WizardKit/scripts/wk/hw/disk.py

736 lines
22 KiB
Python

"""WizardKit: Disk object and functions"""
# vim: sts=2 sw=2 ts=2
import logging
import pathlib
import plistlib
import re
from dataclasses import dataclass, field
from typing import Any, Union
from wk.cfg.hw import (
ATTRIBUTE_COLORS,
KEY_NVME,
KEY_SMART,
KNOWN_DISK_ATTRIBUTES,
KNOWN_DISK_MODELS,
NVME_WARNING_KEYS,
REGEX_POWER_ON_TIME,
SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS,
)
from wk.cfg.main import KIT_NAME_SHORT
from wk.exe import get_json_from_command, run_program
from wk.hw.test import Test
from wk.std import (
PLATFORM,
bytes_to_string,
color_string,
sleep,
)
# STATIC VARIABLES
LOG = logging.getLogger(__name__)
WK_LABEL_REGEX = re.compile(
fr'{KIT_NAME_SHORT}_(LINUX|UFD)',
re.IGNORECASE,
)
# Exception Classes
class CriticalHardwareError(RuntimeError):
"""Exception used for critical hardware failures."""
class SMARTNotSupportedError(TypeError):
"""Exception used for disks lacking SMART support."""
class SMARTSelfTestInProgressError(RuntimeError):
"""Exception used when a SMART self-test is in progress."""
# Classes
@dataclass(slots=True)
class Disk:
# pylint: disable=too-many-instance-attributes
"""Object for tracking disk specific data."""
attributes: dict[Any, dict] = field(init=False, default_factory=dict)
bus: str = field(init=False)
description: str = field(init=False)
filesystem: str = field(init=False)
log_sec: int = field(init=False)
model: str = field(init=False)
name: str = field(init=False)
notes: list[str] = field(init=False, default_factory=list)
path: Union[pathlib.Path, str]
phy_sec: int = field(init=False)
raw_details: dict[str, Any] = field(init=False)
raw_smartctl: dict[str, Any] = field(init=False)
serial: str = field(init=False)
size: int = field(init=False)
ssd: bool = field(init=False)
tests: list[Test] = field(init=False, default_factory=list)
use_sat: bool = field(init=False, default=False)
def __post_init__(self) -> None:
self.path = pathlib.Path(self.path).resolve()
self.get_details()
self.set_description()
self.enable_smart()
self.update_smart_details()
if not self.attributes and self.bus == 'USB':
# Try using SAT
LOG.warning('Using SAT for smartctl for %s', self.path)
self.notes = []
self.use_sat = True
self.enable_smart()
self.update_smart_details()
if not self.is_4k_aligned():
self.add_note('One or more partitions are not 4K aligned', 'YELLOW')
def abort_self_test(self) -> None:
"""Abort currently running non-captive self-test."""
cmd = ['sudo', 'smartctl', '--abort', self.path]
run_program(cmd, check=False)
def add_note(self, note, color=None) -> None:
"""Add note that will be included in the disk report."""
if color:
note = color_string(note, color)
if note not in self.notes:
self.notes.append(note)
self.notes.sort()
def check_attributes(self, only_blocking=False) -> bool:
"""Check if any known attributes are failing, returns bool."""
attributes_ok = True
known_attributes = get_known_disk_attributes(self.model)
for attr, value in self.attributes.items():
# Skip unknown attributes
if attr not in known_attributes:
continue
# Get thresholds
blocking_attribute = known_attributes[attr].get('Blocking', False)
err_thresh = known_attributes[attr].get('Error', None)
max_thresh = known_attributes[attr].get('Maximum', None)
if not max_thresh:
max_thresh = float('inf')
# Skip non-blocking attributes if necessary
if only_blocking and not blocking_attribute:
continue
# Skip informational attributes
if not err_thresh:
continue
# Check attribute
if known_attributes[attr].get('PercentageLife', False):
if 0 <= value['raw'] <= err_thresh:
attributes_ok = False
elif err_thresh <= value['raw'] < max_thresh:
attributes_ok = False
# Done
return attributes_ok
def disable_disk_tests(self) -> None:
"""Disable all tests."""
LOG.warning('Disabling all tests for: %s', self.path)
for test in self.tests:
if test.status in ('Pending', 'Working'):
test.set_status('Denied')
test.disabled = True
def enable_smart(self) -> None:
"""Try enabling SMART for this disk."""
cmd = [
'sudo',
'smartctl',
f'--device={"sat,auto" if self.use_sat else "auto"}',
'--tolerance=permissive',
'--smart=on',
self.path,
]
run_program(cmd, check=False)
def generate_attribute_report(self) -> list[str]:
"""Generate attribute report, returns list."""
known_attributes = get_known_disk_attributes(self.model)
report = []
for attr, value in sorted(self.attributes.items()):
note = ''
value_color = 'GREEN'
# Skip attributes not in our list
if attr not in known_attributes:
continue
# Check for attribute note
note = known_attributes[attr].get('Note', '')
# ID / Name
label = f'{attr:>3}'
if isinstance(attr, int):
# Assuming SMART, include hex ID and name
label += f' / {str(hex(attr))[2:].upper():0>2}: {value["name"]}'
label = f' {label.replace("_", " "):38}'
# Value color
if known_attributes[attr].get('PercentageLife', False):
# PercentageLife values
if 0 <= value['raw'] <= known_attributes[attr]['Error']:
value_color = 'RED'
note = '(failed, % life remaining)'
elif value['raw'] < 0 or value['raw'] > 100:
value_color = 'PURPLE'
note = '(invalid?)'
else:
for threshold, color in ATTRIBUTE_COLORS:
threshold_val = known_attributes[attr].get(threshold, None)
if threshold_val and value['raw'] >= threshold_val:
value_color = color
if threshold == 'Error':
note = '(failed)'
elif threshold == 'Maximum':
note = '(invalid?)'
# 199/C7 warning
if str(attr) == '199' and value['raw'] > 0:
note = '(bad cable?)'
# Build colored string and append to report
line = color_string(
[label, value['raw_str'], note],
[None, value_color, 'YELLOW'],
)
report.append(line)
# Done
return report
def generate_report(self, header=True) -> list[str]:
"""Generate Disk report, returns list."""
report = []
if header:
report.append(color_string(f'Device ({self.path.name})', 'BLUE'))
report.append(f' {self.description}')
# Attributes
if self.attributes:
if header:
report.append(color_string('Attributes', 'BLUE'))
report.extend(self.generate_attribute_report())
# Notes
if self.notes:
report.append(color_string('Notes', 'BLUE'))
for note in self.notes:
report.append(f' {note}')
# Tests
for test in self.tests:
report.extend(test.report)
return report
def get_details(self) -> None:
"""Get disk details using OS specific methods.
Required details default to generic descriptions
and are converted to the correct type.
"""
if PLATFORM == 'Darwin':
self.raw_details = get_disk_details_macos(self.path)
elif PLATFORM == 'Linux':
self.raw_details = get_disk_details_linux(self.path)
# Set necessary details
self.bus = str(self.raw_details.get('bus', '???')).upper()
self.bus = self.bus.replace('IMAGE', 'Image')
self.bus = self.bus.replace('NVME', 'NVMe')
self.filesystem = self.raw_details.get('fstype', 'Unknown')
self.log_sec = self.raw_details.get('log-sec', 512)
self.model = self.raw_details.get('model', 'Unknown Model')
self.name = self.raw_details.get('name', self.path)
self.phy_sec = self.raw_details.get('phy-sec', 512)
self.serial = self.raw_details.get('serial', 'Unknown Serial')
self.size = self.raw_details.get('size', -1)
self.ssd = self.raw_details.get('ssd', False)
# Ensure certain attributes types
## NOTE: This is ugly, deal.
for attr in ['bus', 'model', 'name', 'serial']:
setattr(self, attr, str(getattr(self, attr)))
for attr in ['log_sec', 'phy_sec', 'size']:
try:
setattr(self, attr, int(getattr(self, attr)))
except (TypeError, ValueError):
LOG.error('Invalid disk %s: %s', attr, getattr(self, attr))
if attr == 'size':
setattr(self, attr, -1)
# Set description
self.description = (
f'{bytes_to_string(self.size, use_binary=False)}'
f' ({self.bus})'
f' {self.model}'
f' {self.serial}'
)
def get_labels(self) -> list[str]:
"""Build list of labels for this disk, returns list."""
labels = []
# Add all labels from raw_details
for details in [self.raw_details, *self.raw_details.get('children', [])]:
labels.append(details.get('label', ''))
labels.append(details.get('partlabel', ''))
# Remove empty labels
labels = [str(label) for label in labels if label]
# Done
return labels
def get_smart_self_test_details(self) -> dict[Any, Any]:
"""Shorthand to get deeply nested self-test details, returns dict."""
details = {}
try:
details = self.raw_smartctl['ata_smart_data']['self_test']
except (KeyError, TypeError):
# Assuming disk lacks SMART support, ignore and return empty dict.
pass
# Done
return details
def is_4k_aligned(self) -> bool:
"""Check that all disk partitions are aligned, returns bool."""
aligned = True
if PLATFORM == 'Darwin':
aligned = is_4k_aligned_macos(self.raw_details)
elif PLATFORM == 'Linux':
aligned = is_4k_aligned_linux(self.path, self.phy_sec)
return aligned
def safety_checks(self) -> None:
"""Run safety checks and raise an exception if necessary."""
blocking_event_encountered = False
self.update_smart_details()
# Attributes
if not self.check_attributes(only_blocking=True):
blocking_event_encountered = True
LOG.error('%s: Blocked for failing attribute(s)', self.path)
# NVMe status
nvme_status = self.raw_smartctl.get('smart_status', {}).get('nvme', {})
if nvme_status.get('media_read_only', False):
blocking_event_encountered = True
msg = 'Media has been placed in read-only mode'
self.add_note(msg, 'RED')
LOG.error('%s %s', self.path, msg)
for key in NVME_WARNING_KEYS:
if nvme_status.get(key, False):
msg = key.replace('_', ' ')
self.add_note(msg, 'YELLOW')
LOG.warning('%s %s', self.path, msg)
# SMART overall assessment
smart_passed = True
try:
smart_passed = self.raw_smartctl['smart_status']['passed']
except (KeyError, TypeError):
# Assuming disk doesn't support SMART overall assessment
pass
if not smart_passed:
blocking_event_encountered = True
msg = 'SMART overall self-assessment: Failed'
self.add_note(msg, 'RED')
LOG.error('%s %s', self.path, msg)
# Raise blocking exception if necessary
if blocking_event_encountered:
raise CriticalHardwareError(f'Critical error(s) for: {self.path}')
# SMART self-test status
test_details = self.get_smart_self_test_details()
if 'remaining_percent' in test_details.get('status', ''):
msg = f'SMART self-test in progress for: {self.path}'
LOG.error(msg)
raise SMARTSelfTestInProgressError(msg)
def run_self_test(self, log_path) -> bool:
"""Run disk self-test and check if it passed, returns bool.
NOTE: This function is here to reserve a place for future
NVMe self-tests announced in NVMe spec v1.3.
"""
result = self.run_smart_self_test(log_path)
return result
def run_smart_self_test(self, log_path) -> bool:
"""Run SMART self-test and check if it passed, returns bool.
NOTE: An exception will be raised if the disk lacks SMART support.
"""
finished = False
result = None
started = False
status_str = 'Starting self-test...'
test_details = self.get_smart_self_test_details()
test_minutes = 15
size_str = bytes_to_string(self.size, use_binary=False)
header_str = color_string(
['[', self.path.name, ' ', size_str, ']'],
[None, 'BLUE', None, 'CYAN', None],
sep='',
)
# Check if disk supports self-tests
if not test_details:
raise SMARTNotSupportedError(
f'SMART self-test not supported for {self.path}')
# Get real test length
test_minutes = test_details.get('polling_minutes', {}).get('short', 5)
test_minutes = int(test_minutes) + 10
# Start test
with open(log_path, 'w', encoding='utf-8') as _f:
_f.write(f'{header_str}\nInitializing...')
cmd = [
'sudo',
'smartctl',
'--tolerance=normal',
'--test=short',
self.path,
]
run_program(cmd, check=False)
# Monitor progress (in five second intervals)
for _i in range(int(test_minutes*60/5)):
sleep(5)
# Update status
self.update_smart_details()
test_details = self.get_smart_self_test_details()
# Check test progress
if started:
status_str = test_details.get('status', {}).get('string', 'Unknown')
status_str = status_str.capitalize()
# Update log
with open(log_path, 'w', encoding='utf-8') as _f:
_f.write(f'{header_str}\nSMART self-test status:\n {status_str}')
# Check if finished
if 'remaining_percent' not in test_details.get('status', {}):
finished = True
break
elif 'remaining_percent' in test_details.get('status', {}):
started = True
elif _i * 5 >= SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS:
# Test didn't start within limit, stop waiting
break
# Check result
if finished:
result = test_details.get('status', {}).get('passed', False)
elif started:
raise TimeoutError(f'SMART self-test timed out for {self.path}')
# Done
return result
def set_description(self) -> None:
"""Set disk description from details."""
self.description = (
f'{bytes_to_string(self.size, use_binary=False)}'
f' ({self.bus}) {self.model} {self.serial}'
)
def update_smart_details(self) -> None:
"""Update SMART details via smartctl."""
updated_attributes = {}
# Get SMART data
cmd = [
'sudo',
'smartctl',
f'--device={"sat,auto" if self.use_sat else "auto"}',
'--tolerance=verypermissive',
'--all',
'--json',
self.path,
]
self.raw_smartctl = get_json_from_command(cmd, check=False)
# Check for attributes
if KEY_NVME in self.raw_smartctl:
for name, value in self.raw_smartctl[KEY_NVME].items():
try:
updated_attributes[name] = {
'name': name,
'raw': int(value),
'raw_str': str(value),
}
except (TypeError, ValueError):
# Ignoring invalid attribute
LOG.error('Invalid NVMe attribute: %s %s', name, value)
elif KEY_SMART in self.raw_smartctl:
for attribute in self.raw_smartctl[KEY_SMART].get('table', {}):
try:
_id = int(attribute['id'])
except (KeyError, ValueError):
# Ignoring invalid attribute
LOG.error('Invalid SMART attribute: %s', attribute)
continue
name = str(attribute.get('name', 'Unknown')).replace('_', ' ').title()
raw = int(attribute.get('raw', {}).get('value', -1))
raw_str = attribute.get('raw', {}).get('string', 'Unknown')
# Fix power-on time
match = REGEX_POWER_ON_TIME.match(raw_str)
if _id == 9 and match:
raw = int(match.group(1))
# Add to dict
updated_attributes[_id] = {
'name': name, 'raw': raw, 'raw_str': raw_str}
# Add note if necessary
if not updated_attributes:
self.add_note('No NVMe or SMART data available', 'YELLOW')
# Done
self.attributes.update(updated_attributes)
# Functions
def get_disk_details_linux(path) -> dict[Any, Any]:
"""Get disk details using lsblk, returns dict."""
cmd = ['lsblk', '--bytes', '--json', '--output-all', '--paths', path]
json_data = get_json_from_command(cmd, check=False)
details = json_data.get('blockdevices', [{}])[0]
# Fix details
for dev in [details, *details.get('children', [])]:
dev['bus'] = dev.pop('tran', '???')
dev['parent'] = dev.pop('pkname', None)
dev['ssd'] = not dev.pop('rota', True)
if 'loop' in str(path) and dev['bus'] is None:
dev['bus'] = 'Image'
dev['model'] = ''
dev['serial'] = ''
# Done
return details
def get_disk_details_macos(path) -> dict[Any, Any]:
"""Get disk details using diskutil, returns dict."""
details = {}
# Get "list" details
cmd = ['diskutil', 'list', '-plist', path]
proc = run_program(cmd, check=False, encoding=None, errors=None)
try:
plist_data = plistlib.loads(proc.stdout)
except (TypeError, ValueError):
# Invalid / corrupt plist data? return empty dict to avoid crash
LOG.error('Failed to get diskutil list for %s', path)
return details
# Parse "list" details
details = plist_data.get('AllDisksAndPartitions', [{}])[0]
details['children'] = details.pop('Partitions', [])
details['path'] = path
for child in details['children']:
child['path'] = path.with_name(child.get('DeviceIdentifier', 'null'))
# Get "info" details
for dev in [details, *details['children']]:
cmd = ['diskutil', 'info', '-plist', dev['path']]
proc = run_program(cmd, check=False, encoding=None, errors=None)
try:
plist_data = plistlib.loads(proc.stdout)
except (TypeError, ValueError):
LOG.error('Failed to get diskutil info for %s', path)
continue #Skip
# Parse "info" details
dev.update(plist_data)
dev['bus'] = dev.pop('BusProtocol', '???')
dev['fstype'] = dev.pop('FilesystemType', '')
dev['label'] = dev.pop('VolumeName', '')
dev['model'] = dev.pop('MediaName', 'Unknown')
dev['mountpoint'] = dev.pop('MountPoint', '')
dev['name'] = dev.get('name', str(dev['path']))
dev['phy-sec'] = dev.pop('DeviceBlockSize', 512)
dev['serial'] = get_disk_serial_macos(dev['path'])
dev['size'] = dev.pop('Size', -1)
dev['ssd'] = dev.pop('SolidState', False)
dev['vendor'] = ''
if dev.get('WholeDisk', True):
dev['parent'] = None
else:
dev['parent'] = dev.pop('ParentWholeDisk', None)
# Fix details if main dev is a child
for child in details['children']:
if path == child['path']:
for key in ('fstype', 'label', 'name', 'size'):
details[key] = child[key]
break
# Done
return details
def get_disk_serial_macos(path) -> str:
"""Get disk serial using system_profiler, returns str."""
cmd = ['sudo', 'smartctl', '--info', '--json', path]
smart_info = get_json_from_command(cmd)
return smart_info.get('serial_number', 'Unknown Serial')
def get_disks(skip_kits=False) -> list[Disk]:
"""Get disks using OS-specific methods, returns list."""
disks = []
if PLATFORM == 'Darwin':
disks = get_disks_macos()
elif PLATFORM == 'Linux':
disks = get_disks_linux()
# Skip WK disks
if skip_kits:
disks = [
disk_obj for disk_obj in disks
if not any(
WK_LABEL_REGEX.search(label) for label in disk_obj.get_labels()
)
]
# Done
return disks
def get_disks_linux() -> list[Disk]:
"""Get disks via lsblk, returns list."""
cmd = ['lsblk', '--json', '--nodeps', '--paths']
disks = []
# Add valid disks
json_data = get_json_from_command(cmd)
for disk in json_data.get('blockdevices', []):
disk_obj = Disk(disk['name'])
# Skip loopback devices, optical devices, etc
if disk_obj.raw_details.get('type', '???') != 'disk':
continue
# Add disk
disks.append(disk_obj)
# Done
return disks
def get_disks_macos() -> list[Disk]:
"""Get disks via diskutil, returns list."""
cmd = ['diskutil', 'list', '-plist', 'physical']
disks = []
# Get info from diskutil
proc = run_program(cmd, encoding=None, errors=None, check=False)
if proc.returncode != 0:
# Assuming we're running on an older macOS version
cmd.pop(-1)
proc = run_program(cmd, encoding=None, errors=None, check=False)
# Parse plist data
try:
plist_data = plistlib.loads(proc.stdout)
except (TypeError, ValueError):
# Invalid / corrupt plist data? return empty list to avoid crash
LOG.error('Failed to get diskutil list')
return disks
# Add valid disks
for disk in plist_data['WholeDisks']:
disks.append(Disk(f'/dev/{disk}'))
# Remove virtual disks
# TODO: Test more to figure out why some drives are being marked 'Unknown'
disks = [
d for d in disks if d.details.get('VirtualOrPhysical') != 'Virtual'
]
# Done
return disks
def get_known_disk_attributes(model) -> dict[Any, dict]:
"""Get known NVMe/SMART attributes (model specific), returns dict."""
known_attributes = KNOWN_DISK_ATTRIBUTES.copy()
# Apply model-specific data
for regex, data in KNOWN_DISK_MODELS.items():
if re.search(regex, model):
for attr, thresholds in data.items():
if attr in known_attributes:
known_attributes[attr].update(thresholds)
else:
known_attributes[attr] = thresholds
# Done
return known_attributes
def is_4k_aligned_macos(disk_details) -> bool:
"""Check partition alignment using diskutil info, returns bool."""
aligned = True
# Check partitions
for part in disk_details.get('children', []):
offset = part.get('PartitionMapPartitionOffset', 0)
if not offset:
# Assuming offset couldn't be found and it defaulted to 0
# NOTE: Just logging the error, not bailing
LOG.error('Failed to get partition offset for %s', part['path'])
aligned = aligned and offset >= 0 and offset % 4096 == 0
# Done
return aligned
def is_4k_aligned_linux(dev_path, physical_sector_size) -> bool:
"""Check partition alignment using lsblk, returns bool."""
aligned = True
cmd = [
'sudo',
'sfdisk',
'--json',
dev_path,
]
# Get partition details
json_data = get_json_from_command(cmd)
# Check partitions
for part in json_data.get('partitiontable', {}).get('partitions', []):
offset = physical_sector_size * part.get('start', -1)
aligned = aligned and offset >= 0 and offset % 4096 == 0
# Done
return aligned
if __name__ == '__main__':
print("This file is not meant to be called directly.")