From 46eb737dc8dd2305569e10a86076647fb08c5811 Mon Sep 17 00:00:00 2001 From: 2Shirt <2xShirt@gmail.com> Date: Wed, 6 Apr 2022 16:22:58 -0600 Subject: [PATCH] Move SMART functions to their own file --- scripts/wk/hw/__init__.py | 1 + scripts/wk/hw/ddrescue.py | 21 +- scripts/wk/hw/diags.py | 77 ++----- scripts/wk/hw/disk.py | 364 +-------------------------------- scripts/wk/hw/smart.py | 414 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 457 insertions(+), 420 deletions(-) create mode 100644 scripts/wk/hw/smart.py diff --git a/scripts/wk/hw/__init__.py b/scripts/wk/hw/__init__.py index 821c9761..fca26622 100644 --- a/scripts/wk/hw/__init__.py +++ b/scripts/wk/hw/__init__.py @@ -10,6 +10,7 @@ from . import keyboard from . import network from . import screensavers from . import sensors +from . import smart from . import surface_scan from . import system from . import test diff --git a/scripts/wk/hw/ddrescue.py b/scripts/wk/hw/ddrescue.py index 9267e83f..4d757c27 100644 --- a/scripts/wk/hw/ddrescue.py +++ b/scripts/wk/hw/ddrescue.py @@ -28,6 +28,13 @@ from wk.cfg.ddrescue import ( DDRESCUE_SPECIFIC_PASS_SETTINGS, ) from wk.hw import disk as hw_disk +from wk.hw.smart import ( + CriticalHardwareError, + SMARTNotSupportedError, + SMARTSelfTestInProgressError, + safety_checks, + update_smart_details, + ) # STATIC VARIABLES @@ -947,8 +954,8 @@ class State(): def safety_check_destination(self): """Run safety checks for destination and abort if necessary.""" try: - self.destination.safety_checks() - except hw_disk.CriticalHardwareError as err: + safety_checks(self.destination) + except CriticalHardwareError as err: std.print_error( f'Critical error(s) detected for: {self.destination.path}', ) @@ -1491,12 +1498,12 @@ def check_destination_health(destination): # Run safety checks try: - destination.safety_checks() - except hw_disk.CriticalHardwareError: + safety_checks(destination) + except CriticalHardwareError: result = 'Critical hardware error detected on destination' - except hw_disk.SMARTSelfTestInProgressError: + except SMARTSelfTestInProgressError: result = 'SMART self-test in progress on destination' - except hw_disk.SMARTNotSupportedError: + except SMARTNotSupportedError: pass # Done @@ -2031,7 +2038,7 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True): def _update_smart_pane(): """Update SMART pane every 30 seconds.""" - state.source.update_smart_details() + update_smart_details(state.source) now = datetime.datetime.now(tz=TIMEZONE).strftime('%Y-%m-%d %H:%M %Z') with open(f'{state.log_dir}/smart.out', 'w', encoding='utf-8') as _f: _f.write( diff --git a/scripts/wk/hw/diags.py b/scripts/wk/hw/diags.py index 64fcabf9..051ab228 100644 --- a/scripts/wk/hw/diags.py +++ b/scripts/wk/hw/diags.py @@ -21,6 +21,16 @@ from wk.hw import system as hw_system from wk.hw.audio import audio_test from wk.hw.keyboard import keyboard_test from wk.hw.network import network_test +from wk.hw.smart import ( + CriticalHardwareError, + SMARTSelfTestInProgressError, + abort_self_test, + check_attributes, + check_self_test_results, + generate_attribute_report, + run_self_test, + safety_checks, + ) from wk.hw.screensavers import screensaver from wk.hw.test import Test, TestGroup @@ -121,8 +131,8 @@ class State(): continue try: - disk.safety_checks() - except hw_disk.CriticalHardwareError: + safety_checks(disk) + except CriticalHardwareError: disable_tests = True disk.add_note('Critical hardware error detected.', 'RED') if 'Disk Attributes' in disk.tests: @@ -135,7 +145,7 @@ class State(): 'Critical hardware error detected during diagnostics', 'YELLOW', ) - except hw_disk.SMARTSelfTestInProgressError as err: + except SMARTSelfTestInProgressError as err: if prep: std.print_warning(f'SMART self-test(s) in progress for {disk.path}') if std.ask('Continue with all tests disabled for this device?'): @@ -160,7 +170,7 @@ class State(): if ( 'Disk Attributes' in disk.tests and not disk.tests['Disk Attributes'].failed - and not disk.check_attributes(only_blocking=False) + and not check_attributes(disk, only_blocking=False) ): # No blocking errors encountered, but found minor attribute failures if not prep: @@ -449,32 +459,6 @@ def build_menu(cli_mode=False, quick_mode=False): return menu -def check_self_test_results(test_obj, aborted=False): - """Check SMART self-test results.""" - test_obj.report.append(std.color_string('Self-Test', 'BLUE')) - if test_obj.disabled or test_obj.status == 'Denied': - test_obj.report.append(std.color_string(f' {test_obj.status}', 'RED')) - elif test_obj.status == 'N/A' or not test_obj.dev.attributes: - test_obj.report.append(std.color_string(f' {test_obj.status}', 'YELLOW')) - else: - # Not updating SMART data here to preserve the test status for the report - # For instance if the test was aborted the report should inlcude the last - # known progress instead of just "was aborted buy host" - test_details = test_obj.dev.get_smart_self_test_details() - test_result = test_details.get('status', {}).get('string', 'Unknown') - test_obj.report.append(f' {test_result.capitalize()}') - if aborted and not (test_obj.passed or test_obj.failed): - test_obj.report.append(std.color_string(' Aborted', 'YELLOW')) - test_obj.set_status('Aborted') - elif test_obj.status == 'TimedOut': - test_obj.report.append(std.color_string(' TimedOut', 'YELLOW')) - test_obj.set_status('TimedOut') - else: - test_obj.failed = not test_obj.passed - if test_obj.failed: - test_obj.set_status('Failed') - - def cpu_stress_tests(state, test_objects): # pylint: disable=too-many-statements """CPU & cooling check using Prime95 and Sysbench.""" @@ -612,7 +596,7 @@ def disk_attribute_check(state, test_objects): test.set_status('N/A') continue - if test.dev.check_attributes(): + if check_attributes(test.dev): test.passed = True test.set_status('Passed') else: @@ -695,31 +679,6 @@ def disk_self_test(state, test_objects): threads = [] state.panes['SMART'] = [] - def _run_self_test(test_obj, log_path): - """Run self-test and handle exceptions.""" - result = None - - try: - test_obj.passed = test_obj.dev.run_self_test(log_path) - except TimeoutError: - test_obj.failed = True - result = 'TimedOut' - except hw_disk.SMARTNotSupportedError: - # Pass test since it doesn't apply - test_obj.passed = True - result = 'N/A' - - # Set status - if result: - test_obj.set_status(result) - else: - if test_obj.failed: - test_obj.set_status('Failed') - elif test_obj.passed: - test_obj.set_status('Passed') - else: - test_obj.set_status('Unknown') - # Run self-tests state.update_top_pane( f'Disk self-test{"s" if len(test_objects) > 1 else ""}', @@ -733,7 +692,7 @@ def disk_self_test(state, test_objects): # Start thread test.set_status('Working') test_log = f'{state.log_dir}/{test.dev.path.name}_selftest.log' - threads.append(exe.start_thread(_run_self_test, args=(test, test_log))) + threads.append(exe.start_thread(run_self_test, args=(test, test_log))) # Show progress if threads[-1].is_alive(): @@ -752,7 +711,7 @@ def disk_self_test(state, test_objects): except KeyboardInterrupt: aborted = True for test in test_objects: - test.dev.abort_self_test() + abort_self_test(test.dev) std.sleep(0.5) # Save report(s) @@ -787,7 +746,7 @@ def disk_surface_scan(state, test_objects): ) for disk in state.disks: failed_attributes = [ - line for line in disk.generate_attribute_report() if 'failed' in line + line for line in generate_attribute_report(disk) if 'failed' in line ] if failed_attributes: size_str = std.bytes_to_string(disk.size, use_binary=False) diff --git a/scripts/wk/hw/disk.py b/scripts/wk/hw/disk.py index 5703e1d2..471edc31 100644 --- a/scripts/wk/hw/disk.py +++ b/scripts/wk/hw/disk.py @@ -9,25 +9,15 @@ import re from dataclasses import dataclass, field from typing import Any, Union -from wk.cfg.hw import ( - ATTRIBUTE_COLORS, - KEY_NVME, - KEY_SMART, - KNOWN_DISK_ATTRIBUTES, - KNOWN_DISK_MODELS, - NVME_WARNING_KEYS, - REGEX_POWER_ON_TIME, - SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS, - ) from wk.cfg.main import KIT_NAME_SHORT from wk.exe import get_json_from_command, run_program from wk.hw.test import Test -from wk.std import ( - PLATFORM, - bytes_to_string, - color_string, - sleep, +from wk.hw.smart import ( + enable_smart, + generate_attribute_report, + update_smart_details, ) +from wk.std import PLATFORM, bytes_to_string, color_string # STATIC VARIABLES @@ -38,17 +28,6 @@ WK_LABEL_REGEX = re.compile( ) -# Exception Classes -class CriticalHardwareError(RuntimeError): - """Exception used for critical hardware failures.""" - -class SMARTNotSupportedError(TypeError): - """Exception used for disks lacking SMART support.""" - -class SMARTSelfTestInProgressError(RuntimeError): - """Exception used when a SMART self-test is in progress.""" - - # Classes @dataclass(slots=True) class Disk: @@ -76,23 +55,18 @@ class Disk: self.path = pathlib.Path(self.path).resolve() self.get_details() self.set_description() - self.enable_smart() - self.update_smart_details() + enable_smart(self) + update_smart_details(self) if not self.attributes and self.bus == 'USB': # Try using SAT LOG.warning('Using SAT for smartctl for %s', self.path) self.notes = [] self.use_sat = True - self.enable_smart() - self.update_smart_details() + enable_smart(self) + update_smart_details(self) if not self.is_4k_aligned(): self.add_note('One or more partitions are not 4K aligned', 'YELLOW') - def abort_self_test(self) -> None: - """Abort currently running non-captive self-test.""" - cmd = ['sudo', 'smartctl', '--abort', self.path] - run_program(cmd, check=False) - def add_note(self, note, color=None) -> None: """Add note that will be included in the disk report.""" if color: @@ -101,40 +75,6 @@ class Disk: self.notes.append(note) self.notes.sort() - def check_attributes(self, only_blocking=False) -> bool: - """Check if any known attributes are failing, returns bool.""" - attributes_ok = True - known_attributes = get_known_disk_attributes(self.model) - for attr, value in self.attributes.items(): - # Skip unknown attributes - if attr not in known_attributes: - continue - - # Get thresholds - blocking_attribute = known_attributes[attr].get('Blocking', False) - err_thresh = known_attributes[attr].get('Error', None) - max_thresh = known_attributes[attr].get('Maximum', None) - if not max_thresh: - max_thresh = float('inf') - - # Skip non-blocking attributes if necessary - if only_blocking and not blocking_attribute: - continue - - # Skip informational attributes - if not err_thresh: - continue - - # Check attribute - if known_attributes[attr].get('PercentageLife', False): - if 0 <= value['raw'] <= err_thresh: - attributes_ok = False - elif err_thresh <= value['raw'] < max_thresh: - attributes_ok = False - - # Done - return attributes_ok - def disable_disk_tests(self) -> None: """Disable all tests.""" LOG.warning('Disabling all tests for: %s', self.path) @@ -143,73 +83,6 @@ class Disk: test.set_status('Denied') test.disabled = True - def enable_smart(self) -> None: - """Try enabling SMART for this disk.""" - cmd = [ - 'sudo', - 'smartctl', - f'--device={"sat,auto" if self.use_sat else "auto"}', - '--tolerance=permissive', - '--smart=on', - self.path, - ] - run_program(cmd, check=False) - - def generate_attribute_report(self) -> list[str]: - """Generate attribute report, returns list.""" - known_attributes = get_known_disk_attributes(self.model) - report = [] - for attr, value in sorted(self.attributes.items()): - note = '' - value_color = 'GREEN' - - # Skip attributes not in our list - if attr not in known_attributes: - continue - - # Check for attribute note - note = known_attributes[attr].get('Note', '') - - # ID / Name - label = f'{attr:>3}' - if isinstance(attr, int): - # Assuming SMART, include hex ID and name - label += f' / {str(hex(attr))[2:].upper():0>2}: {value["name"]}' - label = f' {label.replace("_", " "):38}' - - # Value color - if known_attributes[attr].get('PercentageLife', False): - # PercentageLife values - if 0 <= value['raw'] <= known_attributes[attr]['Error']: - value_color = 'RED' - note = '(failed, % life remaining)' - elif value['raw'] < 0 or value['raw'] > 100: - value_color = 'PURPLE' - note = '(invalid?)' - else: - for threshold, color in ATTRIBUTE_COLORS: - threshold_val = known_attributes[attr].get(threshold, None) - if threshold_val and value['raw'] >= threshold_val: - value_color = color - if threshold == 'Error': - note = '(failed)' - elif threshold == 'Maximum': - note = '(invalid?)' - - # 199/C7 warning - if str(attr) == '199' and value['raw'] > 0: - note = '(bad cable?)' - - # Build colored string and append to report - line = color_string( - [label, value['raw_str'], note], - [None, value_color, 'YELLOW'], - ) - report.append(line) - - # Done - return report - def generate_report(self, header=True) -> list[str]: """Generate Disk report, returns list.""" report = [] @@ -221,7 +94,7 @@ class Disk: if self.attributes: if header: report.append(color_string('Attributes', 'BLUE')) - report.extend(self.generate_attribute_report()) + report.extend(generate_attribute_report(self)) # Notes if self.notes: @@ -294,18 +167,6 @@ class Disk: # Done return labels - def get_smart_self_test_details(self) -> dict[Any, Any]: - """Shorthand to get deeply nested self-test details, returns dict.""" - details = {} - try: - details = self.raw_smartctl['ata_smart_data']['self_test'] - except (KeyError, TypeError): - # Assuming disk lacks SMART support, ignore and return empty dict. - pass - - # Done - return details - def is_4k_aligned(self) -> bool: """Check that all disk partitions are aligned, returns bool.""" aligned = True @@ -316,138 +177,6 @@ class Disk: return aligned - def safety_checks(self) -> None: - """Run safety checks and raise an exception if necessary.""" - blocking_event_encountered = False - self.update_smart_details() - - # Attributes - if not self.check_attributes(only_blocking=True): - blocking_event_encountered = True - LOG.error('%s: Blocked for failing attribute(s)', self.path) - - # NVMe status - nvme_status = self.raw_smartctl.get('smart_status', {}).get('nvme', {}) - if nvme_status.get('media_read_only', False): - blocking_event_encountered = True - msg = 'Media has been placed in read-only mode' - self.add_note(msg, 'RED') - LOG.error('%s %s', self.path, msg) - for key in NVME_WARNING_KEYS: - if nvme_status.get(key, False): - msg = key.replace('_', ' ') - self.add_note(msg, 'YELLOW') - LOG.warning('%s %s', self.path, msg) - - # SMART overall assessment - smart_passed = True - try: - smart_passed = self.raw_smartctl['smart_status']['passed'] - except (KeyError, TypeError): - # Assuming disk doesn't support SMART overall assessment - pass - if not smart_passed: - blocking_event_encountered = True - msg = 'SMART overall self-assessment: Failed' - self.add_note(msg, 'RED') - LOG.error('%s %s', self.path, msg) - - # Raise blocking exception if necessary - if blocking_event_encountered: - raise CriticalHardwareError(f'Critical error(s) for: {self.path}') - - # SMART self-test status - test_details = self.get_smart_self_test_details() - if 'remaining_percent' in test_details.get('status', ''): - msg = f'SMART self-test in progress for: {self.path}' - LOG.error(msg) - raise SMARTSelfTestInProgressError(msg) - - def run_self_test(self, log_path) -> bool: - """Run disk self-test and check if it passed, returns bool. - - NOTE: This function is here to reserve a place for future - NVMe self-tests announced in NVMe spec v1.3. - """ - result = self.run_smart_self_test(log_path) - return result - - def run_smart_self_test(self, log_path) -> bool: - """Run SMART self-test and check if it passed, returns bool. - - NOTE: An exception will be raised if the disk lacks SMART support. - """ - finished = False - result = None - started = False - status_str = 'Starting self-test...' - test_details = self.get_smart_self_test_details() - test_minutes = 15 - size_str = bytes_to_string(self.size, use_binary=False) - header_str = color_string( - ['[', self.path.name, ' ', size_str, ']'], - [None, 'BLUE', None, 'CYAN', None], - sep='', - ) - - # Check if disk supports self-tests - if not test_details: - raise SMARTNotSupportedError( - f'SMART self-test not supported for {self.path}') - - # Get real test length - test_minutes = test_details.get('polling_minutes', {}).get('short', 5) - test_minutes = int(test_minutes) + 10 - - # Start test - with open(log_path, 'w', encoding='utf-8') as _f: - _f.write(f'{header_str}\nInitializing...') - cmd = [ - 'sudo', - 'smartctl', - '--tolerance=normal', - '--test=short', - self.path, - ] - run_program(cmd, check=False) - - # Monitor progress (in five second intervals) - for _i in range(int(test_minutes*60/5)): - sleep(5) - - # Update status - self.update_smart_details() - test_details = self.get_smart_self_test_details() - - # Check test progress - if started: - status_str = test_details.get('status', {}).get('string', 'Unknown') - status_str = status_str.capitalize() - - # Update log - with open(log_path, 'w', encoding='utf-8') as _f: - _f.write(f'{header_str}\nSMART self-test status:\n {status_str}') - - # Check if finished - if 'remaining_percent' not in test_details.get('status', {}): - finished = True - break - - elif 'remaining_percent' in test_details.get('status', {}): - started = True - elif _i * 5 >= SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS: - # Test didn't start within limit, stop waiting - break - - # Check result - if finished: - result = test_details.get('status', {}).get('passed', False) - elif started: - raise TimeoutError(f'SMART self-test timed out for {self.path}') - - # Done - return result - def set_description(self) -> None: """Set disk description from details.""" self.description = ( @@ -455,62 +184,6 @@ class Disk: f' ({self.bus}) {self.model} {self.serial}' ) - def update_smart_details(self) -> None: - """Update SMART details via smartctl.""" - updated_attributes = {} - - # Get SMART data - cmd = [ - 'sudo', - 'smartctl', - f'--device={"sat,auto" if self.use_sat else "auto"}', - '--tolerance=verypermissive', - '--all', - '--json', - self.path, - ] - self.raw_smartctl = get_json_from_command(cmd, check=False) - - # Check for attributes - if KEY_NVME in self.raw_smartctl: - for name, value in self.raw_smartctl[KEY_NVME].items(): - try: - updated_attributes[name] = { - 'name': name, - 'raw': int(value), - 'raw_str': str(value), - } - except (TypeError, ValueError): - # Ignoring invalid attribute - LOG.error('Invalid NVMe attribute: %s %s', name, value) - elif KEY_SMART in self.raw_smartctl: - for attribute in self.raw_smartctl[KEY_SMART].get('table', {}): - try: - _id = int(attribute['id']) - except (KeyError, ValueError): - # Ignoring invalid attribute - LOG.error('Invalid SMART attribute: %s', attribute) - continue - name = str(attribute.get('name', 'Unknown')).replace('_', ' ').title() - raw = int(attribute.get('raw', {}).get('value', -1)) - raw_str = attribute.get('raw', {}).get('string', 'Unknown') - - # Fix power-on time - match = REGEX_POWER_ON_TIME.match(raw_str) - if _id == 9 and match: - raw = int(match.group(1)) - - # Add to dict - updated_attributes[_id] = { - 'name': name, 'raw': raw, 'raw_str': raw_str} - - # Add note if necessary - if not updated_attributes: - self.add_note('No NVMe or SMART data available', 'YELLOW') - - # Done - self.attributes.update(updated_attributes) - # Functions def get_disk_details_linux(path) -> dict[Any, Any]: @@ -676,23 +349,6 @@ def get_disks_macos() -> list[Disk]: return disks -def get_known_disk_attributes(model) -> dict[Any, dict]: - """Get known NVMe/SMART attributes (model specific), returns dict.""" - known_attributes = KNOWN_DISK_ATTRIBUTES.copy() - - # Apply model-specific data - for regex, data in KNOWN_DISK_MODELS.items(): - if re.search(regex, model): - for attr, thresholds in data.items(): - if attr in known_attributes: - known_attributes[attr].update(thresholds) - else: - known_attributes[attr] = thresholds - - # Done - return known_attributes - - def is_4k_aligned_macos(disk_details) -> bool: """Check partition alignment using diskutil info, returns bool.""" aligned = True diff --git a/scripts/wk/hw/smart.py b/scripts/wk/hw/smart.py new file mode 100644 index 00000000..6e1a1ab5 --- /dev/null +++ b/scripts/wk/hw/smart.py @@ -0,0 +1,414 @@ +"""WizardKit: SMART test functions""" +# vim: sts=2 sw=2 ts=2 + +import logging +import re + +from typing import Any + +from wk.cfg.hw import ( + ATTRIBUTE_COLORS, + KEY_NVME, + KEY_SMART, + KNOWN_DISK_ATTRIBUTES, + KNOWN_DISK_MODELS, + NVME_WARNING_KEYS, + REGEX_POWER_ON_TIME, + SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS, + ) +from wk.exe import get_json_from_command, run_program +from wk.std import bytes_to_string, color_string, sleep + + +# STATIC VARIABLES +LOG = logging.getLogger(__name__) + + +# Exception Classes +class CriticalHardwareError(RuntimeError): + """Exception used for critical hardware failures.""" + +class SMARTNotSupportedError(TypeError): + """Exception used for disks lacking SMART support.""" + +class SMARTSelfTestInProgressError(RuntimeError): + """Exception used when a SMART self-test is in progress.""" + + +# Functions +def abort_self_test(dev) -> None: + """Abort currently running non-captive self-test.""" + cmd = ['sudo', 'smartctl', '--abort', dev.path] + run_program(cmd, check=False) + + +def check_attributes(dev, only_blocking=False) -> bool: + """Check if any known attributes are failing, returns bool.""" + attributes_ok = True + known_attributes = get_known_disk_attributes(dev.model) + for attr, value in dev.attributes.items(): + # Skip unknown attributes + if attr not in known_attributes: + continue + + # Get thresholds + blocking_attribute = known_attributes[attr].get('Blocking', False) + err_thresh = known_attributes[attr].get('Error', None) + max_thresh = known_attributes[attr].get('Maximum', None) + if not max_thresh: + max_thresh = float('inf') + + # Skip non-blocking attributes if necessary + if only_blocking and not blocking_attribute: + continue + + # Skip informational attributes + if not err_thresh: + continue + + # Check attribute + if known_attributes[attr].get('PercentageLife', False): + if 0 <= value['raw'] <= err_thresh: + attributes_ok = False + elif err_thresh <= value['raw'] < max_thresh: + attributes_ok = False + + # Done + return attributes_ok + + +def check_self_test_results(test_obj, aborted=False): + """Check SMART self-test results.""" + test_obj.report.append(color_string('Self-Test', 'BLUE')) + if test_obj.disabled or test_obj.status == 'Denied': + test_obj.report.append(color_string(f' {test_obj.status}', 'RED')) + elif test_obj.status == 'N/A' or not test_obj.dev.attributes: + test_obj.report.append(color_string(f' {test_obj.status}', 'YELLOW')) + else: + # Not updating SMART data here to preserve the test status for the report + # For instance if the test was aborted the report should inlcude the last + # known progress instead of just "was aborted by host" + test_details = get_smart_self_test_details(test_obj.dev) + test_result = test_details.get('status', {}).get('string', 'Unknown') + test_obj.report.append(f' {test_result.capitalize()}') + if aborted and not (test_obj.passed or test_obj.failed): + test_obj.report.append(color_string(' Aborted', 'YELLOW')) + test_obj.set_status('Aborted') + elif test_obj.status == 'TimedOut': + test_obj.report.append(color_string(' TimedOut', 'YELLOW')) + test_obj.set_status('TimedOut') + else: + test_obj.failed = not test_obj.passed + if test_obj.failed: + test_obj.set_status('Failed') + + +def enable_smart(dev) -> None: + """Try enabling SMART for this disk.""" + cmd = [ + 'sudo', + 'smartctl', + f'--device={"sat,auto" if dev.use_sat else "auto"}', + '--tolerance=permissive', + '--smart=on', + dev.path, + ] + run_program(cmd, check=False) + + +def generate_attribute_report(dev) -> list[str]: + """Generate attribute report, returns list.""" + known_attributes = get_known_disk_attributes(dev.model) + report = [] + for attr, value in sorted(dev.attributes.items()): + note = '' + value_color = 'GREEN' + + # Skip attributes not in our list + if attr not in known_attributes: + continue + + # Check for attribute note + note = known_attributes[attr].get('Note', '') + + # ID / Name + label = f'{attr:>3}' + if isinstance(attr, int): + # Assuming SMART, include hex ID and name + label += f' / {str(hex(attr))[2:].upper():0>2}: {value["name"]}' + label = f' {label.replace("_", " "):38}' + + # Value color + if known_attributes[attr].get('PercentageLife', False): + # PercentageLife values + if 0 <= value['raw'] <= known_attributes[attr]['Error']: + value_color = 'RED' + note = '(failed, % life remaining)' + elif value['raw'] < 0 or value['raw'] > 100: + value_color = 'PURPLE' + note = '(invalid?)' + else: + for threshold, color in ATTRIBUTE_COLORS: + threshold_val = known_attributes[attr].get(threshold, None) + if threshold_val and value['raw'] >= threshold_val: + value_color = color + if threshold == 'Error': + note = '(failed)' + elif threshold == 'Maximum': + note = '(invalid?)' + + # 199/C7 warning + if str(attr) == '199' and value['raw'] > 0: + note = '(bad cable?)' + + # Build colored string and append to report + line = color_string( + [label, value['raw_str'], note], + [None, value_color, 'YELLOW'], + ) + report.append(line) + + # Done + return report + + +def get_known_disk_attributes(model) -> dict[Any, dict]: + """Get known NVMe/SMART attributes (model specific), returns dict.""" + known_attributes = KNOWN_DISK_ATTRIBUTES.copy() + + # Apply model-specific data + for regex, data in KNOWN_DISK_MODELS.items(): + if re.search(regex, model): + for attr, thresholds in data.items(): + if attr in known_attributes: + known_attributes[attr].update(thresholds) + else: + known_attributes[attr] = thresholds + + # Done + return known_attributes + + +def get_smart_self_test_details(dev) -> dict[Any, Any]: + """Shorthand to get deeply nested self-test details, returns dict.""" + details = {} + try: + details = dev.raw_smartctl['ata_smart_data']['self_test'] + except (KeyError, TypeError): + # Assuming disk lacks SMART support, ignore and return empty dict. + pass + + # Done + return details + + +def safety_checks(dev) -> None: + """Run safety checks and raise an exception if necessary.""" + blocking_event_encountered = False + update_smart_details(dev) + + # Attributes + if not check_attributes(dev, only_blocking=True): + blocking_event_encountered = True + LOG.error('%s: Blocked for failing attribute(s)', dev.path) + + # NVMe status + nvme_status = dev.raw_smartctl.get('smart_status', {}).get('nvme', {}) + if nvme_status.get('media_read_only', False): + blocking_event_encountered = True + msg = 'Media has been placed in read-only mode' + dev.add_note(msg, 'RED') + LOG.error('%s %s', dev.path, msg) + for key in NVME_WARNING_KEYS: + if nvme_status.get(key, False): + msg = key.replace('_', ' ') + dev.add_note(msg, 'YELLOW') + LOG.warning('%s %s', dev.path, msg) + + # SMART overall assessment + smart_passed = True + try: + smart_passed = dev.raw_smartctl['smart_status']['passed'] + except (KeyError, TypeError): + # Assuming disk doesn't support SMART overall assessment + pass + if not smart_passed: + blocking_event_encountered = True + msg = 'SMART overall self-assessment: Failed' + dev.add_note(msg, 'RED') + LOG.error('%s %s', dev.path, msg) + + # Raise blocking exception if necessary + if blocking_event_encountered: + raise CriticalHardwareError(f'Critical error(s) for: {dev.path}') + + # SMART self-test status + test_details = get_smart_self_test_details(dev) + if 'remaining_percent' in test_details.get('status', ''): + msg = f'SMART self-test in progress for: {dev.path}' + LOG.error(msg) + raise SMARTSelfTestInProgressError(msg) + + +def run_self_test(test_obj, log_path) -> None: + """Run disk self-test and check if it passed, returns bool.""" + result = None + + try: + test_obj.passed = run_smart_self_test(test_obj.dev, log_path) + except TimeoutError: + test_obj.failed = True + result = 'TimedOut' + except SMARTNotSupportedError: + # Pass test since it doesn't apply + test_obj.passed = True + result = 'N/A' + + # Set status + if result: + test_obj.set_status(result) + else: + if test_obj.failed: + test_obj.set_status('Failed') + elif test_obj.passed: + test_obj.set_status('Passed') + else: + test_obj.set_status('Unknown') + + +def run_smart_self_test(dev, log_path) -> bool: + """Run SMART self-test and check if it passed, returns bool. + + NOTE: An exception will be raised if the disk lacks SMART support. + """ + finished = False + result = None + started = False + status_str = 'Starting self-test...' + test_details = get_smart_self_test_details(dev) + test_minutes = 15 + size_str = bytes_to_string(dev.size, use_binary=False) + header_str = color_string( + ['[', dev.path.name, ' ', size_str, ']'], + [None, 'BLUE', None, 'CYAN', None], + sep='', + ) + + # Check if disk supports self-tests + if not test_details: + raise SMARTNotSupportedError( + f'SMART self-test not supported for {dev.path}') + + # Get real test length + test_minutes = test_details.get('polling_minutes', {}).get('short', 5) + test_minutes = int(test_minutes) + 10 + + # Start test + with open(log_path, 'w', encoding='utf-8') as _f: + _f.write(f'{header_str}\nInitializing...') + cmd = [ + 'sudo', + 'smartctl', + '--tolerance=normal', + '--test=short', + dev.path, + ] + run_program(cmd, check=False) + + # Monitor progress (in five second intervals) + for _i in range(int(test_minutes*60/5)): + sleep(5) + + # Update status + update_smart_details(dev) + test_details = get_smart_self_test_details(dev) + + # Check test progress + if started: + status_str = test_details.get('status', {}).get('string', 'Unknown') + status_str = status_str.capitalize() + + # Update log + with open(log_path, 'w', encoding='utf-8') as _f: + _f.write(f'{header_str}\nSMART self-test status:\n {status_str}') + + # Check if finished + if 'remaining_percent' not in test_details.get('status', {}): + finished = True + break + + elif 'remaining_percent' in test_details.get('status', {}): + started = True + elif _i * 5 >= SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS: + # Test didn't start within limit, stop waiting + break + + # Check result + if finished: + result = test_details.get('status', {}).get('passed', False) + elif started: + raise TimeoutError(f'SMART self-test timed out for {dev.path}') + + # Done + return result + + +def update_smart_details(dev) -> None: + """Update SMART details via smartctl.""" + updated_attributes = {} + + # Get SMART data + cmd = [ + 'sudo', + 'smartctl', + f'--device={"sat,auto" if dev.use_sat else "auto"}', + '--tolerance=verypermissive', + '--all', + '--json', + dev.path, + ] + dev.raw_smartctl = get_json_from_command(cmd, check=False) + + # Check for attributes + if KEY_NVME in dev.raw_smartctl: + for name, value in dev.raw_smartctl[KEY_NVME].items(): + try: + updated_attributes[name] = { + 'name': name, + 'raw': int(value), + 'raw_str': str(value), + } + except (TypeError, ValueError): + # Ignoring invalid attribute + LOG.error('Invalid NVMe attribute: %s %s', name, value) + elif KEY_SMART in dev.raw_smartctl: + for attribute in dev.raw_smartctl[KEY_SMART].get('table', {}): + try: + _id = int(attribute['id']) + except (KeyError, ValueError): + # Ignoring invalid attribute + LOG.error('Invalid SMART attribute: %s', attribute) + continue + name = str(attribute.get('name', 'Unknown')).replace('_', ' ').title() + raw = int(attribute.get('raw', {}).get('value', -1)) + raw_str = attribute.get('raw', {}).get('string', 'Unknown') + + # Fix power-on time + match = REGEX_POWER_ON_TIME.match(raw_str) + if _id == 9 and match: + raw = int(match.group(1)) + + # Add to dict + updated_attributes[_id] = { + 'name': name, 'raw': raw, 'raw_str': raw_str} + + # Add note if necessary + if not updated_attributes: + dev.add_note('No NVMe or SMART data available', 'YELLOW') + + # Done + dev.attributes.update(updated_attributes) + + +if __name__ == '__main__': + print("This file is not meant to be called directly.")