509 lines
15 KiB
Python
509 lines
15 KiB
Python
"""WizardKit: SMART test functions"""
|
|
# vim: sts=2 sw=2 ts=2
|
|
|
|
import copy
|
|
import logging
|
|
import re
|
|
|
|
from typing import Any
|
|
|
|
from wk.cfg.hw import (
|
|
ATTRIBUTE_COLORS,
|
|
KEY_NVME,
|
|
KEY_SMART,
|
|
KNOWN_DISK_ATTRIBUTES,
|
|
KNOWN_DISK_MODELS,
|
|
NVME_WARNING_KEYS,
|
|
REGEX_POWER_ON_TIME,
|
|
SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS,
|
|
)
|
|
from wk.exe import get_json_from_command, run_program
|
|
from wk.std import bytes_to_string, sleep
|
|
from wk.ui import ansi
|
|
|
|
|
|
# STATIC VARIABLES
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
# Functions
|
|
def abort_self_test(dev) -> None:
|
|
"""Abort currently running non-captive self-test."""
|
|
cmd = ['sudo', 'smartctl', '--abort', dev.path]
|
|
run_program(cmd, check=False)
|
|
|
|
|
|
def build_self_test_report(test_obj, aborted=False) -> None:
|
|
"""Check self-test results and build report (saved to test_obj).
|
|
|
|
NOTE: Not updating SMART data to preserve the result for the report.
|
|
|
|
For instance if the test was aborted the report should include the
|
|
last known progress instead of just "was aborted by host."
|
|
"""
|
|
report = [ansi.color_string('Self-Test', 'BLUE')]
|
|
test_details = get_smart_self_test_details(test_obj.dev)
|
|
test_result = test_details.get('status', {}).get('string', 'Unknown')
|
|
|
|
# Build report
|
|
if test_obj.disabled or test_obj.status == 'Denied':
|
|
report.append(ansi.color_string(f' {test_obj.status}', 'RED'))
|
|
elif test_obj.status == 'N/A' or not test_obj.dev.attributes:
|
|
report.append(ansi.color_string(f' {test_obj.status}', 'YELLOW'))
|
|
elif test_obj.status == 'TestInProgress':
|
|
report.append(ansi.color_string(' Failed to stop previous test', 'RED'))
|
|
test_obj.set_status('Failed')
|
|
else:
|
|
# Other cases include self-test result string
|
|
report.append(f' {test_result.capitalize()}')
|
|
if aborted and not (test_obj.passed or test_obj.failed):
|
|
report.append(ansi.color_string(' Aborted', 'YELLOW'))
|
|
test_obj.set_status('Aborted')
|
|
elif test_obj.status == 'TimedOut':
|
|
report.append(ansi.color_string(' TimedOut', 'YELLOW'))
|
|
|
|
# Done
|
|
test_obj.report.extend(report)
|
|
|
|
|
|
def check_attributes(dev, only_blocking=False) -> bool:
|
|
"""Check if any known attributes are failing, returns bool."""
|
|
attributes_ok = True
|
|
for attr, value in dev.attributes.items():
|
|
# Skip unknown attributes
|
|
if attr not in dev.known_attributes:
|
|
continue
|
|
|
|
# Get thresholds
|
|
blocking_attribute = dev.known_attributes[attr].get('Blocking', False)
|
|
err_thresh = dev.known_attributes[attr].get('Error', None)
|
|
max_thresh = dev.known_attributes[attr].get('Maximum', None)
|
|
if not max_thresh:
|
|
max_thresh = float('inf')
|
|
|
|
# Skip non-blocking attributes if necessary
|
|
if only_blocking and not blocking_attribute:
|
|
continue
|
|
|
|
# Skip informational attributes
|
|
if not err_thresh:
|
|
continue
|
|
|
|
# Check attribute
|
|
if dev.known_attributes[attr].get('PercentageLife', False):
|
|
if 0 <= value['raw'] <= err_thresh:
|
|
attributes_ok = False
|
|
elif err_thresh <= value['raw'] < max_thresh:
|
|
attributes_ok = False
|
|
|
|
# Done
|
|
return attributes_ok
|
|
|
|
|
|
def enable_smart(dev) -> None:
|
|
"""Try enabling SMART for this disk."""
|
|
cmd = [
|
|
'sudo',
|
|
'smartctl',
|
|
f'--device={"sat,auto" if dev.use_sat else "auto"}',
|
|
'--tolerance=permissive',
|
|
'--smart=on',
|
|
dev.path,
|
|
]
|
|
run_program(cmd, check=False)
|
|
|
|
|
|
def generate_attribute_report(dev, only_failed=False) -> list[str]:
|
|
"""Generate attribute report, returns list."""
|
|
report = []
|
|
for attr, value in sorted(dev.attributes.items()):
|
|
# Skip attributes not in our list
|
|
if attr not in dev.known_attributes:
|
|
continue
|
|
|
|
# ID / Name
|
|
label = f'{attr:>3}'
|
|
if isinstance(attr, int):
|
|
# Assuming SMART, include hex ID and name
|
|
label += f' / {str(hex(attr))[2:].upper():0>2}: {value["name"]}'
|
|
label = f' {label.replace("_", " "):38}'
|
|
|
|
# Color & Note
|
|
value_color, note = get_attribute_value_color_and_note(dev, attr, value)
|
|
|
|
# Skip non-failing attributes if requested
|
|
## NOTE: This is a naive test and will include 'invalid' attributes
|
|
if only_failed and not note:
|
|
continue
|
|
|
|
# Build colored string and append to report
|
|
line = ansi.color_string(
|
|
[label, get_attribute_value_string(dev, attr), note],
|
|
[None, value_color, 'YELLOW'],
|
|
)
|
|
report.append(line)
|
|
|
|
# Done
|
|
return report
|
|
|
|
|
|
def get_attribute_value_color_and_note(dev, attr, value) -> tuple[str, str]:
|
|
"""Get attribute color and note based on SMART data."""
|
|
value_color = 'GREEN'
|
|
note = dev.known_attributes[attr].get('Note', '')
|
|
|
|
# Value value_color
|
|
if dev.known_attributes[attr].get('PercentageLife', False):
|
|
# PercentageLife values
|
|
if 0 <= value['raw'] <= dev.known_attributes[attr]['Error']:
|
|
value_color = 'RED'
|
|
note = '(failed, % life remaining)'
|
|
elif value['raw'] < 0 or value['raw'] > 100:
|
|
value_color = 'PURPLE'
|
|
note = '(invalid?)'
|
|
else:
|
|
for threshold, color in ATTRIBUTE_COLORS:
|
|
threshold_val = dev.known_attributes[attr].get(threshold, None)
|
|
if threshold_val and value['raw'] >= threshold_val:
|
|
value_color = color
|
|
if threshold == 'Error':
|
|
note = '(failed)'
|
|
elif threshold == 'Maximum':
|
|
note = '(invalid?)'
|
|
|
|
# 199/C7 warning
|
|
if str(attr) == '199' and value['raw'] > 0:
|
|
note = '(bad cable?)'
|
|
|
|
# Done
|
|
return (value_color, note)
|
|
|
|
|
|
def get_attribute_value_string(dev, attr) -> str:
|
|
"""Get attribute value string and report if it has changed."""
|
|
current_value = dev.attributes.get(attr, {})
|
|
initial_value = dev.initial_attributes.get(attr, {})
|
|
value_str = current_value.get('raw_str', '')
|
|
|
|
# Compare current value against initial value
|
|
if (
|
|
current_value.get('raw', None) is None
|
|
or initial_value.get('raw', None) is None
|
|
):
|
|
return value_str
|
|
if current_value['raw'] != initial_value['raw']:
|
|
value_str = (
|
|
f'{initial_value.get("raw_str", "?")} --> '
|
|
f'{current_value.get("raw_str", "?")}'
|
|
)
|
|
|
|
# Done
|
|
return value_str
|
|
|
|
|
|
def get_known_disk_attributes(model) -> None:
|
|
"""Get known disk attributes based on the device model."""
|
|
known_attributes = copy.deepcopy(KNOWN_DISK_ATTRIBUTES)
|
|
|
|
# Apply model-specific data
|
|
for regex, data in KNOWN_DISK_MODELS.items():
|
|
if not re.search(regex, model):
|
|
continue
|
|
for attr, thresholds in data.items():
|
|
if attr in known_attributes:
|
|
known_attributes[attr].update(thresholds)
|
|
else:
|
|
known_attributes[attr] = copy.deepcopy(thresholds)
|
|
|
|
# Done
|
|
return known_attributes
|
|
|
|
|
|
def get_smart_self_test_details(dev) -> dict[Any, Any]:
|
|
"""Shorthand to get deeply nested self-test details, returns dict."""
|
|
details = {}
|
|
try:
|
|
details = dev.raw_smartctl['ata_smart_data']['self_test']
|
|
except (KeyError, TypeError):
|
|
# Assuming disk lacks SMART support, ignore and return empty dict.
|
|
pass
|
|
|
|
# Done
|
|
return details
|
|
|
|
|
|
def get_smart_self_test_last_result(dev) -> str:
|
|
"""Get last SMART self-test result, returns str."""
|
|
result = 'Unknown'
|
|
|
|
# Parse SMART data
|
|
data = dev.raw_smartctl.get(
|
|
'ata_smart_self_test_log', {}).get(
|
|
'standard', {}).get(
|
|
'table', [])
|
|
if not data:
|
|
# No results found
|
|
return result
|
|
|
|
# Build result string
|
|
result = (
|
|
f'Power-on hours: {data.get("lifetime_hours", "?")}'
|
|
f', Type: {data.get("type", {}).get("string", "?")}'
|
|
f', Passed: {data.get("status", {}).get("passed", "?")}'
|
|
f', Result: {data.get("status", {}).get("string", "?")}'
|
|
)
|
|
|
|
# Done
|
|
return result
|
|
|
|
|
|
def monitor_smart_self_test(test_obj, header_str, log_path) -> bool:
|
|
"""Monitor SMART self-test status and update test_obj, returns bool."""
|
|
started = False
|
|
finished = False
|
|
status_str = 'Starting self-test...'
|
|
test_details = get_smart_self_test_details(test_obj.dev)
|
|
test_minutes = 15
|
|
|
|
# Get real test length
|
|
test_minutes = test_details.get('polling_minutes', {}).get('short', 5)
|
|
test_minutes = int(test_minutes) + 10
|
|
|
|
# Monitor progress (in five second intervals)
|
|
for _i in range(int(test_minutes*60/5)):
|
|
sleep(5)
|
|
|
|
# Update log
|
|
## NOTE: This is run at least once with the default "Starting..." status
|
|
with open(log_path, 'w', encoding='utf-8') as _f:
|
|
_f.write(f'{header_str}\nSMART self-test status:\n {status_str}')
|
|
|
|
# Update status
|
|
update_smart_details(test_obj.dev)
|
|
test_details = get_smart_self_test_details(test_obj.dev)
|
|
|
|
# Check if test started
|
|
started = started or 'remaining_percent' in test_details.get('status', {})
|
|
if not started:
|
|
if _i * 5 >= SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS:
|
|
# Test didn't start within limit, stop waiting
|
|
abort_self_test(test_obj.dev)
|
|
result = get_smart_self_test_last_result(test_obj.dev)
|
|
if result == 'Unknown':
|
|
result = 'SMART self-test failed to start'
|
|
test_obj.dev.add_note(result)
|
|
test_obj.failed = True
|
|
test_obj.set_status('TimedOut')
|
|
break
|
|
# Still within starting limit, continue to next loop
|
|
continue
|
|
|
|
# Check test progress
|
|
status_str = test_details.get('status', {}).get('string', 'Unknown')
|
|
status_str = status_str.capitalize()
|
|
|
|
# Check if finished
|
|
if 'remaining_percent' not in test_details.get('status', {}):
|
|
finished = True
|
|
break
|
|
|
|
# Done
|
|
return finished
|
|
|
|
|
|
def run_self_test(test_obj, log_path) -> None:
|
|
"""Run disk self-test and update test results.
|
|
|
|
NOTE: This function is here to reserve a place for future
|
|
NVMe self-tests announced in NVMe spec v1.3.
|
|
"""
|
|
run_smart_self_test(test_obj, log_path)
|
|
|
|
|
|
def run_smart_self_test(test_obj, log_path) -> None:
|
|
"""Run SMART self-test and check if it passed, returns None.
|
|
|
|
NOTE: An exception will be raised if the disk lacks SMART support.
|
|
"""
|
|
finished = False
|
|
test_details = get_smart_self_test_details(test_obj.dev)
|
|
size_str = bytes_to_string(test_obj.dev.size, use_binary=False)
|
|
header_str = ansi.color_string(
|
|
['[', test_obj.dev.path.name, ' ', size_str, ']'],
|
|
[None, 'BLUE', None, 'CYAN', None],
|
|
sep='',
|
|
)
|
|
|
|
# Check if disk supports self-tests
|
|
if not test_details:
|
|
# Mark test as passed since it doesn't apply
|
|
test_obj.passed = True
|
|
test_obj.set_status('N/A')
|
|
build_self_test_report(test_obj)
|
|
return
|
|
|
|
# Update status
|
|
with open(log_path, 'w', encoding='utf-8') as _f:
|
|
_f.write(f'{header_str}\nInitializing...')
|
|
|
|
# Check for, and stop, self-test if currently in-progress
|
|
if self_test_in_progress(test_obj.dev):
|
|
abort_self_test(test_obj.dev)
|
|
for _ in range(6):
|
|
# Wait up to a minute for current test to exit
|
|
sleep(10)
|
|
update_smart_details(test_obj.dev)
|
|
if not self_test_in_progress(test_obj.dev):
|
|
break
|
|
|
|
# Recheck if self-test is in-progress, bail if so
|
|
if self_test_in_progress(test_obj.dev):
|
|
test_obj.failed = True
|
|
test_obj.set_status('TestInProgress')
|
|
build_self_test_report(test_obj)
|
|
return
|
|
|
|
# Start test
|
|
cmd = [
|
|
'sudo',
|
|
'smartctl',
|
|
'--tolerance=normal',
|
|
'--test=short',
|
|
test_obj.dev.path,
|
|
]
|
|
run_program(cmd, check=False)
|
|
|
|
# Monitor progress
|
|
finished = monitor_smart_self_test(test_obj, header_str, log_path)
|
|
|
|
# Check result
|
|
if finished:
|
|
test_details = get_smart_self_test_details(test_obj.dev)
|
|
test_obj.passed = test_details.get('status', {}).get('passed', False)
|
|
test_obj.failed = test_obj.failed or not test_obj.passed
|
|
|
|
# Set status
|
|
if test_obj.status == 'TimedOut':
|
|
# Preserve TimedOut status
|
|
pass
|
|
elif test_obj.failed:
|
|
test_obj.set_status('Failed')
|
|
elif test_obj.passed:
|
|
test_obj.set_status('Passed')
|
|
else:
|
|
test_obj.set_status('Unknown')
|
|
|
|
# Build report
|
|
build_self_test_report(test_obj)
|
|
|
|
|
|
def smart_status_ok(dev) -> bool:
|
|
"""Check SMART attributes and overall assessment, returns bool."""
|
|
blocking_event_encountered = False
|
|
update_smart_details(dev)
|
|
|
|
# Attributes
|
|
if not check_attributes(dev, only_blocking=True):
|
|
blocking_event_encountered = True
|
|
LOG.error('%s: Blocked for failing attribute(s)', dev.path)
|
|
|
|
# NVMe status
|
|
nvme_status = dev.raw_smartctl.get('smart_status', {}).get('nvme', {})
|
|
if nvme_status.get('media_read_only', False):
|
|
blocking_event_encountered = True
|
|
msg = 'Media has been placed in read-only mode'
|
|
dev.add_note(msg, 'RED')
|
|
LOG.error('%s %s', dev.path, msg)
|
|
for key in NVME_WARNING_KEYS:
|
|
if nvme_status.get(key, False):
|
|
msg = key.replace('_', ' ')
|
|
dev.add_note(msg, 'YELLOW')
|
|
LOG.warning('%s %s', dev.path, msg)
|
|
|
|
# SMART overall assessment
|
|
smart_passed = True
|
|
try:
|
|
smart_passed = dev.raw_smartctl['smart_status']['passed']
|
|
except (KeyError, TypeError):
|
|
# Assuming disk doesn't support SMART overall assessment
|
|
pass
|
|
if not smart_passed:
|
|
blocking_event_encountered = True
|
|
msg = 'SMART overall self-assessment: Failed'
|
|
dev.add_note(msg, 'RED')
|
|
LOG.error('%s %s', dev.path, msg)
|
|
|
|
# Done
|
|
return not blocking_event_encountered
|
|
|
|
|
|
def self_test_in_progress(dev) -> bool:
|
|
"""Check if SMART self-test is in progress, returns bool."""
|
|
test_details = get_smart_self_test_details(dev)
|
|
return 'remaining_percent' in test_details.get('status', '')
|
|
|
|
|
|
def update_smart_details(dev) -> None:
|
|
"""Update SMART details via smartctl."""
|
|
updated_attributes = {}
|
|
|
|
# Bail if device was disconnected
|
|
if not dev.present:
|
|
dev.add_note('Device disconnected', 'RED')
|
|
return
|
|
|
|
# Get SMART data
|
|
cmd = [
|
|
'sudo',
|
|
'smartctl',
|
|
f'--device={"sat,auto" if dev.use_sat else "auto"}',
|
|
'--tolerance=verypermissive',
|
|
'--all',
|
|
'--json',
|
|
dev.path,
|
|
]
|
|
dev.raw_smartctl = get_json_from_command(cmd, check=False)
|
|
|
|
# Check for attributes
|
|
if KEY_NVME in dev.raw_smartctl:
|
|
for name, value in dev.raw_smartctl[KEY_NVME].items():
|
|
try:
|
|
updated_attributes[name] = {
|
|
'name': name,
|
|
'raw': int(value),
|
|
'raw_str': str(value),
|
|
}
|
|
except (TypeError, ValueError):
|
|
# Ignoring invalid attribute
|
|
LOG.error('Invalid NVMe attribute: %s %s', name, value)
|
|
elif KEY_SMART in dev.raw_smartctl:
|
|
for attribute in dev.raw_smartctl[KEY_SMART].get('table', {}):
|
|
try:
|
|
_id = int(attribute['id'])
|
|
except (KeyError, ValueError):
|
|
# Ignoring invalid attribute
|
|
LOG.error('Invalid SMART attribute: %s', attribute)
|
|
continue
|
|
name = str(attribute.get('name', 'Unknown')).replace('_', ' ').title()
|
|
raw = int(attribute.get('raw', {}).get('value', -1))
|
|
raw_str = attribute.get('raw', {}).get('string', 'Unknown')
|
|
|
|
# Fix power-on time
|
|
match = REGEX_POWER_ON_TIME.match(raw_str)
|
|
if _id == 9 and match:
|
|
raw = int(match.group(1))
|
|
|
|
# Add to dict
|
|
updated_attributes[_id] = {
|
|
'name': name, 'raw': raw, 'raw_str': raw_str}
|
|
|
|
# Add note if necessary
|
|
if not updated_attributes:
|
|
dev.add_note('No NVMe or SMART data available', 'YELLOW')
|
|
|
|
# Done
|
|
dev.attributes.update(updated_attributes)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
print("This file is not meant to be called directly.")
|