Moved get_disks() from wk/hw/diags to wk/hw/obj

This commit is contained in:
2Shirt 2019-12-12 18:36:57 -07:00
parent 48a6b3200b
commit 3733da17fc
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C
2 changed files with 70 additions and 77 deletions

View file

@ -88,10 +88,6 @@ STATUS_COLORS = {
'Failed': 'RED',
'TimedOut': 'RED',
}
WK_LABEL_REGEX = re.compile(
fr'{cfg.main.KIT_NAME_SHORT}_(LINUX|UFD)',
re.IGNORECASE,
)
# Error Classes
@ -263,7 +259,7 @@ class State():
# Add HW Objects
self.cpu = hw_obj.CpuRam()
self.disks = get_disks()
self.disks = hw_obj.get_disks(skip_kits=True)
# Add test objects
for name, details in menu.options.items():
@ -1078,78 +1074,6 @@ def disk_surface_scan(state, test_objects):
raise std.GenericAbort('Aborted')
def get_disks():
"""Get disks using OS-specific methods, returns list."""
disks = []
if platform.system() == 'Darwin':
disks = get_disks_macos()
elif platform.system() == 'Linux':
disks = get_disks_linux()
# Done
return disks
def get_disks_linux():
"""Get disks via lsblk, returns list."""
cmd = ['lsblk', '--json', '--nodeps', '--paths']
disks = []
# Add valid disks
json_data = exe.get_json_from_command(cmd)
for disk in json_data.get('blockdevices', []):
disk_obj = hw_obj.Disk(disk['name'])
skip = False
# Skip loopback devices, optical devices, etc
if disk_obj.details['type'] != 'disk':
skip = True
# Skip WK disks
for label in disk_obj.get_labels():
if WK_LABEL_REGEX.search(label):
skip = True
# Add disk
if not skip:
disks.append(disk_obj)
# Done
return disks
def get_disks_macos():
"""Get disks via diskutil, returns list."""
cmd = ['diskutil', 'list', '-plist', 'physical']
disks = []
# Get info from diskutil
proc = exe.run_program(cmd, encoding=None, errors=None)
try:
plist_data = plistlib.loads(proc.stdout)
except (TypeError, ValueError):
# Invalid / corrupt plist data? return empty list to avoid crash
LOG.error('Failed to get diskutil list')
return disks
# Add valid disks
for disk in plist_data['WholeDisks']:
disk_obj = hw_obj.Disk(f'/dev/{disk}')
skip = False
# Skip WK disks
for label in disk_obj.get_labels():
if WK_LABEL_REGEX.search(label):
skip = True
# Add disk
if not skip:
disks.append(disk_obj)
# Done
return disks
def keyboard_test():
"""Test keyboard using xev."""
LOG.info('Keyboard Test (xev)')

View file

@ -18,6 +18,7 @@ from wk.cfg.hw import (
KNOWN_RAM_VENDOR_IDS,
REGEX_POWER_ON_TIME,
)
from wk.cfg.main import KIT_NAME_SHORT
from wk.exe import get_json_from_command, run_program
from wk.std import bytes_to_string, color_string, sleep, string_to_bytes
@ -29,6 +30,10 @@ NVME_WARNING_KEYS = (
'reliability_degraded',
'volatile_memory_backup_failed',
)
WK_LABEL_REGEX = re.compile(
fr'{KIT_NAME_SHORT}_(LINUX|UFD)',
re.IGNORECASE,
)
# Exception Classes
@ -626,6 +631,70 @@ def get_disk_serial_macos(path):
return smart_info.get('serial_number', 'Unknown Serial')
def get_disks(skip_kits=False):
"""Get disks using OS-specific methods, returns list."""
disks = []
if platform.system() == 'Darwin':
disks = get_disks_macos()
elif platform.system() == 'Linux':
disks = get_disks_linux()
# Skip WK disks
if skip_kits:
disks = [
disk_obj for disk_obj in disks
if not any(
[WK_LABEL_REGEX.search(label) for label in disk_obj.get_labels()]
)
]
# Done
return disks
def get_disks_linux():
"""Get disks via lsblk, returns list."""
cmd = ['lsblk', '--json', '--nodeps', '--paths']
disks = []
# Add valid disks
json_data = get_json_from_command(cmd)
for disk in json_data.get('blockdevices', []):
disk_obj = Disk(disk['name'])
# Skip loopback devices, optical devices, etc
if disk_obj.details['type'] != 'disk':
continue
# Add disk
disks.append(disk_obj)
# Done
return disks
def get_disks_macos():
"""Get disks via diskutil, returns list."""
cmd = ['diskutil', 'list', '-plist', 'physical']
disks = []
# Get info from diskutil
proc = exe.run_program(cmd, encoding=None, errors=None)
try:
plist_data = plistlib.loads(proc.stdout)
except (TypeError, ValueError):
# Invalid / corrupt plist data? return empty list to avoid crash
LOG.error('Failed to get diskutil list')
return disks
# Add valid disks
for disk in plist_data['WholeDisks']:
disks.append(Disk(f'/dev/{disk}'))
# Done
return disks
def get_known_disk_attributes(model):
"""Get known NVMe/SMART attributes (model specific), returns str."""
known_attributes = KNOWN_DISK_ATTRIBUTES.copy()