Merge branch 'refactor-hw' into dev

This commit is contained in:
2Shirt 2022-05-01 16:35:19 -07:00
commit af13a88c81
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C
22 changed files with 2090 additions and 1839 deletions

View file

@ -10,14 +10,14 @@ import wk
if __name__ == '__main__':
try:
docopt(wk.hw.ddrescue.DOCSTRING)
docopt(wk.clone.ddrescue.DOCSTRING)
except SystemExit:
print('')
wk.std.pause('Press Enter to exit...')
raise
try:
wk.hw.ddrescue.main()
wk.clone.ddrescue.main()
except SystemExit:
raise
except: #pylint: disable=bare-except

View file

@ -4,6 +4,7 @@
from sys import version_info as version
from . import cfg
from . import clone
from . import debug
from . import exe
from . import graph

View file

@ -16,9 +16,22 @@ ATTRIBUTE_COLORS = (
)
# NOTE: Force 4K read block size for disks >= 3TB
BADBLOCKS_LARGE_DISK = 3 * 1024**4
BADBLOCKS_REGEX = re.compile(
r'^Pass completed, (\d+) bad blocks found. .(\d+)/(\d+)/(\d+) errors',
re.IGNORECASE,
)
BADBLOCKS_SKIP_REGEX = re.compile(r'^(Checking|\[)', re.IGNORECASE)
CPU_CRITICAL_TEMP = 99
CPU_FAILURE_TEMP = 90
CPU_TEST_MINUTES = 7
IO_GRAPH_WIDTH = 40
IO_ALT_TEST_SIZE_FACTOR = 0.01
IO_BLOCK_SIZE = 512 * 1024
IO_CHUNK_SIZE = 32 * 1024**2
IO_MINIMUM_TEST_SIZE = 10 * 1024**3
IO_RATE_REGEX = re.compile(
r'(?P<bytes>\d+) bytes.* (?P<seconds>\S+) s(?:,|ecs )',
)
KEY_NVME = 'nvme_smart_health_information_log'
KEY_SMART = 'ata_smart_attributes'
KNOWN_DISK_ATTRIBUTES = {
@ -60,9 +73,15 @@ KNOWN_RAM_VENDOR_IDS = {
'0xAD00': 'Hynix',
'0xCE00': 'Samsung',
}
NVME_WARNING_KEYS = (
'spare_below_threshold',
'reliability_degraded',
'volatile_memory_backup_failed',
)
REGEX_POWER_ON_TIME = re.compile(
r'^(\d+)([Hh].*|\s+\(\d+\s+\d+\s+\d+\).*)'
)
SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS = 120
SMC_IDS = {
# Sources: https://github.com/beltex/SMCKit/blob/master/SMCKit/SMC.swift
# http://www.opensource.apple.com/source/net_snmp/
@ -110,6 +129,18 @@ SMC_IDS = {
'Tp5P': {'CPU Temp': False, 'Source': 'PSU2 Secondary Component'},
'TS0C': {'CPU Temp': False, 'Source': 'CPU B DIMM Exit Ambient'},
}
STATUS_COLORS = {
'Passed': 'GREEN',
'Aborted': 'YELLOW',
'N/A': 'YELLOW',
'Skipped': 'YELLOW',
'Unknown': 'YELLOW',
'Working': 'YELLOW',
'Denied': 'RED',
'ERROR': 'RED',
'Failed': 'RED',
'TimedOut': 'RED',
}
TEMP_COLORS = {
float('-inf'): 'CYAN',
00: 'BLUE',

View file

@ -0,0 +1,3 @@
"""WizardKit: ddrescue-tui module init"""
from . import ddrescue

View file

@ -27,7 +27,12 @@ from wk.cfg.ddrescue import (
DDRESCUE_SETTINGS,
DDRESCUE_SPECIFIC_PASS_SETTINGS,
)
from wk.hw import obj as hw_obj
from wk.hw import disk as hw_disk
from wk.hw.smart import (
check_attributes,
smart_status_ok,
update_smart_details,
)
# STATIC VARIABLES
@ -128,12 +133,12 @@ class BlockPair():
NOTE: source should be a wk.hw.obj.Disk() object
and destination should be a pathlib.Path() object.
"""
self.sector_size = source.details.get('phy-sec', 512)
self.sector_size = source.phy_sec
self.source = source.path
self.destination = destination
self.map_data = {}
self.map_path = None
self.size = source.details['size']
self.size = source.size
self.status = OrderedDict({
'read-skip': 'Pending',
'read-full': 'Pending',
@ -145,9 +150,9 @@ class BlockPair():
# Set map path
# e.g. '(Clone|Image)_Model[_p#]_Size[_Label].map'
map_name = model if model else 'None'
if source.details['bus'] == 'Image':
if source.bus == 'Image':
map_name = 'Image'
if source.details['parent']:
if source.parent:
part_num = re.sub(r"^.*?(\d+)$", r"\1", source.path.name)
map_name += f'_p{part_num}'
size_str = std.bytes_to_string(
@ -155,8 +160,8 @@ class BlockPair():
use_binary=False,
)
map_name += f'_{size_str.replace(" ", "")}'
if source.details.get('label', ''):
map_name += f'_{source.details["label"]}'
if source.raw_details.get('label', ''):
map_name += f'_{source.raw_details["label"]}'
map_name = map_name.replace(' ', '_')
map_name = map_name.replace('/', '_')
if destination.is_dir():
@ -272,8 +277,8 @@ class BlockPair():
"""Run safety check and abort if necessary."""
dest_size = -1
if self.destination.exists():
dest_obj = hw_obj.Disk(self.destination)
dest_size = dest_obj.details['size']
dest_obj = hw_disk.Disk(self.destination)
dest_size = dest_obj.size
del dest_obj
# Check destination size if cloning
@ -311,8 +316,8 @@ class BlockPair():
# Mark future passes as skipped if applicable
if percent == 100:
status_keys = list(self.status.keys())
for i in status_keys[status_keys.index(pass_name)+1:]:
self.status[status_keys[i]] = 'Skipped'
for pass_n in status_keys[status_keys.index(pass_name)+1:]:
self.status[pass_n] = 'Skipped'
class State():
@ -337,13 +342,13 @@ class State():
BlockPair(
source=source,
destination=destination,
model=self.source.details['model'],
model=self.source.model,
working_dir=self.working_dir,
))
def _get_clone_settings_path(self):
"""get Clone settings file path, returns pathlib.Path obj."""
description = self.source.details['model']
description = self.source.model
if not description:
description = self.source.path.name
return pathlib.Path(f'{self.working_dir}/Clone_{description}.json')
@ -436,10 +441,10 @@ class State():
else:
bail = False
for key in ('model', 'serial'):
if settings['Source'][key] != self.source.details[key]:
if settings['Source'][key] != getattr(self.source, key):
std.print_error(f"Clone settings don't match source {key}")
bail = True
if settings['Destination'][key] != self.destination.details[key]:
if settings['Destination'][key] != getattr(self.destination, key):
std.print_error(f"Clone settings don't match destination {key}")
bail = True
if bail:
@ -450,13 +455,13 @@ class State():
settings = CLONE_SETTINGS.copy()
if not settings['Source']:
settings['Source'] = {
'model': self.source.details['model'],
'serial': self.source.details['serial'],
'model': self.source.model,
'serial': self.source.serial,
}
if not settings['Destination']:
settings['Destination'] = {
'model': self.destination.details['model'],
'serial': self.destination.details['serial'],
'model': self.destination.model,
'serial': self.destination.serial,
}
# Done
@ -488,7 +493,7 @@ class State():
if settings['Partition Mapping']:
# Resume previous run, load pairs from settings file
for part_map in settings['Partition Mapping']:
bp_source = hw_obj.Disk(
bp_source = hw_disk.Disk(
f'{self.source.path}{source_sep}{part_map[0]}',
)
bp_dest = pathlib.Path(
@ -616,7 +621,7 @@ class State():
for part in source_parts:
report.append(
f'{part.path.name:<9} '
f'{std.bytes_to_string(part.details["size"], use_binary=False)}'
f'{std.bytes_to_string(part.size, use_binary=False)}'
)
report.append(' ')
@ -875,7 +880,7 @@ class State():
# Add selected partition(s)
for part in source_parts:
num_sectors = part.details['size'] / self.destination.details['log-sec']
num_sectors = part.size / self.destination.log_sec
num_sectors = math.ceil(num_sectors)
part_num += 1
sfdisk_script.append(
@ -883,7 +888,7 @@ class State():
table_type=settings['Table Type'],
dev_path=f'{dest_prefix}{part_num}',
size=num_sectors,
details=part.details,
details=part.raw_details,
),
)
@ -946,13 +951,23 @@ class State():
def safety_check_destination(self):
"""Run safety checks for destination and abort if necessary."""
try:
self.destination.safety_checks()
except hw_obj.CriticalHardwareError as err:
errors_detected = False
# Check for critical errors
if not smart_status_ok(self.destination):
std.print_error(
f'Critical error(s) detected for: {self.destination.path}',
)
raise std.GenericAbort() from err
# Check for minor errors
if not check_attributes(self.destination, only_blocking=False):
std.print_warning(
f'Attribute error(s) detected for: {self.destination.path}',
)
# Done
if errors_detected:
raise std.GenericAbort()
def safety_check_size(self):
"""Run size safety check and abort if necessary."""
@ -966,7 +981,7 @@ class State():
# 1 LBA for the protective MBR
# 33 LBAs each for the primary and backup GPT tables
# Source: https://en.wikipedia.org/wiki/GUID_Partition_Table
required_size += (1 + 33 + 33) * self.destination.details['phy-sec']
required_size += (1 + 33 + 33) * self.destination.phy_sec
if settings['Create Boot Partition']:
# 384MiB EFI System Partition and a 16MiB MS Reserved partition
required_size += (384 + 16) * 1024**2
@ -989,7 +1004,7 @@ class State():
# Check destination size
if self.mode == 'Clone':
destination_size = self.destination.details['size']
destination_size = self.destination.size
error_msg = 'A larger destination disk is required'
else:
# NOTE: Adding an extra 5% here to better ensure it will fit
@ -1097,7 +1112,7 @@ class State():
string = ''
# Build base string
if isinstance(obj, hw_obj.Disk):
if isinstance(obj, hw_disk.Disk):
string = f'{obj.path} {obj.description}'
elif obj.is_dir():
string = f'{obj}/'
@ -1122,7 +1137,7 @@ class State():
if self.source:
source_exists = self.source.path.exists()
if self.destination:
if isinstance(self.destination, hw_obj.Disk):
if isinstance(self.destination, hw_disk.Disk):
dest_exists = self.destination.path.exists()
else:
dest_exists = self.destination.exists()
@ -1296,13 +1311,13 @@ def build_directory_report(path):
def build_disk_report(dev):
"""Build device report, returns list."""
children = dev.details.get('children', [])
children = dev.raw_details.get('children', [])
report = []
# Get widths
widths = {
'fstype': max(6, len(str(dev.details.get('fstype', '')))),
'label': max(5, len(str(dev.details.get('label', '')))),
'fstype': max(6, len(str(dev.filesystem))),
'label': max(5, len(str(dev.raw_details.get('label', '')))),
'name': max(4, len(dev.path.name)),
}
for child in children:
@ -1317,10 +1332,10 @@ def build_disk_report(dev):
# Disk details
report.append(f'{dev.path.name} {dev.description}')
report.append(' ')
dev_fstype = dev.details.get('fstype', '')
dev_label = dev.details.get('label', '')
dev_fstype = dev.filesystem
dev_label = dev.raw_details.get('label', '')
dev_name = dev.path.name
dev_size = std.bytes_to_string(dev.details["size"], use_binary=False)
dev_size = std.bytes_to_string(dev.size, use_binary=False)
# Partition details
report.append(
@ -1485,19 +1500,17 @@ def check_destination_health(destination):
result = ''
# Bail early
if not isinstance(destination, hw_obj.Disk):
if not isinstance(destination, hw_disk.Disk):
# Return empty string
return result
# Run safety checks
try:
destination.safety_checks()
except hw_obj.CriticalHardwareError:
result = 'Critical hardware error detected on destination'
except hw_obj.SMARTSelfTestInProgressError:
result = 'SMART self-test in progress on destination'
except hw_obj.SMARTNotSupportedError:
pass
# Check for critical errors
if not smart_status_ok(destination):
result = 'Critical error(s) detected for: {destination.path}'
# Check for minor errors
if not check_attributes(destination, only_blocking=False):
result = f'Attribute error(s) detected for: {destination.path}'
# Done
return result
@ -1668,20 +1681,19 @@ def get_object(path):
# Check path
path = pathlib.Path(path).resolve()
if path.is_block_device() or path.is_char_device():
obj = hw_obj.Disk(path)
obj = hw_disk.Disk(path)
# Child/Parent check
parent = obj.details['parent']
if parent:
if obj.parent:
std.print_warning(f'"{obj.path}" is a child device')
if std.ask(f'Use parent device "{parent}" instead?'):
obj = hw_obj.Disk(parent)
if std.ask(f'Use parent device "{obj.parent}" instead?'):
obj = hw_disk.Disk(obj.parent)
elif path.is_dir():
obj = path
elif path.is_file():
# Assuming file is a raw image, mounting
loop_path = mount_raw_image(path)
obj = hw_obj.Disk(loop_path)
obj = hw_disk.Disk(loop_path)
# Abort if obj not set
if not obj:
@ -1723,7 +1735,7 @@ def get_table_type(disk):
NOTE: If resulting table type is not GPT or MBR
then an exception is raised.
"""
table_type = str(disk.details.get('pttype', '')).upper()
table_type = str(disk.raw_details.get('pttype', '')).upper()
table_type = table_type.replace('DOS', 'MBR')
# Check type
@ -1830,10 +1842,10 @@ def source_or_destination_changed(state):
elif hasattr(obj, 'exists'):
# Assuming dest path
changed = changed or not obj.exists()
elif isinstance(obj, hw_obj.Disk):
compare_dev = hw_obj.Disk(obj.path)
elif isinstance(obj, hw_disk.Disk):
compare_dev = hw_disk.Disk(obj.path)
for key in ('model', 'serial'):
changed = changed or obj.details[key] != compare_dev.details[key]
changed = changed or getattr(obj, key) != getattr(compare_dev, key)
# Update top panes
state.update_top_panes()
@ -2031,7 +2043,7 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True):
def _update_smart_pane():
"""Update SMART pane every 30 seconds."""
state.source.update_smart_details()
update_smart_details(state.source)
now = datetime.datetime.now(tz=TIMEZONE).strftime('%Y-%m-%d %H:%M %Z')
with open(f'{state.log_dir}/smart.out', 'w', encoding='utf-8') as _f:
_f.write(
@ -2217,7 +2229,7 @@ def run_recovery(state, main_menu, settings_menu, dry_run=True):
def select_disk(prompt, skip_disk=None):
"""Select disk from list, returns Disk()."""
std.print_info('Scanning disks...')
disks = hw_obj.get_disks()
disks = hw_disk.get_disks()
menu = std.Menu(
title=std.color_string(f'ddrescue TUI: {prompt} Selection', 'GREEN'),
)
@ -2226,23 +2238,22 @@ def select_disk(prompt, skip_disk=None):
menu.add_action('Quit')
for disk in disks:
disable_option = False
size = disk.details["size"]
size = disk.size
# Check if option should be disabled
if skip_disk:
parent = skip_disk.details.get('parent', None)
if (disk.path.samefile(skip_disk.path)
or (parent and disk.path.samefile(parent))):
or (skip_disk.parent and disk.path.samefile(skip_disk.parent))):
disable_option = True
# Add to menu
menu.add_option(
name=(
f'{str(disk.path):<12} '
f'{disk.details["bus"]:<5} '
f'{disk.bus:<5} '
f'{std.bytes_to_string(size, decimals=1, use_binary=False):<8} '
f'{disk.details["model"]} '
f'{disk.details["serial"]}'
f'{disk.model} '
f'{disk.serial}'
),
details={'Disabled': disable_option, 'Object': disk},
)
@ -2292,12 +2303,12 @@ def select_disk_parts(prompt, disk):
return [disk]
# Bail early if child device selected
if disk.details.get('parent', False):
if disk.parent:
return [disk]
# Add parts
whole_disk_str = f'{str(disk.path):<14} (Whole device)'
for part in disk.details.get('children', []):
for part in disk.raw_details.get('children', []):
size = part["size"]
name = (
f'{str(part["path"]):<14} '
@ -2320,17 +2331,17 @@ def select_disk_parts(prompt, disk):
object_list.append(option['Path'])
# Check if whole disk selected
if len(object_list) == len(disk.details.get('children', [])):
if len(object_list) == len(disk.raw_details.get('children', [])):
# NOTE: This is not true if the disk has no partitions
msg = f'Preserve partition table and unused space in {prompt.lower()}?'
if std.ask(msg):
# Replace part list with whole disk obj
object_list = [disk.path]
# Convert object_list to hw_obj.Disk() objects
# Convert object_list to hw_disk.Disk() objects
print(' ')
std.print_info('Getting disk/partition details...')
object_list = [hw_obj.Disk(path) for path in object_list]
object_list = [hw_disk.Disk(path) for path in object_list]
# Done
return object_list

View file

@ -20,13 +20,20 @@ METHOD_TYPE = type(DEBUG_CLASS.method)
def generate_object_report(obj, indent=0):
"""Generate debug report for obj, returns list."""
report = []
attr_list = []
# Get attribute list
if hasattr(obj, '__slots__'):
attr_list = list(obj.__slots__)
else:
attr_list = [name for name in dir(obj) if not name.startswith('_')]
# Dump object data
for name in dir(obj):
for name in attr_list:
attr = getattr(obj, name)
# Skip methods and private attributes
if isinstance(attr, METHOD_TYPE) or name.startswith('_'):
# Skip methods
if isinstance(attr, METHOD_TYPE):
continue
# Add attribute to report (expanded if necessary)

View file

@ -1,6 +1,15 @@
"""WizardKit: hw module init"""
from . import ddrescue
from . import audio
from . import benchmark
from . import cpu
from . import diags
from . import obj
from . import disk
from . import keyboard
from . import network
from . import screensavers
from . import sensors
from . import smart
from . import surface_scan
from . import system
from . import test

37
scripts/wk/hw/audio.py Normal file
View file

@ -0,0 +1,37 @@
"""WizardKit: Audio test functions"""
# vim: sts=2 sw=2 ts=2
import logging
from wk.exe import run_program
from wk.std import PLATFORM
# STATIC VARIABLES
LOG = logging.getLogger(__name__)
# Functions
def audio_test() -> None:
"""Run an OS-specific audio test."""
if PLATFORM == 'Linux':
audio_test_linux()
def audio_test_linux() -> None:
"""Run an audio test using amixer and speaker-test."""
LOG.info('Audio Test')
# Set volume
for source in ('Master', 'PCM'):
cmd = f'amixer -q set "{source}" 80% unmute'.split()
run_program(cmd, check=False)
# Run audio tests
for mode in ('pink', 'wav'):
cmd = f'speaker-test -c 2 -l 1 -t {mode}'.split()
run_program(cmd, check=False, pipe=False)
if __name__ == '__main__':
print("This file is not meant to be called directly.")

212
scripts/wk/hw/benchmark.py Normal file
View file

@ -0,0 +1,212 @@
"""WizardKit: Benchmark test functions"""
# vim: sts=2 sw=2 ts=2
import logging
from subprocess import PIPE, STDOUT
from wk import graph
from wk.cfg.hw import (
IO_ALT_TEST_SIZE_FACTOR,
IO_BLOCK_SIZE,
IO_CHUNK_SIZE,
IO_GRAPH_WIDTH,
IO_MINIMUM_TEST_SIZE,
IO_RATE_REGEX,
THRESH_HDD_AVG_HIGH,
THRESH_HDD_AVG_LOW,
THRESH_HDD_MIN,
THRESH_SSD_AVG_HIGH,
THRESH_SSD_AVG_LOW,
THRESH_SSD_MIN,
)
from wk.exe import run_program
from wk.std import (
PLATFORM,
strip_colors,
color_string,
)
# STATIC VARIABLES
LOG = logging.getLogger(__name__)
# Error Classes
class DeviceTooSmallError(RuntimeError):
"""Raised when a device is too small to test."""
# Functions
def calc_io_dd_values(dev_size) -> dict[str, int]:
"""Calculate I/O benchmark dd values, returns dict.
Calculations:
The minimum dev size is IO_GRAPH_WIDTH * IO_CHUNK_SIZE
(e.g. 1.25 GB for a width of 40 and a chunk size of 32MB)
read_total is the area to be read in bytes
If the dev is < IO_MINIMUM_TEST_SIZE then it's the whole dev
Else it's the larger of IO_MINIMUM_TEST_SIZE or the alt test size
(determined by dev * IO_ALT_TEST_SIZE_FACTOR)
read_chunks is the number of groups of IO_CHUNK_SIZE in test_obj.dev
This number is reduced to a multiple of IO_GRAPH_WIDTH in order
to allow for the data to be condensed cleanly
read_blocks is the chunk size in number of blocks
(e.g. 64 if block size is 512KB and chunk size is 32MB
skip_total is the number of IO_BLOCK_SIZE groups not tested
skip_blocks is the number of blocks to skip per IO_CHUNK_SIZE
skip_extra_rate is how often to add an additional skip block
This is needed to ensure an even testing across the dev
This is calculated by using the fractional amount left off
of the skip_blocks variable
"""
read_total = min(IO_MINIMUM_TEST_SIZE, dev_size)
read_total = max(read_total, dev_size*IO_ALT_TEST_SIZE_FACTOR)
read_chunks = int(read_total // IO_CHUNK_SIZE)
read_chunks -= read_chunks % IO_GRAPH_WIDTH
if read_chunks < IO_GRAPH_WIDTH:
raise DeviceTooSmallError
read_blocks = int(IO_CHUNK_SIZE / IO_BLOCK_SIZE)
read_total = read_chunks * IO_CHUNK_SIZE
skip_total = int((dev_size - read_total) // IO_BLOCK_SIZE)
skip_blocks = int((skip_total / read_chunks) // 1)
skip_extra_rate = 0
try:
skip_extra_rate = 1 + int(1 / ((skip_total / read_chunks) % 1))
except ZeroDivisionError:
# skip_extra_rate == 0 is fine
pass
# Done
return {
'Read Chunks': read_chunks,
'Read Blocks': read_blocks,
'Skip Blocks': skip_blocks,
'Skip Extra': skip_extra_rate,
}
def check_io_results(test_obj, rate_list, graph_width) -> None:
"""Check I/O restuls and generate report using rate_list."""
avg_read = sum(rate_list) / len(rate_list)
min_read = min(rate_list)
max_read = max(rate_list)
if test_obj.dev.ssd:
thresh_min = THRESH_SSD_MIN
thresh_avg_high = THRESH_SSD_AVG_HIGH
thresh_avg_low = THRESH_SSD_AVG_LOW
else:
thresh_min = THRESH_HDD_MIN
thresh_avg_high = THRESH_HDD_AVG_HIGH
thresh_avg_low = THRESH_HDD_AVG_LOW
# Add horizontal graph to report
for line in graph.generate_horizontal_graph(rate_list, graph_width):
if not strip_colors(line).strip():
# Skip empty lines
continue
test_obj.report.append(line)
# Add read rates to report
test_obj.report.append(
f'Read speeds avg: {avg_read/(1000**2):3.1f}'
f' min: {min_read/(1000**2):3.1f}'
f' max: {max_read/(1000**2):3.1f}'
)
# Compare against thresholds
if min_read <= thresh_min and avg_read <= thresh_avg_high:
test_obj.failed = True
elif avg_read <= thresh_avg_low:
test_obj.failed = True
else:
test_obj.passed = True
# Set status
if test_obj.failed:
test_obj.set_status('Failed')
elif test_obj.passed:
test_obj.set_status('Passed')
else:
test_obj.set_status('Unknown')
def run_io_test(test_obj, log_path) -> None:
"""Run I/O benchmark and handle exceptions."""
dev_path = test_obj.dev.path
if PLATFORM == 'Darwin':
# Use "RAW" disks under macOS
dev_path = dev_path.with_name(f'r{dev_path.name}')
LOG.info('Using %s for better performance', dev_path)
offset = 0
read_rates = []
test_obj.report.append(color_string('I/O Benchmark', 'BLUE'))
# Get dd values or bail
try:
dd_values = calc_io_dd_values(test_obj.dev.size)
except DeviceTooSmallError:
test_obj.set_status('N/A')
test_obj.report.append(
color_string('Disk too small to test', 'YELLOW'),
)
return
# Run dd read tests
for _i in range(dd_values['Read Chunks']):
_i += 1
# Build cmd
skip = dd_values['Skip Blocks']
if dd_values['Skip Extra'] and _i % dd_values['Skip Extra'] == 0:
skip += 1
cmd = [
'sudo', 'dd',
f'bs={IO_BLOCK_SIZE}',
f'skip={offset+skip}',
f'count={dd_values["Read Blocks"]}',
f'if={dev_path}',
'of=/dev/null',
]
if PLATFORM == 'Linux':
cmd.append('iflag=direct')
# Run and get read rate
try:
proc = run_program(
cmd,
pipe=False,
stdout=PIPE,
stderr=STDOUT,
)
except PermissionError as err:
# Since we're using sudo we can't kill dd
# Assuming this happened during a CTRL+c
raise KeyboardInterrupt from err
match = IO_RATE_REGEX.search(proc.stdout)
if match:
read_rates.append(
int(match.group('bytes')) / float(match.group('seconds')),
)
match.group(1)
# Show progress
with open(log_path, 'a', encoding='utf-8') as _f:
if _i % 5 == 0:
percent = (_i / dd_values['Read Chunks']) * 100
_f.write(f' {graph.vertical_graph_line(percent, read_rates[-1])}\n')
# Update offset
offset += dd_values['Read Blocks'] + skip
# Check results
check_io_results(test_obj, read_rates, IO_GRAPH_WIDTH)
if __name__ == '__main__':
print("This file is not meant to be called directly.")

208
scripts/wk/hw/cpu.py Normal file
View file

@ -0,0 +1,208 @@
"""WizardKit: CPU test functions"""
# vim: sts=2 sw=2 ts=2
import logging
import re
import subprocess
from typing import TextIO
from wk import exe
from wk.cfg.hw import CPU_FAILURE_TEMP
from wk.os.mac import set_fans as macos_set_fans
from wk.std import (
PLATFORM,
color_string,
print_error,
print_warning,
)
from wk.tmux import respawn_pane as tmux_respawn_pane
# STATIC VARIABLES
LOG = logging.getLogger(__name__)
SysbenchType = tuple[subprocess.Popen, TextIO]
# Functions
def check_cooling_results(test_obj, sensors, run_sysbench=False) -> None:
"""Check cooling results and update test_obj."""
max_temp = sensors.cpu_max_temp()
temp_labels = ['Idle', 'Max', 'Cooldown']
if run_sysbench:
temp_labels.append('Sysbench')
# Check temps
if not max_temp:
test_obj.set_status('Unknown')
elif max_temp >= CPU_FAILURE_TEMP:
test_obj.failed = True
test_obj.set_status('Failed')
elif 'Aborted' not in test_obj.status:
test_obj.passed = True
test_obj.set_status('Passed')
# Add temps to report
for line in sensors.generate_report(*temp_labels, only_cpu=True):
test_obj.report.append(f' {line}')
def check_mprime_results(test_obj, working_dir) -> None:
"""Check mprime log files and update test_obj."""
passing_lines = {}
warning_lines = {}
def _read_file(log_name) -> list[str]:
"""Read file and split into lines, returns list."""
lines = []
try:
with open(f'{working_dir}/{log_name}', 'r', encoding='utf-8') as _f:
lines = _f.readlines()
except FileNotFoundError:
# File may be missing on older systems
lines = []
return lines
# results.txt (check if failed)
for line in _read_file('results.txt'):
line = line.strip()
if re.search(r'(error|fail)', line, re.IGNORECASE):
warning_lines[line] = None
# print.log (check if passed)
for line in _read_file('prime.log'):
line = line.strip()
match = re.search(
r'(completed.*(\d+) errors, (\d+) warnings)', line, re.IGNORECASE)
if match:
if int(match.group(2)) + int(match.group(3)) > 0:
# Errors and/or warnings encountered
warning_lines[match.group(1).capitalize()] = None
else:
# No errors/warnings
passing_lines[match.group(1).capitalize()] = None
# Update status
if warning_lines:
test_obj.failed = True
test_obj.set_status('Failed')
elif passing_lines and 'Aborted' not in test_obj.status:
test_obj.passed = True
test_obj.set_status('Passed')
else:
test_obj.set_status('Unknown')
# Update report
for line in passing_lines:
test_obj.report.append(f' {line}')
for line in warning_lines:
test_obj.report.append(color_string(f' {line}', 'YELLOW'))
if not (passing_lines or warning_lines):
test_obj.report.append(color_string(' Unknown result', 'YELLOW'))
def start_mprime(working_dir, log_path) -> subprocess.Popen:
"""Start mprime and save filtered output to log, returns Popen object."""
set_apple_fan_speed('max')
proc_mprime = subprocess.Popen( # pylint: disable=consider-using-with
['mprime', '-t'],
cwd=working_dir,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
proc_grep = subprocess.Popen( # pylint: disable=consider-using-with
'grep --ignore-case --invert-match --line-buffered stress.txt'.split(),
stdin=proc_mprime.stdout,
stdout=subprocess.PIPE,
)
proc_mprime.stdout.close()
save_nsbr = exe.NonBlockingStreamReader(proc_grep.stdout)
exe.start_thread(
save_nsbr.save_to_file,
args=(proc_grep, log_path),
)
# Return objects
return proc_mprime
def start_sysbench(sensors, sensors_out, log_path, pane) -> SysbenchType:
"""Start sysbench, returns tuple with Popen object and file handle."""
set_apple_fan_speed('max')
sysbench_cmd = [
'sysbench',
f'--threads={exe.psutil.cpu_count()}',
'--cpu-max-prime=1000000000',
'cpu',
'run',
]
# Restart background monitor for Sysbench
sensors.stop_background_monitor()
sensors.start_background_monitor(
sensors_out,
alt_max='Sysbench',
thermal_action=('killall', 'sysbench', '-INT'),
)
# Update bottom pane
tmux_respawn_pane(pane, watch_file=log_path, watch_cmd='tail')
# Start sysbench
filehandle_sysbench = open( # pylint: disable=consider-using-with
log_path, 'a', encoding='utf-8',
)
proc_sysbench = exe.popen_program(sysbench_cmd, stdout=filehandle_sysbench)
# Done
return (proc_sysbench, filehandle_sysbench)
def set_apple_fan_speed(speed) -> None:
"""Set Apple fan speed."""
cmd = None
# Check
if speed not in ('auto', 'max'):
raise RuntimeError(f'Invalid speed {speed}')
# Set cmd
if PLATFORM == 'Darwin':
try:
macos_set_fans(speed)
except (RuntimeError, ValueError, subprocess.CalledProcessError) as err:
LOG.error('Failed to set fans to %s', speed)
LOG.error('Error: %s', err)
print_error(f'Failed to set fans to {speed}')
for line in str(err).splitlines():
print_warning(f' {line.strip()}')
elif PLATFORM == 'Linux':
cmd = ['apple-fans', speed]
exe.run_program(cmd, check=False)
def stop_mprime(proc_mprime) -> None:
"""Stop mprime gracefully, then forcefully as needed."""
proc_mprime.terminate()
try:
proc_mprime.wait(timeout=5)
except subprocess.TimeoutExpired:
proc_mprime.kill()
set_apple_fan_speed('auto')
def stop_sysbench(proc_sysbench, filehandle_sysbench) -> None:
"""Stop sysbench."""
proc_sysbench.terminate()
try:
proc_sysbench.wait(timeout=5)
except subprocess.TimeoutExpired:
proc_sysbench.kill()
filehandle_sysbench.flush()
filehandle_sysbench.close()
set_apple_fan_speed('auto')
if __name__ == '__main__':
print("This file is not meant to be called directly.")

File diff suppressed because it is too large Load diff

394
scripts/wk/hw/disk.py Normal file
View file

@ -0,0 +1,394 @@
"""WizardKit: Disk object and functions"""
# vim: sts=2 sw=2 ts=2
import logging
import pathlib
import plistlib
import re
from dataclasses import dataclass, field
from typing import Any, Union
from wk.cfg.main import KIT_NAME_SHORT
from wk.exe import get_json_from_command, run_program
from wk.hw.test import Test
from wk.hw.smart import (
enable_smart,
generate_attribute_report,
update_smart_details,
)
from wk.std import PLATFORM, bytes_to_string, color_string
# STATIC VARIABLES
LOG = logging.getLogger(__name__)
WK_LABEL_REGEX = re.compile(
fr'{KIT_NAME_SHORT}_(LINUX|UFD)',
re.IGNORECASE,
)
# Classes
@dataclass(slots=True)
class Disk:
# pylint: disable=too-many-instance-attributes
"""Object for tracking disk specific data."""
attributes: dict[Any, dict] = field(init=False, default_factory=dict)
bus: str = field(init=False)
description: str = field(init=False)
filesystem: str = field(init=False)
log_sec: int = field(init=False)
model: str = field(init=False)
name: str = field(init=False)
notes: list[str] = field(init=False, default_factory=list)
path: Union[pathlib.Path, str]
parent: str = field(init=False)
phy_sec: int = field(init=False)
raw_details: dict[str, Any] = field(init=False)
raw_smartctl: dict[str, Any] = field(init=False)
serial: str = field(init=False)
size: int = field(init=False)
ssd: bool = field(init=False)
tests: list[Test] = field(init=False, default_factory=list)
use_sat: bool = field(init=False, default=False)
def __post_init__(self) -> None:
self.path = pathlib.Path(self.path).resolve()
self.get_details()
self.set_description()
enable_smart(self)
update_smart_details(self)
if not self.attributes and self.bus == 'USB':
# Try using SAT
LOG.warning('Using SAT for smartctl for %s', self.path)
self.notes = []
self.use_sat = True
enable_smart(self)
update_smart_details(self)
if not self.is_4k_aligned():
self.add_note('One or more partitions are not 4K aligned', 'YELLOW')
def add_note(self, note, color=None) -> None:
"""Add note that will be included in the disk report."""
if color:
note = color_string(note, color)
if note not in self.notes:
self.notes.append(note)
self.notes.sort()
def disable_disk_tests(self) -> None:
"""Disable all tests."""
LOG.warning('Disabling all tests for: %s', self.path)
for test in self.tests:
if test.status in ('Pending', 'Working'):
test.set_status('Denied')
test.disabled = True
def generate_report(self, header=True) -> list[str]:
"""Generate Disk report, returns list."""
report = []
if header:
report.append(color_string(f'Device ({self.path.name})', 'BLUE'))
report.append(f' {self.description}')
# Attributes
if self.attributes:
if header:
report.append(color_string('Attributes', 'BLUE'))
report.extend(generate_attribute_report(self))
# Notes
if self.notes:
report.append(color_string('Notes', 'BLUE'))
for note in self.notes:
report.append(f' {note}')
# Tests
for test in self.tests:
report.extend(test.report)
return report
def get_details(self) -> None:
"""Get disk details using OS specific methods.
Required details default to generic descriptions
and are converted to the correct type.
"""
if PLATFORM == 'Darwin':
self.raw_details = get_disk_details_macos(self.path)
elif PLATFORM == 'Linux':
self.raw_details = get_disk_details_linux(self.path)
# Set necessary details
self.bus = str(self.raw_details.get('bus', '???')).upper()
self.bus = self.bus.replace('IMAGE', 'Image')
self.bus = self.bus.replace('NVME', 'NVMe')
self.filesystem = self.raw_details.get('fstype', 'Unknown')
self.log_sec = self.raw_details.get('log-sec', 512)
self.model = self.raw_details.get('model', 'Unknown Model')
self.name = self.raw_details.get('name', self.path)
self.parent = self.raw_details.get('parent', None)
self.phy_sec = self.raw_details.get('phy-sec', 512)
self.serial = self.raw_details.get('serial', 'Unknown Serial')
self.size = self.raw_details.get('size', -1)
self.ssd = self.raw_details.get('ssd', False)
# Ensure certain attributes types
## NOTE: This is ugly, deal.
for attr in ['bus', 'model', 'name', 'serial']:
setattr(self, attr, str(getattr(self, attr)))
for attr in ['log_sec', 'phy_sec', 'size']:
try:
setattr(self, attr, int(getattr(self, attr)))
except (TypeError, ValueError):
LOG.error('Invalid disk %s: %s', attr, getattr(self, attr))
if attr == 'size':
setattr(self, attr, -1)
# Set description
self.description = (
f'{bytes_to_string(self.size, use_binary=False)}'
f' ({self.bus})'
f' {self.model}'
f' {self.serial}'
)
def get_labels(self) -> list[str]:
"""Build list of labels for this disk, returns list."""
labels = []
# Add all labels from raw_details
for details in [self.raw_details, *self.raw_details.get('children', [])]:
labels.append(details.get('label', ''))
labels.append(details.get('partlabel', ''))
# Remove empty labels
labels = [str(label) for label in labels if label]
# Done
return labels
def is_4k_aligned(self) -> bool:
"""Check that all disk partitions are aligned, returns bool."""
aligned = True
if PLATFORM == 'Darwin':
aligned = is_4k_aligned_macos(self.raw_details)
elif PLATFORM == 'Linux':
aligned = is_4k_aligned_linux(self.path, self.phy_sec)
return aligned
def set_description(self) -> None:
"""Set disk description from details."""
self.description = (
f'{bytes_to_string(self.size, use_binary=False)}'
f' ({self.bus}) {self.model} {self.serial}'
)
# Functions
def get_disk_details_linux(path) -> dict[Any, Any]:
"""Get disk details using lsblk, returns dict."""
cmd = ['lsblk', '--bytes', '--json', '--output-all', '--paths', path]
json_data = get_json_from_command(cmd, check=False)
details = json_data.get('blockdevices', [{}])[0]
# Fix details
for dev in [details, *details.get('children', [])]:
dev['bus'] = dev.pop('tran', '???')
dev['parent'] = dev.pop('pkname', None)
dev['ssd'] = not dev.pop('rota', True)
if 'loop' in str(path) and dev['bus'] is None:
dev['bus'] = 'Image'
dev['model'] = ''
dev['serial'] = ''
# Done
return details
def get_disk_details_macos(path) -> dict[Any, Any]:
"""Get disk details using diskutil, returns dict."""
details = {}
# Get "list" details
cmd = ['diskutil', 'list', '-plist', path]
proc = run_program(cmd, check=False, encoding=None, errors=None)
try:
plist_data = plistlib.loads(proc.stdout)
except (TypeError, ValueError):
# Invalid / corrupt plist data? return empty dict to avoid crash
LOG.error('Failed to get diskutil list for %s', path)
return details
# Parse "list" details
details = plist_data.get('AllDisksAndPartitions', [{}])[0]
details['children'] = details.pop('Partitions', [])
details['path'] = path
for child in details['children']:
child['path'] = path.with_name(child.get('DeviceIdentifier', 'null'))
# Get "info" details
for dev in [details, *details['children']]:
cmd = ['diskutil', 'info', '-plist', dev['path']]
proc = run_program(cmd, check=False, encoding=None, errors=None)
try:
plist_data = plistlib.loads(proc.stdout)
except (TypeError, ValueError):
LOG.error('Failed to get diskutil info for %s', path)
continue #Skip
# Parse "info" details
dev.update(plist_data)
dev['bus'] = dev.pop('BusProtocol', '???')
dev['fstype'] = dev.pop('FilesystemType', '')
dev['label'] = dev.pop('VolumeName', '')
dev['model'] = dev.pop('MediaName', 'Unknown')
dev['mountpoint'] = dev.pop('MountPoint', '')
dev['name'] = dev.get('name', str(dev['path']))
dev['phy-sec'] = dev.pop('DeviceBlockSize', 512)
dev['serial'] = get_disk_serial_macos(dev['path'])
dev['size'] = dev.pop('Size', -1)
dev['ssd'] = dev.pop('SolidState', False)
dev['vendor'] = ''
if dev.get('WholeDisk', True):
dev['parent'] = None
else:
dev['parent'] = dev.pop('ParentWholeDisk', None)
# Fix details if main dev is a child
for child in details['children']:
if path == child['path']:
for key in ('fstype', 'label', 'name', 'size'):
details[key] = child[key]
break
# Done
return details
def get_disk_serial_macos(path) -> str:
"""Get disk serial using system_profiler, returns str."""
cmd = ['sudo', 'smartctl', '--info', '--json', path]
smart_info = get_json_from_command(cmd)
return smart_info.get('serial_number', 'Unknown Serial')
def get_disks(skip_kits=False) -> list[Disk]:
"""Get disks using OS-specific methods, returns list."""
disks = []
if PLATFORM == 'Darwin':
disks = get_disks_macos()
elif PLATFORM == 'Linux':
disks = get_disks_linux()
# Skip WK disks
if skip_kits:
disks = [
disk_obj for disk_obj in disks
if not any(
WK_LABEL_REGEX.search(label) for label in disk_obj.get_labels()
)
]
# Done
return disks
def get_disks_linux() -> list[Disk]:
"""Get disks via lsblk, returns list."""
cmd = ['lsblk', '--json', '--nodeps', '--paths']
disks = []
# Add valid disks
json_data = get_json_from_command(cmd)
for disk in json_data.get('blockdevices', []):
disk_obj = Disk(disk['name'])
# Skip loopback devices, optical devices, etc
if disk_obj.raw_details.get('type', '???') != 'disk':
continue
# Add disk
disks.append(disk_obj)
# Done
return disks
def get_disks_macos() -> list[Disk]:
"""Get disks via diskutil, returns list."""
cmd = ['diskutil', 'list', '-plist', 'physical']
disks = []
# Get info from diskutil
proc = run_program(cmd, encoding=None, errors=None, check=False)
if proc.returncode != 0:
# Assuming we're running on an older macOS version
cmd.pop(-1)
proc = run_program(cmd, encoding=None, errors=None, check=False)
# Parse plist data
try:
plist_data = plistlib.loads(proc.stdout)
except (TypeError, ValueError):
# Invalid / corrupt plist data? return empty list to avoid crash
LOG.error('Failed to get diskutil list')
return disks
# Add valid disks
for disk in plist_data['WholeDisks']:
disks.append(Disk(f'/dev/{disk}'))
# Remove virtual disks
# TODO: Test more to figure out why some drives are being marked 'Unknown'
disks = [
d for d in disks if d.details.get('VirtualOrPhysical') != 'Virtual'
]
# Done
return disks
def is_4k_aligned_macos(disk_details) -> bool:
"""Check partition alignment using diskutil info, returns bool."""
aligned = True
# Check partitions
for part in disk_details.get('children', []):
offset = part.get('PartitionMapPartitionOffset', 0)
if not offset:
# Assuming offset couldn't be found and it defaulted to 0
# NOTE: Just logging the error, not bailing
LOG.error('Failed to get partition offset for %s', part['path'])
aligned = aligned and offset >= 0 and offset % 4096 == 0
# Done
return aligned
def is_4k_aligned_linux(dev_path, physical_sector_size) -> bool:
"""Check partition alignment using lsblk, returns bool."""
aligned = True
cmd = [
'sudo',
'sfdisk',
'--json',
dev_path,
]
# Get partition details
json_data = get_json_from_command(cmd)
# Check partitions
for part in json_data.get('partitiontable', {}).get('partitions', []):
offset = physical_sector_size * part.get('start', -1)
aligned = aligned and offset >= 0 and offset % 4096 == 0
# Done
return aligned
if __name__ == '__main__':
print("This file is not meant to be called directly.")

31
scripts/wk/hw/keyboard.py Normal file
View file

@ -0,0 +1,31 @@
"""WizardKit: Keyboard test functions"""
# vim: sts=2 sw=2 ts=2
import logging
from wk.exe import run_program
from wk.std import PLATFORM, print_warning
# STATIC VARIABLES
LOG = logging.getLogger(__name__)
# Functions
def keyboard_test() -> None:
"""Test keyboard using OS specific functions."""
if PLATFORM == 'Linux':
run_xev()
else:
print_warning(f'Not supported under this OS: {PLATFORM}')
def run_xev() -> None:
"""Test keyboard using xev."""
LOG.info('Keyboard Test (xev)')
cmd = ['xev', '-event', 'keyboard']
run_program(cmd, check=False, pipe=False)
if __name__ == '__main__':
print("This file is not meant to be called directly.")

58
scripts/wk/hw/network.py Normal file
View file

@ -0,0 +1,58 @@
"""WizardKit: Network test functions"""
# vim: sts=2 sw=2 ts=2
import logging
from wk.net import (
connected_to_private_network,
ping,
show_valid_addresses,
speedtest,
)
from wk.std import (
TryAndPrint,
pause,
print_warning,
)
# STATIC VARIABLES
LOG = logging.getLogger(__name__)
# Functions
def network_test() -> None:
"""Run network tests."""
LOG.info('Network Test')
try_and_print = TryAndPrint()
result = try_and_print.run(
message='Network connection...',
function=connected_to_private_network,
msg_good='OK',
raise_on_error=True,
)
# Bail if not connected
if result['Failed']:
print_warning('Please connect to a network and try again')
pause('Press Enter to return to main menu...')
return
# Show IP address(es)
show_valid_addresses()
# Ping tests
try_and_print.run(
'Internet connection...', ping, msg_good='OK', addr='8.8.8.8')
try_and_print.run(
'DNS resolution...', ping, msg_good='OK', addr='google.com')
# Speedtest
try_and_print.run('Speedtest...', speedtest)
# Done
pause('Press Enter to return to main menu...')
if __name__ == '__main__':
print("This file is not meant to be called directly.")

View file

@ -1,909 +0,0 @@
"""WizardKit: Hardware objects (mostly)"""
# vim: sts=2 sw=2 ts=2
import logging
import pathlib
import plistlib
import re
from collections import OrderedDict
from wk.cfg.hw import (
ATTRIBUTE_COLORS,
KEY_NVME,
KEY_SMART,
KNOWN_DISK_ATTRIBUTES,
KNOWN_DISK_MODELS,
KNOWN_RAM_VENDOR_IDS,
REGEX_POWER_ON_TIME,
)
from wk.cfg.main import KIT_NAME_SHORT
from wk.exe import get_json_from_command, run_program
from wk.std import (
PLATFORM,
bytes_to_string,
color_string,
sleep,
string_to_bytes,
)
# STATIC VARIABLES
LOG = logging.getLogger(__name__)
NVME_WARNING_KEYS = (
'spare_below_threshold',
'reliability_degraded',
'volatile_memory_backup_failed',
)
SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS = 120
WK_LABEL_REGEX = re.compile(
fr'{KIT_NAME_SHORT}_(LINUX|UFD)',
re.IGNORECASE,
)
# Exception Classes
class CriticalHardwareError(RuntimeError):
"""Exception used for critical hardware failures."""
class SMARTNotSupportedError(TypeError):
"""Exception used for disks lacking SMART support."""
class SMARTSelfTestInProgressError(RuntimeError):
"""Exception used when a SMART self-test is in progress."""
# Classes
class BaseObj():
"""Base object for tracking device data."""
def __init__(self):
self.tests = OrderedDict()
def all_tests_passed(self):
"""Check if all tests passed, returns bool."""
return all(results.passed for results in self.tests.values())
def any_test_failed(self):
"""Check if any test failed, returns bool."""
return any(results.failed for results in self.tests.values())
class CpuRam(BaseObj):
"""Object for tracking CPU & RAM specific data."""
def __init__(self):
super().__init__()
self.description = 'Unknown'
self.details = {}
self.ram_total = 'Unknown'
self.ram_dimms = []
self.tests = OrderedDict()
# Update details
self.get_cpu_details()
self.get_ram_details()
def generate_report(self):
"""Generate CPU & RAM report, returns list."""
report = []
report.append(color_string('Device', 'BLUE'))
report.append(f' {self.description}')
# Include RAM details
report.append(color_string('RAM', 'BLUE'))
report.append(f' {self.ram_total} ({", ".join(self.ram_dimms)})')
# Tests
for test in self.tests.values():
report.extend(test.report)
return report
def get_cpu_details(self):
"""Get CPU details using OS specific methods."""
if PLATFORM == 'Darwin':
cmd = 'sysctl -n machdep.cpu.brand_string'.split()
proc = run_program(cmd, check=False)
self.description = re.sub(r'\s+', ' ', proc.stdout.strip())
elif PLATFORM == 'Linux':
cmd = ['lscpu', '--json']
json_data = get_json_from_command(cmd)
for line in json_data.get('lscpu', [{}]):
_field = line.get('field', '').replace(':', '')
_data = line.get('data', '')
if not (_field or _data):
# Skip
continue
self.details[_field] = _data
self.description = self.details.get('Model name', '')
# Replace empty description
if not self.description:
self.description = 'Unknown CPU'
def get_ram_details(self):
"""Get RAM details using OS specific methods."""
if PLATFORM == 'Darwin':
dimm_list = get_ram_list_macos()
elif PLATFORM == 'Linux':
dimm_list = get_ram_list_linux()
details = {'Total': 0}
for dimm_details in dimm_list:
size, manufacturer = dimm_details
if size <= 0:
# Skip empty DIMMs
continue
description = f'{bytes_to_string(size)} {manufacturer}'
details['Total'] += size
if description in details:
details[description] += 1
else:
details[description] = 1
# Save details
self.ram_total = bytes_to_string(details.pop('Total', 0))
self.ram_dimms = [
f'{count}x {desc}' for desc, count in sorted(details.items())
]
class Disk(BaseObj):
"""Object for tracking disk specific data."""
def __init__(self, path):
super().__init__()
self.attributes = {}
self.description = 'Unknown'
self.details = {}
self.notes = []
self.path = pathlib.Path(path).resolve()
self.smartctl = {}
self.tests = OrderedDict()
# Update details
self.get_details()
self.enable_smart()
self.update_smart_details()
if self.details['bus'] == 'USB' and not self.attributes:
# Try using SAT
LOG.warning('Using SAT for smartctl for %s', self.path)
self.enable_smart(use_sat=True)
self.update_smart_details(use_sat=True)
if not self.is_4k_aligned():
self.add_note('One or more partitions are not 4K aligned', 'YELLOW')
def abort_self_test(self):
"""Abort currently running non-captive self-test."""
cmd = ['sudo', 'smartctl', '--abort', self.path]
run_program(cmd, check=False)
def add_note(self, note, color=None):
"""Add note that will be included in the disk report."""
if color:
note = color_string(note, color)
if note not in self.notes:
self.notes.append(note)
self.notes.sort()
def check_attributes(self, only_blocking=False):
"""Check if any known attributes are failing, returns bool."""
attributes_ok = True
known_attributes = get_known_disk_attributes(self.details['model'])
for attr, value in self.attributes.items():
# Skip unknown attributes
if attr not in known_attributes:
continue
# Get thresholds
blocking_attribute = known_attributes[attr].get('Blocking', False)
err_thresh = known_attributes[attr].get('Error', None)
max_thresh = known_attributes[attr].get('Maximum', None)
if not max_thresh:
max_thresh = float('inf')
# Skip non-blocking attributes if necessary
if only_blocking and not blocking_attribute:
continue
# Skip informational attributes
if not err_thresh:
continue
# Check attribute
if known_attributes[attr].get('PercentageLife', False):
if 0 <= value['raw'] <= err_thresh:
attributes_ok = False
elif err_thresh <= value['raw'] < max_thresh:
attributes_ok = False
# Done
return attributes_ok
def disable_disk_tests(self):
"""Disable all tests."""
LOG.warning('Disabling all tests for: %s', self.path)
for test in self.tests.values():
if test.status in ('Pending', 'Working'):
test.set_status('Denied')
test.disabled = True
def enable_smart(self, use_sat=False):
"""Try enabling SMART for this disk."""
cmd = [
'sudo',
'smartctl',
f'--device={"sat,auto" if use_sat else "auto"}',
'--tolerance=permissive',
'--smart=on',
self.path,
]
run_program(cmd, check=False)
def generate_attribute_report(self):
"""Generate attribute report, returns list."""
known_attributes = get_known_disk_attributes(self.details['model'])
report = []
for attr, value in sorted(self.attributes.items()):
note = ''
value_color = 'GREEN'
# Skip attributes not in our list
if attr not in known_attributes:
continue
# Check for attribute note
note = known_attributes[attr].get('Note', '')
# ID / Name
label = f'{attr:>3}'
if isinstance(attr, int):
# Assuming SMART, include hex ID and name
label += f' / {str(hex(attr))[2:].upper():0>2}: {value["name"]}'
label = f' {label.replace("_", " "):38}'
# Value color
if known_attributes[attr].get('PercentageLife', False):
# PercentageLife values
if 0 <= value['raw'] <= known_attributes[attr]['Error']:
value_color = 'RED'
note = '(failed, % life remaining)'
elif value['raw'] < 0 or value['raw'] > 100:
value_color = 'PURPLE'
note = '(invalid?)'
else:
for threshold, color in ATTRIBUTE_COLORS:
threshold_val = known_attributes[attr].get(threshold, None)
if threshold_val and value['raw'] >= threshold_val:
value_color = color
if threshold == 'Error':
note = '(failed)'
elif threshold == 'Maximum':
note = '(invalid?)'
# 199/C7 warning
if str(attr) == '199' and value['raw'] > 0:
note = '(bad cable?)'
# Build colored string and append to report
line = color_string(
[label, value['raw_str'], note],
[None, value_color, 'YELLOW'],
)
report.append(line)
# Done
return report
def generate_report(self, header=True):
"""Generate Disk report, returns list."""
report = []
if header:
report.append(color_string(f'Device ({self.path.name})', 'BLUE'))
report.append(f' {self.description}')
# Attributes
if self.attributes:
if header:
report.append(color_string('Attributes', 'BLUE'))
report.extend(self.generate_attribute_report())
# Notes
if self.notes:
report.append(color_string('Notes', 'BLUE'))
for note in self.notes:
report.append(f' {note}')
# Tests
for test in self.tests.values():
report.extend(test.report)
return report
def get_details(self):
"""Get disk details using OS specific methods.
Required details default to generic descriptions
and are converted to the correct type.
"""
if PLATFORM == 'Darwin':
self.details = get_disk_details_macos(self.path)
elif PLATFORM == 'Linux':
self.details = get_disk_details_linux(self.path)
# Set necessary details
self.details['bus'] = str(self.details.get('bus', '???')).upper()
self.details['bus'] = self.details['bus'].replace('IMAGE', 'Image')
self.details['bus'] = self.details['bus'].replace('NVME', 'NVMe')
self.details['fstype'] = self.details.get('fstype', 'Unknown')
self.details['log-sec'] = self.details.get('log-sec', 512)
self.details['model'] = self.details.get('model', 'Unknown Model')
self.details['name'] = self.details.get('name', self.path)
self.details['phy-sec'] = self.details.get('phy-sec', 512)
self.details['serial'] = self.details.get('serial', 'Unknown Serial')
self.details['size'] = self.details.get('size', -1)
self.details['ssd'] = self.details.get('ssd', False)
# Ensure certain attributes types
for attr in ['bus', 'model', 'name', 'serial']:
if not isinstance(self.details[attr], str):
self.details[attr] = str(self.details[attr])
for attr in ['phy-sec', 'size']:
if not isinstance(self.details[attr], int):
try:
self.details[attr] = int(self.details[attr])
except (TypeError, ValueError):
LOG.error('Invalid disk %s: %s', attr, self.details[attr])
self.details[attr] = -1
# Set description
self.description = (
f'{bytes_to_string(self.details["size"], use_binary=False)}'
f' ({self.details["bus"]})'
f' {self.details["model"]}'
f' {self.details["serial"]}'
)
def get_labels(self):
"""Build list of labels for this disk, returns list."""
labels = []
# Add all labels from lsblk
for disk in [self.details, *self.details.get('children', [])]:
labels.append(disk.get('label', ''))
labels.append(disk.get('partlabel', ''))
# Remove empty labels
labels = [str(label) for label in labels if label]
# Done
return labels
def get_smart_self_test_details(self):
"""Shorthand to get deeply nested self-test details, returns dict."""
details = {}
try:
details = self.smartctl['ata_smart_data']['self_test']
except (KeyError, TypeError):
# Assuming disk lacks SMART support, ignore and return empty dict.
pass
# Done
return details
def is_4k_aligned(self):
"""Check that all disk partitions are aligned, returns bool."""
aligned = True
if PLATFORM == 'Darwin':
aligned = is_4k_aligned_macos(self.details)
elif PLATFORM == 'Linux':
aligned = is_4k_aligned_linux(self.path, self.details['phy-sec'])
return aligned
def safety_checks(self):
"""Run safety checks and raise an exception if necessary."""
blocking_event_encountered = False
self.update_smart_details()
# Attributes
if not self.check_attributes(only_blocking=True):
blocking_event_encountered = True
LOG.error('%s: Blocked for failing attribute(s)', self.path)
# NVMe status
nvme_status = self.smartctl.get('smart_status', {}).get('nvme', {})
if nvme_status.get('media_read_only', False):
blocking_event_encountered = True
msg = 'Media has been placed in read-only mode'
self.add_note(msg, 'RED')
LOG.error('%s %s', self.path, msg)
for key in NVME_WARNING_KEYS:
if nvme_status.get(key, False):
msg = key.replace('_', ' ')
self.add_note(msg, 'YELLOW')
LOG.warning('%s %s', self.path, msg)
# SMART overall assessment
smart_passed = True
try:
smart_passed = self.smartctl['smart_status']['passed']
except (KeyError, TypeError):
# Assuming disk doesn't support SMART overall assessment
pass
if not smart_passed:
blocking_event_encountered = True
msg = 'SMART overall self-assessment: Failed'
self.add_note(msg, 'RED')
LOG.error('%s %s', self.path, msg)
# Raise blocking exception if necessary
if blocking_event_encountered:
raise CriticalHardwareError(f'Critical error(s) for: {self.path}')
# SMART self-test status
test_details = self.get_smart_self_test_details()
if 'remaining_percent' in test_details.get('status', ''):
msg = f'SMART self-test in progress for: {self.path}'
LOG.error(msg)
raise SMARTSelfTestInProgressError(msg)
def run_self_test(self, log_path):
"""Run disk self-test and check if it passed, returns bool.
NOTE: This function is here to reserve a place for future
NVMe self-tests announced in NVMe spec v1.3.
"""
result = self.run_smart_self_test(log_path)
return result
def run_smart_self_test(self, log_path):
"""Run SMART self-test and check if it passed, returns bool.
NOTE: An exception will be raised if the disk lacks SMART support.
"""
finished = False
result = None
started = False
status_str = 'Starting self-test...'
test_details = self.get_smart_self_test_details()
test_minutes = 15
size_str = bytes_to_string(self.details["size"], use_binary=False)
header_str = color_string(
['[', self.path.name, ' ', size_str, ']'],
[None, 'BLUE', None, 'CYAN', None],
sep='',
)
# Check if disk supports self-tests
if not test_details:
raise SMARTNotSupportedError(
f'SMART self-test not supported for {self.path}')
# Get real test length
test_minutes = test_details.get('polling_minutes', {}).get('short', 5)
test_minutes = int(test_minutes) + 10
# Start test
with open(log_path, 'w', encoding='utf-8') as _f:
_f.write(f'{header_str}\nInitializing...')
cmd = [
'sudo',
'smartctl',
'--tolerance=normal',
'--test=short',
self.path,
]
run_program(cmd, check=False)
# Monitor progress (in five second intervals)
for _i in range(int(test_minutes*60/5)):
sleep(5)
# Update status
self.update_smart_details()
test_details = self.get_smart_self_test_details()
# Check test progress
if started:
status_str = test_details.get('status', {}).get('string', 'Unknown')
status_str = status_str.capitalize()
# Update log
with open(log_path, 'w', encoding='utf-8') as _f:
_f.write(f'{header_str}\nSMART self-test status:\n {status_str}')
# Check if finished
if 'remaining_percent' not in test_details.get('status', {}):
finished = True
break
elif 'remaining_percent' in test_details.get('status', {}):
started = True
elif _i * 5 >= SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS:
# Test didn't start within limit, stop waiting
break
# Check result
if finished:
result = test_details.get('status', {}).get('passed', False)
elif started:
raise TimeoutError(f'SMART self-test timed out for {self.path}')
# Done
return result
def update_smart_details(self, use_sat=False):
"""Update SMART details via smartctl."""
self.attributes = {}
# Check if SAT is needed
if not use_sat:
# use_sat not set, check previous run (if possible)
for arg in self.smartctl.get('smartctl', {}).get('argv', []):
if arg == '--device=sat,auto':
use_sat = True
break
# Get SMART data
cmd = [
'sudo',
'smartctl',
f'--device={"sat,auto" if use_sat else "auto"}',
'--tolerance=verypermissive',
'--all',
'--json',
self.path,
]
self.smartctl = get_json_from_command(cmd, check=False)
# Check for attributes
if KEY_NVME in self.smartctl:
for name, value in self.smartctl[KEY_NVME].items():
try:
self.attributes[name] = {
'name': name,
'raw': int(value),
'raw_str': str(value),
}
except (TypeError, ValueError):
# Ignoring invalid attribute
LOG.error('Invalid NVMe attribute: %s %s', name, value)
elif KEY_SMART in self.smartctl:
for attribute in self.smartctl[KEY_SMART].get('table', {}):
try:
_id = int(attribute['id'])
except (KeyError, ValueError):
# Ignoring invalid attribute
LOG.error('Invalid SMART attribute: %s', attribute)
continue
name = str(attribute.get('name', 'Unknown')).replace('_', ' ').title()
raw = int(attribute.get('raw', {}).get('value', -1))
raw_str = attribute.get('raw', {}).get('string', 'Unknown')
# Fix power-on time
match = REGEX_POWER_ON_TIME.match(raw_str)
if _id == 9 and match:
raw = int(match.group(1))
# Add to dict
self.attributes[_id] = {
'name': name, 'raw': raw, 'raw_str': raw_str}
# Add note if necessary
if not self.attributes:
self.add_note('No NVMe or SMART data available', 'YELLOW')
class Test():
# pylint: disable=too-few-public-methods
"""Object for tracking test specific data."""
def __init__(self, dev, label):
self.dev = dev
self.disabled = False
self.failed = False
self.label = label
self.passed = False
self.report = []
self.status = 'Pending'
def set_status(self, status):
"""Update status string."""
if self.disabled:
# Don't change status if disabled
return
self.status = status
# Functions
def get_disk_details_linux(path):
"""Get disk details using lsblk, returns dict."""
cmd = ['lsblk', '--bytes', '--json', '--output-all', '--paths', path]
json_data = get_json_from_command(cmd, check=False)
details = json_data.get('blockdevices', [{}])[0]
# Fix details
for dev in [details, *details.get('children', [])]:
dev['bus'] = dev.pop('tran', '???')
dev['parent'] = dev.pop('pkname', None)
dev['ssd'] = not dev.pop('rota', True)
if 'loop' in str(path) and dev['bus'] is None:
dev['bus'] = 'Image'
dev['model'] = ''
dev['serial'] = ''
# Done
return details
def get_disk_details_macos(path):
"""Get disk details using diskutil, returns dict."""
details = {}
# Get "list" details
cmd = ['diskutil', 'list', '-plist', path]
proc = run_program(cmd, check=False, encoding=None, errors=None)
try:
plist_data = plistlib.loads(proc.stdout)
except (TypeError, ValueError):
# Invalid / corrupt plist data? return empty dict to avoid crash
LOG.error('Failed to get diskutil list for %s', path)
return details
# Parse "list" details
details = plist_data.get('AllDisksAndPartitions', [{}])[0]
details['children'] = details.pop('Partitions', [])
details['path'] = path
for child in details['children']:
child['path'] = path.with_name(child.get('DeviceIdentifier', 'null'))
# Get "info" details
for dev in [details, *details['children']]:
cmd = ['diskutil', 'info', '-plist', dev['path']]
proc = run_program(cmd, check=False, encoding=None, errors=None)
try:
plist_data = plistlib.loads(proc.stdout)
except (TypeError, ValueError):
LOG.error('Failed to get diskutil info for %s', path)
continue #Skip
# Parse "info" details
dev.update(plist_data)
dev['bus'] = dev.pop('BusProtocol', '???')
dev['fstype'] = dev.pop('FilesystemType', '')
dev['label'] = dev.pop('VolumeName', '')
dev['model'] = dev.pop('MediaName', 'Unknown')
dev['mountpoint'] = dev.pop('MountPoint', '')
dev['name'] = dev.get('name', str(dev['path']))
dev['phy-sec'] = dev.pop('DeviceBlockSize', 512)
dev['serial'] = get_disk_serial_macos(dev['path'])
dev['size'] = dev.pop('Size', -1)
dev['ssd'] = dev.pop('SolidState', False)
dev['vendor'] = ''
if dev.get('WholeDisk', True):
dev['parent'] = None
else:
dev['parent'] = dev.pop('ParentWholeDisk', None)
# Fix details if main dev is a child
for child in details['children']:
if path == child['path']:
for key in ('fstype', 'label', 'name', 'size'):
details[key] = child[key]
break
# Done
return details
def get_disk_serial_macos(path):
"""Get disk serial using system_profiler, returns str."""
cmd = ['sudo', 'smartctl', '--info', '--json', path]
smart_info = get_json_from_command(cmd)
return smart_info.get('serial_number', 'Unknown Serial')
def get_disks(skip_kits=False):
"""Get disks using OS-specific methods, returns list."""
disks = []
if PLATFORM == 'Darwin':
disks = get_disks_macos()
elif PLATFORM == 'Linux':
disks = get_disks_linux()
# Skip WK disks
if skip_kits:
disks = [
disk_obj for disk_obj in disks
if not any(
WK_LABEL_REGEX.search(label) for label in disk_obj.get_labels()
)
]
# Done
return disks
def get_disks_linux():
"""Get disks via lsblk, returns list."""
cmd = ['lsblk', '--json', '--nodeps', '--paths']
disks = []
# Add valid disks
json_data = get_json_from_command(cmd)
for disk in json_data.get('blockdevices', []):
disk_obj = Disk(disk['name'])
# Skip loopback devices, optical devices, etc
if disk_obj.details['type'] != 'disk':
continue
# Add disk
disks.append(disk_obj)
# Done
return disks
def get_disks_macos():
"""Get disks via diskutil, returns list."""
cmd = ['diskutil', 'list', '-plist', 'physical']
disks = []
# Get info from diskutil
proc = run_program(cmd, encoding=None, errors=None, check=False)
if proc.returncode != 0:
# Assuming we're running on an older macOS version
cmd.pop(-1)
proc = run_program(cmd, encoding=None, errors=None, check=False)
# Parse plist data
try:
plist_data = plistlib.loads(proc.stdout)
except (TypeError, ValueError):
# Invalid / corrupt plist data? return empty list to avoid crash
LOG.error('Failed to get diskutil list')
return disks
# Add valid disks
for disk in plist_data['WholeDisks']:
disks.append(Disk(f'/dev/{disk}'))
# Remove virtual disks
# TODO: Test more to figure out why some drives are being marked 'Unknown'
disks = [
d for d in disks if d.details.get('VirtualOrPhysical') != 'Virtual'
]
# Done
return disks
def get_known_disk_attributes(model):
"""Get known NVMe/SMART attributes (model specific), returns str."""
known_attributes = KNOWN_DISK_ATTRIBUTES.copy()
# Apply model-specific data
for regex, data in KNOWN_DISK_MODELS.items():
if re.search(regex, model):
for attr, thresholds in data.items():
if attr in known_attributes:
known_attributes[attr].update(thresholds)
else:
known_attributes[attr] = thresholds
# Done
return known_attributes
def get_ram_list_linux():
"""Get RAM list using dmidecode."""
cmd = ['sudo', 'dmidecode', '--type', 'memory']
dimm_list = []
manufacturer = 'Unknown'
size = 0
# Get DMI data
proc = run_program(cmd)
dmi_data = proc.stdout.splitlines()
# Parse data
for line in dmi_data:
line = line.strip()
if line == 'Memory Device':
# Reset vars
manufacturer = 'Unknown'
size = 0
elif line.startswith('Size:'):
size = line.replace('Size: ', '')
try:
size = string_to_bytes(size, assume_binary=True)
except ValueError:
# Assuming empty module
size = 0
elif line.startswith('Manufacturer:'):
manufacturer = line.replace('Manufacturer: ', '')
dimm_list.append([size, manufacturer])
# Save details
return dimm_list
def get_ram_list_macos():
"""Get RAM list using system_profiler."""
dimm_list = []
# Get and parse plist data
cmd = [
'system_profiler',
'-xml',
'SPMemoryDataType',
]
proc = run_program(cmd, check=False, encoding=None, errors=None)
try:
plist_data = plistlib.loads(proc.stdout)
except (TypeError, ValueError):
# Ignore and return an empty list
return dimm_list
# Check DIMM data
dimm_details = plist_data[0].get('_items', [{}])[0].get('_items', [])
for dimm in dimm_details:
manufacturer = dimm.get('dimm_manufacturer', None)
manufacturer = KNOWN_RAM_VENDOR_IDS.get(
manufacturer,
f'Unknown ({manufacturer})')
size = dimm.get('dimm_size', '0 GB')
try:
size = string_to_bytes(size, assume_binary=True)
except ValueError:
# Empty DIMM?
LOG.error('Invalid DIMM size: %s', size)
continue
dimm_list.append([size, manufacturer])
# Save details
return dimm_list
def is_4k_aligned_macos(disk_details):
"""Check partition alignment using diskutil info, returns bool."""
aligned = True
# Check partitions
for part in disk_details.get('children', []):
offset = part.get('PartitionMapPartitionOffset', 0)
if not offset:
# Assuming offset couldn't be found and it defaulted to 0
# NOTE: Just logging the error, not bailing
LOG.error('Failed to get partition offset for %s', part['path'])
aligned = aligned and offset >= 0 and offset % 4096 == 0
# Done
return aligned
def is_4k_aligned_linux(dev_path, physical_sector_size):
"""Check partition alignment using lsblk, returns bool."""
aligned = True
cmd = [
'sudo',
'sfdisk',
'--json',
dev_path,
]
# Get partition details
json_data = get_json_from_command(cmd)
# Check partitions
for part in json_data.get('partitiontable', {}).get('partitions', []):
offset = physical_sector_size * part.get('start', -1)
aligned = aligned and offset >= 0 and offset % 4096 == 0
# Done
return aligned
if __name__ == '__main__':
print("This file is not meant to be called directly.")

View file

@ -0,0 +1,40 @@
"""WizardKit: Screensaver functions"""
# vim: sts=2 sw=2 ts=2
import logging
from subprocess import PIPE
from wk.exe import run_program
from wk.tmux import zoom_pane as tmux_zoom_pane
# STATIC VARIABLES
LOG = logging.getLogger(__name__)
# Functions
def screensaver(name) -> None:
"""Show screensaver"""
LOG.info('Screensaver (%s)', name)
if name == 'matrix':
cmd = ['cmatrix', '-abs']
elif name == 'pipes':
cmd = [
'pipes.sh',
'-t', '0',
'-t', '1',
'-t', '2',
'-t', '3',
'-t', '5',
'-R', '-r', '4000',
]
# Switch pane to fullscreen and start screensaver
tmux_zoom_pane()
run_program(cmd, check=False, pipe=False, stderr=PIPE)
tmux_zoom_pane()
if __name__ == '__main__':
print("This file is not meant to be called directly.")

View file

@ -7,6 +7,7 @@ import pathlib
import re
from subprocess import CalledProcessError
from typing import Any
from wk.cfg.hw import CPU_CRITICAL_TEMP, SMC_IDS, TEMP_COLORS
from wk.exe import run_program, start_thread
@ -39,14 +40,14 @@ class Sensors():
self.data = get_sensor_data()
self.out_path = None
def clear_temps(self):
def clear_temps(self) -> None:
"""Clear saved temps but keep structure"""
for adapters in self.data.values():
for sources in adapters.values():
for source_data in sources.values():
source_data['Temps'] = []
def cpu_max_temp(self):
def cpu_max_temp(self) -> float:
"""Get max temp from any CPU source, returns float.
NOTE: If no temps are found this returns zero.
@ -64,7 +65,7 @@ class Sensors():
# Done
return max_temp
def cpu_reached_critical_temp(self):
def cpu_reached_critical_temp(self) -> bool:
"""Check if CPU reached CPU_CRITICAL_TEMP, returns bool."""
for section, adapters in self.data.items():
if not section.startswith('CPU'):
@ -80,7 +81,8 @@ class Sensors():
# Didn't return above so temps are within the threshold
return False
def generate_report(self, *temp_labels, colored=True, only_cpu=False):
def generate_report(
self, *temp_labels, colored=True, only_cpu=False) -> list[str]:
"""Generate report based on given temp_labels, returns list."""
report = []
@ -117,7 +119,8 @@ class Sensors():
def monitor_to_file(
self, out_path, alt_max=None,
exit_on_thermal_limit=True, temp_labels=None, thermal_action=None):
exit_on_thermal_limit=True, temp_labels=None,
thermal_action=None) -> None:
# pylint: disable=too-many-arguments
"""Write report to path every second until stopped.
@ -151,13 +154,12 @@ class Sensors():
# Sleep before next loop
sleep(0.5)
def save_average_temps(self, temp_label, seconds=10):
# pylint: disable=unused-variable
def save_average_temps(self, temp_label, seconds=10) -> None:
"""Save average temps under temp_label over provided seconds.."""
self.clear_temps()
# Get temps
for i in range(seconds):
for _ in range(seconds):
self.update_sensor_data(exit_on_thermal_limit=False)
sleep(1)
@ -178,7 +180,8 @@ class Sensors():
def start_background_monitor(
self, out_path, alt_max=None,
exit_on_thermal_limit=True, temp_labels=None, thermal_action=None):
exit_on_thermal_limit=True, temp_labels=None,
thermal_action=None) -> None:
# pylint: disable=too-many-arguments
"""Start background thread to save report to file.
@ -195,7 +198,7 @@ class Sensors():
),
)
def stop_background_monitor(self):
def stop_background_monitor(self) -> None:
"""Stop background thread."""
self.out_path.with_suffix('.stop').touch()
self.background_thread.join()
@ -204,14 +207,16 @@ class Sensors():
self.background_thread = None
self.out_path = None
def update_sensor_data(self, alt_max=None, exit_on_thermal_limit=True):
def update_sensor_data(
self, alt_max=None, exit_on_thermal_limit=True) -> None:
"""Update sensor data via OS-specific means."""
if PLATFORM == 'Darwin':
self.update_sensor_data_macos(alt_max, exit_on_thermal_limit)
elif PLATFORM == 'Linux':
self.update_sensor_data_linux(alt_max, exit_on_thermal_limit)
def update_sensor_data_linux(self, alt_max, exit_on_thermal_limit=True):
def update_sensor_data_linux(
self, alt_max, exit_on_thermal_limit=True) -> None:
"""Update sensor data via lm_sensors."""
lm_sensor_data = get_sensor_data_lm()
for section, adapters in self.data.items():
@ -234,7 +239,8 @@ class Sensors():
if source_data['Current'] >= CPU_CRITICAL_TEMP:
raise ThermalLimitReachedError('CPU temps reached limit')
def update_sensor_data_macos(self, alt_max, exit_on_thermal_limit=True):
def update_sensor_data_macos(
self, alt_max, exit_on_thermal_limit=True) -> None:
"""Update sensor data via SMC."""
for section, adapters in self.data.items():
for sources in adapters.values():
@ -262,7 +268,7 @@ class Sensors():
# Functions
def fix_sensor_name(name):
def fix_sensor_name(name) -> str:
"""Cleanup sensor name, returns str."""
name = re.sub(r'^(\w+)-(\w+)-(\w+)', r'\1 (\2 \3)', name, re.IGNORECASE)
name = name.title()
@ -281,7 +287,7 @@ def fix_sensor_name(name):
return name
def get_sensor_data():
def get_sensor_data() -> dict[Any, Any]:
"""Get sensor data via OS-specific means, returns dict."""
sensor_data = {}
if PLATFORM == 'Darwin':
@ -292,7 +298,7 @@ def get_sensor_data():
return sensor_data
def get_sensor_data_linux():
def get_sensor_data_linux() -> dict[Any, Any]:
"""Get sensor data via lm_sensors, returns dict."""
raw_lm_sensor_data = get_sensor_data_lm()
sensor_data = {'CPUTemps': {}, 'Others': {}}
@ -333,7 +339,7 @@ def get_sensor_data_linux():
return sensor_data
def get_sensor_data_lm():
def get_sensor_data_lm() -> dict[Any, Any]:
"""Get raw sensor data via lm_sensors, returns dict."""
raw_lm_sensor_data = {}
cmd = ['sensors', '-j']
@ -364,7 +370,7 @@ def get_sensor_data_lm():
return raw_lm_sensor_data
def get_sensor_data_macos():
def get_sensor_data_macos() -> dict[Any, Any]:
"""Get sensor data via SMC, returns dict.
NOTE: The data is structured like the lm_sensor data.
@ -408,7 +414,7 @@ def get_sensor_data_macos():
return sensor_data
def get_temp_str(temp, colored=True):
def get_temp_str(temp, colored=True) -> str:
"""Get colored string based on temp, returns str."""
temp_color = None

437
scripts/wk/hw/smart.py Normal file
View file

@ -0,0 +1,437 @@
"""WizardKit: SMART test functions"""
# vim: sts=2 sw=2 ts=2
import logging
import re
from typing import Any
from wk.cfg.hw import (
ATTRIBUTE_COLORS,
KEY_NVME,
KEY_SMART,
KNOWN_DISK_ATTRIBUTES,
KNOWN_DISK_MODELS,
NVME_WARNING_KEYS,
REGEX_POWER_ON_TIME,
SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS,
)
from wk.exe import get_json_from_command, run_program
from wk.std import bytes_to_string, color_string, sleep
# STATIC VARIABLES
LOG = logging.getLogger(__name__)
# Functions
def abort_self_test(dev) -> None:
"""Abort currently running non-captive self-test."""
cmd = ['sudo', 'smartctl', '--abort', dev.path]
run_program(cmd, check=False)
def build_self_test_report(test_obj, aborted=False) -> None:
"""Check self-test results and build report (saved to test_obj).
NOTE: Not updating SMART data to preserve the result for the report.
For instance if the test was aborted the report should include the
last known progress instead of just "was aborted by host."
"""
report = [color_string('Self-Test', 'BLUE')]
test_details = get_smart_self_test_details(test_obj.dev)
test_result = test_details.get('status', {}).get('string', 'Unknown')
# Build report
if test_obj.disabled or test_obj.status == 'Denied':
report.append(color_string(f' {test_obj.status}', 'RED'))
elif test_obj.status == 'N/A' or not test_obj.dev.attributes:
report.append(color_string(f' {test_obj.status}', 'YELLOW'))
elif test_obj.status == 'TestInProgress':
report.append(color_string(' Failed to stop previous test', 'RED'))
test_obj.set_status('Failed')
else:
# Other cases include self-test result string
report.append(f' {test_result.capitalize()}')
if aborted and not (test_obj.passed or test_obj.failed):
report.append(color_string(' Aborted', 'YELLOW'))
test_obj.set_status('Aborted')
elif test_obj.status == 'TimedOut':
report.append(color_string(' TimedOut', 'YELLOW'))
# Done
test_obj.report.extend(report)
def check_attributes(dev, only_blocking=False) -> bool:
"""Check if any known attributes are failing, returns bool."""
attributes_ok = True
known_attributes = get_known_disk_attributes(dev.model)
for attr, value in dev.attributes.items():
# Skip unknown attributes
if attr not in known_attributes:
continue
# Get thresholds
blocking_attribute = known_attributes[attr].get('Blocking', False)
err_thresh = known_attributes[attr].get('Error', None)
max_thresh = known_attributes[attr].get('Maximum', None)
if not max_thresh:
max_thresh = float('inf')
# Skip non-blocking attributes if necessary
if only_blocking and not blocking_attribute:
continue
# Skip informational attributes
if not err_thresh:
continue
# Check attribute
if known_attributes[attr].get('PercentageLife', False):
if 0 <= value['raw'] <= err_thresh:
attributes_ok = False
elif err_thresh <= value['raw'] < max_thresh:
attributes_ok = False
# Done
return attributes_ok
def enable_smart(dev) -> None:
"""Try enabling SMART for this disk."""
cmd = [
'sudo',
'smartctl',
f'--device={"sat,auto" if dev.use_sat else "auto"}',
'--tolerance=permissive',
'--smart=on',
dev.path,
]
run_program(cmd, check=False)
def generate_attribute_report(dev) -> list[str]:
"""Generate attribute report, returns list."""
known_attributes = get_known_disk_attributes(dev.model)
report = []
for attr, value in sorted(dev.attributes.items()):
note = ''
value_color = 'GREEN'
# Skip attributes not in our list
if attr not in known_attributes:
continue
# Check for attribute note
note = known_attributes[attr].get('Note', '')
# ID / Name
label = f'{attr:>3}'
if isinstance(attr, int):
# Assuming SMART, include hex ID and name
label += f' / {str(hex(attr))[2:].upper():0>2}: {value["name"]}'
label = f' {label.replace("_", " "):38}'
# Value color
if known_attributes[attr].get('PercentageLife', False):
# PercentageLife values
if 0 <= value['raw'] <= known_attributes[attr]['Error']:
value_color = 'RED'
note = '(failed, % life remaining)'
elif value['raw'] < 0 or value['raw'] > 100:
value_color = 'PURPLE'
note = '(invalid?)'
else:
for threshold, color in ATTRIBUTE_COLORS:
threshold_val = known_attributes[attr].get(threshold, None)
if threshold_val and value['raw'] >= threshold_val:
value_color = color
if threshold == 'Error':
note = '(failed)'
elif threshold == 'Maximum':
note = '(invalid?)'
# 199/C7 warning
if str(attr) == '199' and value['raw'] > 0:
note = '(bad cable?)'
# Build colored string and append to report
line = color_string(
[label, value['raw_str'], note],
[None, value_color, 'YELLOW'],
)
report.append(line)
# Done
return report
def get_known_disk_attributes(model) -> dict[Any, dict]:
"""Get known NVMe/SMART attributes (model specific), returns dict."""
known_attributes = KNOWN_DISK_ATTRIBUTES.copy()
# Apply model-specific data
for regex, data in KNOWN_DISK_MODELS.items():
if re.search(regex, model):
for attr, thresholds in data.items():
if attr in known_attributes:
known_attributes[attr].update(thresholds)
else:
known_attributes[attr] = thresholds
# Done
return known_attributes
def get_smart_self_test_details(dev) -> dict[Any, Any]:
"""Shorthand to get deeply nested self-test details, returns dict."""
details = {}
try:
details = dev.raw_smartctl['ata_smart_data']['self_test']
except (KeyError, TypeError):
# Assuming disk lacks SMART support, ignore and return empty dict.
pass
# Done
return details
def monitor_smart_self_test(test_obj, header_str, log_path) -> bool:
"""Monitor SMART self-test status and update test_obj, returns bool."""
started = False
finished = False
status_str = 'Starting self-test...'
test_details = get_smart_self_test_details(test_obj.dev)
test_minutes = 15
# Get real test length
test_minutes = test_details.get('polling_minutes', {}).get('short', 5)
test_minutes = int(test_minutes) + 10
# Monitor progress (in five second intervals)
for _i in range(int(test_minutes*60/5)):
sleep(5)
# Update log
## NOTE: This is run at least once with the default "Starting..." status
with open(log_path, 'w', encoding='utf-8') as _f:
_f.write(f'{header_str}\nSMART self-test status:\n {status_str}')
# Update status
update_smart_details(test_obj.dev)
test_details = get_smart_self_test_details(test_obj.dev)
# Check if test started
started = started or 'remaining_percent' in test_details.get('status', {})
if not started:
if _i * 5 >= SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS:
# Test didn't start within limit, stop waiting
abort_self_test(test_obj.dev)
test_obj.failed = True
test_obj.set_status('TimedOut')
break
# Still within starting limit, continue to next loop
continue
# Check test progress
status_str = test_details.get('status', {}).get('string', 'Unknown')
status_str = status_str.capitalize()
# Check if finished
if 'remaining_percent' not in test_details.get('status', {}):
finished = True
break
# Done
return finished
def run_self_test(test_obj, log_path) -> None:
"""Run disk self-test and update test results.
NOTE: This function is here to reserve a place for future
NVMe self-tests announced in NVMe spec v1.3.
"""
run_smart_self_test(test_obj, log_path)
def run_smart_self_test(test_obj, log_path) -> bool:
"""Run SMART self-test and check if it passed, returns bool.
NOTE: An exception will be raised if the disk lacks SMART support.
"""
finished = False
test_details = get_smart_self_test_details(test_obj.dev)
size_str = bytes_to_string(test_obj.dev.size, use_binary=False)
header_str = color_string(
['[', test_obj.dev.path.name, ' ', size_str, ']'],
[None, 'BLUE', None, 'CYAN', None],
sep='',
)
# Check if disk supports self-tests
if not test_details:
# Mark test as passed since it doesn't apply
test_obj.passed = True
test_obj.set_status('N/A')
build_self_test_report(test_obj)
return
# Update status
with open(log_path, 'w', encoding='utf-8') as _f:
_f.write(f'{header_str}\nInitializing...')
# Check for, and stop, self-test if currently in-progress
if self_test_in_progress(test_obj.dev):
abort_self_test(test_obj.dev)
for _ in range(6):
# Wait up to a minute for current test to exit
sleep(10)
update_smart_details(test_obj.dev)
if not self_test_in_progress(test_obj.dev):
break
# Recheck if self-test is in-progress, bail if so
if self_test_in_progress(test_obj.dev):
test_obj.failed = True
test_obj.set_status('TestInProgress')
build_self_test_report(test_obj)
return
# Start test
cmd = [
'sudo',
'smartctl',
'--tolerance=normal',
'--test=short',
test_obj.dev.path,
]
run_program(cmd, check=False)
# Monitor progress
finished = monitor_smart_self_test(test_obj, header_str, log_path)
# Check result
if finished:
test_obj.passed = test_details.get('status', {}).get('passed', False)
test_obj.failed = test_obj.failed or not test_obj.passed
# Set status
if test_obj.failed and test_obj.status != 'TimedOut':
test_obj.set_status('Failed')
elif test_obj.passed:
test_obj.set_status('Passed')
else:
test_obj.set_status('Unknown')
# Build report
build_self_test_report(test_obj)
def smart_status_ok(dev) -> bool:
"""Check SMART attributes and overall assessment, returns bool."""
blocking_event_encountered = False
update_smart_details(dev)
# Attributes
if not check_attributes(dev, only_blocking=True):
blocking_event_encountered = True
LOG.error('%s: Blocked for failing attribute(s)', dev.path)
# NVMe status
nvme_status = dev.raw_smartctl.get('smart_status', {}).get('nvme', {})
if nvme_status.get('media_read_only', False):
blocking_event_encountered = True
msg = 'Media has been placed in read-only mode'
dev.add_note(msg, 'RED')
LOG.error('%s %s', dev.path, msg)
for key in NVME_WARNING_KEYS:
if nvme_status.get(key, False):
msg = key.replace('_', ' ')
dev.add_note(msg, 'YELLOW')
LOG.warning('%s %s', dev.path, msg)
# SMART overall assessment
smart_passed = True
try:
smart_passed = dev.raw_smartctl['smart_status']['passed']
except (KeyError, TypeError):
# Assuming disk doesn't support SMART overall assessment
pass
if not smart_passed:
blocking_event_encountered = True
msg = 'SMART overall self-assessment: Failed'
dev.add_note(msg, 'RED')
LOG.error('%s %s', dev.path, msg)
# Done
return not blocking_event_encountered
def self_test_in_progress(dev) -> bool:
"""Check if SMART self-test is in progress, returns bool."""
test_details = get_smart_self_test_details(dev)
return 'remaining_percent' in test_details.get('status', '')
def update_smart_details(dev) -> None:
"""Update SMART details via smartctl."""
updated_attributes = {}
# Get SMART data
cmd = [
'sudo',
'smartctl',
f'--device={"sat,auto" if dev.use_sat else "auto"}',
'--tolerance=verypermissive',
'--all',
'--json',
dev.path,
]
dev.raw_smartctl = get_json_from_command(cmd, check=False)
# Check for attributes
if KEY_NVME in dev.raw_smartctl:
for name, value in dev.raw_smartctl[KEY_NVME].items():
try:
updated_attributes[name] = {
'name': name,
'raw': int(value),
'raw_str': str(value),
}
except (TypeError, ValueError):
# Ignoring invalid attribute
LOG.error('Invalid NVMe attribute: %s %s', name, value)
elif KEY_SMART in dev.raw_smartctl:
for attribute in dev.raw_smartctl[KEY_SMART].get('table', {}):
try:
_id = int(attribute['id'])
except (KeyError, ValueError):
# Ignoring invalid attribute
LOG.error('Invalid SMART attribute: %s', attribute)
continue
name = str(attribute.get('name', 'Unknown')).replace('_', ' ').title()
raw = int(attribute.get('raw', {}).get('value', -1))
raw_str = attribute.get('raw', {}).get('string', 'Unknown')
# Fix power-on time
match = REGEX_POWER_ON_TIME.match(raw_str)
if _id == 9 and match:
raw = int(match.group(1))
# Add to dict
updated_attributes[_id] = {
'name': name, 'raw': raw, 'raw_str': raw_str}
# Add note if necessary
if not updated_attributes:
dev.add_note('No NVMe or SMART data available', 'YELLOW')
# Done
dev.attributes.update(updated_attributes)
if __name__ == '__main__':
print("This file is not meant to be called directly.")

View file

@ -0,0 +1,93 @@
"""WizardKit: Surface scan test functions"""
# vim: sts=2 sw=2 ts=2
import logging
from subprocess import STDOUT
from wk.cfg.hw import (
BADBLOCKS_LARGE_DISK,
BADBLOCKS_REGEX,
BADBLOCKS_SKIP_REGEX,
)
from wk.exe import run_program
from wk.std import (
PLATFORM,
bytes_to_string,
color_string,
strip_colors,
)
# STATIC VARIABLES
LOG = logging.getLogger(__name__)
# Functions
def check_surface_scan_results(test_obj, log_path) -> None:
"""Check results and set test status."""
with open(log_path, 'r', encoding='utf-8') as _f:
for line in _f.readlines():
line = strip_colors(line.strip())
if not line or BADBLOCKS_SKIP_REGEX.match(line):
# Skip
continue
match = BADBLOCKS_REGEX.search(line)
if match:
if all(s == '0' for s in match.groups()):
test_obj.passed = True
test_obj.report.append(f' {line}')
test_obj.set_status('Passed')
else:
test_obj.failed = True
test_obj.report.append(f' {color_string(line, "YELLOW")}')
test_obj.set_status('Failed')
else:
test_obj.report.append(f' {color_string(line, "YELLOW")}')
if not (test_obj.passed or test_obj.failed):
test_obj.set_status('Unknown')
def run_scan(test_obj, log_path) -> None:
"""Run surface scan and handle exceptions."""
block_size = '1024'
dev = test_obj.dev
dev_path = test_obj.dev.path
if PLATFORM == 'Darwin':
# Use "RAW" disks under macOS
dev_path = dev_path.with_name(f'r{dev_path.name}')
LOG.info('Using %s for better performance', dev_path)
test_obj.report.append(color_string('badblocks', 'BLUE'))
test_obj.set_status('Working')
# Increase block size if necessary
if (dev.phy_sec == 4096
or dev.size >= BADBLOCKS_LARGE_DISK):
block_size = '4096'
# Start scan
cmd = ['sudo', 'badblocks', '-sv', '-b', block_size, '-e', '1', dev_path]
with open(log_path, 'a', encoding='utf-8') as _f:
size_str = bytes_to_string(dev.size, use_binary=False)
_f.write(
color_string(
['[', dev.path.name, ' ', size_str, ']\n'],
[None, 'BLUE', None, 'CYAN', None],
sep='',
),
)
_f.flush()
run_program(
cmd,
check=False,
pipe=False,
stderr=STDOUT,
stdout=_f,
)
# Check results
check_surface_scan_results(test_obj, log_path)
if __name__ == '__main__':
print("This file is not meant to be called directly.")

183
scripts/wk/hw/system.py Normal file
View file

@ -0,0 +1,183 @@
"""WizardKit: System object and functions"""
# vim: sts=2 sw=2 ts=2
import logging
import plistlib
import re
from dataclasses import dataclass, field
from typing import Any
from wk.cfg.hw import KNOWN_RAM_VENDOR_IDS
from wk.exe import get_json_from_command, run_program
from wk.hw.test import Test
from wk.std import (
PLATFORM,
bytes_to_string,
color_string,
string_to_bytes,
)
# STATIC VARIABLES
LOG = logging.getLogger(__name__)
@dataclass(slots=True)
class System:
"""Object for tracking system specific hardware data."""
cpu_description: str = field(init=False)
ram_dimms: list[str] = field(init=False, default_factory=list)
ram_total: str = field(init=False, default='Unknown')
raw_details: dict[Any, Any] = field(init=False, default_factory=dict)
tests: list[Test] = field(init=False, default_factory=list)
def __post_init__(self) -> None:
self.get_cpu_details()
self.set_cpu_description()
self.get_ram_details()
def generate_report(self) -> list[str]:
"""Generate CPU & RAM report, returns list."""
report = []
report.append(color_string('Device', 'BLUE'))
report.append(f' {self.cpu_description}')
# Include RAM details
report.append(color_string('RAM', 'BLUE'))
report.append(f' {self.ram_total} ({", ".join(self.ram_dimms)})')
# Tests
for test in self.tests:
report.extend(test.report)
return report
def get_cpu_details(self) -> None:
"""Get CPU details using OS specific methods."""
cmd = ['lscpu', '--json']
# Bail early
if PLATFORM != 'Linux':
# Only Linux is supported ATM
return
# Parse details
json_data = get_json_from_command(cmd)
for line in json_data.get('lscpu', [{}]):
_field = line.get('field', '').replace(':', '')
_data = line.get('data', '')
if not (_field or _data):
# Skip
continue
self.raw_details[_field] = _data
def get_ram_details(self) -> None:
"""Get RAM details using OS specific methods."""
if PLATFORM == 'Darwin':
dimm_list = get_ram_list_macos()
elif PLATFORM == 'Linux':
dimm_list = get_ram_list_linux()
details = {'Total': 0}
for dimm_details in dimm_list:
size, manufacturer = dimm_details
if size <= 0:
# Skip empty DIMMs
continue
description = f'{bytes_to_string(size)} {manufacturer}'
details['Total'] += size
if description in details:
details[description] += 1
else:
details[description] = 1
# Save details
self.ram_total = bytes_to_string(details.pop('Total', 0))
self.ram_dimms = [
f'{count}x {desc}' for desc, count in sorted(details.items())
]
def set_cpu_description(self) -> None:
"""Set CPU description."""
self.cpu_description = self.raw_details.get('Model name', 'Unknown CPU')
# macOS
if PLATFORM == 'Darwin':
cmd = 'sysctl -n machdep.cpu.brand_string'.split()
proc = run_program(cmd, check=False)
self.cpu_description = re.sub(r'\s+', ' ', proc.stdout.strip())
def get_ram_list_linux() -> list[list]:
"""Get RAM list using dmidecode."""
cmd = ['sudo', 'dmidecode', '--type', 'memory']
dimm_list = []
manufacturer = 'Unknown'
size = 0
# Get DMI data
proc = run_program(cmd)
dmi_data = proc.stdout.splitlines()
# Parse data
for line in dmi_data:
line = line.strip()
if line == 'Memory Device':
# Reset vars
manufacturer = 'Unknown'
size = 0
elif line.startswith('Size:'):
size = line.replace('Size: ', '')
try:
size = string_to_bytes(size, assume_binary=True)
except ValueError:
# Assuming empty module
size = 0
elif line.startswith('Manufacturer:'):
manufacturer = line.replace('Manufacturer: ', '')
dimm_list.append([size, manufacturer])
# Save details
return dimm_list
def get_ram_list_macos() -> list[list]:
"""Get RAM list using system_profiler."""
dimm_list = []
# Get and parse plist data
cmd = [
'system_profiler',
'-xml',
'SPMemoryDataType',
]
proc = run_program(cmd, check=False, encoding=None, errors=None)
try:
plist_data = plistlib.loads(proc.stdout)
except (TypeError, ValueError):
# Ignore and return an empty list
return dimm_list
# Check DIMM data
dimm_details = plist_data[0].get('_items', [{}])[0].get('_items', [])
for dimm in dimm_details:
manufacturer = dimm.get('dimm_manufacturer', None)
manufacturer = KNOWN_RAM_VENDOR_IDS.get(
manufacturer,
f'Unknown ({manufacturer})')
size = dimm.get('dimm_size', '0 GB')
try:
size = string_to_bytes(size, assume_binary=True)
except ValueError:
# Empty DIMM?
LOG.error('Invalid DIMM size: %s', size)
continue
dimm_list.append([size, manufacturer])
# Save details
return dimm_list
if __name__ == '__main__':
print("This file is not meant to be called directly.")

35
scripts/wk/hw/test.py Normal file
View file

@ -0,0 +1,35 @@
"""WizardKit: Test object and functions"""
# vim: sts=2 sw=2 ts=2
from dataclasses import dataclass, field
from typing import Any, Callable
@dataclass(slots=True)
class Test:
# pylint: disable=too-many-instance-attributes
"""Object for tracking test specific data."""
dev: Any
label: str
name: str
disabled: bool = field(init=False, default=False)
failed: bool = field(init=False, default=False)
hidden: bool = False
passed: bool = field(init=False, default=False)
report: list[str] = field(init=False, default_factory=list)
status: str = field(init=False, default='Pending')
def set_status(self, status) -> None:
"""Update status string."""
if self.disabled:
# Don't change status if disabled
return
self.status = status
@dataclass(slots=True)
class TestGroup:
"""Object for tracking groups of tests."""
name: str
function: Callable
test_objects: list[Test] = field(default_factory=list)

View file

@ -9,7 +9,7 @@ import subprocess
from wk import std
from wk.exe import popen_program, run_program
from wk.hw.obj import Disk
from wk.hw.disk import Disk
from wk.log import format_log_path