WizardKit/scripts/wk/hw/disk.py
2Shirt 6880a353cc
Set known_attributes when intializing Disk()
This new design uses copy.deepcopy() to avoid erroneous thresholds being
applied to drives during diags.  This also reduces the number of lookups
to one per Disk.
2022-10-08 14:15:32 -07:00

440 lines
13 KiB
Python

"""WizardKit: Disk object and functions"""
# vim: sts=2 sw=2 ts=2
import logging
import pathlib
import platform
import plistlib
import re
from dataclasses import dataclass, field
from typing import Any, Union
from wk.cfg.main import KIT_NAME_SHORT
from wk.cfg.python import DATACLASS_DECORATOR_KWARGS
from wk.exe import get_json_from_command, run_program
from wk.hw.test import Test
from wk.hw.smart import (
enable_smart,
generate_attribute_report,
get_known_disk_attributes,
update_smart_details,
)
from wk.std import PLATFORM, bytes_to_string, color_string, strip_colors
# STATIC VARIABLES
LOG = logging.getLogger(__name__)
WK_LABEL_REGEX = re.compile(
fr'{KIT_NAME_SHORT}_(LINUX|UFD)',
re.IGNORECASE,
)
# Classes
@dataclass(**DATACLASS_DECORATOR_KWARGS)
class Disk:
# pylint: disable=too-many-instance-attributes
"""Object for tracking disk specific data."""
attributes: dict[Any, dict] = field(init=False, default_factory=dict)
bus: str = field(init=False)
children: list[dict] = field(init=False, default_factory=list)
filesystem: str = field(init=False)
known_attributes: dict[Any, dict] = field(init=False, default_factory=dict)
log_sec: int = field(init=False)
model: str = field(init=False)
name: str = field(init=False)
notes: list[str] = field(init=False, default_factory=list)
path: Union[pathlib.Path, str]
parent: str = field(init=False)
phy_sec: int = field(init=False)
raw_details: dict[str, Any] = field(init=False)
raw_smartctl: dict[str, Any] = field(init=False)
serial: str = field(init=False)
size: int = field(init=False)
ssd: bool = field(init=False)
tests: list[Test] = field(init=False, default_factory=list)
use_sat: bool = field(init=False, default=False)
def __post_init__(self) -> None:
self.path = pathlib.Path(self.path).resolve()
self.update_details()
self.known_attributes = get_known_disk_attributes(self.model)
enable_smart(self)
update_smart_details(self)
if not self.attributes and self.bus == 'USB':
# Try using SAT
LOG.warning('Using SAT for smartctl for %s', self.path)
self.notes = []
self.use_sat = True
enable_smart(self)
update_smart_details(self)
if not self.is_4k_aligned():
self.add_note('One or more partitions are not 4K aligned', 'YELLOW')
def add_note(self, note, color=None) -> None:
"""Add note that will be included in the disk report."""
if color:
note = color_string(note, color)
if note not in self.notes:
self.notes.append(note)
self.notes.sort()
def contains_note(self, note_str) -> bool:
"""Check if note is already present."""
present = False
for note in self.notes:
if note_str == strip_colors(note):
present = True
return present
@property
def description(self) -> str:
"""Get disk description from details."""
return (
f'{bytes_to_string(self.size, use_binary=False)}'
f' ({self.bus}) {self.model} {self.serial}'
)
def disable_disk_tests(self) -> None:
"""Disable all tests."""
LOG.warning('Disabling all tests for: %s', self.path)
for test in self.tests:
if test.status in ('Pending', 'Working'):
test.set_status('Denied')
test.disabled = True
def generate_report(self, header=True) -> list[str]:
"""Generate Disk report, returns list."""
report = []
if header:
report.append(color_string(f'Device ({self.path.name})', 'BLUE'))
report.append(f' {self.description}')
# Attributes
if self.attributes:
if header:
report.append(color_string('Attributes', 'BLUE'))
report.extend(generate_attribute_report(self))
# Notes
if self.notes:
report.append(color_string('Notes', 'BLUE'))
for note in self.notes:
report.append(f' {note}')
# Tests
for test in self.tests:
report.extend(test.report)
return report
def get_labels(self) -> list[str]:
"""Build list of labels for this disk, returns list."""
labels = []
# Add all labels from raw_details
for details in [self.raw_details, *self.raw_details.get('children', [])]:
labels.append(details.get('label', ''))
labels.append(details.get('partlabel', ''))
# Remove empty labels
labels = [str(label) for label in labels if label]
# Done
return labels
def is_4k_aligned(self) -> bool:
"""Check that all disk partitions are aligned, returns bool."""
aligned = True
if PLATFORM == 'Darwin':
aligned = is_4k_aligned_macos(self.raw_details)
elif PLATFORM == 'Linux':
aligned = is_4k_aligned_linux(self.path, self.phy_sec)
return aligned
@property
def present(self) -> bool:
"""Verify this device is still present, returns bool."""
if not self.path.exists():
self.add_note('Device disconnected', 'RED')
return False
return True
def update_details(self, skip_children=True) -> None:
"""Update disk details using OS specific methods.
Required details default to generic descriptions
and are converted to the correct type.
"""
if not self.present:
return
if PLATFORM == 'Darwin':
self.raw_details = get_disk_details_macos(
self.path, skip_children=skip_children,
)
elif PLATFORM == 'Linux':
self.raw_details = get_disk_details_linux(
self.path, skip_children=skip_children,
)
# Set necessary details
self.bus = str(self.raw_details.get('bus', '???')).upper()
self.bus = self.bus.replace('IMAGE', 'Image')
self.bus = self.bus.replace('NVME', 'NVMe')
self.children = self.raw_details.get('children', [])
self.filesystem = self.raw_details.get('fstype', 'Unknown')
self.log_sec = self.raw_details.get('log-sec', 512)
self.model = self.raw_details.get('model', 'Unknown Model')
self.name = self.raw_details.get('name', self.path)
self.parent = self.raw_details.get('parent', None)
self.phy_sec = self.raw_details.get('phy-sec', 512)
self.serial = self.raw_details.get('serial', 'Unknown Serial')
self.size = self.raw_details.get('size', -1)
self.ssd = self.raw_details.get('ssd', False)
# Ensure certain attributes types
## NOTE: This is ugly, deal.
for attr in ['bus', 'model', 'name', 'serial']:
setattr(self, attr, str(getattr(self, attr)))
for attr in ['log_sec', 'phy_sec', 'size']:
try:
setattr(self, attr, int(getattr(self, attr)))
except (TypeError, ValueError):
LOG.error('Invalid disk %s: %s', attr, getattr(self, attr))
if attr == 'size':
setattr(self, attr, -1)
# Functions
def get_disk_details_linux(disk_path, skip_children=True) -> dict[Any, Any]:
"""Get disk details using lsblk, returns dict."""
def _flatten_json(dev) -> list:
"""Convert lsblk JSON tree to a flat list of items, returns list."""
devs = [dev]
for child in dev.pop('children', []):
devs.extend(_flatten_json(child))
return devs
# Get lsblk info
cmd = ['lsblk', '--bytes', '--json', '--output-all', '--paths', disk_path]
if skip_children:
cmd.append('--nodeps')
json_data = get_json_from_command(cmd, check=False)
dev_list = _flatten_json(json_data.get('blockdevices', [{}])[0])
# Fix details
for dev in dev_list:
dev['bus'] = dev.pop('tran', '???')
dev['parent'] = dev.pop('pkname', None)
dev['ssd'] = not dev.pop('rota', True)
if 'loop' in str(disk_path) and dev['bus'] is None:
dev['bus'] = 'Image'
dev['model'] = ''
dev['serial'] = ''
# Convert to dict
details = dev_list.pop(0)
details['children'] = dev_list
# Done
return details
def get_disk_details_macos(disk_path, skip_children=True) -> dict:
"""Get disk details using diskutil, returns dict."""
details = {}
disk_path = pathlib.Path(disk_path)
# Get "list" details
cmd = ['diskutil', 'list', '-plist', disk_path]
proc = run_program(cmd, check=False, encoding=None, errors=None)
try:
plist_data = plistlib.loads(proc.stdout)
except (TypeError, ValueError):
# Invalid / corrupt plist data? return empty dict to avoid crash
LOG.error('Failed to get diskutil list for %s', disk_path)
return details
# Parse "list" details
details = plist_data.get('AllDisksAndPartitions', [{}])[0]
details['path'] = disk_path
if skip_children:
details['children'] = []
else:
details['children'] = details.pop('Partitions', [])
details['children'].extend(details.pop('APFSVolumes', []))
for child in details['children']:
child['path'] = disk_path.with_name(child['DeviceIdentifier'])
# Get "info" details
for dev in [details, *details['children']]:
cmd = ['diskutil', 'info', '-plist', dev['path']]
proc = run_program(cmd, check=False, encoding=None, errors=None)
try:
plist_data = plistlib.loads(proc.stdout)
except (TypeError, ValueError):
LOG.error('Failed to get diskutil info for %s', dev['path'])
continue
# Parse "info" details
dev.update(plist_data)
dev['bus'] = dev.pop('BusProtocol', '???')
dev['fstype'] = dev.pop('FilesystemType', '')
dev['label'] = dev.pop('VolumeName', '')
dev['model'] = dev.pop('MediaName', 'Unknown')
dev['mountpoint'] = dev.pop('MountPoint', '')
dev['name'] = dev.get('name', str(dev['path']))
dev['phy-sec'] = dev.pop('DeviceBlockSize', 512)
dev['serial'] = get_disk_serial_macos(dev['path'])
dev['size'] = dev.pop('Size', -1)
dev['ssd'] = dev.pop('SolidState', False)
dev['vendor'] = ''
if dev.get('WholeDisk', True):
dev['parent'] = None
else:
dev['parent'] = dev.pop('ParentWholeDisk', None)
# Fix details if main dev is a child
for child in details['children']:
if disk_path == child['path']:
for key in ('fstype', 'label', 'name', 'size'):
details[key] = child[key]
break
# Done
return details
def get_disk_serial_macos(path) -> str:
"""Get disk serial using system_profiler, returns str."""
cmd = ['sudo', 'smartctl', '--info', '--json', path]
smart_info = get_json_from_command(cmd)
return smart_info.get('serial_number', 'Unknown Serial')
def get_disks(skip_kits=False) -> list[Disk]:
"""Get disks using OS-specific methods, returns list."""
disks = []
if PLATFORM == 'Darwin':
disks = get_disks_macos()
elif PLATFORM == 'Linux':
disks = get_disks_linux()
# Skip WK disks
if skip_kits:
disks = [
disk_obj for disk_obj in disks
if not any(
WK_LABEL_REGEX.search(label) for label in disk_obj.get_labels()
)
]
# Done
return disks
def get_disks_linux() -> list[Disk]:
"""Get disks via lsblk, returns list."""
cmd = ['lsblk', '--json', '--nodeps', '--paths']
disks = []
# Add valid disks
json_data = get_json_from_command(cmd)
for disk in json_data.get('blockdevices', []):
disk_obj = Disk(disk['name'])
# Skip loopback devices, optical devices, etc
if disk_obj.raw_details.get('type', '???') != 'disk':
continue
# Add disk
disks.append(disk_obj)
# Done
return disks
def get_disks_macos() -> list[Disk]:
"""Get disks via diskutil, returns list."""
cmd = ['diskutil', 'list', '-plist', 'physical']
disks = []
# El Capitan workaround
if platform.mac_ver()[0].startswith('10.11'):
cmd.pop()
# Get info from diskutil
proc = run_program(cmd, encoding=None, errors=None, check=False)
# Parse plist data
try:
plist_data = plistlib.loads(proc.stdout)
except (TypeError, ValueError):
# Invalid / corrupt plist data? return empty list to avoid crash
LOG.error('Failed to get diskutil list')
return disks
# Add valid disks
for disk in plist_data['AllDisksAndPartitions']:
name = disk['DeviceIdentifier']
if name not in plist_data['WholeDisks']:
# Only check "WholeDisks"
continue
if not disk['Content']:
# This lists GPT or MBR for device, blank should mean it's an image
continue
disks.append(Disk(f'/dev/{name}'))
# Remove virtual disks
disks = [
d for d in disks if d.raw_details.get('VirtualOrPhysical') != 'Virtual'
]
# Done
return disks
def is_4k_aligned_macos(disk_details) -> bool:
"""Check partition alignment using diskutil info, returns bool."""
aligned = True
# Check partitions
for part in disk_details.get('children', []):
offset = part.get('PartitionMapPartitionOffset', 0)
if not offset:
# Assuming offset couldn't be found and it defaulted to 0
# NOTE: Just logging the error, not bailing
LOG.error('Failed to get partition offset for %s', part['path'])
aligned = aligned and offset >= 0 and offset % 4096 == 0
# Done
return aligned
def is_4k_aligned_linux(dev_path, physical_sector_size) -> bool:
"""Check partition alignment using lsblk, returns bool."""
aligned = True
cmd = [
'sudo',
'sfdisk',
'--json',
dev_path,
]
# Get partition details
json_data = get_json_from_command(cmd)
# Check partitions
for part in json_data.get('partitiontable', {}).get('partitions', []):
offset = physical_sector_size * part.get('start', -1)
aligned = aligned and offset >= 0 and offset % 4096 == 0
# Done
return aligned
if __name__ == '__main__':
print("This file is not meant to be called directly.")