Move SMART functions to their own file
This commit is contained in:
parent
99dd7661d4
commit
46eb737dc8
5 changed files with 457 additions and 420 deletions
|
|
@ -10,6 +10,7 @@ from . import keyboard
|
||||||
from . import network
|
from . import network
|
||||||
from . import screensavers
|
from . import screensavers
|
||||||
from . import sensors
|
from . import sensors
|
||||||
|
from . import smart
|
||||||
from . import surface_scan
|
from . import surface_scan
|
||||||
from . import system
|
from . import system
|
||||||
from . import test
|
from . import test
|
||||||
|
|
|
||||||
|
|
@ -28,6 +28,13 @@ from wk.cfg.ddrescue import (
|
||||||
DDRESCUE_SPECIFIC_PASS_SETTINGS,
|
DDRESCUE_SPECIFIC_PASS_SETTINGS,
|
||||||
)
|
)
|
||||||
from wk.hw import disk as hw_disk
|
from wk.hw import disk as hw_disk
|
||||||
|
from wk.hw.smart import (
|
||||||
|
CriticalHardwareError,
|
||||||
|
SMARTNotSupportedError,
|
||||||
|
SMARTSelfTestInProgressError,
|
||||||
|
safety_checks,
|
||||||
|
update_smart_details,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# STATIC VARIABLES
|
# STATIC VARIABLES
|
||||||
|
|
@ -947,8 +954,8 @@ class State():
|
||||||
def safety_check_destination(self):
|
def safety_check_destination(self):
|
||||||
"""Run safety checks for destination and abort if necessary."""
|
"""Run safety checks for destination and abort if necessary."""
|
||||||
try:
|
try:
|
||||||
self.destination.safety_checks()
|
safety_checks(self.destination)
|
||||||
except hw_disk.CriticalHardwareError as err:
|
except CriticalHardwareError as err:
|
||||||
std.print_error(
|
std.print_error(
|
||||||
f'Critical error(s) detected for: {self.destination.path}',
|
f'Critical error(s) detected for: {self.destination.path}',
|
||||||
)
|
)
|
||||||
|
|
@ -1491,12 +1498,12 @@ def check_destination_health(destination):
|
||||||
|
|
||||||
# Run safety checks
|
# Run safety checks
|
||||||
try:
|
try:
|
||||||
destination.safety_checks()
|
safety_checks(destination)
|
||||||
except hw_disk.CriticalHardwareError:
|
except CriticalHardwareError:
|
||||||
result = 'Critical hardware error detected on destination'
|
result = 'Critical hardware error detected on destination'
|
||||||
except hw_disk.SMARTSelfTestInProgressError:
|
except SMARTSelfTestInProgressError:
|
||||||
result = 'SMART self-test in progress on destination'
|
result = 'SMART self-test in progress on destination'
|
||||||
except hw_disk.SMARTNotSupportedError:
|
except SMARTNotSupportedError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
# Done
|
# Done
|
||||||
|
|
@ -2031,7 +2038,7 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True):
|
||||||
|
|
||||||
def _update_smart_pane():
|
def _update_smart_pane():
|
||||||
"""Update SMART pane every 30 seconds."""
|
"""Update SMART pane every 30 seconds."""
|
||||||
state.source.update_smart_details()
|
update_smart_details(state.source)
|
||||||
now = datetime.datetime.now(tz=TIMEZONE).strftime('%Y-%m-%d %H:%M %Z')
|
now = datetime.datetime.now(tz=TIMEZONE).strftime('%Y-%m-%d %H:%M %Z')
|
||||||
with open(f'{state.log_dir}/smart.out', 'w', encoding='utf-8') as _f:
|
with open(f'{state.log_dir}/smart.out', 'w', encoding='utf-8') as _f:
|
||||||
_f.write(
|
_f.write(
|
||||||
|
|
|
||||||
|
|
@ -21,6 +21,16 @@ from wk.hw import system as hw_system
|
||||||
from wk.hw.audio import audio_test
|
from wk.hw.audio import audio_test
|
||||||
from wk.hw.keyboard import keyboard_test
|
from wk.hw.keyboard import keyboard_test
|
||||||
from wk.hw.network import network_test
|
from wk.hw.network import network_test
|
||||||
|
from wk.hw.smart import (
|
||||||
|
CriticalHardwareError,
|
||||||
|
SMARTSelfTestInProgressError,
|
||||||
|
abort_self_test,
|
||||||
|
check_attributes,
|
||||||
|
check_self_test_results,
|
||||||
|
generate_attribute_report,
|
||||||
|
run_self_test,
|
||||||
|
safety_checks,
|
||||||
|
)
|
||||||
from wk.hw.screensavers import screensaver
|
from wk.hw.screensavers import screensaver
|
||||||
from wk.hw.test import Test, TestGroup
|
from wk.hw.test import Test, TestGroup
|
||||||
|
|
||||||
|
|
@ -121,8 +131,8 @@ class State():
|
||||||
continue
|
continue
|
||||||
|
|
||||||
try:
|
try:
|
||||||
disk.safety_checks()
|
safety_checks(disk)
|
||||||
except hw_disk.CriticalHardwareError:
|
except CriticalHardwareError:
|
||||||
disable_tests = True
|
disable_tests = True
|
||||||
disk.add_note('Critical hardware error detected.', 'RED')
|
disk.add_note('Critical hardware error detected.', 'RED')
|
||||||
if 'Disk Attributes' in disk.tests:
|
if 'Disk Attributes' in disk.tests:
|
||||||
|
|
@ -135,7 +145,7 @@ class State():
|
||||||
'Critical hardware error detected during diagnostics',
|
'Critical hardware error detected during diagnostics',
|
||||||
'YELLOW',
|
'YELLOW',
|
||||||
)
|
)
|
||||||
except hw_disk.SMARTSelfTestInProgressError as err:
|
except SMARTSelfTestInProgressError as err:
|
||||||
if prep:
|
if prep:
|
||||||
std.print_warning(f'SMART self-test(s) in progress for {disk.path}')
|
std.print_warning(f'SMART self-test(s) in progress for {disk.path}')
|
||||||
if std.ask('Continue with all tests disabled for this device?'):
|
if std.ask('Continue with all tests disabled for this device?'):
|
||||||
|
|
@ -160,7 +170,7 @@ class State():
|
||||||
if (
|
if (
|
||||||
'Disk Attributes' in disk.tests
|
'Disk Attributes' in disk.tests
|
||||||
and not disk.tests['Disk Attributes'].failed
|
and not disk.tests['Disk Attributes'].failed
|
||||||
and not disk.check_attributes(only_blocking=False)
|
and not check_attributes(disk, only_blocking=False)
|
||||||
):
|
):
|
||||||
# No blocking errors encountered, but found minor attribute failures
|
# No blocking errors encountered, but found minor attribute failures
|
||||||
if not prep:
|
if not prep:
|
||||||
|
|
@ -449,32 +459,6 @@ def build_menu(cli_mode=False, quick_mode=False):
|
||||||
return menu
|
return menu
|
||||||
|
|
||||||
|
|
||||||
def check_self_test_results(test_obj, aborted=False):
|
|
||||||
"""Check SMART self-test results."""
|
|
||||||
test_obj.report.append(std.color_string('Self-Test', 'BLUE'))
|
|
||||||
if test_obj.disabled or test_obj.status == 'Denied':
|
|
||||||
test_obj.report.append(std.color_string(f' {test_obj.status}', 'RED'))
|
|
||||||
elif test_obj.status == 'N/A' or not test_obj.dev.attributes:
|
|
||||||
test_obj.report.append(std.color_string(f' {test_obj.status}', 'YELLOW'))
|
|
||||||
else:
|
|
||||||
# Not updating SMART data here to preserve the test status for the report
|
|
||||||
# For instance if the test was aborted the report should inlcude the last
|
|
||||||
# known progress instead of just "was aborted buy host"
|
|
||||||
test_details = test_obj.dev.get_smart_self_test_details()
|
|
||||||
test_result = test_details.get('status', {}).get('string', 'Unknown')
|
|
||||||
test_obj.report.append(f' {test_result.capitalize()}')
|
|
||||||
if aborted and not (test_obj.passed or test_obj.failed):
|
|
||||||
test_obj.report.append(std.color_string(' Aborted', 'YELLOW'))
|
|
||||||
test_obj.set_status('Aborted')
|
|
||||||
elif test_obj.status == 'TimedOut':
|
|
||||||
test_obj.report.append(std.color_string(' TimedOut', 'YELLOW'))
|
|
||||||
test_obj.set_status('TimedOut')
|
|
||||||
else:
|
|
||||||
test_obj.failed = not test_obj.passed
|
|
||||||
if test_obj.failed:
|
|
||||||
test_obj.set_status('Failed')
|
|
||||||
|
|
||||||
|
|
||||||
def cpu_stress_tests(state, test_objects):
|
def cpu_stress_tests(state, test_objects):
|
||||||
# pylint: disable=too-many-statements
|
# pylint: disable=too-many-statements
|
||||||
"""CPU & cooling check using Prime95 and Sysbench."""
|
"""CPU & cooling check using Prime95 and Sysbench."""
|
||||||
|
|
@ -612,7 +596,7 @@ def disk_attribute_check(state, test_objects):
|
||||||
test.set_status('N/A')
|
test.set_status('N/A')
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if test.dev.check_attributes():
|
if check_attributes(test.dev):
|
||||||
test.passed = True
|
test.passed = True
|
||||||
test.set_status('Passed')
|
test.set_status('Passed')
|
||||||
else:
|
else:
|
||||||
|
|
@ -695,31 +679,6 @@ def disk_self_test(state, test_objects):
|
||||||
threads = []
|
threads = []
|
||||||
state.panes['SMART'] = []
|
state.panes['SMART'] = []
|
||||||
|
|
||||||
def _run_self_test(test_obj, log_path):
|
|
||||||
"""Run self-test and handle exceptions."""
|
|
||||||
result = None
|
|
||||||
|
|
||||||
try:
|
|
||||||
test_obj.passed = test_obj.dev.run_self_test(log_path)
|
|
||||||
except TimeoutError:
|
|
||||||
test_obj.failed = True
|
|
||||||
result = 'TimedOut'
|
|
||||||
except hw_disk.SMARTNotSupportedError:
|
|
||||||
# Pass test since it doesn't apply
|
|
||||||
test_obj.passed = True
|
|
||||||
result = 'N/A'
|
|
||||||
|
|
||||||
# Set status
|
|
||||||
if result:
|
|
||||||
test_obj.set_status(result)
|
|
||||||
else:
|
|
||||||
if test_obj.failed:
|
|
||||||
test_obj.set_status('Failed')
|
|
||||||
elif test_obj.passed:
|
|
||||||
test_obj.set_status('Passed')
|
|
||||||
else:
|
|
||||||
test_obj.set_status('Unknown')
|
|
||||||
|
|
||||||
# Run self-tests
|
# Run self-tests
|
||||||
state.update_top_pane(
|
state.update_top_pane(
|
||||||
f'Disk self-test{"s" if len(test_objects) > 1 else ""}',
|
f'Disk self-test{"s" if len(test_objects) > 1 else ""}',
|
||||||
|
|
@ -733,7 +692,7 @@ def disk_self_test(state, test_objects):
|
||||||
# Start thread
|
# Start thread
|
||||||
test.set_status('Working')
|
test.set_status('Working')
|
||||||
test_log = f'{state.log_dir}/{test.dev.path.name}_selftest.log'
|
test_log = f'{state.log_dir}/{test.dev.path.name}_selftest.log'
|
||||||
threads.append(exe.start_thread(_run_self_test, args=(test, test_log)))
|
threads.append(exe.start_thread(run_self_test, args=(test, test_log)))
|
||||||
|
|
||||||
# Show progress
|
# Show progress
|
||||||
if threads[-1].is_alive():
|
if threads[-1].is_alive():
|
||||||
|
|
@ -752,7 +711,7 @@ def disk_self_test(state, test_objects):
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
aborted = True
|
aborted = True
|
||||||
for test in test_objects:
|
for test in test_objects:
|
||||||
test.dev.abort_self_test()
|
abort_self_test(test.dev)
|
||||||
std.sleep(0.5)
|
std.sleep(0.5)
|
||||||
|
|
||||||
# Save report(s)
|
# Save report(s)
|
||||||
|
|
@ -787,7 +746,7 @@ def disk_surface_scan(state, test_objects):
|
||||||
)
|
)
|
||||||
for disk in state.disks:
|
for disk in state.disks:
|
||||||
failed_attributes = [
|
failed_attributes = [
|
||||||
line for line in disk.generate_attribute_report() if 'failed' in line
|
line for line in generate_attribute_report(disk) if 'failed' in line
|
||||||
]
|
]
|
||||||
if failed_attributes:
|
if failed_attributes:
|
||||||
size_str = std.bytes_to_string(disk.size, use_binary=False)
|
size_str = std.bytes_to_string(disk.size, use_binary=False)
|
||||||
|
|
|
||||||
|
|
@ -9,25 +9,15 @@ import re
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from typing import Any, Union
|
from typing import Any, Union
|
||||||
|
|
||||||
from wk.cfg.hw import (
|
|
||||||
ATTRIBUTE_COLORS,
|
|
||||||
KEY_NVME,
|
|
||||||
KEY_SMART,
|
|
||||||
KNOWN_DISK_ATTRIBUTES,
|
|
||||||
KNOWN_DISK_MODELS,
|
|
||||||
NVME_WARNING_KEYS,
|
|
||||||
REGEX_POWER_ON_TIME,
|
|
||||||
SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS,
|
|
||||||
)
|
|
||||||
from wk.cfg.main import KIT_NAME_SHORT
|
from wk.cfg.main import KIT_NAME_SHORT
|
||||||
from wk.exe import get_json_from_command, run_program
|
from wk.exe import get_json_from_command, run_program
|
||||||
from wk.hw.test import Test
|
from wk.hw.test import Test
|
||||||
from wk.std import (
|
from wk.hw.smart import (
|
||||||
PLATFORM,
|
enable_smart,
|
||||||
bytes_to_string,
|
generate_attribute_report,
|
||||||
color_string,
|
update_smart_details,
|
||||||
sleep,
|
|
||||||
)
|
)
|
||||||
|
from wk.std import PLATFORM, bytes_to_string, color_string
|
||||||
|
|
||||||
|
|
||||||
# STATIC VARIABLES
|
# STATIC VARIABLES
|
||||||
|
|
@ -38,17 +28,6 @@ WK_LABEL_REGEX = re.compile(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# Exception Classes
|
|
||||||
class CriticalHardwareError(RuntimeError):
|
|
||||||
"""Exception used for critical hardware failures."""
|
|
||||||
|
|
||||||
class SMARTNotSupportedError(TypeError):
|
|
||||||
"""Exception used for disks lacking SMART support."""
|
|
||||||
|
|
||||||
class SMARTSelfTestInProgressError(RuntimeError):
|
|
||||||
"""Exception used when a SMART self-test is in progress."""
|
|
||||||
|
|
||||||
|
|
||||||
# Classes
|
# Classes
|
||||||
@dataclass(slots=True)
|
@dataclass(slots=True)
|
||||||
class Disk:
|
class Disk:
|
||||||
|
|
@ -76,23 +55,18 @@ class Disk:
|
||||||
self.path = pathlib.Path(self.path).resolve()
|
self.path = pathlib.Path(self.path).resolve()
|
||||||
self.get_details()
|
self.get_details()
|
||||||
self.set_description()
|
self.set_description()
|
||||||
self.enable_smart()
|
enable_smart(self)
|
||||||
self.update_smart_details()
|
update_smart_details(self)
|
||||||
if not self.attributes and self.bus == 'USB':
|
if not self.attributes and self.bus == 'USB':
|
||||||
# Try using SAT
|
# Try using SAT
|
||||||
LOG.warning('Using SAT for smartctl for %s', self.path)
|
LOG.warning('Using SAT for smartctl for %s', self.path)
|
||||||
self.notes = []
|
self.notes = []
|
||||||
self.use_sat = True
|
self.use_sat = True
|
||||||
self.enable_smart()
|
enable_smart(self)
|
||||||
self.update_smart_details()
|
update_smart_details(self)
|
||||||
if not self.is_4k_aligned():
|
if not self.is_4k_aligned():
|
||||||
self.add_note('One or more partitions are not 4K aligned', 'YELLOW')
|
self.add_note('One or more partitions are not 4K aligned', 'YELLOW')
|
||||||
|
|
||||||
def abort_self_test(self) -> None:
|
|
||||||
"""Abort currently running non-captive self-test."""
|
|
||||||
cmd = ['sudo', 'smartctl', '--abort', self.path]
|
|
||||||
run_program(cmd, check=False)
|
|
||||||
|
|
||||||
def add_note(self, note, color=None) -> None:
|
def add_note(self, note, color=None) -> None:
|
||||||
"""Add note that will be included in the disk report."""
|
"""Add note that will be included in the disk report."""
|
||||||
if color:
|
if color:
|
||||||
|
|
@ -101,40 +75,6 @@ class Disk:
|
||||||
self.notes.append(note)
|
self.notes.append(note)
|
||||||
self.notes.sort()
|
self.notes.sort()
|
||||||
|
|
||||||
def check_attributes(self, only_blocking=False) -> bool:
|
|
||||||
"""Check if any known attributes are failing, returns bool."""
|
|
||||||
attributes_ok = True
|
|
||||||
known_attributes = get_known_disk_attributes(self.model)
|
|
||||||
for attr, value in self.attributes.items():
|
|
||||||
# Skip unknown attributes
|
|
||||||
if attr not in known_attributes:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Get thresholds
|
|
||||||
blocking_attribute = known_attributes[attr].get('Blocking', False)
|
|
||||||
err_thresh = known_attributes[attr].get('Error', None)
|
|
||||||
max_thresh = known_attributes[attr].get('Maximum', None)
|
|
||||||
if not max_thresh:
|
|
||||||
max_thresh = float('inf')
|
|
||||||
|
|
||||||
# Skip non-blocking attributes if necessary
|
|
||||||
if only_blocking and not blocking_attribute:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Skip informational attributes
|
|
||||||
if not err_thresh:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Check attribute
|
|
||||||
if known_attributes[attr].get('PercentageLife', False):
|
|
||||||
if 0 <= value['raw'] <= err_thresh:
|
|
||||||
attributes_ok = False
|
|
||||||
elif err_thresh <= value['raw'] < max_thresh:
|
|
||||||
attributes_ok = False
|
|
||||||
|
|
||||||
# Done
|
|
||||||
return attributes_ok
|
|
||||||
|
|
||||||
def disable_disk_tests(self) -> None:
|
def disable_disk_tests(self) -> None:
|
||||||
"""Disable all tests."""
|
"""Disable all tests."""
|
||||||
LOG.warning('Disabling all tests for: %s', self.path)
|
LOG.warning('Disabling all tests for: %s', self.path)
|
||||||
|
|
@ -143,73 +83,6 @@ class Disk:
|
||||||
test.set_status('Denied')
|
test.set_status('Denied')
|
||||||
test.disabled = True
|
test.disabled = True
|
||||||
|
|
||||||
def enable_smart(self) -> None:
|
|
||||||
"""Try enabling SMART for this disk."""
|
|
||||||
cmd = [
|
|
||||||
'sudo',
|
|
||||||
'smartctl',
|
|
||||||
f'--device={"sat,auto" if self.use_sat else "auto"}',
|
|
||||||
'--tolerance=permissive',
|
|
||||||
'--smart=on',
|
|
||||||
self.path,
|
|
||||||
]
|
|
||||||
run_program(cmd, check=False)
|
|
||||||
|
|
||||||
def generate_attribute_report(self) -> list[str]:
|
|
||||||
"""Generate attribute report, returns list."""
|
|
||||||
known_attributes = get_known_disk_attributes(self.model)
|
|
||||||
report = []
|
|
||||||
for attr, value in sorted(self.attributes.items()):
|
|
||||||
note = ''
|
|
||||||
value_color = 'GREEN'
|
|
||||||
|
|
||||||
# Skip attributes not in our list
|
|
||||||
if attr not in known_attributes:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Check for attribute note
|
|
||||||
note = known_attributes[attr].get('Note', '')
|
|
||||||
|
|
||||||
# ID / Name
|
|
||||||
label = f'{attr:>3}'
|
|
||||||
if isinstance(attr, int):
|
|
||||||
# Assuming SMART, include hex ID and name
|
|
||||||
label += f' / {str(hex(attr))[2:].upper():0>2}: {value["name"]}'
|
|
||||||
label = f' {label.replace("_", " "):38}'
|
|
||||||
|
|
||||||
# Value color
|
|
||||||
if known_attributes[attr].get('PercentageLife', False):
|
|
||||||
# PercentageLife values
|
|
||||||
if 0 <= value['raw'] <= known_attributes[attr]['Error']:
|
|
||||||
value_color = 'RED'
|
|
||||||
note = '(failed, % life remaining)'
|
|
||||||
elif value['raw'] < 0 or value['raw'] > 100:
|
|
||||||
value_color = 'PURPLE'
|
|
||||||
note = '(invalid?)'
|
|
||||||
else:
|
|
||||||
for threshold, color in ATTRIBUTE_COLORS:
|
|
||||||
threshold_val = known_attributes[attr].get(threshold, None)
|
|
||||||
if threshold_val and value['raw'] >= threshold_val:
|
|
||||||
value_color = color
|
|
||||||
if threshold == 'Error':
|
|
||||||
note = '(failed)'
|
|
||||||
elif threshold == 'Maximum':
|
|
||||||
note = '(invalid?)'
|
|
||||||
|
|
||||||
# 199/C7 warning
|
|
||||||
if str(attr) == '199' and value['raw'] > 0:
|
|
||||||
note = '(bad cable?)'
|
|
||||||
|
|
||||||
# Build colored string and append to report
|
|
||||||
line = color_string(
|
|
||||||
[label, value['raw_str'], note],
|
|
||||||
[None, value_color, 'YELLOW'],
|
|
||||||
)
|
|
||||||
report.append(line)
|
|
||||||
|
|
||||||
# Done
|
|
||||||
return report
|
|
||||||
|
|
||||||
def generate_report(self, header=True) -> list[str]:
|
def generate_report(self, header=True) -> list[str]:
|
||||||
"""Generate Disk report, returns list."""
|
"""Generate Disk report, returns list."""
|
||||||
report = []
|
report = []
|
||||||
|
|
@ -221,7 +94,7 @@ class Disk:
|
||||||
if self.attributes:
|
if self.attributes:
|
||||||
if header:
|
if header:
|
||||||
report.append(color_string('Attributes', 'BLUE'))
|
report.append(color_string('Attributes', 'BLUE'))
|
||||||
report.extend(self.generate_attribute_report())
|
report.extend(generate_attribute_report(self))
|
||||||
|
|
||||||
# Notes
|
# Notes
|
||||||
if self.notes:
|
if self.notes:
|
||||||
|
|
@ -294,18 +167,6 @@ class Disk:
|
||||||
# Done
|
# Done
|
||||||
return labels
|
return labels
|
||||||
|
|
||||||
def get_smart_self_test_details(self) -> dict[Any, Any]:
|
|
||||||
"""Shorthand to get deeply nested self-test details, returns dict."""
|
|
||||||
details = {}
|
|
||||||
try:
|
|
||||||
details = self.raw_smartctl['ata_smart_data']['self_test']
|
|
||||||
except (KeyError, TypeError):
|
|
||||||
# Assuming disk lacks SMART support, ignore and return empty dict.
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Done
|
|
||||||
return details
|
|
||||||
|
|
||||||
def is_4k_aligned(self) -> bool:
|
def is_4k_aligned(self) -> bool:
|
||||||
"""Check that all disk partitions are aligned, returns bool."""
|
"""Check that all disk partitions are aligned, returns bool."""
|
||||||
aligned = True
|
aligned = True
|
||||||
|
|
@ -316,138 +177,6 @@ class Disk:
|
||||||
|
|
||||||
return aligned
|
return aligned
|
||||||
|
|
||||||
def safety_checks(self) -> None:
|
|
||||||
"""Run safety checks and raise an exception if necessary."""
|
|
||||||
blocking_event_encountered = False
|
|
||||||
self.update_smart_details()
|
|
||||||
|
|
||||||
# Attributes
|
|
||||||
if not self.check_attributes(only_blocking=True):
|
|
||||||
blocking_event_encountered = True
|
|
||||||
LOG.error('%s: Blocked for failing attribute(s)', self.path)
|
|
||||||
|
|
||||||
# NVMe status
|
|
||||||
nvme_status = self.raw_smartctl.get('smart_status', {}).get('nvme', {})
|
|
||||||
if nvme_status.get('media_read_only', False):
|
|
||||||
blocking_event_encountered = True
|
|
||||||
msg = 'Media has been placed in read-only mode'
|
|
||||||
self.add_note(msg, 'RED')
|
|
||||||
LOG.error('%s %s', self.path, msg)
|
|
||||||
for key in NVME_WARNING_KEYS:
|
|
||||||
if nvme_status.get(key, False):
|
|
||||||
msg = key.replace('_', ' ')
|
|
||||||
self.add_note(msg, 'YELLOW')
|
|
||||||
LOG.warning('%s %s', self.path, msg)
|
|
||||||
|
|
||||||
# SMART overall assessment
|
|
||||||
smart_passed = True
|
|
||||||
try:
|
|
||||||
smart_passed = self.raw_smartctl['smart_status']['passed']
|
|
||||||
except (KeyError, TypeError):
|
|
||||||
# Assuming disk doesn't support SMART overall assessment
|
|
||||||
pass
|
|
||||||
if not smart_passed:
|
|
||||||
blocking_event_encountered = True
|
|
||||||
msg = 'SMART overall self-assessment: Failed'
|
|
||||||
self.add_note(msg, 'RED')
|
|
||||||
LOG.error('%s %s', self.path, msg)
|
|
||||||
|
|
||||||
# Raise blocking exception if necessary
|
|
||||||
if blocking_event_encountered:
|
|
||||||
raise CriticalHardwareError(f'Critical error(s) for: {self.path}')
|
|
||||||
|
|
||||||
# SMART self-test status
|
|
||||||
test_details = self.get_smart_self_test_details()
|
|
||||||
if 'remaining_percent' in test_details.get('status', ''):
|
|
||||||
msg = f'SMART self-test in progress for: {self.path}'
|
|
||||||
LOG.error(msg)
|
|
||||||
raise SMARTSelfTestInProgressError(msg)
|
|
||||||
|
|
||||||
def run_self_test(self, log_path) -> bool:
|
|
||||||
"""Run disk self-test and check if it passed, returns bool.
|
|
||||||
|
|
||||||
NOTE: This function is here to reserve a place for future
|
|
||||||
NVMe self-tests announced in NVMe spec v1.3.
|
|
||||||
"""
|
|
||||||
result = self.run_smart_self_test(log_path)
|
|
||||||
return result
|
|
||||||
|
|
||||||
def run_smart_self_test(self, log_path) -> bool:
|
|
||||||
"""Run SMART self-test and check if it passed, returns bool.
|
|
||||||
|
|
||||||
NOTE: An exception will be raised if the disk lacks SMART support.
|
|
||||||
"""
|
|
||||||
finished = False
|
|
||||||
result = None
|
|
||||||
started = False
|
|
||||||
status_str = 'Starting self-test...'
|
|
||||||
test_details = self.get_smart_self_test_details()
|
|
||||||
test_minutes = 15
|
|
||||||
size_str = bytes_to_string(self.size, use_binary=False)
|
|
||||||
header_str = color_string(
|
|
||||||
['[', self.path.name, ' ', size_str, ']'],
|
|
||||||
[None, 'BLUE', None, 'CYAN', None],
|
|
||||||
sep='',
|
|
||||||
)
|
|
||||||
|
|
||||||
# Check if disk supports self-tests
|
|
||||||
if not test_details:
|
|
||||||
raise SMARTNotSupportedError(
|
|
||||||
f'SMART self-test not supported for {self.path}')
|
|
||||||
|
|
||||||
# Get real test length
|
|
||||||
test_minutes = test_details.get('polling_minutes', {}).get('short', 5)
|
|
||||||
test_minutes = int(test_minutes) + 10
|
|
||||||
|
|
||||||
# Start test
|
|
||||||
with open(log_path, 'w', encoding='utf-8') as _f:
|
|
||||||
_f.write(f'{header_str}\nInitializing...')
|
|
||||||
cmd = [
|
|
||||||
'sudo',
|
|
||||||
'smartctl',
|
|
||||||
'--tolerance=normal',
|
|
||||||
'--test=short',
|
|
||||||
self.path,
|
|
||||||
]
|
|
||||||
run_program(cmd, check=False)
|
|
||||||
|
|
||||||
# Monitor progress (in five second intervals)
|
|
||||||
for _i in range(int(test_minutes*60/5)):
|
|
||||||
sleep(5)
|
|
||||||
|
|
||||||
# Update status
|
|
||||||
self.update_smart_details()
|
|
||||||
test_details = self.get_smart_self_test_details()
|
|
||||||
|
|
||||||
# Check test progress
|
|
||||||
if started:
|
|
||||||
status_str = test_details.get('status', {}).get('string', 'Unknown')
|
|
||||||
status_str = status_str.capitalize()
|
|
||||||
|
|
||||||
# Update log
|
|
||||||
with open(log_path, 'w', encoding='utf-8') as _f:
|
|
||||||
_f.write(f'{header_str}\nSMART self-test status:\n {status_str}')
|
|
||||||
|
|
||||||
# Check if finished
|
|
||||||
if 'remaining_percent' not in test_details.get('status', {}):
|
|
||||||
finished = True
|
|
||||||
break
|
|
||||||
|
|
||||||
elif 'remaining_percent' in test_details.get('status', {}):
|
|
||||||
started = True
|
|
||||||
elif _i * 5 >= SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS:
|
|
||||||
# Test didn't start within limit, stop waiting
|
|
||||||
break
|
|
||||||
|
|
||||||
# Check result
|
|
||||||
if finished:
|
|
||||||
result = test_details.get('status', {}).get('passed', False)
|
|
||||||
elif started:
|
|
||||||
raise TimeoutError(f'SMART self-test timed out for {self.path}')
|
|
||||||
|
|
||||||
# Done
|
|
||||||
return result
|
|
||||||
|
|
||||||
def set_description(self) -> None:
|
def set_description(self) -> None:
|
||||||
"""Set disk description from details."""
|
"""Set disk description from details."""
|
||||||
self.description = (
|
self.description = (
|
||||||
|
|
@ -455,62 +184,6 @@ class Disk:
|
||||||
f' ({self.bus}) {self.model} {self.serial}'
|
f' ({self.bus}) {self.model} {self.serial}'
|
||||||
)
|
)
|
||||||
|
|
||||||
def update_smart_details(self) -> None:
|
|
||||||
"""Update SMART details via smartctl."""
|
|
||||||
updated_attributes = {}
|
|
||||||
|
|
||||||
# Get SMART data
|
|
||||||
cmd = [
|
|
||||||
'sudo',
|
|
||||||
'smartctl',
|
|
||||||
f'--device={"sat,auto" if self.use_sat else "auto"}',
|
|
||||||
'--tolerance=verypermissive',
|
|
||||||
'--all',
|
|
||||||
'--json',
|
|
||||||
self.path,
|
|
||||||
]
|
|
||||||
self.raw_smartctl = get_json_from_command(cmd, check=False)
|
|
||||||
|
|
||||||
# Check for attributes
|
|
||||||
if KEY_NVME in self.raw_smartctl:
|
|
||||||
for name, value in self.raw_smartctl[KEY_NVME].items():
|
|
||||||
try:
|
|
||||||
updated_attributes[name] = {
|
|
||||||
'name': name,
|
|
||||||
'raw': int(value),
|
|
||||||
'raw_str': str(value),
|
|
||||||
}
|
|
||||||
except (TypeError, ValueError):
|
|
||||||
# Ignoring invalid attribute
|
|
||||||
LOG.error('Invalid NVMe attribute: %s %s', name, value)
|
|
||||||
elif KEY_SMART in self.raw_smartctl:
|
|
||||||
for attribute in self.raw_smartctl[KEY_SMART].get('table', {}):
|
|
||||||
try:
|
|
||||||
_id = int(attribute['id'])
|
|
||||||
except (KeyError, ValueError):
|
|
||||||
# Ignoring invalid attribute
|
|
||||||
LOG.error('Invalid SMART attribute: %s', attribute)
|
|
||||||
continue
|
|
||||||
name = str(attribute.get('name', 'Unknown')).replace('_', ' ').title()
|
|
||||||
raw = int(attribute.get('raw', {}).get('value', -1))
|
|
||||||
raw_str = attribute.get('raw', {}).get('string', 'Unknown')
|
|
||||||
|
|
||||||
# Fix power-on time
|
|
||||||
match = REGEX_POWER_ON_TIME.match(raw_str)
|
|
||||||
if _id == 9 and match:
|
|
||||||
raw = int(match.group(1))
|
|
||||||
|
|
||||||
# Add to dict
|
|
||||||
updated_attributes[_id] = {
|
|
||||||
'name': name, 'raw': raw, 'raw_str': raw_str}
|
|
||||||
|
|
||||||
# Add note if necessary
|
|
||||||
if not updated_attributes:
|
|
||||||
self.add_note('No NVMe or SMART data available', 'YELLOW')
|
|
||||||
|
|
||||||
# Done
|
|
||||||
self.attributes.update(updated_attributes)
|
|
||||||
|
|
||||||
|
|
||||||
# Functions
|
# Functions
|
||||||
def get_disk_details_linux(path) -> dict[Any, Any]:
|
def get_disk_details_linux(path) -> dict[Any, Any]:
|
||||||
|
|
@ -676,23 +349,6 @@ def get_disks_macos() -> list[Disk]:
|
||||||
return disks
|
return disks
|
||||||
|
|
||||||
|
|
||||||
def get_known_disk_attributes(model) -> dict[Any, dict]:
|
|
||||||
"""Get known NVMe/SMART attributes (model specific), returns dict."""
|
|
||||||
known_attributes = KNOWN_DISK_ATTRIBUTES.copy()
|
|
||||||
|
|
||||||
# Apply model-specific data
|
|
||||||
for regex, data in KNOWN_DISK_MODELS.items():
|
|
||||||
if re.search(regex, model):
|
|
||||||
for attr, thresholds in data.items():
|
|
||||||
if attr in known_attributes:
|
|
||||||
known_attributes[attr].update(thresholds)
|
|
||||||
else:
|
|
||||||
known_attributes[attr] = thresholds
|
|
||||||
|
|
||||||
# Done
|
|
||||||
return known_attributes
|
|
||||||
|
|
||||||
|
|
||||||
def is_4k_aligned_macos(disk_details) -> bool:
|
def is_4k_aligned_macos(disk_details) -> bool:
|
||||||
"""Check partition alignment using diskutil info, returns bool."""
|
"""Check partition alignment using diskutil info, returns bool."""
|
||||||
aligned = True
|
aligned = True
|
||||||
|
|
|
||||||
414
scripts/wk/hw/smart.py
Normal file
414
scripts/wk/hw/smart.py
Normal file
|
|
@ -0,0 +1,414 @@
|
||||||
|
"""WizardKit: SMART test functions"""
|
||||||
|
# vim: sts=2 sw=2 ts=2
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import re
|
||||||
|
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from wk.cfg.hw import (
|
||||||
|
ATTRIBUTE_COLORS,
|
||||||
|
KEY_NVME,
|
||||||
|
KEY_SMART,
|
||||||
|
KNOWN_DISK_ATTRIBUTES,
|
||||||
|
KNOWN_DISK_MODELS,
|
||||||
|
NVME_WARNING_KEYS,
|
||||||
|
REGEX_POWER_ON_TIME,
|
||||||
|
SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS,
|
||||||
|
)
|
||||||
|
from wk.exe import get_json_from_command, run_program
|
||||||
|
from wk.std import bytes_to_string, color_string, sleep
|
||||||
|
|
||||||
|
|
||||||
|
# STATIC VARIABLES
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
# Exception Classes
|
||||||
|
class CriticalHardwareError(RuntimeError):
|
||||||
|
"""Exception used for critical hardware failures."""
|
||||||
|
|
||||||
|
class SMARTNotSupportedError(TypeError):
|
||||||
|
"""Exception used for disks lacking SMART support."""
|
||||||
|
|
||||||
|
class SMARTSelfTestInProgressError(RuntimeError):
|
||||||
|
"""Exception used when a SMART self-test is in progress."""
|
||||||
|
|
||||||
|
|
||||||
|
# Functions
|
||||||
|
def abort_self_test(dev) -> None:
|
||||||
|
"""Abort currently running non-captive self-test."""
|
||||||
|
cmd = ['sudo', 'smartctl', '--abort', dev.path]
|
||||||
|
run_program(cmd, check=False)
|
||||||
|
|
||||||
|
|
||||||
|
def check_attributes(dev, only_blocking=False) -> bool:
|
||||||
|
"""Check if any known attributes are failing, returns bool."""
|
||||||
|
attributes_ok = True
|
||||||
|
known_attributes = get_known_disk_attributes(dev.model)
|
||||||
|
for attr, value in dev.attributes.items():
|
||||||
|
# Skip unknown attributes
|
||||||
|
if attr not in known_attributes:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Get thresholds
|
||||||
|
blocking_attribute = known_attributes[attr].get('Blocking', False)
|
||||||
|
err_thresh = known_attributes[attr].get('Error', None)
|
||||||
|
max_thresh = known_attributes[attr].get('Maximum', None)
|
||||||
|
if not max_thresh:
|
||||||
|
max_thresh = float('inf')
|
||||||
|
|
||||||
|
# Skip non-blocking attributes if necessary
|
||||||
|
if only_blocking and not blocking_attribute:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Skip informational attributes
|
||||||
|
if not err_thresh:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Check attribute
|
||||||
|
if known_attributes[attr].get('PercentageLife', False):
|
||||||
|
if 0 <= value['raw'] <= err_thresh:
|
||||||
|
attributes_ok = False
|
||||||
|
elif err_thresh <= value['raw'] < max_thresh:
|
||||||
|
attributes_ok = False
|
||||||
|
|
||||||
|
# Done
|
||||||
|
return attributes_ok
|
||||||
|
|
||||||
|
|
||||||
|
def check_self_test_results(test_obj, aborted=False):
|
||||||
|
"""Check SMART self-test results."""
|
||||||
|
test_obj.report.append(color_string('Self-Test', 'BLUE'))
|
||||||
|
if test_obj.disabled or test_obj.status == 'Denied':
|
||||||
|
test_obj.report.append(color_string(f' {test_obj.status}', 'RED'))
|
||||||
|
elif test_obj.status == 'N/A' or not test_obj.dev.attributes:
|
||||||
|
test_obj.report.append(color_string(f' {test_obj.status}', 'YELLOW'))
|
||||||
|
else:
|
||||||
|
# Not updating SMART data here to preserve the test status for the report
|
||||||
|
# For instance if the test was aborted the report should inlcude the last
|
||||||
|
# known progress instead of just "was aborted by host"
|
||||||
|
test_details = get_smart_self_test_details(test_obj.dev)
|
||||||
|
test_result = test_details.get('status', {}).get('string', 'Unknown')
|
||||||
|
test_obj.report.append(f' {test_result.capitalize()}')
|
||||||
|
if aborted and not (test_obj.passed or test_obj.failed):
|
||||||
|
test_obj.report.append(color_string(' Aborted', 'YELLOW'))
|
||||||
|
test_obj.set_status('Aborted')
|
||||||
|
elif test_obj.status == 'TimedOut':
|
||||||
|
test_obj.report.append(color_string(' TimedOut', 'YELLOW'))
|
||||||
|
test_obj.set_status('TimedOut')
|
||||||
|
else:
|
||||||
|
test_obj.failed = not test_obj.passed
|
||||||
|
if test_obj.failed:
|
||||||
|
test_obj.set_status('Failed')
|
||||||
|
|
||||||
|
|
||||||
|
def enable_smart(dev) -> None:
|
||||||
|
"""Try enabling SMART for this disk."""
|
||||||
|
cmd = [
|
||||||
|
'sudo',
|
||||||
|
'smartctl',
|
||||||
|
f'--device={"sat,auto" if dev.use_sat else "auto"}',
|
||||||
|
'--tolerance=permissive',
|
||||||
|
'--smart=on',
|
||||||
|
dev.path,
|
||||||
|
]
|
||||||
|
run_program(cmd, check=False)
|
||||||
|
|
||||||
|
|
||||||
|
def generate_attribute_report(dev) -> list[str]:
|
||||||
|
"""Generate attribute report, returns list."""
|
||||||
|
known_attributes = get_known_disk_attributes(dev.model)
|
||||||
|
report = []
|
||||||
|
for attr, value in sorted(dev.attributes.items()):
|
||||||
|
note = ''
|
||||||
|
value_color = 'GREEN'
|
||||||
|
|
||||||
|
# Skip attributes not in our list
|
||||||
|
if attr not in known_attributes:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Check for attribute note
|
||||||
|
note = known_attributes[attr].get('Note', '')
|
||||||
|
|
||||||
|
# ID / Name
|
||||||
|
label = f'{attr:>3}'
|
||||||
|
if isinstance(attr, int):
|
||||||
|
# Assuming SMART, include hex ID and name
|
||||||
|
label += f' / {str(hex(attr))[2:].upper():0>2}: {value["name"]}'
|
||||||
|
label = f' {label.replace("_", " "):38}'
|
||||||
|
|
||||||
|
# Value color
|
||||||
|
if known_attributes[attr].get('PercentageLife', False):
|
||||||
|
# PercentageLife values
|
||||||
|
if 0 <= value['raw'] <= known_attributes[attr]['Error']:
|
||||||
|
value_color = 'RED'
|
||||||
|
note = '(failed, % life remaining)'
|
||||||
|
elif value['raw'] < 0 or value['raw'] > 100:
|
||||||
|
value_color = 'PURPLE'
|
||||||
|
note = '(invalid?)'
|
||||||
|
else:
|
||||||
|
for threshold, color in ATTRIBUTE_COLORS:
|
||||||
|
threshold_val = known_attributes[attr].get(threshold, None)
|
||||||
|
if threshold_val and value['raw'] >= threshold_val:
|
||||||
|
value_color = color
|
||||||
|
if threshold == 'Error':
|
||||||
|
note = '(failed)'
|
||||||
|
elif threshold == 'Maximum':
|
||||||
|
note = '(invalid?)'
|
||||||
|
|
||||||
|
# 199/C7 warning
|
||||||
|
if str(attr) == '199' and value['raw'] > 0:
|
||||||
|
note = '(bad cable?)'
|
||||||
|
|
||||||
|
# Build colored string and append to report
|
||||||
|
line = color_string(
|
||||||
|
[label, value['raw_str'], note],
|
||||||
|
[None, value_color, 'YELLOW'],
|
||||||
|
)
|
||||||
|
report.append(line)
|
||||||
|
|
||||||
|
# Done
|
||||||
|
return report
|
||||||
|
|
||||||
|
|
||||||
|
def get_known_disk_attributes(model) -> dict[Any, dict]:
|
||||||
|
"""Get known NVMe/SMART attributes (model specific), returns dict."""
|
||||||
|
known_attributes = KNOWN_DISK_ATTRIBUTES.copy()
|
||||||
|
|
||||||
|
# Apply model-specific data
|
||||||
|
for regex, data in KNOWN_DISK_MODELS.items():
|
||||||
|
if re.search(regex, model):
|
||||||
|
for attr, thresholds in data.items():
|
||||||
|
if attr in known_attributes:
|
||||||
|
known_attributes[attr].update(thresholds)
|
||||||
|
else:
|
||||||
|
known_attributes[attr] = thresholds
|
||||||
|
|
||||||
|
# Done
|
||||||
|
return known_attributes
|
||||||
|
|
||||||
|
|
||||||
|
def get_smart_self_test_details(dev) -> dict[Any, Any]:
|
||||||
|
"""Shorthand to get deeply nested self-test details, returns dict."""
|
||||||
|
details = {}
|
||||||
|
try:
|
||||||
|
details = dev.raw_smartctl['ata_smart_data']['self_test']
|
||||||
|
except (KeyError, TypeError):
|
||||||
|
# Assuming disk lacks SMART support, ignore and return empty dict.
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Done
|
||||||
|
return details
|
||||||
|
|
||||||
|
|
||||||
|
def safety_checks(dev) -> None:
|
||||||
|
"""Run safety checks and raise an exception if necessary."""
|
||||||
|
blocking_event_encountered = False
|
||||||
|
update_smart_details(dev)
|
||||||
|
|
||||||
|
# Attributes
|
||||||
|
if not check_attributes(dev, only_blocking=True):
|
||||||
|
blocking_event_encountered = True
|
||||||
|
LOG.error('%s: Blocked for failing attribute(s)', dev.path)
|
||||||
|
|
||||||
|
# NVMe status
|
||||||
|
nvme_status = dev.raw_smartctl.get('smart_status', {}).get('nvme', {})
|
||||||
|
if nvme_status.get('media_read_only', False):
|
||||||
|
blocking_event_encountered = True
|
||||||
|
msg = 'Media has been placed in read-only mode'
|
||||||
|
dev.add_note(msg, 'RED')
|
||||||
|
LOG.error('%s %s', dev.path, msg)
|
||||||
|
for key in NVME_WARNING_KEYS:
|
||||||
|
if nvme_status.get(key, False):
|
||||||
|
msg = key.replace('_', ' ')
|
||||||
|
dev.add_note(msg, 'YELLOW')
|
||||||
|
LOG.warning('%s %s', dev.path, msg)
|
||||||
|
|
||||||
|
# SMART overall assessment
|
||||||
|
smart_passed = True
|
||||||
|
try:
|
||||||
|
smart_passed = dev.raw_smartctl['smart_status']['passed']
|
||||||
|
except (KeyError, TypeError):
|
||||||
|
# Assuming disk doesn't support SMART overall assessment
|
||||||
|
pass
|
||||||
|
if not smart_passed:
|
||||||
|
blocking_event_encountered = True
|
||||||
|
msg = 'SMART overall self-assessment: Failed'
|
||||||
|
dev.add_note(msg, 'RED')
|
||||||
|
LOG.error('%s %s', dev.path, msg)
|
||||||
|
|
||||||
|
# Raise blocking exception if necessary
|
||||||
|
if blocking_event_encountered:
|
||||||
|
raise CriticalHardwareError(f'Critical error(s) for: {dev.path}')
|
||||||
|
|
||||||
|
# SMART self-test status
|
||||||
|
test_details = get_smart_self_test_details(dev)
|
||||||
|
if 'remaining_percent' in test_details.get('status', ''):
|
||||||
|
msg = f'SMART self-test in progress for: {dev.path}'
|
||||||
|
LOG.error(msg)
|
||||||
|
raise SMARTSelfTestInProgressError(msg)
|
||||||
|
|
||||||
|
|
||||||
|
def run_self_test(test_obj, log_path) -> None:
|
||||||
|
"""Run disk self-test and check if it passed, returns bool."""
|
||||||
|
result = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
test_obj.passed = run_smart_self_test(test_obj.dev, log_path)
|
||||||
|
except TimeoutError:
|
||||||
|
test_obj.failed = True
|
||||||
|
result = 'TimedOut'
|
||||||
|
except SMARTNotSupportedError:
|
||||||
|
# Pass test since it doesn't apply
|
||||||
|
test_obj.passed = True
|
||||||
|
result = 'N/A'
|
||||||
|
|
||||||
|
# Set status
|
||||||
|
if result:
|
||||||
|
test_obj.set_status(result)
|
||||||
|
else:
|
||||||
|
if test_obj.failed:
|
||||||
|
test_obj.set_status('Failed')
|
||||||
|
elif test_obj.passed:
|
||||||
|
test_obj.set_status('Passed')
|
||||||
|
else:
|
||||||
|
test_obj.set_status('Unknown')
|
||||||
|
|
||||||
|
|
||||||
|
def run_smart_self_test(dev, log_path) -> bool:
|
||||||
|
"""Run SMART self-test and check if it passed, returns bool.
|
||||||
|
|
||||||
|
NOTE: An exception will be raised if the disk lacks SMART support.
|
||||||
|
"""
|
||||||
|
finished = False
|
||||||
|
result = None
|
||||||
|
started = False
|
||||||
|
status_str = 'Starting self-test...'
|
||||||
|
test_details = get_smart_self_test_details(dev)
|
||||||
|
test_minutes = 15
|
||||||
|
size_str = bytes_to_string(dev.size, use_binary=False)
|
||||||
|
header_str = color_string(
|
||||||
|
['[', dev.path.name, ' ', size_str, ']'],
|
||||||
|
[None, 'BLUE', None, 'CYAN', None],
|
||||||
|
sep='',
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check if disk supports self-tests
|
||||||
|
if not test_details:
|
||||||
|
raise SMARTNotSupportedError(
|
||||||
|
f'SMART self-test not supported for {dev.path}')
|
||||||
|
|
||||||
|
# Get real test length
|
||||||
|
test_minutes = test_details.get('polling_minutes', {}).get('short', 5)
|
||||||
|
test_minutes = int(test_minutes) + 10
|
||||||
|
|
||||||
|
# Start test
|
||||||
|
with open(log_path, 'w', encoding='utf-8') as _f:
|
||||||
|
_f.write(f'{header_str}\nInitializing...')
|
||||||
|
cmd = [
|
||||||
|
'sudo',
|
||||||
|
'smartctl',
|
||||||
|
'--tolerance=normal',
|
||||||
|
'--test=short',
|
||||||
|
dev.path,
|
||||||
|
]
|
||||||
|
run_program(cmd, check=False)
|
||||||
|
|
||||||
|
# Monitor progress (in five second intervals)
|
||||||
|
for _i in range(int(test_minutes*60/5)):
|
||||||
|
sleep(5)
|
||||||
|
|
||||||
|
# Update status
|
||||||
|
update_smart_details(dev)
|
||||||
|
test_details = get_smart_self_test_details(dev)
|
||||||
|
|
||||||
|
# Check test progress
|
||||||
|
if started:
|
||||||
|
status_str = test_details.get('status', {}).get('string', 'Unknown')
|
||||||
|
status_str = status_str.capitalize()
|
||||||
|
|
||||||
|
# Update log
|
||||||
|
with open(log_path, 'w', encoding='utf-8') as _f:
|
||||||
|
_f.write(f'{header_str}\nSMART self-test status:\n {status_str}')
|
||||||
|
|
||||||
|
# Check if finished
|
||||||
|
if 'remaining_percent' not in test_details.get('status', {}):
|
||||||
|
finished = True
|
||||||
|
break
|
||||||
|
|
||||||
|
elif 'remaining_percent' in test_details.get('status', {}):
|
||||||
|
started = True
|
||||||
|
elif _i * 5 >= SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS:
|
||||||
|
# Test didn't start within limit, stop waiting
|
||||||
|
break
|
||||||
|
|
||||||
|
# Check result
|
||||||
|
if finished:
|
||||||
|
result = test_details.get('status', {}).get('passed', False)
|
||||||
|
elif started:
|
||||||
|
raise TimeoutError(f'SMART self-test timed out for {dev.path}')
|
||||||
|
|
||||||
|
# Done
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def update_smart_details(dev) -> None:
|
||||||
|
"""Update SMART details via smartctl."""
|
||||||
|
updated_attributes = {}
|
||||||
|
|
||||||
|
# Get SMART data
|
||||||
|
cmd = [
|
||||||
|
'sudo',
|
||||||
|
'smartctl',
|
||||||
|
f'--device={"sat,auto" if dev.use_sat else "auto"}',
|
||||||
|
'--tolerance=verypermissive',
|
||||||
|
'--all',
|
||||||
|
'--json',
|
||||||
|
dev.path,
|
||||||
|
]
|
||||||
|
dev.raw_smartctl = get_json_from_command(cmd, check=False)
|
||||||
|
|
||||||
|
# Check for attributes
|
||||||
|
if KEY_NVME in dev.raw_smartctl:
|
||||||
|
for name, value in dev.raw_smartctl[KEY_NVME].items():
|
||||||
|
try:
|
||||||
|
updated_attributes[name] = {
|
||||||
|
'name': name,
|
||||||
|
'raw': int(value),
|
||||||
|
'raw_str': str(value),
|
||||||
|
}
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
# Ignoring invalid attribute
|
||||||
|
LOG.error('Invalid NVMe attribute: %s %s', name, value)
|
||||||
|
elif KEY_SMART in dev.raw_smartctl:
|
||||||
|
for attribute in dev.raw_smartctl[KEY_SMART].get('table', {}):
|
||||||
|
try:
|
||||||
|
_id = int(attribute['id'])
|
||||||
|
except (KeyError, ValueError):
|
||||||
|
# Ignoring invalid attribute
|
||||||
|
LOG.error('Invalid SMART attribute: %s', attribute)
|
||||||
|
continue
|
||||||
|
name = str(attribute.get('name', 'Unknown')).replace('_', ' ').title()
|
||||||
|
raw = int(attribute.get('raw', {}).get('value', -1))
|
||||||
|
raw_str = attribute.get('raw', {}).get('string', 'Unknown')
|
||||||
|
|
||||||
|
# Fix power-on time
|
||||||
|
match = REGEX_POWER_ON_TIME.match(raw_str)
|
||||||
|
if _id == 9 and match:
|
||||||
|
raw = int(match.group(1))
|
||||||
|
|
||||||
|
# Add to dict
|
||||||
|
updated_attributes[_id] = {
|
||||||
|
'name': name, 'raw': raw, 'raw_str': raw_str}
|
||||||
|
|
||||||
|
# Add note if necessary
|
||||||
|
if not updated_attributes:
|
||||||
|
dev.add_note('No NVMe or SMART data available', 'YELLOW')
|
||||||
|
|
||||||
|
# Done
|
||||||
|
dev.attributes.update(updated_attributes)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
print("This file is not meant to be called directly.")
|
||||||
Loading…
Reference in a new issue