Added SMART self-test sections

This commit is contained in:
2Shirt 2019-11-01 18:51:02 -06:00
parent 93102b5144
commit e634d1691f
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C

View file

@ -11,7 +11,7 @@ from collections import OrderedDict
from wk.cfg.hw import KNOWN_ATTRIBUTES from wk.cfg.hw import KNOWN_ATTRIBUTES
from wk.exe import get_json_from_command, run_program from wk.exe import get_json_from_command, run_program
from wk.std import bytes_to_string, color_string, string_to_bytes from wk.std import bytes_to_string, color_string, sleep, string_to_bytes
# STATIC VARIABLES # STATIC VARIABLES
@ -43,6 +43,9 @@ REGEX_POWER_ON_TIME = re.compile(
class CriticalHardwareError(RuntimeError): class CriticalHardwareError(RuntimeError):
"""Exception used for critical hardware failures.""" """Exception used for critical hardware failures."""
class SMARTNotSupportedError(TypeError):
"""Exception used for disks lacking SMART support."""
# Classes # Classes
class CpuRam(): class CpuRam():
@ -233,7 +236,6 @@ class Disk():
# Done # Done
return report return report
def generate_report(self): def generate_report(self):
"""Generate Disk report, returns list.""" """Generate Disk report, returns list."""
report = [] report = []
@ -307,6 +309,18 @@ class Disk():
# Done # Done
return labels return labels
def get_smart_self_test_details(self):
"""Shorthand to get deeply nested self-test details, returns dict."""
details = {}
try:
details = self.smartctl['ata_smart_data']['self_test']
except (KeyError, TypeError):
# Assuming disk lacks SMART support, ignore and return empty dict.
pass
# Done
return details
def is_4k_aligned(self): def is_4k_aligned(self):
"""Check that all disk partitions are aligned, returns bool.""" """Check that all disk partitions are aligned, returns bool."""
aligned = True aligned = True
@ -345,13 +359,8 @@ class Disk():
LOG.error('%s %s', self.path, msg) LOG.error('%s %s', self.path, msg)
# SMART self-test status # SMART self-test status
test_status = '' test_details = self.get_smart_self_test_details()
try: if 'remaining_percent' in test_details.get('status', ''):
test_status = self.smartctl['ata_smart_data']['self_test']['status']
except (KeyError, TypeError):
# Assuming disk doesn't support SMART self-tests
pass
if 'remaining_percent' in test_status:
blocking_event_encountered = True blocking_event_encountered = True
msg = 'SMART self-test in progress' msg = 'SMART self-test in progress'
self.add_note(msg, 'RED') self.add_note(msg, 'RED')
@ -361,6 +370,80 @@ class Disk():
if blocking_event_encountered: if blocking_event_encountered:
raise CriticalHardwareError(f'Critical error(s) for: {self.path}') raise CriticalHardwareError(f'Critical error(s) for: {self.path}')
def run_self_test(self, log_path):
"""Run disk self-test and check if it passed, returns bool.
NOTE: This function is here to reserve a place for future
NVMe self-tests announced in NVMe spec v1.3.
"""
result = self.run_smart_self_test(log_path)
return result
def run_smart_self_test(self, log_path):
"""Run SMART self-test and check if it passed, returns bool.
NOTE: An exception will be raised if the disk lacks SMART support.
"""
finished = False
result = None
started = False
status_str = 'Starting self-test...'
test_details = self.get_smart_self_test_details()
test_minutes = 15
# Check if disk supports self-tests
if not test_details:
raise SMARTNotSupportedError(
f'SMART self-test not supported for {self.path}')
# Get real test length
test_minutes = test_details.get('polling_minutes', {}).get('short', 5)
test_minutes = int(test_minutes) + 10
# Start test
cmd = [
'sudo',
'smartctl',
'--tolerance=normal',
'--test=short',
self.path,
]
run_program(cmd, check=False)
# Monitor progress (in five second intervals)
for _i in range(int(test_minutes*60/5)):
sleep(5)
# Update status
self.update_smart_details()
test_details = self.get_smart_self_test_details()
# Check test progress
if started:
status_str = test_details.get('status', {}).get('string', 'Unknown')
status_str = status_str.capitalize()
# Update log
with open(log_path, 'w') as _f:
_f.write(f'SMART self-test status for {self.path}:\n {status_str}')
# Check if finished
if 'remaining_percent' not in test_details['status']:
finished = True
break
elif 'remaining_percent' in test_details['status']:
started = True
# Check result
if finished:
result = test_details.get('status', {}).get('passed', False)
elif started:
raise TimeoutError(f'SMART self-test timed out for {self.path}')
# Done
return result
def update_smart_details(self): def update_smart_details(self):
"""Update SMART details via smartctl.""" """Update SMART details via smartctl."""
self.attributes = {} self.attributes = {}
@ -433,9 +516,9 @@ def get_disk_details_macos(path):
try: try:
plist_data = plistlib.loads(proc.stdout) plist_data = plistlib.loads(proc.stdout)
except (TypeError, ValueError): except (TypeError, ValueError):
# Invalid / corrupt plist data? return empty dict to avoid crash
LOG.error('Failed to get diskutil list for %s', path) LOG.error('Failed to get diskutil list for %s', path)
# TODO: Figure this out return details
return details #Bail
# Parse "list" details # Parse "list" details
details = plist_data.get('AllDisksAndPartitions', [{}])[0] details = plist_data.get('AllDisksAndPartitions', [{}])[0]