From 8582046948c8e3f5da249a25f3f200d44246ce9f Mon Sep 17 00:00:00 2001 From: 2Shirt <2xShirt@gmail.com> Date: Mon, 4 Apr 2022 18:31:52 -0600 Subject: [PATCH] Update HW diags and ddrescue to use new HW classes --- scripts/wk/hw/ddrescue.py | 39 ++++++++++++++------------ scripts/wk/hw/diags.py | 59 ++++++++++++++++++++------------------- 2 files changed, 52 insertions(+), 46 deletions(-) diff --git a/scripts/wk/hw/ddrescue.py b/scripts/wk/hw/ddrescue.py index e457dfe2..1184d583 100644 --- a/scripts/wk/hw/ddrescue.py +++ b/scripts/wk/hw/ddrescue.py @@ -27,7 +27,10 @@ from wk.cfg.ddrescue import ( DDRESCUE_SETTINGS, DDRESCUE_SPECIFIC_PASS_SETTINGS, ) -from wk.hw import obj as hw_obj +from wk.hw import disk as hw_disk +from wk.hw import sensors as hw_sensors +from wk.hw import system as hw_system +from wk.hw.test import Test # STATIC VARIABLES @@ -272,7 +275,7 @@ class BlockPair(): """Run safety check and abort if necessary.""" dest_size = -1 if self.destination.exists(): - dest_obj = hw_obj.Disk(self.destination) + dest_obj = hw_disk.Disk(self.destination) dest_size = dest_obj.details['size'] del dest_obj @@ -488,7 +491,7 @@ class State(): if settings['Partition Mapping']: # Resume previous run, load pairs from settings file for part_map in settings['Partition Mapping']: - bp_source = hw_obj.Disk( + bp_source = hw_disk.Disk( f'{self.source.path}{source_sep}{part_map[0]}', ) bp_dest = pathlib.Path( @@ -948,7 +951,7 @@ class State(): """Run safety checks for destination and abort if necessary.""" try: self.destination.safety_checks() - except hw_obj.CriticalHardwareError as err: + except hw_disk.CriticalHardwareError as err: std.print_error( f'Critical error(s) detected for: {self.destination.path}', ) @@ -1097,7 +1100,7 @@ class State(): string = '' # Build base string - if isinstance(obj, hw_obj.Disk): + if isinstance(obj, hw_disk.Disk): string = f'{obj.path} {obj.description}' elif obj.is_dir(): string = f'{obj}/' @@ -1122,7 +1125,7 @@ class State(): if self.source: source_exists = self.source.path.exists() if self.destination: - if isinstance(self.destination, hw_obj.Disk): + if isinstance(self.destination, hw_disk.Disk): dest_exists = self.destination.path.exists() else: dest_exists = self.destination.exists() @@ -1485,18 +1488,18 @@ def check_destination_health(destination): result = '' # Bail early - if not isinstance(destination, hw_obj.Disk): + if not isinstance(destination, hw_disk.Disk): # Return empty string return result # Run safety checks try: destination.safety_checks() - except hw_obj.CriticalHardwareError: + except hw_disk.CriticalHardwareError: result = 'Critical hardware error detected on destination' - except hw_obj.SMARTSelfTestInProgressError: + except hw_disk.SMARTSelfTestInProgressError: result = 'SMART self-test in progress on destination' - except hw_obj.SMARTNotSupportedError: + except hw_disk.SMARTNotSupportedError: pass # Done @@ -1668,20 +1671,20 @@ def get_object(path): # Check path path = pathlib.Path(path).resolve() if path.is_block_device() or path.is_char_device(): - obj = hw_obj.Disk(path) + obj = hw_disk.Disk(path) # Child/Parent check parent = obj.details['parent'] if parent: std.print_warning(f'"{obj.path}" is a child device') if std.ask(f'Use parent device "{parent}" instead?'): - obj = hw_obj.Disk(parent) + obj = hw_disk.Disk(parent) elif path.is_dir(): obj = path elif path.is_file(): # Assuming file is a raw image, mounting loop_path = mount_raw_image(path) - obj = hw_obj.Disk(loop_path) + obj = hw_disk.Disk(loop_path) # Abort if obj not set if not obj: @@ -1830,8 +1833,8 @@ def source_or_destination_changed(state): elif hasattr(obj, 'exists'): # Assuming dest path changed = changed or not obj.exists() - elif isinstance(obj, hw_obj.Disk): - compare_dev = hw_obj.Disk(obj.path) + elif isinstance(obj, hw_disk.Disk): + compare_dev = hw_disk.Disk(obj.path) for key in ('model', 'serial'): changed = changed or obj.details[key] != compare_dev.details[key] @@ -2217,7 +2220,7 @@ def run_recovery(state, main_menu, settings_menu, dry_run=True): def select_disk(prompt, skip_disk=None): """Select disk from list, returns Disk().""" std.print_info('Scanning disks...') - disks = hw_obj.get_disks() + disks = hw_disk.get_disks() menu = std.Menu( title=std.color_string(f'ddrescue TUI: {prompt} Selection', 'GREEN'), ) @@ -2327,10 +2330,10 @@ def select_disk_parts(prompt, disk): # Replace part list with whole disk obj object_list = [disk.path] - # Convert object_list to hw_obj.Disk() objects + # Convert object_list to hw_disk.Disk() objects print(' ') std.print_info('Getting disk/partition details...') - object_list = [hw_obj.Disk(path) for path in object_list] + object_list = [hw_disk.Disk(path) for path in object_list] # Done return object_list diff --git a/scripts/wk/hw/diags.py b/scripts/wk/hw/diags.py index f380239e..457d0116 100644 --- a/scripts/wk/hw/diags.py +++ b/scripts/wk/hw/diags.py @@ -15,8 +15,10 @@ from docopt import docopt from wk import cfg, debug, exe, graph, log, net, std, tmux from wk import os as wk_os -from wk.hw import obj as hw_obj +from wk.hw import disk as hw_disk from wk.hw import sensors as hw_sensors +from wk.hw import system as hw_system +from wk.hw.test import Test # STATIC VARIABLES @@ -100,11 +102,11 @@ class DeviceTooSmallError(RuntimeError): class State(): """Object for tracking hardware diagnostic data.""" def __init__(self): - self.cpu = None self.disks = [] self.layout = cfg.hw.TMUX_LAYOUT.copy() self.log_dir = None self.panes = {} + self.system = None self.tests = OrderedDict({ 'CPU & Cooling': { 'Enabled': False, @@ -165,12 +167,12 @@ class State(): disable_tests = False # Skip already disabled devices - if all(test.disabled for test in disk.tests.values()): + if all(test.disabled for test in disk.tests): continue try: disk.safety_checks() - except hw_obj.CriticalHardwareError: + except hw_disk.CriticalHardwareError: disable_tests = True disk.add_note('Critical hardware error detected.', 'RED') if 'Disk Attributes' in disk.tests: @@ -183,7 +185,7 @@ class State(): 'Critical hardware error detected during diagnostics', 'YELLOW', ) - except hw_obj.SMARTSelfTestInProgressError as err: + except hw_disk.SMARTSelfTestInProgressError as err: if prep: std.print_warning(f'SMART self-test(s) in progress for {disk.path}') if std.ask('Continue with all tests disabled for this device?'): @@ -293,8 +295,8 @@ class State(): ) # Add HW Objects - self.cpu = hw_obj.CpuRam() - self.disks = hw_obj.get_disks(skip_kits=True) + self.system = hw_system.System() + self.disks = hw_disk.get_disks(skip_kits=True) # Add test objects for name, details in menu.options.items(): @@ -304,16 +306,17 @@ class State(): if 'CPU' in name: # Create two Test objects which will both be used by cpu_stress_tests # NOTE: Prime95 should be added first - test_mprime_obj = hw_obj.Test(dev=self.cpu, label='Prime95') - test_cooling_obj = hw_obj.Test(dev=self.cpu, label='Cooling') - self.cpu.tests[test_mprime_obj.label] = test_mprime_obj - self.cpu.tests[test_cooling_obj.label] = test_cooling_obj - self.tests[name]['Objects'].append(test_mprime_obj) - self.tests[name]['Objects'].append(test_cooling_obj) + self.system.tests.append( + Test(dev=self.system, label='Prime95', name=name), + ) + self.system.tests.append( + Test(dev=self.system, label='Cooling', name=name), + ) + self.tests[name]['Objects'].extend(self.system.tests) elif 'Disk' in name: for disk in self.disks: - test_obj = hw_obj.Test(dev=disk, label=disk.path.name) - disk.tests[name] = test_obj + test_obj = Test(dev=disk, label=disk.path.name, name=name) + disk.test.append(test_obj) self.tests[name]['Objects'].append(test_obj) # Run safety checks @@ -360,14 +363,6 @@ class State(): with open(f'{debug_dir}/state.report', 'a', encoding='utf-8') as _f: _f.write('\n'.join(debug.generate_object_report(self))) - # CPU/RAM - with open(f'{debug_dir}/cpu.report', 'a', encoding='utf-8') as _f: - _f.write('\n'.join(debug.generate_object_report(self.cpu))) - _f.write('\n\n[Tests]') - for name, test in self.cpu.tests.items(): - _f.write(f'\n{name}:\n') - _f.write('\n'.join(debug.generate_object_report(test, indent=1))) - # Disks for disk in self.disks: with open( @@ -375,8 +370,8 @@ class State(): encoding='utf-8') as _f: _f.write('\n'.join(debug.generate_object_report(disk))) _f.write('\n\n[Tests]') - for name, test in disk.tests.items(): - _f.write(f'\n{name}:\n') + for test in disk.tests: + _f.write(f'\n{test.name}:\n') _f.write('\n'.join(debug.generate_object_report(test, indent=1))) # SMC @@ -394,6 +389,14 @@ class State(): with open(f'{debug_dir}/smc.data', 'a', encoding='utf-8') as _f: _f.write('\n'.join(data)) + # System + with open(f'{debug_dir}/system.report', 'a', encoding='utf-8') as _f: + _f.write('\n'.join(debug.generate_object_report(self.system))) + _f.write('\n\n[Tests]') + for test in self.system.tests: + _f.write(f'\n{test.name}:\n') + _f.write('\n'.join(debug.generate_object_report(test, indent=1))) + def update_clock(self): """Update 'Started' pane following clock sync.""" tmux.respawn_pane( @@ -725,7 +728,7 @@ def cpu_stress_tests(state, test_objects): return # Prep - state.update_top_pane(test_mprime_obj.dev.description) + state.update_top_pane(test_mprime_obj.dev.cpu_description) test_cooling_obj.set_status('Working') test_mprime_obj.set_status('Working') @@ -1008,7 +1011,7 @@ def disk_self_test(state, test_objects): except TimeoutError: test_obj.failed = True result = 'TimedOut' - except hw_obj.SMARTNotSupportedError: + except hw_disk.SMARTNotSupportedError: # Pass test since it doesn't apply test_obj.passed = True result = 'N/A' @@ -1463,7 +1466,7 @@ def show_results(state): if name.startswith('CPU')] if any(cpu_tests_enabled): std.print_success('CPU:') - std.print_report(state.cpu.generate_report()) + std.print_report(state.system.generate_report()) std.print_standard(' ') # Disk Tests