Update HW diags and ddrescue to use new HW classes

This commit is contained in:
2Shirt 2022-04-04 18:31:52 -06:00
parent 56e145942a
commit 8582046948
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C
2 changed files with 52 additions and 46 deletions

View file

@ -27,7 +27,10 @@ from wk.cfg.ddrescue import (
DDRESCUE_SETTINGS,
DDRESCUE_SPECIFIC_PASS_SETTINGS,
)
from wk.hw import obj as hw_obj
from wk.hw import disk as hw_disk
from wk.hw import sensors as hw_sensors
from wk.hw import system as hw_system
from wk.hw.test import Test
# STATIC VARIABLES
@ -272,7 +275,7 @@ class BlockPair():
"""Run safety check and abort if necessary."""
dest_size = -1
if self.destination.exists():
dest_obj = hw_obj.Disk(self.destination)
dest_obj = hw_disk.Disk(self.destination)
dest_size = dest_obj.details['size']
del dest_obj
@ -488,7 +491,7 @@ class State():
if settings['Partition Mapping']:
# Resume previous run, load pairs from settings file
for part_map in settings['Partition Mapping']:
bp_source = hw_obj.Disk(
bp_source = hw_disk.Disk(
f'{self.source.path}{source_sep}{part_map[0]}',
)
bp_dest = pathlib.Path(
@ -948,7 +951,7 @@ class State():
"""Run safety checks for destination and abort if necessary."""
try:
self.destination.safety_checks()
except hw_obj.CriticalHardwareError as err:
except hw_disk.CriticalHardwareError as err:
std.print_error(
f'Critical error(s) detected for: {self.destination.path}',
)
@ -1097,7 +1100,7 @@ class State():
string = ''
# Build base string
if isinstance(obj, hw_obj.Disk):
if isinstance(obj, hw_disk.Disk):
string = f'{obj.path} {obj.description}'
elif obj.is_dir():
string = f'{obj}/'
@ -1122,7 +1125,7 @@ class State():
if self.source:
source_exists = self.source.path.exists()
if self.destination:
if isinstance(self.destination, hw_obj.Disk):
if isinstance(self.destination, hw_disk.Disk):
dest_exists = self.destination.path.exists()
else:
dest_exists = self.destination.exists()
@ -1485,18 +1488,18 @@ def check_destination_health(destination):
result = ''
# Bail early
if not isinstance(destination, hw_obj.Disk):
if not isinstance(destination, hw_disk.Disk):
# Return empty string
return result
# Run safety checks
try:
destination.safety_checks()
except hw_obj.CriticalHardwareError:
except hw_disk.CriticalHardwareError:
result = 'Critical hardware error detected on destination'
except hw_obj.SMARTSelfTestInProgressError:
except hw_disk.SMARTSelfTestInProgressError:
result = 'SMART self-test in progress on destination'
except hw_obj.SMARTNotSupportedError:
except hw_disk.SMARTNotSupportedError:
pass
# Done
@ -1668,20 +1671,20 @@ def get_object(path):
# Check path
path = pathlib.Path(path).resolve()
if path.is_block_device() or path.is_char_device():
obj = hw_obj.Disk(path)
obj = hw_disk.Disk(path)
# Child/Parent check
parent = obj.details['parent']
if parent:
std.print_warning(f'"{obj.path}" is a child device')
if std.ask(f'Use parent device "{parent}" instead?'):
obj = hw_obj.Disk(parent)
obj = hw_disk.Disk(parent)
elif path.is_dir():
obj = path
elif path.is_file():
# Assuming file is a raw image, mounting
loop_path = mount_raw_image(path)
obj = hw_obj.Disk(loop_path)
obj = hw_disk.Disk(loop_path)
# Abort if obj not set
if not obj:
@ -1830,8 +1833,8 @@ def source_or_destination_changed(state):
elif hasattr(obj, 'exists'):
# Assuming dest path
changed = changed or not obj.exists()
elif isinstance(obj, hw_obj.Disk):
compare_dev = hw_obj.Disk(obj.path)
elif isinstance(obj, hw_disk.Disk):
compare_dev = hw_disk.Disk(obj.path)
for key in ('model', 'serial'):
changed = changed or obj.details[key] != compare_dev.details[key]
@ -2217,7 +2220,7 @@ def run_recovery(state, main_menu, settings_menu, dry_run=True):
def select_disk(prompt, skip_disk=None):
"""Select disk from list, returns Disk()."""
std.print_info('Scanning disks...')
disks = hw_obj.get_disks()
disks = hw_disk.get_disks()
menu = std.Menu(
title=std.color_string(f'ddrescue TUI: {prompt} Selection', 'GREEN'),
)
@ -2327,10 +2330,10 @@ def select_disk_parts(prompt, disk):
# Replace part list with whole disk obj
object_list = [disk.path]
# Convert object_list to hw_obj.Disk() objects
# Convert object_list to hw_disk.Disk() objects
print(' ')
std.print_info('Getting disk/partition details...')
object_list = [hw_obj.Disk(path) for path in object_list]
object_list = [hw_disk.Disk(path) for path in object_list]
# Done
return object_list

View file

@ -15,8 +15,10 @@ from docopt import docopt
from wk import cfg, debug, exe, graph, log, net, std, tmux
from wk import os as wk_os
from wk.hw import obj as hw_obj
from wk.hw import disk as hw_disk
from wk.hw import sensors as hw_sensors
from wk.hw import system as hw_system
from wk.hw.test import Test
# STATIC VARIABLES
@ -100,11 +102,11 @@ class DeviceTooSmallError(RuntimeError):
class State():
"""Object for tracking hardware diagnostic data."""
def __init__(self):
self.cpu = None
self.disks = []
self.layout = cfg.hw.TMUX_LAYOUT.copy()
self.log_dir = None
self.panes = {}
self.system = None
self.tests = OrderedDict({
'CPU & Cooling': {
'Enabled': False,
@ -165,12 +167,12 @@ class State():
disable_tests = False
# Skip already disabled devices
if all(test.disabled for test in disk.tests.values()):
if all(test.disabled for test in disk.tests):
continue
try:
disk.safety_checks()
except hw_obj.CriticalHardwareError:
except hw_disk.CriticalHardwareError:
disable_tests = True
disk.add_note('Critical hardware error detected.', 'RED')
if 'Disk Attributes' in disk.tests:
@ -183,7 +185,7 @@ class State():
'Critical hardware error detected during diagnostics',
'YELLOW',
)
except hw_obj.SMARTSelfTestInProgressError as err:
except hw_disk.SMARTSelfTestInProgressError as err:
if prep:
std.print_warning(f'SMART self-test(s) in progress for {disk.path}')
if std.ask('Continue with all tests disabled for this device?'):
@ -293,8 +295,8 @@ class State():
)
# Add HW Objects
self.cpu = hw_obj.CpuRam()
self.disks = hw_obj.get_disks(skip_kits=True)
self.system = hw_system.System()
self.disks = hw_disk.get_disks(skip_kits=True)
# Add test objects
for name, details in menu.options.items():
@ -304,16 +306,17 @@ class State():
if 'CPU' in name:
# Create two Test objects which will both be used by cpu_stress_tests
# NOTE: Prime95 should be added first
test_mprime_obj = hw_obj.Test(dev=self.cpu, label='Prime95')
test_cooling_obj = hw_obj.Test(dev=self.cpu, label='Cooling')
self.cpu.tests[test_mprime_obj.label] = test_mprime_obj
self.cpu.tests[test_cooling_obj.label] = test_cooling_obj
self.tests[name]['Objects'].append(test_mprime_obj)
self.tests[name]['Objects'].append(test_cooling_obj)
self.system.tests.append(
Test(dev=self.system, label='Prime95', name=name),
)
self.system.tests.append(
Test(dev=self.system, label='Cooling', name=name),
)
self.tests[name]['Objects'].extend(self.system.tests)
elif 'Disk' in name:
for disk in self.disks:
test_obj = hw_obj.Test(dev=disk, label=disk.path.name)
disk.tests[name] = test_obj
test_obj = Test(dev=disk, label=disk.path.name, name=name)
disk.test.append(test_obj)
self.tests[name]['Objects'].append(test_obj)
# Run safety checks
@ -360,14 +363,6 @@ class State():
with open(f'{debug_dir}/state.report', 'a', encoding='utf-8') as _f:
_f.write('\n'.join(debug.generate_object_report(self)))
# CPU/RAM
with open(f'{debug_dir}/cpu.report', 'a', encoding='utf-8') as _f:
_f.write('\n'.join(debug.generate_object_report(self.cpu)))
_f.write('\n\n[Tests]')
for name, test in self.cpu.tests.items():
_f.write(f'\n{name}:\n')
_f.write('\n'.join(debug.generate_object_report(test, indent=1)))
# Disks
for disk in self.disks:
with open(
@ -375,8 +370,8 @@ class State():
encoding='utf-8') as _f:
_f.write('\n'.join(debug.generate_object_report(disk)))
_f.write('\n\n[Tests]')
for name, test in disk.tests.items():
_f.write(f'\n{name}:\n')
for test in disk.tests:
_f.write(f'\n{test.name}:\n')
_f.write('\n'.join(debug.generate_object_report(test, indent=1)))
# SMC
@ -394,6 +389,14 @@ class State():
with open(f'{debug_dir}/smc.data', 'a', encoding='utf-8') as _f:
_f.write('\n'.join(data))
# System
with open(f'{debug_dir}/system.report', 'a', encoding='utf-8') as _f:
_f.write('\n'.join(debug.generate_object_report(self.system)))
_f.write('\n\n[Tests]')
for test in self.system.tests:
_f.write(f'\n{test.name}:\n')
_f.write('\n'.join(debug.generate_object_report(test, indent=1)))
def update_clock(self):
"""Update 'Started' pane following clock sync."""
tmux.respawn_pane(
@ -725,7 +728,7 @@ def cpu_stress_tests(state, test_objects):
return
# Prep
state.update_top_pane(test_mprime_obj.dev.description)
state.update_top_pane(test_mprime_obj.dev.cpu_description)
test_cooling_obj.set_status('Working')
test_mprime_obj.set_status('Working')
@ -1008,7 +1011,7 @@ def disk_self_test(state, test_objects):
except TimeoutError:
test_obj.failed = True
result = 'TimedOut'
except hw_obj.SMARTNotSupportedError:
except hw_disk.SMARTNotSupportedError:
# Pass test since it doesn't apply
test_obj.passed = True
result = 'N/A'
@ -1463,7 +1466,7 @@ def show_results(state):
if name.startswith('CPU')]
if any(cpu_tests_enabled):
std.print_success('CPU:')
std.print_report(state.cpu.generate_report())
std.print_report(state.system.generate_report())
std.print_standard(' ')
# Disk Tests