diff --git a/scripts/wk/hw/obj.py b/scripts/wk/hw/obj.py index 8df8a459..ddc9d2ab 100644 --- a/scripts/wk/hw/obj.py +++ b/scripts/wk/hw/obj.py @@ -11,7 +11,7 @@ from collections import OrderedDict from wk.cfg.hw import KNOWN_ATTRIBUTES from wk.exe import get_json_from_command, run_program -from wk.std import bytes_to_string, color_string, string_to_bytes +from wk.std import bytes_to_string, color_string, sleep, string_to_bytes # STATIC VARIABLES @@ -43,6 +43,9 @@ REGEX_POWER_ON_TIME = re.compile( class CriticalHardwareError(RuntimeError): """Exception used for critical hardware failures.""" +class SMARTNotSupportedError(TypeError): + """Exception used for disks lacking SMART support.""" + # Classes class CpuRam(): @@ -233,7 +236,6 @@ class Disk(): # Done return report - def generate_report(self): """Generate Disk report, returns list.""" report = [] @@ -307,6 +309,18 @@ class Disk(): # Done return labels + def get_smart_self_test_details(self): + """Shorthand to get deeply nested self-test details, returns dict.""" + details = {} + try: + details = self.smartctl['ata_smart_data']['self_test'] + except (KeyError, TypeError): + # Assuming disk lacks SMART support, ignore and return empty dict. + pass + + # Done + return details + def is_4k_aligned(self): """Check that all disk partitions are aligned, returns bool.""" aligned = True @@ -345,13 +359,8 @@ class Disk(): LOG.error('%s %s', self.path, msg) # SMART self-test status - test_status = '' - try: - test_status = self.smartctl['ata_smart_data']['self_test']['status'] - except (KeyError, TypeError): - # Assuming disk doesn't support SMART self-tests - pass - if 'remaining_percent' in test_status: + test_details = self.get_smart_self_test_details() + if 'remaining_percent' in test_details.get('status', ''): blocking_event_encountered = True msg = 'SMART self-test in progress' self.add_note(msg, 'RED') @@ -361,6 +370,80 @@ class Disk(): if blocking_event_encountered: raise CriticalHardwareError(f'Critical error(s) for: {self.path}') + def run_self_test(self, log_path): + """Run disk self-test and check if it passed, returns bool. + + NOTE: This function is here to reserve a place for future + NVMe self-tests announced in NVMe spec v1.3. + """ + result = self.run_smart_self_test(log_path) + return result + + def run_smart_self_test(self, log_path): + """Run SMART self-test and check if it passed, returns bool. + + NOTE: An exception will be raised if the disk lacks SMART support. + """ + finished = False + result = None + started = False + status_str = 'Starting self-test...' + test_details = self.get_smart_self_test_details() + test_minutes = 15 + + # Check if disk supports self-tests + if not test_details: + raise SMARTNotSupportedError( + f'SMART self-test not supported for {self.path}') + + # Get real test length + test_minutes = test_details.get('polling_minutes', {}).get('short', 5) + test_minutes = int(test_minutes) + 10 + + # Start test + cmd = [ + 'sudo', + 'smartctl', + '--tolerance=normal', + '--test=short', + self.path, + ] + run_program(cmd, check=False) + + # Monitor progress (in five second intervals) + for _i in range(int(test_minutes*60/5)): + sleep(5) + + # Update status + self.update_smart_details() + test_details = self.get_smart_self_test_details() + + # Check test progress + if started: + status_str = test_details.get('status', {}).get('string', 'Unknown') + status_str = status_str.capitalize() + + # Update log + with open(log_path, 'w') as _f: + _f.write(f'SMART self-test status for {self.path}:\n {status_str}') + + # Check if finished + if 'remaining_percent' not in test_details['status']: + finished = True + break + + elif 'remaining_percent' in test_details['status']: + started = True + + # Check result + if finished: + result = test_details.get('status', {}).get('passed', False) + elif started: + raise TimeoutError(f'SMART self-test timed out for {self.path}') + + # Done + return result + def update_smart_details(self): """Update SMART details via smartctl.""" self.attributes = {} @@ -433,9 +516,9 @@ def get_disk_details_macos(path): try: plist_data = plistlib.loads(proc.stdout) except (TypeError, ValueError): + # Invalid / corrupt plist data? return empty dict to avoid crash LOG.error('Failed to get diskutil list for %s', path) - # TODO: Figure this out - return details #Bail + return details # Parse "list" details details = plist_data.get('AllDisksAndPartitions', [{}])[0]