WizardKit/scripts/wk/hw/disk.py
2Shirt 7499639c5c
Drop sat,auto detection for smartctl
This was needed twofold.  First is that it was not working as expected
for some time.  Second is that it conflicts with the delayed attribute
updating needed for faster WKClone menus.
2023-07-05 14:57:54 -07:00

467 lines
14 KiB
Python

"""WizardKit: Disk object and functions"""
# vim: sts=2 sw=2 ts=2
import copy
import logging
import pathlib
import platform
import plistlib
import re
from dataclasses import dataclass, field
from typing import Any
from wk.cfg.main import KIT_NAME_SHORT
from wk.exe import get_json_from_command, run_program
from wk.hw.test import Test
from wk.hw.smart import (
generate_attribute_report,
get_known_disk_attributes,
)
from wk.std import PLATFORM
from wk.ui import ansi
# STATIC VARIABLES
LOG = logging.getLogger(__name__)
WK_LABEL_REGEX = re.compile(
fr'{KIT_NAME_SHORT}_(LINUX|UFD)',
re.IGNORECASE,
)
# Classes
@dataclass(slots=True)
class Disk:
"""Object for tracking disk specific data."""
attributes: dict[Any, dict] = field(init=False, default_factory=dict)
bus: str = field(init=False)
children: list[dict] = field(init=False, default_factory=list)
description: str = field(init=False)
filesystem: str = field(init=False)
initial_attributes: dict[Any, dict] = field(init=False)
known_attributes: dict[Any, dict] = field(init=False, default_factory=dict)
log_sec: int = field(init=False)
model: str = field(init=False)
name: str = field(init=False)
notes: list[str] = field(init=False, default_factory=list)
path: pathlib.Path = field(init=False)
path_str: pathlib.Path | str
parent: str = field(init=False)
phy_sec: int = field(init=False)
raw_details: dict[str, Any] = field(init=False)
raw_smartctl: dict[str, Any] = field(init=False, default_factory=dict)
serial: str = field(init=False)
size: int = field(init=False)
ssd: bool = field(init=False)
tests: list[Test] = field(init=False, default_factory=list)
trim: bool = field(init=False)
def __post_init__(self):
self.path = pathlib.Path(self.path_str).resolve()
self.update_details()
self.set_description()
self.known_attributes = get_known_disk_attributes(self.model)
self.initial_attributes = copy.deepcopy(self.attributes)
if not self.is_4k_aligned():
self.add_note('One or more partitions are not 4K aligned', 'YELLOW')
def add_note(self, note, color=None) -> None:
"""Add note that will be included in the disk report."""
if color:
note = ansi.color_string(note, color)
if note not in self.notes:
self.notes.append(note)
self.notes.sort()
def contains_note(self, note_str) -> bool:
"""Check if note is already present."""
present = False
for note in self.notes:
if note_str == ansi.strip_colors(note):
present = True
return present
def disable_disk_tests(self) -> None:
"""Disable all tests."""
LOG.warning('Disabling all tests for: %s', self.path)
for test in self.tests:
if test.status in ('Pending', 'Working'):
test.set_status('Denied')
test.disabled = True
def generate_report(self, header=True) -> list[str]:
"""Generate Disk report, returns list."""
report = []
if header:
report.append(ansi.color_string(f'Device ({self.path.name})', 'BLUE'))
report.append(f' {self.description}')
# Attributes
if self.attributes:
if header:
report.append(ansi.color_string('Attributes', 'BLUE'))
report.extend(generate_attribute_report(self))
# Notes
if self.notes:
report.append(ansi.color_string('Notes', 'BLUE'))
for note in self.notes:
report.append(f' {note}')
# Tests
for test in self.tests:
report.extend(test.report)
return report
def get_labels(self) -> list[str]:
"""Build list of labels for this disk, returns list."""
labels = []
# Add all labels from raw_details
for details in [self.raw_details, *self.raw_details.get('children', [])]:
labels.append(details.get('label', ''))
labels.append(details.get('partlabel', ''))
# Remove empty labels
labels = [str(label) for label in labels if label]
# Done
return labels
def is_4k_aligned(self) -> bool:
"""Check that all disk partitions are aligned, returns bool."""
aligned = True
if PLATFORM == 'Darwin':
aligned = is_4k_aligned_macos(self.raw_details)
elif PLATFORM == 'Linux':
aligned = is_4k_aligned_linux(self.path, self.phy_sec)
return aligned
@property
def present(self) -> bool:
"""Verify this device is still present, returns bool."""
if not self.path.exists():
self.add_note('Device disconnected', 'RED')
return False
return True
def set_description(self) -> None:
"""Set disk description from details."""
decimals = 1
suffix = ' '
# Set size_str (try binary scale first)
for scale in (1024, 1000):
size = float(self.size)
units = list('KMGTPEZY')
while units:
if abs(size) < scale:
break
size /= scale
suffix = units.pop(0)
size = ((size * 10) // 1) / 10
if size % 1 == 0:
# Found an exact whole number, drop the decimal
decimals = 0
break
if size % 1 == 0.5:
break
# Done
self.description = (
f'{size:0.{decimals}f} {suffix}B ({self.bus}) {self.model} {self.serial}'
)
def update_details(self, skip_children=True) -> None:
"""Update disk details using OS specific methods.
Required details default to generic descriptions
and are converted to the correct type.
"""
if not self.present:
return
if PLATFORM == 'Darwin':
self.raw_details = get_disk_details_macos(
self.path, skip_children=skip_children,
)
elif PLATFORM == 'Linux':
self.raw_details = get_disk_details_linux(
self.path, skip_children=skip_children,
)
# Set necessary details
self.bus = str(self.raw_details.get('bus', '???')).upper()
self.bus = self.bus.replace('IMAGE', 'Image')
self.bus = self.bus.replace('NVME', 'NVMe')
self.children = self.raw_details.get('children', [])
self.filesystem = self.raw_details.get('fstype', 'Unknown')
self.log_sec = self.raw_details.get('log-sec', 512)
self.model = self.raw_details.get('model', 'Unknown Model')
self.name = self.raw_details.get('name', self.path)
self.parent = self.raw_details.get('parent', None)
self.phy_sec = self.raw_details.get('phy-sec', 512)
self.serial = self.raw_details.get('serial', 'Unknown Serial')
self.size = self.raw_details.get('size', -1)
self.ssd = self.raw_details.get('ssd', False)
self.trim = self.raw_details.get('trim', False)
# Ensure certain attributes types
## NOTE: This is ugly, deal.
for attr in ['bus', 'model', 'name', 'serial']:
setattr(self, attr, str(getattr(self, attr)))
for attr in ['log_sec', 'phy_sec', 'size']:
try:
setattr(self, attr, int(getattr(self, attr)))
except (TypeError, ValueError):
LOG.error('Invalid disk %s: %s', attr, getattr(self, attr))
if attr == 'size':
setattr(self, attr, -1)
# Add TRIM note
if self.trim:
self.add_note('TRIM support detected', 'YELLOW')
# Functions
def get_disk_details_linux(disk_path, skip_children=True) -> dict[Any, Any]:
"""Get disk details using lsblk, returns dict."""
def _flatten_json(dev) -> list:
"""Convert lsblk JSON tree to a flat list of items, returns list."""
devs = [dev]
for child in dev.pop('children', []):
devs.extend(_flatten_json(child))
return devs
# Get lsblk info
cmd = ['lsblk', '--bytes', '--json', '--output-all', '--paths', disk_path]
if skip_children:
cmd.append('--nodeps')
json_data = get_json_from_command(cmd, check=False)
dev_list = _flatten_json(json_data.get('blockdevices', [{}])[0])
# Fix details
for dev in dev_list:
dev['bus'] = dev.pop('tran', '???')
dev['parent'] = dev.pop('pkname', None)
dev['ssd'] = not dev.pop('rota', True)
dev['trim'] = bool(dev.pop('disc-max', 0))
if 'loop' in str(disk_path) and dev['bus'] is None:
dev['bus'] = 'Image'
dev['model'] = ''
dev['serial'] = ''
dev['trim'] = False # NOTE: This check is just for physical devices
# Convert to dict
details = dev_list.pop(0)
details['children'] = dev_list
# Done
return details
def get_disk_details_macos(disk_path, skip_children=True) -> dict:
"""Get disk details using diskutil, returns dict."""
details = {}
disk_path = pathlib.Path(disk_path)
# Get "list" details
cmd = ['diskutil', 'list', '-plist', disk_path]
proc = run_program(cmd, check=False, encoding=None, errors=None)
try:
plist_data = plistlib.loads(proc.stdout)
except (TypeError, ValueError):
# Invalid / corrupt plist data? return empty dict to avoid crash
LOG.error('Failed to get diskutil list for %s', disk_path)
return details
# Parse "list" details
details = plist_data.get('AllDisksAndPartitions', [{}])[0]
details['path'] = disk_path
if skip_children:
details['children'] = []
else:
details['children'] = details.pop('Partitions', [])
details['children'].extend(details.pop('APFSVolumes', []))
for child in details['children']:
child['path'] = disk_path.with_name(child['DeviceIdentifier'])
# Get "info" details
for dev in [details, *details['children']]:
cmd = ['diskutil', 'info', '-plist', dev['path']]
proc = run_program(cmd, check=False, encoding=None, errors=None)
try:
plist_data = plistlib.loads(proc.stdout)
except (TypeError, ValueError):
LOG.error('Failed to get diskutil info for %s', dev['path'])
continue
# Parse "info" details
dev.update(plist_data)
dev['bus'] = dev.pop('BusProtocol', '???')
dev['fstype'] = dev.pop('FilesystemType', '')
dev['label'] = dev.pop('VolumeName', '')
dev['model'] = dev.pop('MediaName', 'Unknown')
dev['mountpoint'] = dev.pop('MountPoint', '')
dev['name'] = dev.get('name', str(dev['path']))
dev['phy-sec'] = dev.pop('DeviceBlockSize', 512)
dev['serial'] = get_disk_serial_macos(dev['path'])
dev['size'] = dev.pop('Size', -1)
dev['ssd'] = dev.pop('SolidState', False)
dev['trim'] = False # TODO: ACtually check for TRIM
dev['vendor'] = ''
if dev.get('WholeDisk', True):
dev['parent'] = None
else:
dev['parent'] = dev.pop('ParentWholeDisk', None)
# Fix details if main dev is a child
for child in details['children']:
if disk_path == child['path']:
for key in ('fstype', 'label', 'name', 'size'):
details[key] = child[key]
break
# Done
return details
def get_disk_serial_macos(path) -> str:
"""Get disk serial using system_profiler, returns str."""
cmd = ['sudo', 'smartctl', '--info', '--json', path]
smart_info = get_json_from_command(cmd)
return smart_info.get('serial_number', 'Unknown Serial')
def get_disks(skip_kits=False) -> list[Disk]:
"""Get disks using OS-specific methods, returns list."""
disks = []
if PLATFORM == 'Darwin':
disks = get_disks_macos()
elif PLATFORM == 'Linux':
disks = get_disks_linux()
# Skip WK disks
if skip_kits:
for disk in disks:
disk.update_details(skip_children=False)
disks = [
disk_obj for disk_obj in disks
if not any(
WK_LABEL_REGEX.search(label) for label in disk_obj.get_labels()
)
]
# Done
return disks
def get_disks_linux() -> list[Disk]:
"""Get disks via lsblk, returns list."""
cmd = ['lsblk', '--json', '--nodeps', '--paths']
disks = []
# Add valid disks
json_data = get_json_from_command(cmd)
for disk in json_data.get('blockdevices', []):
disk_obj = Disk(disk['name'])
# Skip loopback devices, optical devices, etc
if disk_obj.raw_details.get('type', '???') != 'disk':
continue
# Skip empty devices (usually card readers)
if disk_obj.size <= 0:
continue
# Add disk
disks.append(disk_obj)
# Done
return disks
def get_disks_macos() -> list[Disk]:
"""Get disks via diskutil, returns list."""
cmd = ['diskutil', 'list', '-plist', 'physical']
disks = []
# El Capitan workaround
if platform.mac_ver()[0].startswith('10.11'):
cmd.pop()
# Get info from diskutil
proc = run_program(cmd, encoding=None, errors=None, check=False)
# Parse plist data
try:
plist_data = plistlib.loads(proc.stdout)
except (TypeError, ValueError):
# Invalid / corrupt plist data? return empty list to avoid crash
LOG.error('Failed to get diskutil list')
return disks
# Add valid disks
for disk in plist_data['AllDisksAndPartitions']:
name = disk['DeviceIdentifier']
if name not in plist_data['WholeDisks']:
# Only check "WholeDisks"
continue
if not disk['Content']:
# This lists GPT or MBR for device, blank should mean it's an image
continue
disks.append(Disk(f'/dev/{name}'))
# Remove virtual disks
disks = [
d for d in disks if d.raw_details.get('VirtualOrPhysical') != 'Virtual'
]
# Done
return disks
def is_4k_aligned_macos(disk_details) -> bool:
"""Check partition alignment using diskutil info, returns bool."""
aligned = True
# Check partitions
for part in disk_details.get('children', []):
offset = part.get('PartitionMapPartitionOffset', 0)
if not offset:
# Assuming offset couldn't be found and it defaulted to 0
# NOTE: Just logging the error, not bailing
LOG.error('Failed to get partition offset for %s', part['path'])
aligned = aligned and offset >= 0 and offset % 4096 == 0
# Done
return aligned
def is_4k_aligned_linux(dev_path, physical_sector_size) -> bool:
"""Check partition alignment using lsblk, returns bool."""
aligned = True
cmd = [
'sudo',
'sfdisk',
'--json',
dev_path,
]
# Get partition details
json_data = get_json_from_command(cmd)
# Check partitions
for part in json_data.get('partitiontable', {}).get('partitions', []):
offset = physical_sector_size * part.get('start', -1)
aligned = aligned and offset >= 0 and offset % 4096 == 0
# Done
return aligned
if __name__ == '__main__':
print("This file is not meant to be called directly.")