diff --git a/scripts/wk/cfg/hw.py b/scripts/wk/cfg/hw.py index ed040437..65fa156a 100644 --- a/scripts/wk/cfg/hw.py +++ b/scripts/wk/cfg/hw.py @@ -60,9 +60,15 @@ KNOWN_RAM_VENDOR_IDS = { '0xAD00': 'Hynix', '0xCE00': 'Samsung', } +NVME_WARNING_KEYS = ( + 'spare_below_threshold', + 'reliability_degraded', + 'volatile_memory_backup_failed', + ) REGEX_POWER_ON_TIME = re.compile( r'^(\d+)([Hh].*|\s+\(\d+\s+\d+\s+\d+\).*)' ) +SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS = 120 SMC_IDS = { # Sources: https://github.com/beltex/SMCKit/blob/master/SMCKit/SMC.swift # http://www.opensource.apple.com/source/net_snmp/ diff --git a/scripts/wk/hw/__init__.py b/scripts/wk/hw/__init__.py index 741d3151..092a7428 100644 --- a/scripts/wk/hw/__init__.py +++ b/scripts/wk/hw/__init__.py @@ -2,5 +2,7 @@ from . import ddrescue from . import diags -from . import obj +from . import disk from . import sensors +from . import system +from . import test diff --git a/scripts/wk/hw/obj.py b/scripts/wk/hw/disk.py similarity index 64% rename from scripts/wk/hw/obj.py rename to scripts/wk/hw/disk.py index d6923432..5703e1d2 100644 --- a/scripts/wk/hw/obj.py +++ b/scripts/wk/hw/disk.py @@ -1,4 +1,4 @@ -"""WizardKit: Hardware objects (mostly)""" +"""WizardKit: Disk object and functions""" # vim: sts=2 sw=2 ts=2 import logging @@ -6,7 +6,8 @@ import pathlib import plistlib import re -from collections import OrderedDict +from dataclasses import dataclass, field +from typing import Any, Union from wk.cfg.hw import ( ATTRIBUTE_COLORS, @@ -14,28 +15,23 @@ from wk.cfg.hw import ( KEY_SMART, KNOWN_DISK_ATTRIBUTES, KNOWN_DISK_MODELS, - KNOWN_RAM_VENDOR_IDS, + NVME_WARNING_KEYS, REGEX_POWER_ON_TIME, + SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS, ) from wk.cfg.main import KIT_NAME_SHORT from wk.exe import get_json_from_command, run_program +from wk.hw.test import Test from wk.std import ( PLATFORM, bytes_to_string, color_string, sleep, - string_to_bytes, ) # STATIC VARIABLES LOG = logging.getLogger(__name__) -NVME_WARNING_KEYS = ( - 'spare_below_threshold', - 'reliability_degraded', - 'volatile_memory_backup_failed', - ) -SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS = 120 WK_LABEL_REGEX = re.compile( fr'{KIT_NAME_SHORT}_(LINUX|UFD)', re.IGNORECASE, @@ -54,130 +50,50 @@ class SMARTSelfTestInProgressError(RuntimeError): # Classes -class BaseObj(): - """Base object for tracking device data.""" - def __init__(self): - self.tests = OrderedDict() - - def all_tests_passed(self): - """Check if all tests passed, returns bool.""" - return all(results.passed for results in self.tests.values()) - - def any_test_failed(self): - """Check if any test failed, returns bool.""" - return any(results.failed for results in self.tests.values()) - - -class CpuRam(BaseObj): - """Object for tracking CPU & RAM specific data.""" - def __init__(self): - super().__init__() - self.description = 'Unknown' - self.details = {} - self.ram_total = 'Unknown' - self.ram_dimms = [] - self.tests = OrderedDict() - - # Update details - self.get_cpu_details() - self.get_ram_details() - - def generate_report(self): - """Generate CPU & RAM report, returns list.""" - report = [] - report.append(color_string('Device', 'BLUE')) - report.append(f' {self.description}') - - # Include RAM details - report.append(color_string('RAM', 'BLUE')) - report.append(f' {self.ram_total} ({", ".join(self.ram_dimms)})') - - # Tests - for test in self.tests.values(): - report.extend(test.report) - - return report - - def get_cpu_details(self): - """Get CPU details using OS specific methods.""" - if PLATFORM == 'Darwin': - cmd = 'sysctl -n machdep.cpu.brand_string'.split() - proc = run_program(cmd, check=False) - self.description = re.sub(r'\s+', ' ', proc.stdout.strip()) - elif PLATFORM == 'Linux': - cmd = ['lscpu', '--json'] - json_data = get_json_from_command(cmd) - for line in json_data.get('lscpu', [{}]): - _field = line.get('field', '').replace(':', '') - _data = line.get('data', '') - if not (_field or _data): - # Skip - continue - self.details[_field] = _data - - self.description = self.details.get('Model name', '') - - # Replace empty description - if not self.description: - self.description = 'Unknown CPU' - - def get_ram_details(self): - """Get RAM details using OS specific methods.""" - if PLATFORM == 'Darwin': - dimm_list = get_ram_list_macos() - elif PLATFORM == 'Linux': - dimm_list = get_ram_list_linux() - - details = {'Total': 0} - for dimm_details in dimm_list: - size, manufacturer = dimm_details - if size <= 0: - # Skip empty DIMMs - continue - description = f'{bytes_to_string(size)} {manufacturer}' - details['Total'] += size - if description in details: - details[description] += 1 - else: - details[description] = 1 - - # Save details - self.ram_total = bytes_to_string(details.pop('Total', 0)) - self.ram_dimms = [ - f'{count}x {desc}' for desc, count in sorted(details.items()) - ] - - -class Disk(BaseObj): +@dataclass(slots=True) +class Disk: + # pylint: disable=too-many-instance-attributes """Object for tracking disk specific data.""" - def __init__(self, path): - super().__init__() - self.attributes = {} - self.description = 'Unknown' - self.details = {} - self.notes = [] - self.path = pathlib.Path(path).resolve() - self.smartctl = {} - self.tests = OrderedDict() + attributes: dict[Any, dict] = field(init=False, default_factory=dict) + bus: str = field(init=False) + description: str = field(init=False) + filesystem: str = field(init=False) + log_sec: int = field(init=False) + model: str = field(init=False) + name: str = field(init=False) + notes: list[str] = field(init=False, default_factory=list) + path: Union[pathlib.Path, str] + phy_sec: int = field(init=False) + raw_details: dict[str, Any] = field(init=False) + raw_smartctl: dict[str, Any] = field(init=False) + serial: str = field(init=False) + size: int = field(init=False) + ssd: bool = field(init=False) + tests: list[Test] = field(init=False, default_factory=list) + use_sat: bool = field(init=False, default=False) - # Update details + def __post_init__(self) -> None: + self.path = pathlib.Path(self.path).resolve() self.get_details() + self.set_description() self.enable_smart() self.update_smart_details() - if self.details['bus'] == 'USB' and not self.attributes: + if not self.attributes and self.bus == 'USB': # Try using SAT LOG.warning('Using SAT for smartctl for %s', self.path) - self.enable_smart(use_sat=True) - self.update_smart_details(use_sat=True) + self.notes = [] + self.use_sat = True + self.enable_smart() + self.update_smart_details() if not self.is_4k_aligned(): self.add_note('One or more partitions are not 4K aligned', 'YELLOW') - def abort_self_test(self): + def abort_self_test(self) -> None: """Abort currently running non-captive self-test.""" cmd = ['sudo', 'smartctl', '--abort', self.path] run_program(cmd, check=False) - def add_note(self, note, color=None): + def add_note(self, note, color=None) -> None: """Add note that will be included in the disk report.""" if color: note = color_string(note, color) @@ -185,10 +101,10 @@ class Disk(BaseObj): self.notes.append(note) self.notes.sort() - def check_attributes(self, only_blocking=False): + def check_attributes(self, only_blocking=False) -> bool: """Check if any known attributes are failing, returns bool.""" attributes_ok = True - known_attributes = get_known_disk_attributes(self.details['model']) + known_attributes = get_known_disk_attributes(self.model) for attr, value in self.attributes.items(): # Skip unknown attributes if attr not in known_attributes: @@ -219,29 +135,29 @@ class Disk(BaseObj): # Done return attributes_ok - def disable_disk_tests(self): + def disable_disk_tests(self) -> None: """Disable all tests.""" LOG.warning('Disabling all tests for: %s', self.path) - for test in self.tests.values(): + for test in self.tests: if test.status in ('Pending', 'Working'): test.set_status('Denied') test.disabled = True - def enable_smart(self, use_sat=False): + def enable_smart(self) -> None: """Try enabling SMART for this disk.""" cmd = [ 'sudo', 'smartctl', - f'--device={"sat,auto" if use_sat else "auto"}', + f'--device={"sat,auto" if self.use_sat else "auto"}', '--tolerance=permissive', '--smart=on', self.path, ] run_program(cmd, check=False) - def generate_attribute_report(self): + def generate_attribute_report(self) -> list[str]: """Generate attribute report, returns list.""" - known_attributes = get_known_disk_attributes(self.details['model']) + known_attributes = get_known_disk_attributes(self.model) report = [] for attr, value in sorted(self.attributes.items()): note = '' @@ -294,7 +210,7 @@ class Disk(BaseObj): # Done return report - def generate_report(self, header=True): + def generate_report(self, header=True) -> list[str]: """Generate Disk report, returns list.""" report = [] if header: @@ -314,63 +230,63 @@ class Disk(BaseObj): report.append(f' {note}') # Tests - for test in self.tests.values(): + for test in self.tests: report.extend(test.report) return report - def get_details(self): + def get_details(self) -> None: """Get disk details using OS specific methods. Required details default to generic descriptions and are converted to the correct type. """ if PLATFORM == 'Darwin': - self.details = get_disk_details_macos(self.path) + self.raw_details = get_disk_details_macos(self.path) elif PLATFORM == 'Linux': - self.details = get_disk_details_linux(self.path) + self.raw_details = get_disk_details_linux(self.path) # Set necessary details - self.details['bus'] = str(self.details.get('bus', '???')).upper() - self.details['bus'] = self.details['bus'].replace('IMAGE', 'Image') - self.details['bus'] = self.details['bus'].replace('NVME', 'NVMe') - self.details['fstype'] = self.details.get('fstype', 'Unknown') - self.details['log-sec'] = self.details.get('log-sec', 512) - self.details['model'] = self.details.get('model', 'Unknown Model') - self.details['name'] = self.details.get('name', self.path) - self.details['phy-sec'] = self.details.get('phy-sec', 512) - self.details['serial'] = self.details.get('serial', 'Unknown Serial') - self.details['size'] = self.details.get('size', -1) - self.details['ssd'] = self.details.get('ssd', False) + self.bus = str(self.raw_details.get('bus', '???')).upper() + self.bus = self.bus.replace('IMAGE', 'Image') + self.bus = self.bus.replace('NVME', 'NVMe') + self.filesystem = self.raw_details.get('fstype', 'Unknown') + self.log_sec = self.raw_details.get('log-sec', 512) + self.model = self.raw_details.get('model', 'Unknown Model') + self.name = self.raw_details.get('name', self.path) + self.phy_sec = self.raw_details.get('phy-sec', 512) + self.serial = self.raw_details.get('serial', 'Unknown Serial') + self.size = self.raw_details.get('size', -1) + self.ssd = self.raw_details.get('ssd', False) # Ensure certain attributes types + ## NOTE: This is ugly, deal. for attr in ['bus', 'model', 'name', 'serial']: - if not isinstance(self.details[attr], str): - self.details[attr] = str(self.details[attr]) - for attr in ['phy-sec', 'size']: - if not isinstance(self.details[attr], int): - try: - self.details[attr] = int(self.details[attr]) - except (TypeError, ValueError): - LOG.error('Invalid disk %s: %s', attr, self.details[attr]) - self.details[attr] = -1 + setattr(self, attr, str(getattr(self, attr))) + for attr in ['log_sec', 'phy_sec', 'size']: + try: + setattr(self, attr, int(getattr(self, attr))) + except (TypeError, ValueError): + LOG.error('Invalid disk %s: %s', attr, getattr(self, attr)) + if attr == 'size': + setattr(self, attr, -1) # Set description self.description = ( - f'{bytes_to_string(self.details["size"], use_binary=False)}' - f' ({self.details["bus"]})' - f' {self.details["model"]}' - f' {self.details["serial"]}' + f'{bytes_to_string(self.size, use_binary=False)}' + f' ({self.bus})' + f' {self.model}' + f' {self.serial}' ) - def get_labels(self): + def get_labels(self) -> list[str]: """Build list of labels for this disk, returns list.""" labels = [] - # Add all labels from lsblk - for disk in [self.details, *self.details.get('children', [])]: - labels.append(disk.get('label', '')) - labels.append(disk.get('partlabel', '')) + # Add all labels from raw_details + for details in [self.raw_details, *self.raw_details.get('children', [])]: + labels.append(details.get('label', '')) + labels.append(details.get('partlabel', '')) # Remove empty labels labels = [str(label) for label in labels if label] @@ -378,11 +294,11 @@ class Disk(BaseObj): # Done return labels - def get_smart_self_test_details(self): + def get_smart_self_test_details(self) -> dict[Any, Any]: """Shorthand to get deeply nested self-test details, returns dict.""" details = {} try: - details = self.smartctl['ata_smart_data']['self_test'] + details = self.raw_smartctl['ata_smart_data']['self_test'] except (KeyError, TypeError): # Assuming disk lacks SMART support, ignore and return empty dict. pass @@ -390,17 +306,17 @@ class Disk(BaseObj): # Done return details - def is_4k_aligned(self): + def is_4k_aligned(self) -> bool: """Check that all disk partitions are aligned, returns bool.""" aligned = True if PLATFORM == 'Darwin': - aligned = is_4k_aligned_macos(self.details) + aligned = is_4k_aligned_macos(self.raw_details) elif PLATFORM == 'Linux': - aligned = is_4k_aligned_linux(self.path, self.details['phy-sec']) + aligned = is_4k_aligned_linux(self.path, self.phy_sec) return aligned - def safety_checks(self): + def safety_checks(self) -> None: """Run safety checks and raise an exception if necessary.""" blocking_event_encountered = False self.update_smart_details() @@ -411,7 +327,7 @@ class Disk(BaseObj): LOG.error('%s: Blocked for failing attribute(s)', self.path) # NVMe status - nvme_status = self.smartctl.get('smart_status', {}).get('nvme', {}) + nvme_status = self.raw_smartctl.get('smart_status', {}).get('nvme', {}) if nvme_status.get('media_read_only', False): blocking_event_encountered = True msg = 'Media has been placed in read-only mode' @@ -426,7 +342,7 @@ class Disk(BaseObj): # SMART overall assessment smart_passed = True try: - smart_passed = self.smartctl['smart_status']['passed'] + smart_passed = self.raw_smartctl['smart_status']['passed'] except (KeyError, TypeError): # Assuming disk doesn't support SMART overall assessment pass @@ -447,7 +363,7 @@ class Disk(BaseObj): LOG.error(msg) raise SMARTSelfTestInProgressError(msg) - def run_self_test(self, log_path): + def run_self_test(self, log_path) -> bool: """Run disk self-test and check if it passed, returns bool. NOTE: This function is here to reserve a place for future @@ -456,7 +372,7 @@ class Disk(BaseObj): result = self.run_smart_self_test(log_path) return result - def run_smart_self_test(self, log_path): + def run_smart_self_test(self, log_path) -> bool: """Run SMART self-test and check if it passed, returns bool. NOTE: An exception will be raised if the disk lacks SMART support. @@ -467,7 +383,7 @@ class Disk(BaseObj): status_str = 'Starting self-test...' test_details = self.get_smart_self_test_details() test_minutes = 15 - size_str = bytes_to_string(self.details["size"], use_binary=False) + size_str = bytes_to_string(self.size, use_binary=False) header_str = color_string( ['[', self.path.name, ' ', size_str, ']'], [None, 'BLUE', None, 'CYAN', None], @@ -532,35 +448,34 @@ class Disk(BaseObj): # Done return result - def update_smart_details(self, use_sat=False): - """Update SMART details via smartctl.""" - self.attributes = {} + def set_description(self) -> None: + """Set disk description from details.""" + self.description = ( + f'{bytes_to_string(self.size, use_binary=False)}' + f' ({self.bus}) {self.model} {self.serial}' + ) - # Check if SAT is needed - if not use_sat: - # use_sat not set, check previous run (if possible) - for arg in self.smartctl.get('smartctl', {}).get('argv', []): - if arg == '--device=sat,auto': - use_sat = True - break + def update_smart_details(self) -> None: + """Update SMART details via smartctl.""" + updated_attributes = {} # Get SMART data cmd = [ 'sudo', 'smartctl', - f'--device={"sat,auto" if use_sat else "auto"}', + f'--device={"sat,auto" if self.use_sat else "auto"}', '--tolerance=verypermissive', '--all', '--json', self.path, ] - self.smartctl = get_json_from_command(cmd, check=False) + self.raw_smartctl = get_json_from_command(cmd, check=False) # Check for attributes - if KEY_NVME in self.smartctl: - for name, value in self.smartctl[KEY_NVME].items(): + if KEY_NVME in self.raw_smartctl: + for name, value in self.raw_smartctl[KEY_NVME].items(): try: - self.attributes[name] = { + updated_attributes[name] = { 'name': name, 'raw': int(value), 'raw_str': str(value), @@ -568,8 +483,8 @@ class Disk(BaseObj): except (TypeError, ValueError): # Ignoring invalid attribute LOG.error('Invalid NVMe attribute: %s %s', name, value) - elif KEY_SMART in self.smartctl: - for attribute in self.smartctl[KEY_SMART].get('table', {}): + elif KEY_SMART in self.raw_smartctl: + for attribute in self.raw_smartctl[KEY_SMART].get('table', {}): try: _id = int(attribute['id']) except (KeyError, ValueError): @@ -586,37 +501,19 @@ class Disk(BaseObj): raw = int(match.group(1)) # Add to dict - self.attributes[_id] = { + updated_attributes[_id] = { 'name': name, 'raw': raw, 'raw_str': raw_str} # Add note if necessary - if not self.attributes: + if not updated_attributes: self.add_note('No NVMe or SMART data available', 'YELLOW') - -class Test(): - # pylint: disable=too-few-public-methods - """Object for tracking test specific data.""" - def __init__(self, dev, label): - self.dev = dev - self.disabled = False - self.failed = False - self.label = label - self.passed = False - self.report = [] - self.status = 'Pending' - - def set_status(self, status): - """Update status string.""" - if self.disabled: - # Don't change status if disabled - return - - self.status = status + # Done + self.attributes.update(updated_attributes) # Functions -def get_disk_details_linux(path): +def get_disk_details_linux(path) -> dict[Any, Any]: """Get disk details using lsblk, returns dict.""" cmd = ['lsblk', '--bytes', '--json', '--output-all', '--paths', path] json_data = get_json_from_command(cmd, check=False) @@ -636,7 +533,7 @@ def get_disk_details_linux(path): return details -def get_disk_details_macos(path): +def get_disk_details_macos(path) -> dict[Any, Any]: """Get disk details using diskutil, returns dict.""" details = {} @@ -696,14 +593,14 @@ def get_disk_details_macos(path): return details -def get_disk_serial_macos(path): +def get_disk_serial_macos(path) -> str: """Get disk serial using system_profiler, returns str.""" cmd = ['sudo', 'smartctl', '--info', '--json', path] smart_info = get_json_from_command(cmd) return smart_info.get('serial_number', 'Unknown Serial') -def get_disks(skip_kits=False): +def get_disks(skip_kits=False) -> list[Disk]: """Get disks using OS-specific methods, returns list.""" disks = [] if PLATFORM == 'Darwin': @@ -724,7 +621,7 @@ def get_disks(skip_kits=False): return disks -def get_disks_linux(): +def get_disks_linux() -> list[Disk]: """Get disks via lsblk, returns list.""" cmd = ['lsblk', '--json', '--nodeps', '--paths'] disks = [] @@ -735,7 +632,7 @@ def get_disks_linux(): disk_obj = Disk(disk['name']) # Skip loopback devices, optical devices, etc - if disk_obj.details['type'] != 'disk': + if disk_obj.raw_details.get('type', '???') != 'disk': continue # Add disk @@ -745,7 +642,7 @@ def get_disks_linux(): return disks -def get_disks_macos(): +def get_disks_macos() -> list[Disk]: """Get disks via diskutil, returns list.""" cmd = ['diskutil', 'list', '-plist', 'physical'] disks = [] @@ -779,8 +676,8 @@ def get_disks_macos(): return disks -def get_known_disk_attributes(model): - """Get known NVMe/SMART attributes (model specific), returns str.""" +def get_known_disk_attributes(model) -> dict[Any, dict]: + """Get known NVMe/SMART attributes (model specific), returns dict.""" known_attributes = KNOWN_DISK_ATTRIBUTES.copy() # Apply model-specific data @@ -796,77 +693,7 @@ def get_known_disk_attributes(model): return known_attributes -def get_ram_list_linux(): - """Get RAM list using dmidecode.""" - cmd = ['sudo', 'dmidecode', '--type', 'memory'] - dimm_list = [] - manufacturer = 'Unknown' - size = 0 - - # Get DMI data - proc = run_program(cmd) - dmi_data = proc.stdout.splitlines() - - # Parse data - for line in dmi_data: - line = line.strip() - if line == 'Memory Device': - # Reset vars - manufacturer = 'Unknown' - size = 0 - elif line.startswith('Size:'): - size = line.replace('Size: ', '') - try: - size = string_to_bytes(size, assume_binary=True) - except ValueError: - # Assuming empty module - size = 0 - elif line.startswith('Manufacturer:'): - manufacturer = line.replace('Manufacturer: ', '') - dimm_list.append([size, manufacturer]) - - # Save details - return dimm_list - - -def get_ram_list_macos(): - """Get RAM list using system_profiler.""" - dimm_list = [] - - # Get and parse plist data - cmd = [ - 'system_profiler', - '-xml', - 'SPMemoryDataType', - ] - proc = run_program(cmd, check=False, encoding=None, errors=None) - try: - plist_data = plistlib.loads(proc.stdout) - except (TypeError, ValueError): - # Ignore and return an empty list - return dimm_list - - # Check DIMM data - dimm_details = plist_data[0].get('_items', [{}])[0].get('_items', []) - for dimm in dimm_details: - manufacturer = dimm.get('dimm_manufacturer', None) - manufacturer = KNOWN_RAM_VENDOR_IDS.get( - manufacturer, - f'Unknown ({manufacturer})') - size = dimm.get('dimm_size', '0 GB') - try: - size = string_to_bytes(size, assume_binary=True) - except ValueError: - # Empty DIMM? - LOG.error('Invalid DIMM size: %s', size) - continue - dimm_list.append([size, manufacturer]) - - # Save details - return dimm_list - - -def is_4k_aligned_macos(disk_details): +def is_4k_aligned_macos(disk_details) -> bool: """Check partition alignment using diskutil info, returns bool.""" aligned = True @@ -883,7 +710,7 @@ def is_4k_aligned_macos(disk_details): return aligned -def is_4k_aligned_linux(dev_path, physical_sector_size): +def is_4k_aligned_linux(dev_path, physical_sector_size) -> bool: """Check partition alignment using lsblk, returns bool.""" aligned = True cmd = [ diff --git a/scripts/wk/hw/system.py b/scripts/wk/hw/system.py new file mode 100644 index 00000000..00abf736 --- /dev/null +++ b/scripts/wk/hw/system.py @@ -0,0 +1,183 @@ +"""WizardKit: System object and functions""" +# vim: sts=2 sw=2 ts=2 + +import logging +import plistlib +import re + +from dataclasses import dataclass, field +from typing import Any + +from wk.cfg.hw import KNOWN_RAM_VENDOR_IDS +from wk.exe import get_json_from_command, run_program +from wk.hw.test import Test +from wk.std import ( + PLATFORM, + bytes_to_string, + color_string, + string_to_bytes, + ) + + +# STATIC VARIABLES +LOG = logging.getLogger(__name__) + + +@dataclass(slots=True) +class System: + """Object for tracking system specific hardware data.""" + cpu_description: str = field(init=False) + ram_dimms: list[str] = field(init=False, default_factory=list) + ram_total: str = field(init=False, default='Unknown') + raw_details: dict[Any, Any] = field(init=False, default_factory=dict) + tests: list[Test] = field(init=False, default_factory=list) + + def __post_init__(self) -> None: + self.get_cpu_details() + self.set_cpu_description() + self.get_ram_details() + + def generate_report(self) -> list[str]: + """Generate CPU & RAM report, returns list.""" + report = [] + report.append(color_string('Device', 'BLUE')) + report.append(f' {self.cpu_description}') + + # Include RAM details + report.append(color_string('RAM', 'BLUE')) + report.append(f' {self.ram_total} ({", ".join(self.ram_dimms)})') + + # Tests + for test in self.tests: + report.extend(test.report) + + return report + + def get_cpu_details(self) -> None: + """Get CPU details using OS specific methods.""" + cmd = ['lscpu', '--json'] + + # Bail early + if PLATFORM != 'Linux': + # Only Linux is supported ATM + return + + # Parse details + json_data = get_json_from_command(cmd) + for line in json_data.get('lscpu', [{}]): + _field = line.get('field', '').replace(':', '') + _data = line.get('data', '') + if not (_field or _data): + # Skip + continue + self.raw_details[_field] = _data + + def get_ram_details(self) -> None: + """Get RAM details using OS specific methods.""" + if PLATFORM == 'Darwin': + dimm_list = get_ram_list_macos() + elif PLATFORM == 'Linux': + dimm_list = get_ram_list_linux() + + details = {'Total': 0} + for dimm_details in dimm_list: + size, manufacturer = dimm_details + if size <= 0: + # Skip empty DIMMs + continue + description = f'{bytes_to_string(size)} {manufacturer}' + details['Total'] += size + if description in details: + details[description] += 1 + else: + details[description] = 1 + + # Save details + self.ram_total = bytes_to_string(details.pop('Total', 0)) + self.ram_dimms = [ + f'{count}x {desc}' for desc, count in sorted(details.items()) + ] + + def set_cpu_description(self) -> None: + """Set CPU description.""" + self.cpu_description = self.raw_details.get('Model name', 'Unknown CPU') + + # macOS + if PLATFORM == 'Darwin': + cmd = 'sysctl -n machdep.cpu.brand_string'.split() + proc = run_program(cmd, check=False) + self.cpu_description = re.sub(r'\s+', ' ', proc.stdout.strip()) + + +def get_ram_list_linux() -> list[list]: + """Get RAM list using dmidecode.""" + cmd = ['sudo', 'dmidecode', '--type', 'memory'] + dimm_list = [] + manufacturer = 'Unknown' + size = 0 + + # Get DMI data + proc = run_program(cmd) + dmi_data = proc.stdout.splitlines() + + # Parse data + for line in dmi_data: + line = line.strip() + if line == 'Memory Device': + # Reset vars + manufacturer = 'Unknown' + size = 0 + elif line.startswith('Size:'): + size = line.replace('Size: ', '') + try: + size = string_to_bytes(size, assume_binary=True) + except ValueError: + # Assuming empty module + size = 0 + elif line.startswith('Manufacturer:'): + manufacturer = line.replace('Manufacturer: ', '') + dimm_list.append([size, manufacturer]) + + # Save details + return dimm_list + + +def get_ram_list_macos() -> list[list]: + """Get RAM list using system_profiler.""" + dimm_list = [] + + # Get and parse plist data + cmd = [ + 'system_profiler', + '-xml', + 'SPMemoryDataType', + ] + proc = run_program(cmd, check=False, encoding=None, errors=None) + try: + plist_data = plistlib.loads(proc.stdout) + except (TypeError, ValueError): + # Ignore and return an empty list + return dimm_list + + # Check DIMM data + dimm_details = plist_data[0].get('_items', [{}])[0].get('_items', []) + for dimm in dimm_details: + manufacturer = dimm.get('dimm_manufacturer', None) + manufacturer = KNOWN_RAM_VENDOR_IDS.get( + manufacturer, + f'Unknown ({manufacturer})') + size = dimm.get('dimm_size', '0 GB') + try: + size = string_to_bytes(size, assume_binary=True) + except ValueError: + # Empty DIMM? + LOG.error('Invalid DIMM size: %s', size) + continue + dimm_list.append([size, manufacturer]) + + # Save details + return dimm_list + + +if __name__ == '__main__': + print("This file is not meant to be called directly.") diff --git a/scripts/wk/hw/test.py b/scripts/wk/hw/test.py new file mode 100644 index 00000000..0f2305df --- /dev/null +++ b/scripts/wk/hw/test.py @@ -0,0 +1,27 @@ +"""WizardKit: Test object and functions""" +# vim: sts=2 sw=2 ts=2 + +from dataclasses import dataclass, field +from typing import Any + +@dataclass(slots=True) +class Test: + # pylint: disable=too-many-instance-attributes + """Object for tracking test specific data.""" + dev: Any + label: str + name: str + disabled: bool = field(init=False, default=False) + failed: bool = field(init=False, default=False) + hidden: bool = False + passed: bool = field(init=False, default=False) + report: list[str] = field(init=False, default_factory=list) + status: str = field(init=False, default='Pending') + + def set_status(self, status): + """Update status string.""" + if self.disabled: + # Don't change status if disabled + return + + self.status = status