"""WizardKit: SMART test functions""" # vim: sts=2 sw=2 ts=2 import copy import logging import re from typing import Any from wk.cfg.hw import ( ATTRIBUTE_COLORS, KEY_NVME, KEY_SMART, KNOWN_DISK_ATTRIBUTES, KNOWN_DISK_MODELS, NVME_WARNING_KEYS, REGEX_POWER_ON_TIME, SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS, ) from wk.exe import get_json_from_command, run_program from wk.std import bytes_to_string, color_string, sleep # STATIC VARIABLES LOG = logging.getLogger(__name__) # Functions def abort_self_test(dev) -> None: """Abort currently running non-captive self-test.""" cmd = ['sudo', 'smartctl', '--abort', dev.path] run_program(cmd, check=False) def build_self_test_report(test_obj, aborted=False) -> None: """Check self-test results and build report (saved to test_obj). NOTE: Not updating SMART data to preserve the result for the report. For instance if the test was aborted the report should include the last known progress instead of just "was aborted by host." """ report = [color_string('Self-Test', 'BLUE')] test_details = get_smart_self_test_details(test_obj.dev) test_result = test_details.get('status', {}).get('string', 'Unknown') # Build report if test_obj.disabled or test_obj.status == 'Denied': report.append(color_string(f' {test_obj.status}', 'RED')) elif test_obj.status == 'N/A' or not test_obj.dev.attributes: report.append(color_string(f' {test_obj.status}', 'YELLOW')) elif test_obj.status == 'TestInProgress': report.append(color_string(' Failed to stop previous test', 'RED')) test_obj.set_status('Failed') else: # Other cases include self-test result string report.append(f' {test_result.capitalize()}') if aborted and not (test_obj.passed or test_obj.failed): report.append(color_string(' Aborted', 'YELLOW')) test_obj.set_status('Aborted') elif test_obj.status == 'TimedOut': report.append(color_string(' TimedOut', 'YELLOW')) # Done test_obj.report.extend(report) def check_attributes(dev, only_blocking=False) -> bool: """Check if any known attributes are failing, returns bool.""" attributes_ok = True for attr, value in dev.attributes.items(): # Skip unknown attributes if attr not in dev.known_attributes: continue # Get thresholds blocking_attribute = dev.known_attributes[attr].get('Blocking', False) err_thresh = dev.known_attributes[attr].get('Error', None) max_thresh = dev.known_attributes[attr].get('Maximum', None) if not max_thresh: max_thresh = float('inf') # Skip non-blocking attributes if necessary if only_blocking and not blocking_attribute: continue # Skip informational attributes if not err_thresh: continue # Check attribute if dev.known_attributes[attr].get('PercentageLife', False): if 0 <= value['raw'] <= err_thresh: attributes_ok = False elif err_thresh <= value['raw'] < max_thresh: attributes_ok = False # Done return attributes_ok def enable_smart(dev) -> None: """Try enabling SMART for this disk.""" cmd = [ 'sudo', 'smartctl', f'--device={"sat,auto" if dev.use_sat else "auto"}', '--tolerance=permissive', '--smart=on', dev.path, ] run_program(cmd, check=False) def generate_attribute_report(dev) -> list[str]: """Generate attribute report, returns list.""" report = [] for attr, value in sorted(dev.attributes.items()): note = '' value_color = 'GREEN' # Skip attributes not in our list if attr not in dev.known_attributes: continue # Check for attribute note note = dev.known_attributes[attr].get('Note', '') # ID / Name label = f'{attr:>3}' if isinstance(attr, int): # Assuming SMART, include hex ID and name label += f' / {str(hex(attr))[2:].upper():0>2}: {value["name"]}' label = f' {label.replace("_", " "):38}' # Value color if dev.known_attributes[attr].get('PercentageLife', False): # PercentageLife values if 0 <= value['raw'] <= dev.known_attributes[attr]['Error']: value_color = 'RED' note = '(failed, % life remaining)' elif value['raw'] < 0 or value['raw'] > 100: value_color = 'PURPLE' note = '(invalid?)' else: for threshold, color in ATTRIBUTE_COLORS: threshold_val = dev.known_attributes[attr].get(threshold, None) if threshold_val and value['raw'] >= threshold_val: value_color = color if threshold == 'Error': note = '(failed)' elif threshold == 'Maximum': note = '(invalid?)' # 199/C7 warning if str(attr) == '199' and value['raw'] > 0: note = '(bad cable?)' # Build colored string and append to report line = color_string( [label, value['raw_str'], note], [None, value_color, 'YELLOW'], ) report.append(line) # Done return report def get_known_disk_attributes(model) -> None: """Get known disk attributes based on the device model.""" known_attributes = copy.deepcopy(KNOWN_DISK_ATTRIBUTES) # Apply model-specific data for regex, data in KNOWN_DISK_MODELS.items(): if not re.search(regex, model): continue for attr, thresholds in data.items(): if attr in known_attributes: known_attributes[attr].update(thresholds) else: known_attributes[attr] = copy.deepcopy(thresholds) # Done return known_attributes def get_smart_self_test_details(dev) -> dict[Any, Any]: """Shorthand to get deeply nested self-test details, returns dict.""" details = {} try: details = dev.raw_smartctl['ata_smart_data']['self_test'] except (KeyError, TypeError): # Assuming disk lacks SMART support, ignore and return empty dict. pass # Done return details def monitor_smart_self_test(test_obj, header_str, log_path) -> bool: """Monitor SMART self-test status and update test_obj, returns bool.""" started = False finished = False status_str = 'Starting self-test...' test_details = get_smart_self_test_details(test_obj.dev) test_minutes = 15 # Get real test length test_minutes = test_details.get('polling_minutes', {}).get('short', 5) test_minutes = int(test_minutes) + 10 # Monitor progress (in five second intervals) for _i in range(int(test_minutes*60/5)): sleep(5) # Update log ## NOTE: This is run at least once with the default "Starting..." status with open(log_path, 'w', encoding='utf-8') as _f: _f.write(f'{header_str}\nSMART self-test status:\n {status_str}') # Update status update_smart_details(test_obj.dev) test_details = get_smart_self_test_details(test_obj.dev) # Check if test started started = started or 'remaining_percent' in test_details.get('status', {}) if not started: if _i * 5 >= SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS: # Test didn't start within limit, stop waiting abort_self_test(test_obj.dev) test_obj.failed = True test_obj.set_status('TimedOut') break # Still within starting limit, continue to next loop continue # Check test progress status_str = test_details.get('status', {}).get('string', 'Unknown') status_str = status_str.capitalize() # Check if finished if 'remaining_percent' not in test_details.get('status', {}): finished = True break # Done return finished def run_self_test(test_obj, log_path) -> None: """Run disk self-test and update test results. NOTE: This function is here to reserve a place for future NVMe self-tests announced in NVMe spec v1.3. """ run_smart_self_test(test_obj, log_path) def run_smart_self_test(test_obj, log_path) -> bool: """Run SMART self-test and check if it passed, returns bool. NOTE: An exception will be raised if the disk lacks SMART support. """ finished = False test_details = get_smart_self_test_details(test_obj.dev) size_str = bytes_to_string(test_obj.dev.size, use_binary=False) header_str = color_string( ['[', test_obj.dev.path.name, ' ', size_str, ']'], [None, 'BLUE', None, 'CYAN', None], sep='', ) # Check if disk supports self-tests if not test_details: # Mark test as passed since it doesn't apply test_obj.passed = True test_obj.set_status('N/A') build_self_test_report(test_obj) return # Update status with open(log_path, 'w', encoding='utf-8') as _f: _f.write(f'{header_str}\nInitializing...') # Check for, and stop, self-test if currently in-progress if self_test_in_progress(test_obj.dev): abort_self_test(test_obj.dev) for _ in range(6): # Wait up to a minute for current test to exit sleep(10) update_smart_details(test_obj.dev) if not self_test_in_progress(test_obj.dev): break # Recheck if self-test is in-progress, bail if so if self_test_in_progress(test_obj.dev): test_obj.failed = True test_obj.set_status('TestInProgress') build_self_test_report(test_obj) return # Start test cmd = [ 'sudo', 'smartctl', '--tolerance=normal', '--test=short', test_obj.dev.path, ] run_program(cmd, check=False) # Monitor progress finished = monitor_smart_self_test(test_obj, header_str, log_path) # Check result if finished: test_obj.passed = test_details.get('status', {}).get('passed', False) test_obj.failed = test_obj.failed or not test_obj.passed # Set status if test_obj.failed and test_obj.status != 'TimedOut': test_obj.set_status('Failed') elif test_obj.passed: test_obj.set_status('Passed') else: test_obj.set_status('Unknown') # Build report build_self_test_report(test_obj) def smart_status_ok(dev) -> bool: """Check SMART attributes and overall assessment, returns bool.""" blocking_event_encountered = False update_smart_details(dev) # Attributes if not check_attributes(dev, only_blocking=True): blocking_event_encountered = True LOG.error('%s: Blocked for failing attribute(s)', dev.path) # NVMe status nvme_status = dev.raw_smartctl.get('smart_status', {}).get('nvme', {}) if nvme_status.get('media_read_only', False): blocking_event_encountered = True msg = 'Media has been placed in read-only mode' dev.add_note(msg, 'RED') LOG.error('%s %s', dev.path, msg) for key in NVME_WARNING_KEYS: if nvme_status.get(key, False): msg = key.replace('_', ' ') dev.add_note(msg, 'YELLOW') LOG.warning('%s %s', dev.path, msg) # SMART overall assessment smart_passed = True try: smart_passed = dev.raw_smartctl['smart_status']['passed'] except (KeyError, TypeError): # Assuming disk doesn't support SMART overall assessment pass if not smart_passed: blocking_event_encountered = True msg = 'SMART overall self-assessment: Failed' dev.add_note(msg, 'RED') LOG.error('%s %s', dev.path, msg) # Done return not blocking_event_encountered def self_test_in_progress(dev) -> bool: """Check if SMART self-test is in progress, returns bool.""" test_details = get_smart_self_test_details(dev) return 'remaining_percent' in test_details.get('status', '') def update_smart_details(dev) -> None: """Update SMART details via smartctl.""" updated_attributes = {} # Bail if device was disconnected if not dev.present: dev.add_note('Device disconnected', 'RED') return # Get SMART data cmd = [ 'sudo', 'smartctl', f'--device={"sat,auto" if dev.use_sat else "auto"}', '--tolerance=verypermissive', '--all', '--json', dev.path, ] dev.raw_smartctl = get_json_from_command(cmd, check=False) # Check for attributes if KEY_NVME in dev.raw_smartctl: for name, value in dev.raw_smartctl[KEY_NVME].items(): try: updated_attributes[name] = { 'name': name, 'raw': int(value), 'raw_str': str(value), } except (TypeError, ValueError): # Ignoring invalid attribute LOG.error('Invalid NVMe attribute: %s %s', name, value) elif KEY_SMART in dev.raw_smartctl: for attribute in dev.raw_smartctl[KEY_SMART].get('table', {}): try: _id = int(attribute['id']) except (KeyError, ValueError): # Ignoring invalid attribute LOG.error('Invalid SMART attribute: %s', attribute) continue name = str(attribute.get('name', 'Unknown')).replace('_', ' ').title() raw = int(attribute.get('raw', {}).get('value', -1)) raw_str = attribute.get('raw', {}).get('string', 'Unknown') # Fix power-on time match = REGEX_POWER_ON_TIME.match(raw_str) if _id == 9 and match: raw = int(match.group(1)) # Add to dict updated_attributes[_id] = { 'name': name, 'raw': raw, 'raw_str': raw_str} # Add note if necessary if not updated_attributes: dev.add_note('No NVMe or SMART data available', 'YELLOW') # Done dev.attributes.update(updated_attributes) if __name__ == '__main__': print("This file is not meant to be called directly.")