WizardKit/scripts/wk/hw/smart.py

414 lines
12 KiB
Python

"""WizardKit: SMART test functions"""
# vim: sts=2 sw=2 ts=2
import logging
import re
from typing import Any
from wk.cfg.hw import (
ATTRIBUTE_COLORS,
KEY_NVME,
KEY_SMART,
KNOWN_DISK_ATTRIBUTES,
KNOWN_DISK_MODELS,
NVME_WARNING_KEYS,
REGEX_POWER_ON_TIME,
SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS,
)
from wk.exe import get_json_from_command, run_program
from wk.std import bytes_to_string, color_string, sleep
# STATIC VARIABLES
LOG = logging.getLogger(__name__)
# Exception Classes
class CriticalHardwareError(RuntimeError):
"""Exception used for critical hardware failures."""
class SMARTNotSupportedError(TypeError):
"""Exception used for disks lacking SMART support."""
class SMARTSelfTestInProgressError(RuntimeError):
"""Exception used when a SMART self-test is in progress."""
# Functions
def abort_self_test(dev) -> None:
"""Abort currently running non-captive self-test."""
cmd = ['sudo', 'smartctl', '--abort', dev.path]
run_program(cmd, check=False)
def check_attributes(dev, only_blocking=False) -> bool:
"""Check if any known attributes are failing, returns bool."""
attributes_ok = True
known_attributes = get_known_disk_attributes(dev.model)
for attr, value in dev.attributes.items():
# Skip unknown attributes
if attr not in known_attributes:
continue
# Get thresholds
blocking_attribute = known_attributes[attr].get('Blocking', False)
err_thresh = known_attributes[attr].get('Error', None)
max_thresh = known_attributes[attr].get('Maximum', None)
if not max_thresh:
max_thresh = float('inf')
# Skip non-blocking attributes if necessary
if only_blocking and not blocking_attribute:
continue
# Skip informational attributes
if not err_thresh:
continue
# Check attribute
if known_attributes[attr].get('PercentageLife', False):
if 0 <= value['raw'] <= err_thresh:
attributes_ok = False
elif err_thresh <= value['raw'] < max_thresh:
attributes_ok = False
# Done
return attributes_ok
def check_self_test_results(test_obj, aborted=False) -> None:
"""Check SMART self-test results."""
test_obj.report.append(color_string('Self-Test', 'BLUE'))
if test_obj.disabled or test_obj.status == 'Denied':
test_obj.report.append(color_string(f' {test_obj.status}', 'RED'))
elif test_obj.status == 'N/A' or not test_obj.dev.attributes:
test_obj.report.append(color_string(f' {test_obj.status}', 'YELLOW'))
else:
# Not updating SMART data here to preserve the test status for the report
# For instance if the test was aborted the report should inlcude the last
# known progress instead of just "was aborted by host"
test_details = get_smart_self_test_details(test_obj.dev)
test_result = test_details.get('status', {}).get('string', 'Unknown')
test_obj.report.append(f' {test_result.capitalize()}')
if aborted and not (test_obj.passed or test_obj.failed):
test_obj.report.append(color_string(' Aborted', 'YELLOW'))
test_obj.set_status('Aborted')
elif test_obj.status == 'TimedOut':
test_obj.report.append(color_string(' TimedOut', 'YELLOW'))
test_obj.set_status('TimedOut')
else:
test_obj.failed = not test_obj.passed
if test_obj.failed:
test_obj.set_status('Failed')
def enable_smart(dev) -> None:
"""Try enabling SMART for this disk."""
cmd = [
'sudo',
'smartctl',
f'--device={"sat,auto" if dev.use_sat else "auto"}',
'--tolerance=permissive',
'--smart=on',
dev.path,
]
run_program(cmd, check=False)
def generate_attribute_report(dev) -> list[str]:
"""Generate attribute report, returns list."""
known_attributes = get_known_disk_attributes(dev.model)
report = []
for attr, value in sorted(dev.attributes.items()):
note = ''
value_color = 'GREEN'
# Skip attributes not in our list
if attr not in known_attributes:
continue
# Check for attribute note
note = known_attributes[attr].get('Note', '')
# ID / Name
label = f'{attr:>3}'
if isinstance(attr, int):
# Assuming SMART, include hex ID and name
label += f' / {str(hex(attr))[2:].upper():0>2}: {value["name"]}'
label = f' {label.replace("_", " "):38}'
# Value color
if known_attributes[attr].get('PercentageLife', False):
# PercentageLife values
if 0 <= value['raw'] <= known_attributes[attr]['Error']:
value_color = 'RED'
note = '(failed, % life remaining)'
elif value['raw'] < 0 or value['raw'] > 100:
value_color = 'PURPLE'
note = '(invalid?)'
else:
for threshold, color in ATTRIBUTE_COLORS:
threshold_val = known_attributes[attr].get(threshold, None)
if threshold_val and value['raw'] >= threshold_val:
value_color = color
if threshold == 'Error':
note = '(failed)'
elif threshold == 'Maximum':
note = '(invalid?)'
# 199/C7 warning
if str(attr) == '199' and value['raw'] > 0:
note = '(bad cable?)'
# Build colored string and append to report
line = color_string(
[label, value['raw_str'], note],
[None, value_color, 'YELLOW'],
)
report.append(line)
# Done
return report
def get_known_disk_attributes(model) -> dict[Any, dict]:
"""Get known NVMe/SMART attributes (model specific), returns dict."""
known_attributes = KNOWN_DISK_ATTRIBUTES.copy()
# Apply model-specific data
for regex, data in KNOWN_DISK_MODELS.items():
if re.search(regex, model):
for attr, thresholds in data.items():
if attr in known_attributes:
known_attributes[attr].update(thresholds)
else:
known_attributes[attr] = thresholds
# Done
return known_attributes
def get_smart_self_test_details(dev) -> dict[Any, Any]:
"""Shorthand to get deeply nested self-test details, returns dict."""
details = {}
try:
details = dev.raw_smartctl['ata_smart_data']['self_test']
except (KeyError, TypeError):
# Assuming disk lacks SMART support, ignore and return empty dict.
pass
# Done
return details
def safety_checks(dev) -> None:
"""Run safety checks and raise an exception if necessary."""
blocking_event_encountered = False
update_smart_details(dev)
# Attributes
if not check_attributes(dev, only_blocking=True):
blocking_event_encountered = True
LOG.error('%s: Blocked for failing attribute(s)', dev.path)
# NVMe status
nvme_status = dev.raw_smartctl.get('smart_status', {}).get('nvme', {})
if nvme_status.get('media_read_only', False):
blocking_event_encountered = True
msg = 'Media has been placed in read-only mode'
dev.add_note(msg, 'RED')
LOG.error('%s %s', dev.path, msg)
for key in NVME_WARNING_KEYS:
if nvme_status.get(key, False):
msg = key.replace('_', ' ')
dev.add_note(msg, 'YELLOW')
LOG.warning('%s %s', dev.path, msg)
# SMART overall assessment
smart_passed = True
try:
smart_passed = dev.raw_smartctl['smart_status']['passed']
except (KeyError, TypeError):
# Assuming disk doesn't support SMART overall assessment
pass
if not smart_passed:
blocking_event_encountered = True
msg = 'SMART overall self-assessment: Failed'
dev.add_note(msg, 'RED')
LOG.error('%s %s', dev.path, msg)
# Raise blocking exception if necessary
if blocking_event_encountered:
raise CriticalHardwareError(f'Critical error(s) for: {dev.path}')
# SMART self-test status
test_details = get_smart_self_test_details(dev)
if 'remaining_percent' in test_details.get('status', ''):
msg = f'SMART self-test in progress for: {dev.path}'
LOG.error(msg)
raise SMARTSelfTestInProgressError(msg)
def run_self_test(test_obj, log_path) -> None:
"""Run disk self-test and check if it passed, returns bool."""
result = None
try:
test_obj.passed = run_smart_self_test(test_obj.dev, log_path)
except TimeoutError:
test_obj.failed = True
result = 'TimedOut'
except SMARTNotSupportedError:
# Pass test since it doesn't apply
test_obj.passed = True
result = 'N/A'
# Set status
if result:
test_obj.set_status(result)
else:
if test_obj.failed:
test_obj.set_status('Failed')
elif test_obj.passed:
test_obj.set_status('Passed')
else:
test_obj.set_status('Unknown')
def run_smart_self_test(dev, log_path) -> bool:
"""Run SMART self-test and check if it passed, returns bool.
NOTE: An exception will be raised if the disk lacks SMART support.
"""
finished = False
result = None
started = False
status_str = 'Starting self-test...'
test_details = get_smart_self_test_details(dev)
test_minutes = 15
size_str = bytes_to_string(dev.size, use_binary=False)
header_str = color_string(
['[', dev.path.name, ' ', size_str, ']'],
[None, 'BLUE', None, 'CYAN', None],
sep='',
)
# Check if disk supports self-tests
if not test_details:
raise SMARTNotSupportedError(
f'SMART self-test not supported for {dev.path}')
# Get real test length
test_minutes = test_details.get('polling_minutes', {}).get('short', 5)
test_minutes = int(test_minutes) + 10
# Start test
with open(log_path, 'w', encoding='utf-8') as _f:
_f.write(f'{header_str}\nInitializing...')
cmd = [
'sudo',
'smartctl',
'--tolerance=normal',
'--test=short',
dev.path,
]
run_program(cmd, check=False)
# Monitor progress (in five second intervals)
for _i in range(int(test_minutes*60/5)):
sleep(5)
# Update status
update_smart_details(dev)
test_details = get_smart_self_test_details(dev)
# Check test progress
if started:
status_str = test_details.get('status', {}).get('string', 'Unknown')
status_str = status_str.capitalize()
# Update log
with open(log_path, 'w', encoding='utf-8') as _f:
_f.write(f'{header_str}\nSMART self-test status:\n {status_str}')
# Check if finished
if 'remaining_percent' not in test_details.get('status', {}):
finished = True
break
elif 'remaining_percent' in test_details.get('status', {}):
started = True
elif _i * 5 >= SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS:
# Test didn't start within limit, stop waiting
break
# Check result
if finished:
result = test_details.get('status', {}).get('passed', False)
elif started:
raise TimeoutError(f'SMART self-test timed out for {dev.path}')
# Done
return result
def update_smart_details(dev) -> None:
"""Update SMART details via smartctl."""
updated_attributes = {}
# Get SMART data
cmd = [
'sudo',
'smartctl',
f'--device={"sat,auto" if dev.use_sat else "auto"}',
'--tolerance=verypermissive',
'--all',
'--json',
dev.path,
]
dev.raw_smartctl = get_json_from_command(cmd, check=False)
# Check for attributes
if KEY_NVME in dev.raw_smartctl:
for name, value in dev.raw_smartctl[KEY_NVME].items():
try:
updated_attributes[name] = {
'name': name,
'raw': int(value),
'raw_str': str(value),
}
except (TypeError, ValueError):
# Ignoring invalid attribute
LOG.error('Invalid NVMe attribute: %s %s', name, value)
elif KEY_SMART in dev.raw_smartctl:
for attribute in dev.raw_smartctl[KEY_SMART].get('table', {}):
try:
_id = int(attribute['id'])
except (KeyError, ValueError):
# Ignoring invalid attribute
LOG.error('Invalid SMART attribute: %s', attribute)
continue
name = str(attribute.get('name', 'Unknown')).replace('_', ' ').title()
raw = int(attribute.get('raw', {}).get('value', -1))
raw_str = attribute.get('raw', {}).get('string', 'Unknown')
# Fix power-on time
match = REGEX_POWER_ON_TIME.match(raw_str)
if _id == 9 and match:
raw = int(match.group(1))
# Add to dict
updated_attributes[_id] = {
'name': name, 'raw': raw, 'raw_str': raw_str}
# Add note if necessary
if not updated_attributes:
dev.add_note('No NVMe or SMART data available', 'YELLOW')
# Done
dev.attributes.update(updated_attributes)
if __name__ == '__main__':
print("This file is not meant to be called directly.")