From 3733da17fc98ecb686bc1f21e913720e1d383af7 Mon Sep 17 00:00:00 2001 From: 2Shirt <2xShirt@gmail.com> Date: Thu, 12 Dec 2019 18:36:57 -0700 Subject: [PATCH] Moved get_disks() from wk/hw/diags to wk/hw/obj --- scripts/wk/hw/diags.py | 78 +----------------------------------------- scripts/wk/hw/obj.py | 69 +++++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 77 deletions(-) diff --git a/scripts/wk/hw/diags.py b/scripts/wk/hw/diags.py index 0cc4f42f..58f8825f 100644 --- a/scripts/wk/hw/diags.py +++ b/scripts/wk/hw/diags.py @@ -88,10 +88,6 @@ STATUS_COLORS = { 'Failed': 'RED', 'TimedOut': 'RED', } -WK_LABEL_REGEX = re.compile( - fr'{cfg.main.KIT_NAME_SHORT}_(LINUX|UFD)', - re.IGNORECASE, - ) # Error Classes @@ -263,7 +259,7 @@ class State(): # Add HW Objects self.cpu = hw_obj.CpuRam() - self.disks = get_disks() + self.disks = hw_obj.get_disks(skip_kits=True) # Add test objects for name, details in menu.options.items(): @@ -1078,78 +1074,6 @@ def disk_surface_scan(state, test_objects): raise std.GenericAbort('Aborted') -def get_disks(): - """Get disks using OS-specific methods, returns list.""" - disks = [] - if platform.system() == 'Darwin': - disks = get_disks_macos() - elif platform.system() == 'Linux': - disks = get_disks_linux() - - # Done - return disks - - -def get_disks_linux(): - """Get disks via lsblk, returns list.""" - cmd = ['lsblk', '--json', '--nodeps', '--paths'] - disks = [] - - # Add valid disks - json_data = exe.get_json_from_command(cmd) - for disk in json_data.get('blockdevices', []): - disk_obj = hw_obj.Disk(disk['name']) - skip = False - - # Skip loopback devices, optical devices, etc - if disk_obj.details['type'] != 'disk': - skip = True - - # Skip WK disks - for label in disk_obj.get_labels(): - if WK_LABEL_REGEX.search(label): - skip = True - - # Add disk - if not skip: - disks.append(disk_obj) - - # Done - return disks - - -def get_disks_macos(): - """Get disks via diskutil, returns list.""" - cmd = ['diskutil', 'list', '-plist', 'physical'] - disks = [] - - # Get info from diskutil - proc = exe.run_program(cmd, encoding=None, errors=None) - try: - plist_data = plistlib.loads(proc.stdout) - except (TypeError, ValueError): - # Invalid / corrupt plist data? return empty list to avoid crash - LOG.error('Failed to get diskutil list') - return disks - - # Add valid disks - for disk in plist_data['WholeDisks']: - disk_obj = hw_obj.Disk(f'/dev/{disk}') - skip = False - - # Skip WK disks - for label in disk_obj.get_labels(): - if WK_LABEL_REGEX.search(label): - skip = True - - # Add disk - if not skip: - disks.append(disk_obj) - - # Done - return disks - - def keyboard_test(): """Test keyboard using xev.""" LOG.info('Keyboard Test (xev)') diff --git a/scripts/wk/hw/obj.py b/scripts/wk/hw/obj.py index bc1df561..82b52e22 100644 --- a/scripts/wk/hw/obj.py +++ b/scripts/wk/hw/obj.py @@ -18,6 +18,7 @@ from wk.cfg.hw import ( KNOWN_RAM_VENDOR_IDS, REGEX_POWER_ON_TIME, ) +from wk.cfg.main import KIT_NAME_SHORT from wk.exe import get_json_from_command, run_program from wk.std import bytes_to_string, color_string, sleep, string_to_bytes @@ -29,6 +30,10 @@ NVME_WARNING_KEYS = ( 'reliability_degraded', 'volatile_memory_backup_failed', ) +WK_LABEL_REGEX = re.compile( + fr'{KIT_NAME_SHORT}_(LINUX|UFD)', + re.IGNORECASE, + ) # Exception Classes @@ -626,6 +631,70 @@ def get_disk_serial_macos(path): return smart_info.get('serial_number', 'Unknown Serial') +def get_disks(skip_kits=False): + """Get disks using OS-specific methods, returns list.""" + disks = [] + if platform.system() == 'Darwin': + disks = get_disks_macos() + elif platform.system() == 'Linux': + disks = get_disks_linux() + + # Skip WK disks + if skip_kits: + disks = [ + disk_obj for disk_obj in disks + if not any( + [WK_LABEL_REGEX.search(label) for label in disk_obj.get_labels()] + ) + ] + + # Done + return disks + + +def get_disks_linux(): + """Get disks via lsblk, returns list.""" + cmd = ['lsblk', '--json', '--nodeps', '--paths'] + disks = [] + + # Add valid disks + json_data = get_json_from_command(cmd) + for disk in json_data.get('blockdevices', []): + disk_obj = Disk(disk['name']) + + # Skip loopback devices, optical devices, etc + if disk_obj.details['type'] != 'disk': + continue + + # Add disk + disks.append(disk_obj) + + # Done + return disks + + +def get_disks_macos(): + """Get disks via diskutil, returns list.""" + cmd = ['diskutil', 'list', '-plist', 'physical'] + disks = [] + + # Get info from diskutil + proc = exe.run_program(cmd, encoding=None, errors=None) + try: + plist_data = plistlib.loads(proc.stdout) + except (TypeError, ValueError): + # Invalid / corrupt plist data? return empty list to avoid crash + LOG.error('Failed to get diskutil list') + return disks + + # Add valid disks + for disk in plist_data['WholeDisks']: + disks.append(Disk(f'/dev/{disk}')) + + # Done + return disks + + def get_known_disk_attributes(model): """Get known NVMe/SMART attributes (model specific), returns str.""" known_attributes = KNOWN_DISK_ATTRIBUTES.copy()