"""WizardKit: SMART test functions""" # vim: sts=2 sw=2 ts=2 import copy import logging import re from typing import Any from wk.cfg.hw import ( ATTRIBUTE_COLORS, KEY_NVME, KEY_SMART, KNOWN_DISK_ATTRIBUTES, KNOWN_DISK_MODELS, NVME_WARNING_KEYS, REGEX_POWER_ON_TIME, SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS, ) from wk.exe import get_json_from_command, run_program from wk.std import bytes_to_string, sleep from wk.ui import ansi # STATIC VARIABLES LOG = logging.getLogger(__name__) # Functions def abort_self_test(dev) -> None: """Abort currently running non-captive self-test.""" cmd = ['sudo', 'smartctl', '--abort', dev.path] run_program(cmd, check=False) def build_self_test_report(test_obj, aborted=False) -> None: """Check self-test results and build report (saved to test_obj). NOTE: Not updating SMART data to preserve the result for the report. For instance if the test was aborted the report should include the last known progress instead of just "was aborted by host." """ report = [ansi.color_string('Self-Test', 'BLUE')] test_result = get_smart_self_test_last_result(test_obj.dev) # Build report if test_obj.disabled or test_obj.status == 'Denied': report.append(ansi.color_string(f' {test_obj.status}', 'RED')) elif test_obj.status == 'N/A' or not test_obj.dev.attributes: report.append(ansi.color_string(f' {test_obj.status}', 'YELLOW')) else: # Other cases include self-test result string if test_obj.status == 'TestInProgress': report.append(ansi.color_string(' Failed to stop previous test', 'RED')) test_obj.set_status('Failed') elif test_obj.status == 'TimedOut': report.append(ansi.color_string(' TimedOut', 'YELLOW')) elif aborted and not (test_obj.passed or test_obj.failed): report.append(ansi.color_string(' Aborted', 'YELLOW')) test_obj.set_status('Aborted') report.append(f' {test_result}') # Done test_obj.report.extend(report) def check_attributes(dev, only_blocking=False) -> bool: """Check if any known attributes are failing, returns bool.""" attributes_ok = True for attr, value in dev.attributes.items(): # Skip unknown attributes if attr not in dev.known_attributes: continue # Get thresholds blocking_attribute = dev.known_attributes[attr].get('Blocking', False) err_thresh = dev.known_attributes[attr].get('Error', None) max_thresh = dev.known_attributes[attr].get('Maximum', None) if not max_thresh: max_thresh = float('inf') # Skip non-blocking attributes if necessary if only_blocking and not blocking_attribute: continue # Skip informational attributes if not err_thresh: continue # Check attribute if dev.known_attributes[attr].get('PercentageLife', False): if 0 <= value['raw'] <= err_thresh: attributes_ok = False elif err_thresh <= value['raw'] < max_thresh: attributes_ok = False # Done return attributes_ok def enable_smart(dev) -> None: """Try enabling SMART for this disk.""" cmd = [ 'sudo', 'smartctl', '--device=auto', '--tolerance=permissive', '--smart=on', dev.path, ] run_program(cmd, check=False) def generate_attribute_report(dev, only_failed=False) -> list[str]: """Generate attribute report, returns list.""" report = [] for attr, value in sorted(dev.attributes.items()): # Skip attributes not in our list if attr not in dev.known_attributes: continue # ID / Name label = f'{attr:>3}' if isinstance(attr, int): # Assuming SMART, include hex ID and name label += f' / {str(hex(attr))[2:].upper():0>2}: {value["name"]}' label = f' {label.replace("_", " "):38}' # Color & Note value_color, note = get_attribute_value_color_and_note(dev, attr, value) # Skip non-failing attributes if requested ## NOTE: This is a naive test and will include 'invalid' attributes if only_failed and not note: continue # Build colored string and append to report line = ansi.color_string( [label, get_attribute_value_string(dev, attr), note], [None, value_color, 'YELLOW'], ) report.append(line) # Done return report def get_attribute_value_color_and_note(dev, attr, value) -> tuple[str, str]: """Get attribute color and note based on SMART data.""" value_color = 'GREEN' note = dev.known_attributes[attr].get('Note', '') # Value value_color if dev.known_attributes[attr].get('PercentageLife', False): # PercentageLife values if 0 <= value['raw'] <= dev.known_attributes[attr]['Error']: value_color = 'RED' note = '(failed, % life remaining)' elif value['raw'] < 0 or value['raw'] > 100: value_color = 'PURPLE' note = '(invalid?)' else: for threshold, color in ATTRIBUTE_COLORS: threshold_val = dev.known_attributes[attr].get(threshold, None) if threshold_val and value['raw'] >= threshold_val: value_color = color if threshold == 'Error': note = '(failed)' elif threshold == 'Maximum': note = '(invalid?)' # 199/C7 warning if str(attr) == '199' and value['raw'] > 0: note = '(bad cable?)' # Done return (value_color, note) def get_attribute_value_string(dev, attr) -> str: """Get attribute value string and report if it has changed.""" current_value = dev.attributes.get(attr, {}) initial_value = dev.initial_attributes.get(attr, {}) value_str = current_value.get('raw_str', '') # Compare current value against initial value if ( current_value.get('raw', None) is None or initial_value.get('raw', None) is None ): return value_str if current_value['raw'] != initial_value['raw']: value_str = ( f'{initial_value.get("raw_str", "?")} --> ' f'{current_value.get("raw_str", "?")}' ) # Done return value_str def get_known_disk_attributes(model) -> dict[str | int, dict[str, Any]]: """Get known disk attributes based on the device model.""" known_attributes = copy.deepcopy(KNOWN_DISK_ATTRIBUTES) # Apply model-specific data for regex, data in KNOWN_DISK_MODELS.items(): if not re.search(regex, model): continue for attr, thresholds in data.items(): if attr in known_attributes: known_attributes[attr].update(thresholds) else: known_attributes[attr] = copy.deepcopy(thresholds) # Done return known_attributes def get_smart_self_test_details(dev) -> dict[str, Any]: """Shorthand to get deeply nested self-test details, returns dict.""" details = {} try: details = dev.raw_smartctl['ata_smart_data']['self_test'] except (KeyError, TypeError): # Assuming disk lacks SMART support, ignore and return empty dict. pass # Done return details def get_smart_self_test_last_result(dev) -> str: """Get last SMART self-test result, returns str.""" result = 'Unknown' # Parse SMART data data = dev.raw_smartctl.get( 'ata_smart_self_test_log', {}).get( 'standard', {}).get( 'table', []) try: data = data[0] except IndexError: # No results found return result # Build result string result = ( f'Power-on hours: {data.get("lifetime_hours", "?")}' f', Type: {data.get("type", {}).get("string", "?")}' f', Passed: {data.get("status", {}).get("passed", "?")}' f', Result: {data.get("status", {}).get("string", "?")}' ) # Done return result def monitor_smart_self_test(test_obj, header_str, log_path) -> bool: """Monitor SMART self-test status and update test_obj, returns bool.""" started = False finished = False status_str = 'Starting self-test...' test_details = get_smart_self_test_details(test_obj.dev) test_minutes = 15 # Get real test length test_minutes = test_details.get('polling_minutes', {}).get('short', 5) test_minutes = int(test_minutes) + 10 # Monitor progress (in five second intervals) for _i in range(int(test_minutes*60/5)): sleep(5) # Update log ## NOTE: This is run at least once with the default "Starting..." status with open(log_path, 'w', encoding='utf-8') as _f: _f.write(f'{header_str}\nSMART self-test status:\n {status_str}') # Update status update_smart_details(test_obj.dev) test_details = get_smart_self_test_details(test_obj.dev) # Check if test started started = started or 'remaining_percent' in test_details.get('status', {}) if not started: if _i * 5 >= SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS: # Test didn't start within limit, stop waiting abort_self_test(test_obj.dev) result = get_smart_self_test_last_result(test_obj.dev) if result == 'Unknown': result = 'SMART self-test failed to start' test_obj.failed = True test_obj.set_status('TimedOut') break # Still within starting limit, continue to next loop continue # Check test progress status_str = test_details.get('status', {}).get('string', 'Unknown') status_str = status_str.capitalize() # Check if finished if 'remaining_percent' not in test_details.get('status', {}): finished = True break # Check if timed out if started and not finished: test_obj.failed = True test_obj.set_status('TimedOut') # Done return finished def run_self_test(test_obj, log_path) -> None: """Run disk self-test and update test results. NOTE: This function is here to reserve a place for future NVMe self-tests announced in NVMe spec v1.3. """ run_smart_self_test(test_obj, log_path) def run_smart_self_test(test_obj, log_path) -> None: """Run SMART self-test and check if it passed, returns None. NOTE: An exception will be raised if the disk lacks SMART support. """ finished = False test_details = get_smart_self_test_details(test_obj.dev) size_str = bytes_to_string(test_obj.dev.size, use_binary=False) header_str = ansi.color_string( ['[', test_obj.dev.path.name, ' ', size_str, ']'], [None, 'BLUE', None, 'CYAN', None], sep='', ) # Check if disk supports self-tests if not test_details: # Mark test as passed since it doesn't apply test_obj.passed = True test_obj.set_status('N/A') build_self_test_report(test_obj) return # Update status with open(log_path, 'w', encoding='utf-8') as _f: _f.write(f'{header_str}\nInitializing...') # Check for, and stop, self-test if currently in-progress if self_test_in_progress(test_obj.dev): abort_self_test(test_obj.dev) for _ in range(6): # Wait up to a minute for current test to exit sleep(10) update_smart_details(test_obj.dev) if not self_test_in_progress(test_obj.dev): break # Recheck if self-test is in-progress, bail if so if self_test_in_progress(test_obj.dev): test_obj.failed = True test_obj.set_status('TestInProgress') build_self_test_report(test_obj) return # Start test cmd = [ 'sudo', 'smartctl', '--tolerance=normal', '--test=short', test_obj.dev.path, ] run_program(cmd, check=False) # Monitor progress finished = monitor_smart_self_test(test_obj, header_str, log_path) # Check result if finished: test_details = get_smart_self_test_details(test_obj.dev) test_obj.passed = test_details.get('status', {}).get('passed', False) test_obj.failed = test_obj.failed or not test_obj.passed # Set status if test_obj.status == 'TimedOut': # Preserve TimedOut status pass elif test_obj.failed: test_obj.set_status('Failed') elif test_obj.passed: test_obj.set_status('Passed') else: test_obj.set_status('Unknown') # Build report build_self_test_report(test_obj) def smart_status_ok(dev) -> bool: """Check SMART attributes and overall assessment, returns bool.""" blocking_event_encountered = False update_smart_details(dev) # Attributes if not check_attributes(dev, only_blocking=True): blocking_event_encountered = True LOG.error('%s: Blocked for failing attribute(s)', dev.path) # NVMe status nvme_status = dev.raw_smartctl.get('smart_status', {}).get('nvme', {}) if nvme_status.get('media_read_only', False): blocking_event_encountered = True msg = 'Media has been placed in read-only mode' dev.add_note(msg, 'RED') LOG.error('%s %s', dev.path, msg) for key in NVME_WARNING_KEYS: if nvme_status.get(key, False): msg = key.replace('_', ' ') dev.add_note(msg, 'YELLOW') LOG.warning('%s %s', dev.path, msg) # SMART overall assessment smart_passed = True try: smart_passed = dev.raw_smartctl['smart_status']['passed'] except (KeyError, TypeError): # Assuming disk doesn't support SMART overall assessment pass if not smart_passed: blocking_event_encountered = True msg = 'SMART overall self-assessment: Failed' dev.add_note(msg, 'RED') LOG.error('%s %s', dev.path, msg) # Done return not blocking_event_encountered def self_test_in_progress(dev) -> bool: """Check if SMART self-test is in progress, returns bool.""" test_details = get_smart_self_test_details(dev) return 'remaining_percent' in test_details.get('status', '') def update_smart_details(dev) -> None: """Update SMART details via smartctl.""" updated_attributes = {} # Bail if device was disconnected if not dev.present: dev.add_note('Device disconnected', 'RED') return # Get SMART data cmd = [ 'sudo', 'smartctl', '--device=auto', '--tolerance=verypermissive', '--all', '--json', dev.path, ] dev.raw_smartctl = get_json_from_command(cmd, check=False) # Check for attributes if KEY_NVME in dev.raw_smartctl: for name, value in dev.raw_smartctl[KEY_NVME].items(): try: updated_attributes[name] = { 'name': name, 'raw': int(value), 'raw_str': str(value), } except (TypeError, ValueError): # Ignoring invalid attribute LOG.error('Invalid NVMe attribute: %s %s', name, value) elif KEY_SMART in dev.raw_smartctl: for attribute in dev.raw_smartctl[KEY_SMART].get('table', {}): try: _id = int(attribute['id']) except (KeyError, ValueError): # Ignoring invalid attribute LOG.error('Invalid SMART attribute: %s', attribute) continue name = str(attribute.get('name', 'Unknown')).replace('_', ' ').title() raw = int(attribute.get('raw', {}).get('value', -1)) raw_str = attribute.get('raw', {}).get('string', 'Unknown') # Fix power-on time match = REGEX_POWER_ON_TIME.match(raw_str) if _id == 9 and match: raw = int(match.group(1)) # Add to dict updated_attributes[_id] = { 'name': name, 'raw': raw, 'raw_str': raw_str} # Add note if necessary if not updated_attributes: dev.add_note('No NVMe or SMART data available', 'YELLOW') # Update iniital_attributes if needed if not dev.initial_attributes: dev.initial_attributes = copy.deepcopy(updated_attributes) # Done dev.attributes.update(updated_attributes) if __name__ == '__main__': print("This file is not meant to be called directly.")