WizardKit/scripts/wk/hw/smart.py

509 lines
15 KiB
Python

"""WizardKit: SMART test functions"""
# vim: sts=2 sw=2 ts=2
import copy
import logging
import re
from typing import Any
from wk.cfg.hw import (
ATTRIBUTE_COLORS,
KEY_NVME,
KEY_SMART,
KNOWN_DISK_ATTRIBUTES,
KNOWN_DISK_MODELS,
NVME_WARNING_KEYS,
REGEX_POWER_ON_TIME,
SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS,
)
from wk.exe import get_json_from_command, run_program
from wk.std import bytes_to_string, sleep
from wk.ui import ansi
# STATIC VARIABLES
LOG = logging.getLogger(__name__)
# Functions
def abort_self_test(dev) -> None:
"""Abort currently running non-captive self-test."""
cmd = ['sudo', 'smartctl', '--abort', dev.path]
run_program(cmd, check=False)
def build_self_test_report(test_obj, aborted=False) -> None:
"""Check self-test results and build report (saved to test_obj).
NOTE: Not updating SMART data to preserve the result for the report.
For instance if the test was aborted the report should include the
last known progress instead of just "was aborted by host."
"""
report = [ansi.color_string('Self-Test', 'BLUE')]
test_details = get_smart_self_test_details(test_obj.dev)
test_result = test_details.get('status', {}).get('string', 'Unknown')
# Build report
if test_obj.disabled or test_obj.status == 'Denied':
report.append(ansi.color_string(f' {test_obj.status}', 'RED'))
elif test_obj.status == 'N/A' or not test_obj.dev.attributes:
report.append(ansi.color_string(f' {test_obj.status}', 'YELLOW'))
elif test_obj.status == 'TestInProgress':
report.append(ansi.color_string(' Failed to stop previous test', 'RED'))
test_obj.set_status('Failed')
else:
# Other cases include self-test result string
report.append(f' {test_result.capitalize()}')
if aborted and not (test_obj.passed or test_obj.failed):
report.append(ansi.color_string(' Aborted', 'YELLOW'))
test_obj.set_status('Aborted')
elif test_obj.status == 'TimedOut':
report.append(ansi.color_string(' TimedOut', 'YELLOW'))
# Done
test_obj.report.extend(report)
def check_attributes(dev, only_blocking=False) -> bool:
"""Check if any known attributes are failing, returns bool."""
attributes_ok = True
for attr, value in dev.attributes.items():
# Skip unknown attributes
if attr not in dev.known_attributes:
continue
# Get thresholds
blocking_attribute = dev.known_attributes[attr].get('Blocking', False)
err_thresh = dev.known_attributes[attr].get('Error', None)
max_thresh = dev.known_attributes[attr].get('Maximum', None)
if not max_thresh:
max_thresh = float('inf')
# Skip non-blocking attributes if necessary
if only_blocking and not blocking_attribute:
continue
# Skip informational attributes
if not err_thresh:
continue
# Check attribute
if dev.known_attributes[attr].get('PercentageLife', False):
if 0 <= value['raw'] <= err_thresh:
attributes_ok = False
elif err_thresh <= value['raw'] < max_thresh:
attributes_ok = False
# Done
return attributes_ok
def enable_smart(dev) -> None:
"""Try enabling SMART for this disk."""
cmd = [
'sudo',
'smartctl',
f'--device={"sat,auto" if dev.use_sat else "auto"}',
'--tolerance=permissive',
'--smart=on',
dev.path,
]
run_program(cmd, check=False)
def generate_attribute_report(dev, only_failed=False) -> list[str]:
"""Generate attribute report, returns list."""
report = []
for attr, value in sorted(dev.attributes.items()):
# Skip attributes not in our list
if attr not in dev.known_attributes:
continue
# ID / Name
label = f'{attr:>3}'
if isinstance(attr, int):
# Assuming SMART, include hex ID and name
label += f' / {str(hex(attr))[2:].upper():0>2}: {value["name"]}'
label = f' {label.replace("_", " "):38}'
# Color & Note
value_color, note = get_attribute_value_color_and_note(dev, attr, value)
# Skip non-failing attributes if requested
## NOTE: This is a naive test and will include 'invalid' attributes
if only_failed and not note:
continue
# Build colored string and append to report
line = ansi.color_string(
[label, get_attribute_value_string(dev, attr), note],
[None, value_color, 'YELLOW'],
)
report.append(line)
# Done
return report
def get_attribute_value_color_and_note(dev, attr, value) -> tuple[str, str]:
"""Get attribute color and note based on SMART data."""
value_color = 'GREEN'
note = dev.known_attributes[attr].get('Note', '')
# Value value_color
if dev.known_attributes[attr].get('PercentageLife', False):
# PercentageLife values
if 0 <= value['raw'] <= dev.known_attributes[attr]['Error']:
value_color = 'RED'
note = '(failed, % life remaining)'
elif value['raw'] < 0 or value['raw'] > 100:
value_color = 'PURPLE'
note = '(invalid?)'
else:
for threshold, color in ATTRIBUTE_COLORS:
threshold_val = dev.known_attributes[attr].get(threshold, None)
if threshold_val and value['raw'] >= threshold_val:
value_color = color
if threshold == 'Error':
note = '(failed)'
elif threshold == 'Maximum':
note = '(invalid?)'
# 199/C7 warning
if str(attr) == '199' and value['raw'] > 0:
note = '(bad cable?)'
# Done
return (value_color, note)
def get_attribute_value_string(dev, attr) -> str:
"""Get attribute value string and report if it has changed."""
current_value = dev.attributes.get(attr, {})
initial_value = dev.initial_attributes.get(attr, {})
value_str = current_value.get('raw_str', '')
# Compare current value against initial value
if (
current_value.get('raw', None) is None
or initial_value.get('raw', None) is None
):
return value_str
if current_value['raw'] != initial_value['raw']:
value_str = (
f'{initial_value.get("raw_str", "?")} --> '
f'{current_value.get("raw_str", "?")}'
)
# Done
return value_str
def get_known_disk_attributes(model) -> dict[str | int, dict[str, Any]]:
"""Get known disk attributes based on the device model."""
known_attributes = copy.deepcopy(KNOWN_DISK_ATTRIBUTES)
# Apply model-specific data
for regex, data in KNOWN_DISK_MODELS.items():
if not re.search(regex, model):
continue
for attr, thresholds in data.items():
if attr in known_attributes:
known_attributes[attr].update(thresholds)
else:
known_attributes[attr] = copy.deepcopy(thresholds)
# Done
return known_attributes
def get_smart_self_test_details(dev) -> dict[Any, Any]:
"""Shorthand to get deeply nested self-test details, returns dict."""
details = {}
try:
details = dev.raw_smartctl['ata_smart_data']['self_test']
except (KeyError, TypeError):
# Assuming disk lacks SMART support, ignore and return empty dict.
pass
# Done
return details
def get_smart_self_test_last_result(dev) -> str:
"""Get last SMART self-test result, returns str."""
result = 'Unknown'
# Parse SMART data
data = dev.raw_smartctl.get(
'ata_smart_self_test_log', {}).get(
'standard', {}).get(
'table', [])
if not data:
# No results found
return result
# Build result string
result = (
f'Power-on hours: {data.get("lifetime_hours", "?")}'
f', Type: {data.get("type", {}).get("string", "?")}'
f', Passed: {data.get("status", {}).get("passed", "?")}'
f', Result: {data.get("status", {}).get("string", "?")}'
)
# Done
return result
def monitor_smart_self_test(test_obj, header_str, log_path) -> bool:
"""Monitor SMART self-test status and update test_obj, returns bool."""
started = False
finished = False
status_str = 'Starting self-test...'
test_details = get_smart_self_test_details(test_obj.dev)
test_minutes = 15
# Get real test length
test_minutes = test_details.get('polling_minutes', {}).get('short', 5)
test_minutes = int(test_minutes) + 10
# Monitor progress (in five second intervals)
for _i in range(int(test_minutes*60/5)):
sleep(5)
# Update log
## NOTE: This is run at least once with the default "Starting..." status
with open(log_path, 'w', encoding='utf-8') as _f:
_f.write(f'{header_str}\nSMART self-test status:\n {status_str}')
# Update status
update_smart_details(test_obj.dev)
test_details = get_smart_self_test_details(test_obj.dev)
# Check if test started
started = started or 'remaining_percent' in test_details.get('status', {})
if not started:
if _i * 5 >= SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS:
# Test didn't start within limit, stop waiting
abort_self_test(test_obj.dev)
result = get_smart_self_test_last_result(test_obj.dev)
if result == 'Unknown':
result = 'SMART self-test failed to start'
test_obj.dev.add_note(result)
test_obj.failed = True
test_obj.set_status('TimedOut')
break
# Still within starting limit, continue to next loop
continue
# Check test progress
status_str = test_details.get('status', {}).get('string', 'Unknown')
status_str = status_str.capitalize()
# Check if finished
if 'remaining_percent' not in test_details.get('status', {}):
finished = True
break
# Done
return finished
def run_self_test(test_obj, log_path) -> None:
"""Run disk self-test and update test results.
NOTE: This function is here to reserve a place for future
NVMe self-tests announced in NVMe spec v1.3.
"""
run_smart_self_test(test_obj, log_path)
def run_smart_self_test(test_obj, log_path) -> None:
"""Run SMART self-test and check if it passed, returns None.
NOTE: An exception will be raised if the disk lacks SMART support.
"""
finished = False
test_details = get_smart_self_test_details(test_obj.dev)
size_str = bytes_to_string(test_obj.dev.size, use_binary=False)
header_str = ansi.color_string(
['[', test_obj.dev.path.name, ' ', size_str, ']'],
[None, 'BLUE', None, 'CYAN', None],
sep='',
)
# Check if disk supports self-tests
if not test_details:
# Mark test as passed since it doesn't apply
test_obj.passed = True
test_obj.set_status('N/A')
build_self_test_report(test_obj)
return
# Update status
with open(log_path, 'w', encoding='utf-8') as _f:
_f.write(f'{header_str}\nInitializing...')
# Check for, and stop, self-test if currently in-progress
if self_test_in_progress(test_obj.dev):
abort_self_test(test_obj.dev)
for _ in range(6):
# Wait up to a minute for current test to exit
sleep(10)
update_smart_details(test_obj.dev)
if not self_test_in_progress(test_obj.dev):
break
# Recheck if self-test is in-progress, bail if so
if self_test_in_progress(test_obj.dev):
test_obj.failed = True
test_obj.set_status('TestInProgress')
build_self_test_report(test_obj)
return
# Start test
cmd = [
'sudo',
'smartctl',
'--tolerance=normal',
'--test=short',
test_obj.dev.path,
]
run_program(cmd, check=False)
# Monitor progress
finished = monitor_smart_self_test(test_obj, header_str, log_path)
# Check result
if finished:
test_details = get_smart_self_test_details(test_obj.dev)
test_obj.passed = test_details.get('status', {}).get('passed', False)
test_obj.failed = test_obj.failed or not test_obj.passed
# Set status
if test_obj.status == 'TimedOut':
# Preserve TimedOut status
pass
elif test_obj.failed:
test_obj.set_status('Failed')
elif test_obj.passed:
test_obj.set_status('Passed')
else:
test_obj.set_status('Unknown')
# Build report
build_self_test_report(test_obj)
def smart_status_ok(dev) -> bool:
"""Check SMART attributes and overall assessment, returns bool."""
blocking_event_encountered = False
update_smart_details(dev)
# Attributes
if not check_attributes(dev, only_blocking=True):
blocking_event_encountered = True
LOG.error('%s: Blocked for failing attribute(s)', dev.path)
# NVMe status
nvme_status = dev.raw_smartctl.get('smart_status', {}).get('nvme', {})
if nvme_status.get('media_read_only', False):
blocking_event_encountered = True
msg = 'Media has been placed in read-only mode'
dev.add_note(msg, 'RED')
LOG.error('%s %s', dev.path, msg)
for key in NVME_WARNING_KEYS:
if nvme_status.get(key, False):
msg = key.replace('_', ' ')
dev.add_note(msg, 'YELLOW')
LOG.warning('%s %s', dev.path, msg)
# SMART overall assessment
smart_passed = True
try:
smart_passed = dev.raw_smartctl['smart_status']['passed']
except (KeyError, TypeError):
# Assuming disk doesn't support SMART overall assessment
pass
if not smart_passed:
blocking_event_encountered = True
msg = 'SMART overall self-assessment: Failed'
dev.add_note(msg, 'RED')
LOG.error('%s %s', dev.path, msg)
# Done
return not blocking_event_encountered
def self_test_in_progress(dev) -> bool:
"""Check if SMART self-test is in progress, returns bool."""
test_details = get_smart_self_test_details(dev)
return 'remaining_percent' in test_details.get('status', '')
def update_smart_details(dev) -> None:
"""Update SMART details via smartctl."""
updated_attributes = {}
# Bail if device was disconnected
if not dev.present:
dev.add_note('Device disconnected', 'RED')
return
# Get SMART data
cmd = [
'sudo',
'smartctl',
f'--device={"sat,auto" if dev.use_sat else "auto"}',
'--tolerance=verypermissive',
'--all',
'--json',
dev.path,
]
dev.raw_smartctl = get_json_from_command(cmd, check=False)
# Check for attributes
if KEY_NVME in dev.raw_smartctl:
for name, value in dev.raw_smartctl[KEY_NVME].items():
try:
updated_attributes[name] = {
'name': name,
'raw': int(value),
'raw_str': str(value),
}
except (TypeError, ValueError):
# Ignoring invalid attribute
LOG.error('Invalid NVMe attribute: %s %s', name, value)
elif KEY_SMART in dev.raw_smartctl:
for attribute in dev.raw_smartctl[KEY_SMART].get('table', {}):
try:
_id = int(attribute['id'])
except (KeyError, ValueError):
# Ignoring invalid attribute
LOG.error('Invalid SMART attribute: %s', attribute)
continue
name = str(attribute.get('name', 'Unknown')).replace('_', ' ').title()
raw = int(attribute.get('raw', {}).get('value', -1))
raw_str = attribute.get('raw', {}).get('string', 'Unknown')
# Fix power-on time
match = REGEX_POWER_ON_TIME.match(raw_str)
if _id == 9 and match:
raw = int(match.group(1))
# Add to dict
updated_attributes[_id] = {
'name': name, 'raw': raw, 'raw_str': raw_str}
# Add note if necessary
if not updated_attributes:
dev.add_note('No NVMe or SMART data available', 'YELLOW')
# Done
dev.attributes.update(updated_attributes)
if __name__ == '__main__':
print("This file is not meant to be called directly.")