"""WizardKit: SMART test functions""" # vim: sts=2 sw=2 ts=2 import logging import re from typing import Any from wk.cfg.hw import ( ATTRIBUTE_COLORS, KEY_NVME, KEY_SMART, KNOWN_DISK_ATTRIBUTES, KNOWN_DISK_MODELS, NVME_WARNING_KEYS, REGEX_POWER_ON_TIME, SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS, ) from wk.exe import get_json_from_command, run_program from wk.std import bytes_to_string, color_string, sleep # STATIC VARIABLES LOG = logging.getLogger(__name__) # Exception Classes class CriticalHardwareError(RuntimeError): """Exception used for critical hardware failures.""" class SMARTNotSupportedError(TypeError): """Exception used for disks lacking SMART support.""" class SMARTSelfTestInProgressError(RuntimeError): """Exception used when a SMART self-test is in progress.""" # Functions def abort_self_test(dev) -> None: """Abort currently running non-captive self-test.""" cmd = ['sudo', 'smartctl', '--abort', dev.path] run_program(cmd, check=False) def check_attributes(dev, only_blocking=False) -> bool: """Check if any known attributes are failing, returns bool.""" attributes_ok = True known_attributes = get_known_disk_attributes(dev.model) for attr, value in dev.attributes.items(): # Skip unknown attributes if attr not in known_attributes: continue # Get thresholds blocking_attribute = known_attributes[attr].get('Blocking', False) err_thresh = known_attributes[attr].get('Error', None) max_thresh = known_attributes[attr].get('Maximum', None) if not max_thresh: max_thresh = float('inf') # Skip non-blocking attributes if necessary if only_blocking and not blocking_attribute: continue # Skip informational attributes if not err_thresh: continue # Check attribute if known_attributes[attr].get('PercentageLife', False): if 0 <= value['raw'] <= err_thresh: attributes_ok = False elif err_thresh <= value['raw'] < max_thresh: attributes_ok = False # Done return attributes_ok def check_self_test_results(test_obj, aborted=False) -> None: """Check SMART self-test results.""" test_obj.report.append(color_string('Self-Test', 'BLUE')) if test_obj.disabled or test_obj.status == 'Denied': test_obj.report.append(color_string(f' {test_obj.status}', 'RED')) elif test_obj.status == 'N/A' or not test_obj.dev.attributes: test_obj.report.append(color_string(f' {test_obj.status}', 'YELLOW')) else: # Not updating SMART data here to preserve the test status for the report # For instance if the test was aborted the report should inlcude the last # known progress instead of just "was aborted by host" test_details = get_smart_self_test_details(test_obj.dev) test_result = test_details.get('status', {}).get('string', 'Unknown') test_obj.report.append(f' {test_result.capitalize()}') if aborted and not (test_obj.passed or test_obj.failed): test_obj.report.append(color_string(' Aborted', 'YELLOW')) test_obj.set_status('Aborted') elif test_obj.status == 'TimedOut': test_obj.report.append(color_string(' TimedOut', 'YELLOW')) test_obj.set_status('TimedOut') else: test_obj.failed = not test_obj.passed if test_obj.failed: test_obj.set_status('Failed') def enable_smart(dev) -> None: """Try enabling SMART for this disk.""" cmd = [ 'sudo', 'smartctl', f'--device={"sat,auto" if dev.use_sat else "auto"}', '--tolerance=permissive', '--smart=on', dev.path, ] run_program(cmd, check=False) def generate_attribute_report(dev) -> list[str]: """Generate attribute report, returns list.""" known_attributes = get_known_disk_attributes(dev.model) report = [] for attr, value in sorted(dev.attributes.items()): note = '' value_color = 'GREEN' # Skip attributes not in our list if attr not in known_attributes: continue # Check for attribute note note = known_attributes[attr].get('Note', '') # ID / Name label = f'{attr:>3}' if isinstance(attr, int): # Assuming SMART, include hex ID and name label += f' / {str(hex(attr))[2:].upper():0>2}: {value["name"]}' label = f' {label.replace("_", " "):38}' # Value color if known_attributes[attr].get('PercentageLife', False): # PercentageLife values if 0 <= value['raw'] <= known_attributes[attr]['Error']: value_color = 'RED' note = '(failed, % life remaining)' elif value['raw'] < 0 or value['raw'] > 100: value_color = 'PURPLE' note = '(invalid?)' else: for threshold, color in ATTRIBUTE_COLORS: threshold_val = known_attributes[attr].get(threshold, None) if threshold_val and value['raw'] >= threshold_val: value_color = color if threshold == 'Error': note = '(failed)' elif threshold == 'Maximum': note = '(invalid?)' # 199/C7 warning if str(attr) == '199' and value['raw'] > 0: note = '(bad cable?)' # Build colored string and append to report line = color_string( [label, value['raw_str'], note], [None, value_color, 'YELLOW'], ) report.append(line) # Done return report def get_known_disk_attributes(model) -> dict[Any, dict]: """Get known NVMe/SMART attributes (model specific), returns dict.""" known_attributes = KNOWN_DISK_ATTRIBUTES.copy() # Apply model-specific data for regex, data in KNOWN_DISK_MODELS.items(): if re.search(regex, model): for attr, thresholds in data.items(): if attr in known_attributes: known_attributes[attr].update(thresholds) else: known_attributes[attr] = thresholds # Done return known_attributes def get_smart_self_test_details(dev) -> dict[Any, Any]: """Shorthand to get deeply nested self-test details, returns dict.""" details = {} try: details = dev.raw_smartctl['ata_smart_data']['self_test'] except (KeyError, TypeError): # Assuming disk lacks SMART support, ignore and return empty dict. pass # Done return details def safety_checks(dev) -> None: """Run safety checks and raise an exception if necessary.""" blocking_event_encountered = False update_smart_details(dev) # Attributes if not check_attributes(dev, only_blocking=True): blocking_event_encountered = True LOG.error('%s: Blocked for failing attribute(s)', dev.path) # NVMe status nvme_status = dev.raw_smartctl.get('smart_status', {}).get('nvme', {}) if nvme_status.get('media_read_only', False): blocking_event_encountered = True msg = 'Media has been placed in read-only mode' dev.add_note(msg, 'RED') LOG.error('%s %s', dev.path, msg) for key in NVME_WARNING_KEYS: if nvme_status.get(key, False): msg = key.replace('_', ' ') dev.add_note(msg, 'YELLOW') LOG.warning('%s %s', dev.path, msg) # SMART overall assessment smart_passed = True try: smart_passed = dev.raw_smartctl['smart_status']['passed'] except (KeyError, TypeError): # Assuming disk doesn't support SMART overall assessment pass if not smart_passed: blocking_event_encountered = True msg = 'SMART overall self-assessment: Failed' dev.add_note(msg, 'RED') LOG.error('%s %s', dev.path, msg) # Raise blocking exception if necessary if blocking_event_encountered: raise CriticalHardwareError(f'Critical error(s) for: {dev.path}') # SMART self-test status test_details = get_smart_self_test_details(dev) if 'remaining_percent' in test_details.get('status', ''): msg = f'SMART self-test in progress for: {dev.path}' LOG.error(msg) raise SMARTSelfTestInProgressError(msg) def run_self_test(test_obj, log_path) -> None: """Run disk self-test and check if it passed, returns bool.""" result = None try: test_obj.passed = run_smart_self_test(test_obj.dev, log_path) except TimeoutError: test_obj.failed = True result = 'TimedOut' except SMARTNotSupportedError: # Pass test since it doesn't apply test_obj.passed = True result = 'N/A' # Set status if result: test_obj.set_status(result) else: if test_obj.failed: test_obj.set_status('Failed') elif test_obj.passed: test_obj.set_status('Passed') else: test_obj.set_status('Unknown') def run_smart_self_test(dev, log_path) -> bool: """Run SMART self-test and check if it passed, returns bool. NOTE: An exception will be raised if the disk lacks SMART support. """ finished = False result = None started = False status_str = 'Starting self-test...' test_details = get_smart_self_test_details(dev) test_minutes = 15 size_str = bytes_to_string(dev.size, use_binary=False) header_str = color_string( ['[', dev.path.name, ' ', size_str, ']'], [None, 'BLUE', None, 'CYAN', None], sep='', ) # Check if disk supports self-tests if not test_details: raise SMARTNotSupportedError( f'SMART self-test not supported for {dev.path}') # Get real test length test_minutes = test_details.get('polling_minutes', {}).get('short', 5) test_minutes = int(test_minutes) + 10 # Start test with open(log_path, 'w', encoding='utf-8') as _f: _f.write(f'{header_str}\nInitializing...') cmd = [ 'sudo', 'smartctl', '--tolerance=normal', '--test=short', dev.path, ] run_program(cmd, check=False) # Monitor progress (in five second intervals) for _i in range(int(test_minutes*60/5)): sleep(5) # Update status update_smart_details(dev) test_details = get_smart_self_test_details(dev) # Check test progress if started: status_str = test_details.get('status', {}).get('string', 'Unknown') status_str = status_str.capitalize() # Update log with open(log_path, 'w', encoding='utf-8') as _f: _f.write(f'{header_str}\nSMART self-test status:\n {status_str}') # Check if finished if 'remaining_percent' not in test_details.get('status', {}): finished = True break elif 'remaining_percent' in test_details.get('status', {}): started = True elif _i * 5 >= SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS: # Test didn't start within limit, stop waiting break # Check result if finished: result = test_details.get('status', {}).get('passed', False) elif started: raise TimeoutError(f'SMART self-test timed out for {dev.path}') # Done return result def update_smart_details(dev) -> None: """Update SMART details via smartctl.""" updated_attributes = {} # Get SMART data cmd = [ 'sudo', 'smartctl', f'--device={"sat,auto" if dev.use_sat else "auto"}', '--tolerance=verypermissive', '--all', '--json', dev.path, ] dev.raw_smartctl = get_json_from_command(cmd, check=False) # Check for attributes if KEY_NVME in dev.raw_smartctl: for name, value in dev.raw_smartctl[KEY_NVME].items(): try: updated_attributes[name] = { 'name': name, 'raw': int(value), 'raw_str': str(value), } except (TypeError, ValueError): # Ignoring invalid attribute LOG.error('Invalid NVMe attribute: %s %s', name, value) elif KEY_SMART in dev.raw_smartctl: for attribute in dev.raw_smartctl[KEY_SMART].get('table', {}): try: _id = int(attribute['id']) except (KeyError, ValueError): # Ignoring invalid attribute LOG.error('Invalid SMART attribute: %s', attribute) continue name = str(attribute.get('name', 'Unknown')).replace('_', ' ').title() raw = int(attribute.get('raw', {}).get('value', -1)) raw_str = attribute.get('raw', {}).get('string', 'Unknown') # Fix power-on time match = REGEX_POWER_ON_TIME.match(raw_str) if _id == 9 and match: raw = int(match.group(1)) # Add to dict updated_attributes[_id] = { 'name': name, 'raw': raw, 'raw_str': raw_str} # Add note if necessary if not updated_attributes: dev.add_note('No NVMe or SMART data available', 'YELLOW') # Done dev.attributes.update(updated_attributes) if __name__ == '__main__': print("This file is not meant to be called directly.")