Refactor SMART self-test sections

This commit is contained in:
2Shirt 2022-04-08 18:47:08 -06:00
parent 41b4ffd9fb
commit af8b2b7dd3
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C
3 changed files with 171 additions and 139 deletions

View file

@ -30,7 +30,6 @@ from wk.cfg.ddrescue import (
from wk.hw import disk as hw_disk
from wk.hw.smart import (
CriticalHardwareError,
SMARTNotSupportedError,
SMARTSelfTestInProgressError,
safety_checks,
update_smart_details,
@ -1503,8 +1502,6 @@ def check_destination_health(destination):
result = 'Critical hardware error detected on destination'
except SMARTSelfTestInProgressError:
result = 'SMART self-test in progress on destination'
except SMARTNotSupportedError:
pass
# Done
return result

View file

@ -700,11 +700,8 @@ def disk_self_test(state, test_objects) -> None:
aborted = True
for test in test_objects:
hw_smart.abort_self_test(test.dev)
std.sleep(0.5)
# Save report(s)
for test in test_objects:
hw_smart.check_self_test_results(test, aborted=aborted)
std.sleep(0.5)
hw_smart.build_self_test_report(test, aborted=True)
# Cleanup
state.update_progress_pane()

View file

@ -28,9 +28,6 @@ LOG = logging.getLogger(__name__)
class CriticalHardwareError(RuntimeError):
"""Exception used for critical hardware failures."""
class SMARTNotSupportedError(TypeError):
"""Exception used for disks lacking SMART support."""
class SMARTSelfTestInProgressError(RuntimeError):
"""Exception used when a SMART self-test is in progress."""
@ -42,6 +39,39 @@ def abort_self_test(dev) -> None:
run_program(cmd, check=False)
def build_self_test_report(test_obj, aborted=False) -> None:
"""Check self-test results and build report (saved to test_obj).
NOTE: Not updating SMART data to preserve the result for the report.
For instance if the test was aborted the report should include the
last known progress instead of just "was aborted by host."
"""
report = [color_string('Self-Test', 'BLUE')]
test_details = get_smart_self_test_details(test_obj.dev)
test_result = test_details.get('status', {}).get('string', 'Unknown')
# Build report
if test_obj.disabled or test_obj.status == 'Denied':
report.append(color_string(f' {test_obj.status}', 'RED'))
elif test_obj.status == 'N/A' or not test_obj.dev.attributes:
report.append(color_string(f' {test_obj.status}', 'YELLOW'))
elif test_obj.status == 'TestInProgress':
report.append(color_string(' Failed to stop previous test', 'RED'))
test_obj.set_status('Failed')
else:
# Other cases include self-test result string
report.append(f' {test_result.capitalize()}')
if aborted and not (test_obj.passed or test_obj.failed):
report.append(color_string(' Aborted', 'YELLOW'))
test_obj.set_status('Aborted')
elif test_obj.status == 'TimedOut':
report.append(color_string(' TimedOut', 'YELLOW'))
# Done
test_obj.report.extend(report)
def check_attributes(dev, only_blocking=False) -> bool:
"""Check if any known attributes are failing, returns bool."""
attributes_ok = True
@ -77,32 +107,6 @@ def check_attributes(dev, only_blocking=False) -> bool:
return attributes_ok
def check_self_test_results(test_obj, aborted=False) -> None:
"""Check SMART self-test results."""
test_obj.report.append(color_string('Self-Test', 'BLUE'))
if test_obj.disabled or test_obj.status == 'Denied':
test_obj.report.append(color_string(f' {test_obj.status}', 'RED'))
elif test_obj.status == 'N/A' or not test_obj.dev.attributes:
test_obj.report.append(color_string(f' {test_obj.status}', 'YELLOW'))
else:
# Not updating SMART data here to preserve the test status for the report
# For instance if the test was aborted the report should inlcude the last
# known progress instead of just "was aborted by host"
test_details = get_smart_self_test_details(test_obj.dev)
test_result = test_details.get('status', {}).get('string', 'Unknown')
test_obj.report.append(f' {test_result.capitalize()}')
if aborted and not (test_obj.passed or test_obj.failed):
test_obj.report.append(color_string(' Aborted', 'YELLOW'))
test_obj.set_status('Aborted')
elif test_obj.status == 'TimedOut':
test_obj.report.append(color_string(' TimedOut', 'YELLOW'))
test_obj.set_status('TimedOut')
else:
test_obj.failed = not test_obj.passed
if test_obj.failed:
test_obj.set_status('Failed')
def enable_smart(dev) -> None:
"""Try enabling SMART for this disk."""
cmd = [
@ -202,6 +206,138 @@ def get_smart_self_test_details(dev) -> dict[Any, Any]:
return details
def monitor_smart_self_test(test_obj, header_str, log_path) -> bool:
"""Monitor SMART self-test status and update test_obj, returns bool."""
started = False
finished = False
status_str = 'Starting self-test...'
test_details = get_smart_self_test_details(test_obj.dev)
test_minutes = 15
# Get real test length
test_minutes = test_details.get('polling_minutes', {}).get('short', 5)
test_minutes = int(test_minutes) + 10
# Monitor progress (in five second intervals)
for _i in range(int(test_minutes*60/5)):
sleep(5)
# Update log
## NOTE: This is run at least once with the default "Starting..." status
with open(log_path, 'w', encoding='utf-8') as _f:
_f.write(f'{header_str}\nSMART self-test status:\n {status_str}')
# Update status
update_smart_details(test_obj.dev)
test_details = get_smart_self_test_details(test_obj.dev)
# Check if test started
started = started or 'remaining_percent' in test_details.get('status', {})
if not started:
if _i * 5 >= SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS:
# Test didn't start within limit, stop waiting
abort_self_test(test_obj.dev)
test_obj.failed = True
test_obj.set_status('TimedOut')
break
# Still within starting limit, continue to next loop
continue
# Check test progress
status_str = test_details.get('status', {}).get('string', 'Unknown')
status_str = status_str.capitalize()
# Check if finished
if 'remaining_percent' not in test_details.get('status', {}):
finished = True
break
# Done
return finished
def run_self_test(test_obj, log_path) -> None:
"""Run disk self-test and update test results.
NOTE: This function is here to reserve a place for future
NVMe self-tests announced in NVMe spec v1.3.
"""
run_smart_self_test(test_obj, log_path)
def run_smart_self_test(test_obj, log_path) -> bool:
"""Run SMART self-test and check if it passed, returns bool.
NOTE: An exception will be raised if the disk lacks SMART support.
"""
finished = False
test_details = get_smart_self_test_details(test_obj.dev)
size_str = bytes_to_string(test_obj.dev.size, use_binary=False)
header_str = color_string(
['[', test_obj.dev.path.name, ' ', size_str, ']'],
[None, 'BLUE', None, 'CYAN', None],
sep='',
)
# Check if disk supports self-tests
if not test_details:
# Mark test as passed since it doesn't apply
test_obj.passed = True
test_obj.set_status('N/A')
build_self_test_report(test_obj)
return
# Update status
with open(log_path, 'w', encoding='utf-8') as _f:
_f.write(f'{header_str}\nInitializing...')
# Check for, and stop, self-test if currently in-progress
if self_test_in_progress(test_obj.dev):
abort_self_test(test_obj.dev)
for _ in range(6):
# Wait up to a minute for current test to exit
sleep(10)
update_smart_details(test_obj.dev)
if not self_test_in_progress(test_obj.dev):
break
# Recheck if self-test is in-progress, bail if so
if self_test_in_progress(test_obj.dev):
test_obj.failed = True
test_obj.set_status('TestInProgress')
build_self_test_report(test_obj)
return
# Start test
cmd = [
'sudo',
'smartctl',
'--tolerance=normal',
'--test=short',
test_obj.dev.path,
]
run_program(cmd, check=False)
# Monitor progress
finished = monitor_smart_self_test(test_obj, header_str, log_path)
# Check result
if finished:
test_obj.passed = test_details.get('status', {}).get('passed', False)
test_obj.failed = test_obj.failed or not test_obj.passed
# Set status
if test_obj.failed and test_obj.status != 'TimedOut':
test_obj.set_status('Failed')
elif test_obj.passed:
test_obj.set_status('Passed')
else:
test_obj.set_status('Unknown')
# Build report
build_self_test_report(test_obj)
def safety_checks(dev) -> None:
"""Run safety checks and raise an exception if necessary."""
blocking_event_encountered = False
@ -243,114 +379,16 @@ def safety_checks(dev) -> None:
raise CriticalHardwareError(f'Critical error(s) for: {dev.path}')
# SMART self-test status
test_details = get_smart_self_test_details(dev)
if 'remaining_percent' in test_details.get('status', ''):
if self_test_in_progress(dev):
msg = f'SMART self-test in progress for: {dev.path}'
LOG.error(msg)
raise SMARTSelfTestInProgressError(msg)
def run_self_test(test_obj, log_path) -> None:
"""Run disk self-test and check if it passed, returns bool."""
result = None
try:
test_obj.passed = run_smart_self_test(test_obj.dev, log_path)
except TimeoutError:
test_obj.failed = True
result = 'TimedOut'
except SMARTNotSupportedError:
# Pass test since it doesn't apply
test_obj.passed = True
result = 'N/A'
# Set status
if result:
test_obj.set_status(result)
else:
if test_obj.failed:
test_obj.set_status('Failed')
elif test_obj.passed:
test_obj.set_status('Passed')
else:
test_obj.set_status('Unknown')
def run_smart_self_test(dev, log_path) -> bool:
"""Run SMART self-test and check if it passed, returns bool.
NOTE: An exception will be raised if the disk lacks SMART support.
"""
finished = False
result = None
started = False
status_str = 'Starting self-test...'
def self_test_in_progress(dev) -> bool:
"""Check if SMART self-test is in progress, returns bool."""
test_details = get_smart_self_test_details(dev)
test_minutes = 15
size_str = bytes_to_string(dev.size, use_binary=False)
header_str = color_string(
['[', dev.path.name, ' ', size_str, ']'],
[None, 'BLUE', None, 'CYAN', None],
sep='',
)
# Check if disk supports self-tests
if not test_details:
raise SMARTNotSupportedError(
f'SMART self-test not supported for {dev.path}')
# Get real test length
test_minutes = test_details.get('polling_minutes', {}).get('short', 5)
test_minutes = int(test_minutes) + 10
# Start test
with open(log_path, 'w', encoding='utf-8') as _f:
_f.write(f'{header_str}\nInitializing...')
cmd = [
'sudo',
'smartctl',
'--tolerance=normal',
'--test=short',
dev.path,
]
run_program(cmd, check=False)
# Monitor progress (in five second intervals)
for _i in range(int(test_minutes*60/5)):
sleep(5)
# Update status
update_smart_details(dev)
test_details = get_smart_self_test_details(dev)
# Check test progress
if started:
status_str = test_details.get('status', {}).get('string', 'Unknown')
status_str = status_str.capitalize()
# Update log
with open(log_path, 'w', encoding='utf-8') as _f:
_f.write(f'{header_str}\nSMART self-test status:\n {status_str}')
# Check if finished
if 'remaining_percent' not in test_details.get('status', {}):
finished = True
break
elif 'remaining_percent' in test_details.get('status', {}):
started = True
elif _i * 5 >= SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS:
# Test didn't start within limit, stop waiting
break
# Check result
if finished:
result = test_details.get('status', {}).get('passed', False)
elif started:
raise TimeoutError(f'SMART self-test timed out for {dev.path}')
# Done
return result
return 'remaining_percent' in test_details.get('status', '')
def update_smart_details(dev) -> None: