"""WizardKit: Disk object and functions""" # vim: sts=2 sw=2 ts=2 import copy import logging import pathlib import platform import plistlib import re from dataclasses import dataclass, field from typing import Any, Union from wk.cfg.main import KIT_NAME_SHORT from wk.cfg.python import DATACLASS_DECORATOR_KWARGS from wk.exe import get_json_from_command, run_program from wk.hw.test import Test from wk.hw.smart import ( generate_attribute_report, get_known_disk_attributes, ) from wk.std import PLATFORM from wk.ui import ansi # STATIC VARIABLES LOG = logging.getLogger(__name__) WK_LABEL_REGEX = re.compile( fr'{KIT_NAME_SHORT}_(LINUX|UFD)', re.IGNORECASE, ) # Classes @dataclass(**DATACLASS_DECORATOR_KWARGS) class Disk: """Object for tracking disk specific data.""" attributes: dict[Any, dict] = field(init=False, default_factory=dict) bus: str = field(init=False) children: list[dict] = field(init=False, default_factory=list) description: str = field(init=False) filesystem: str = field(init=False) initial_attributes: dict[Any, dict] = field(init=False) known_attributes: dict[Any, dict] = field(init=False, default_factory=dict) log_sec: int = field(init=False) model: str = field(init=False) name: str = field(init=False) notes: list[str] = field(init=False, default_factory=list) path: Union[pathlib.Path, str] parent: str = field(init=False) phy_sec: int = field(init=False) raw_details: dict[str, Any] = field(init=False) raw_smartctl: dict[str, Any] = field(init=False) serial: str = field(init=False) size: int = field(init=False) ssd: bool = field(init=False) tests: list[Test] = field(init=False, default_factory=list) use_sat: bool = field(init=False, default=False) def __post_init__(self) -> None: self.path = pathlib.Path(self.path).resolve() self.update_details() self.set_description() self.known_attributes = get_known_disk_attributes(self.model) if not self.attributes and self.bus == 'USB': # Try using SAT LOG.warning('Using SAT for smartctl for %s', self.path) self.notes = [] self.use_sat = True self.initial_attributes = copy.deepcopy(self.attributes) if not self.is_4k_aligned(): self.add_note('One or more partitions are not 4K aligned', 'YELLOW') def add_note(self, note, color=None) -> None: """Add note that will be included in the disk report.""" if color: note = ansi.color_string(note, color) if note not in self.notes: self.notes.append(note) self.notes.sort() def contains_note(self, note_str) -> bool: """Check if note is already present.""" present = False for note in self.notes: if note_str == ansi.strip_colors(note): present = True return present def disable_disk_tests(self) -> None: """Disable all tests.""" LOG.warning('Disabling all tests for: %s', self.path) for test in self.tests: if test.status in ('Pending', 'Working'): test.set_status('Denied') test.disabled = True def generate_report(self, header=True) -> list[str]: """Generate Disk report, returns list.""" report = [] if header: report.append(ansi.color_string(f'Device ({self.path.name})', 'BLUE')) report.append(f' {self.description}') # Attributes if self.attributes: if header: report.append(ansi.color_string('Attributes', 'BLUE')) report.extend(generate_attribute_report(self)) # Notes if self.notes: report.append(ansi.color_string('Notes', 'BLUE')) for note in self.notes: report.append(f' {note}') # Tests for test in self.tests: report.extend(test.report) return report def get_labels(self) -> list[str]: """Build list of labels for this disk, returns list.""" labels = [] # Add all labels from raw_details for details in [self.raw_details, *self.raw_details.get('children', [])]: labels.append(details.get('label', '')) labels.append(details.get('partlabel', '')) # Remove empty labels labels = [str(label) for label in labels if label] # Done return labels def is_4k_aligned(self) -> bool: """Check that all disk partitions are aligned, returns bool.""" aligned = True if PLATFORM == 'Darwin': aligned = is_4k_aligned_macos(self.raw_details) elif PLATFORM == 'Linux': aligned = is_4k_aligned_linux(self.path, self.phy_sec) return aligned @property def present(self) -> bool: """Verify this device is still present, returns bool.""" if not self.path.exists(): self.add_note('Device disconnected', 'RED') return False return True def set_description(self) -> None: """Set disk description from details.""" decimals = 1 suffix = ' ' # Set size_str (try binary scale first) for scale in (1024, 1000): size = float(self.size) units = list('KMGTPEZY') while units: if abs(size) < scale: break size /= scale suffix = units.pop(0) size = ((size * 10) // 1) / 10 if size % 1 == 0: # Found an exact whole number, drop the decimal decimals = 0 break if size % 1 == 0.5: break # Done self.description = ( f'{size:0.{decimals}f} {suffix}B ({self.bus}) {self.model} {self.serial}' ) def update_details(self, skip_children=True) -> None: """Update disk details using OS specific methods. Required details default to generic descriptions and are converted to the correct type. """ if not self.present: return if PLATFORM == 'Darwin': self.raw_details = get_disk_details_macos( self.path, skip_children=skip_children, ) elif PLATFORM == 'Linux': self.raw_details = get_disk_details_linux( self.path, skip_children=skip_children, ) # Set necessary details self.bus = str(self.raw_details.get('bus', '???')).upper() self.bus = self.bus.replace('IMAGE', 'Image') self.bus = self.bus.replace('NVME', 'NVMe') self.children = self.raw_details.get('children', []) self.filesystem = self.raw_details.get('fstype', 'Unknown') self.log_sec = self.raw_details.get('log-sec', 512) self.model = self.raw_details.get('model', 'Unknown Model') self.name = self.raw_details.get('name', self.path) self.parent = self.raw_details.get('parent', None) self.phy_sec = self.raw_details.get('phy-sec', 512) self.serial = self.raw_details.get('serial', 'Unknown Serial') self.size = self.raw_details.get('size', -1) self.ssd = self.raw_details.get('ssd', False) # Ensure certain attributes types ## NOTE: This is ugly, deal. for attr in ['bus', 'model', 'name', 'serial']: setattr(self, attr, str(getattr(self, attr))) for attr in ['log_sec', 'phy_sec', 'size']: try: setattr(self, attr, int(getattr(self, attr))) except (TypeError, ValueError): LOG.error('Invalid disk %s: %s', attr, getattr(self, attr)) if attr == 'size': setattr(self, attr, -1) # Functions def get_disk_details_linux(disk_path, skip_children=True) -> dict[Any, Any]: """Get disk details using lsblk, returns dict.""" def _flatten_json(dev) -> list: """Convert lsblk JSON tree to a flat list of items, returns list.""" devs = [dev] for child in dev.pop('children', []): devs.extend(_flatten_json(child)) return devs # Get lsblk info cmd = ['lsblk', '--bytes', '--json', '--output-all', '--paths', disk_path] if skip_children: cmd.append('--nodeps') json_data = get_json_from_command(cmd, check=False) dev_list = _flatten_json(json_data.get('blockdevices', [{}])[0]) # Fix details for dev in dev_list: dev['bus'] = dev.pop('tran', '???') dev['parent'] = dev.pop('pkname', None) dev['ssd'] = not dev.pop('rota', True) if 'loop' in str(disk_path) and dev['bus'] is None: dev['bus'] = 'Image' dev['model'] = '' dev['serial'] = '' # Convert to dict details = dev_list.pop(0) details['children'] = dev_list # Done return details def get_disk_details_macos(disk_path, skip_children=True) -> dict: """Get disk details using diskutil, returns dict.""" details = {} disk_path = pathlib.Path(disk_path) # Get "list" details cmd = ['diskutil', 'list', '-plist', disk_path] proc = run_program(cmd, check=False, encoding=None, errors=None) try: plist_data = plistlib.loads(proc.stdout) except (TypeError, ValueError): # Invalid / corrupt plist data? return empty dict to avoid crash LOG.error('Failed to get diskutil list for %s', disk_path) return details # Parse "list" details details = plist_data.get('AllDisksAndPartitions', [{}])[0] details['path'] = disk_path if skip_children: details['children'] = [] else: details['children'] = details.pop('Partitions', []) details['children'].extend(details.pop('APFSVolumes', [])) for child in details['children']: child['path'] = disk_path.with_name(child['DeviceIdentifier']) # Get "info" details for dev in [details, *details['children']]: cmd = ['diskutil', 'info', '-plist', dev['path']] proc = run_program(cmd, check=False, encoding=None, errors=None) try: plist_data = plistlib.loads(proc.stdout) except (TypeError, ValueError): LOG.error('Failed to get diskutil info for %s', dev['path']) continue # Parse "info" details dev.update(plist_data) dev['bus'] = dev.pop('BusProtocol', '???') dev['fstype'] = dev.pop('FilesystemType', '') dev['label'] = dev.pop('VolumeName', '') dev['model'] = dev.pop('MediaName', 'Unknown') dev['mountpoint'] = dev.pop('MountPoint', '') dev['name'] = dev.get('name', str(dev['path'])) dev['phy-sec'] = dev.pop('DeviceBlockSize', 512) dev['serial'] = get_disk_serial_macos(dev['path']) dev['size'] = dev.pop('Size', -1) dev['ssd'] = dev.pop('SolidState', False) dev['vendor'] = '' if dev.get('WholeDisk', True): dev['parent'] = None else: dev['parent'] = dev.pop('ParentWholeDisk', None) # Fix details if main dev is a child for child in details['children']: if disk_path == child['path']: for key in ('fstype', 'label', 'name', 'size'): details[key] = child[key] break # Done return details def get_disk_serial_macos(path) -> str: """Get disk serial using system_profiler, returns str.""" cmd = ['sudo', 'smartctl', '--info', '--json', path] smart_info = get_json_from_command(cmd) return smart_info.get('serial_number', 'Unknown Serial') def get_disks(skip_kits=False) -> list[Disk]: """Get disks using OS-specific methods, returns list.""" disks = [] if PLATFORM == 'Darwin': disks = get_disks_macos() elif PLATFORM == 'Linux': disks = get_disks_linux() # Skip WK disks if skip_kits: for disk in disks: disk.update_details(skip_children=False) disks = [ disk_obj for disk_obj in disks if not any( WK_LABEL_REGEX.search(label) for label in disk_obj.get_labels() ) ] # Done return disks def get_disks_linux() -> list[Disk]: """Get disks via lsblk, returns list.""" cmd = ['lsblk', '--json', '--nodeps', '--paths'] disks = [] # Add valid disks json_data = get_json_from_command(cmd) for disk in json_data.get('blockdevices', []): disk_obj = Disk(disk['name']) # Skip loopback devices, optical devices, etc if disk_obj.raw_details.get('type', '???') != 'disk': continue # Skip empty devices (usually card readers) if disk_obj.size <= 0: continue # Add disk disks.append(disk_obj) # Done return disks def get_disks_macos() -> list[Disk]: """Get disks via diskutil, returns list.""" cmd = ['diskutil', 'list', '-plist', 'physical'] disks = [] # El Capitan workaround if platform.mac_ver()[0].startswith('10.11'): cmd.pop() # Get info from diskutil proc = run_program(cmd, encoding=None, errors=None, check=False) # Parse plist data try: plist_data = plistlib.loads(proc.stdout) except (TypeError, ValueError): # Invalid / corrupt plist data? return empty list to avoid crash LOG.error('Failed to get diskutil list') return disks # Add valid disks for disk in plist_data['AllDisksAndPartitions']: name = disk['DeviceIdentifier'] if name not in plist_data['WholeDisks']: # Only check "WholeDisks" continue if not disk['Content']: # This lists GPT or MBR for device, blank should mean it's an image continue disks.append(Disk(f'/dev/{name}')) # Remove virtual disks disks = [ d for d in disks if d.raw_details.get('VirtualOrPhysical') != 'Virtual' ] # Done return disks def is_4k_aligned_macos(disk_details) -> bool: """Check partition alignment using diskutil info, returns bool.""" aligned = True # Check partitions for part in disk_details.get('children', []): offset = part.get('PartitionMapPartitionOffset', 0) if not offset: # Assuming offset couldn't be found and it defaulted to 0 # NOTE: Just logging the error, not bailing LOG.error('Failed to get partition offset for %s', part['path']) aligned = aligned and offset >= 0 and offset % 4096 == 0 # Done return aligned def is_4k_aligned_linux(dev_path, physical_sector_size) -> bool: """Check partition alignment using lsblk, returns bool.""" aligned = True cmd = [ 'sudo', 'sfdisk', '--json', dev_path, ] # Get partition details json_data = get_json_from_command(cmd) # Check partitions for part in json_data.get('partitiontable', {}).get('partitions', []): offset = physical_sector_size * part.get('start', -1) aligned = aligned and offset >= 0 and offset % 4096 == 0 # Done return aligned if __name__ == '__main__': print("This file is not meant to be called directly.")