From 7abd4c21c399eaa4803a6e381f79b75295b85f4a Mon Sep 17 00:00:00 2001 From: 2Shirt <2xShirt@gmail.com> Date: Sat, 21 May 2022 15:03:44 -0700 Subject: [PATCH] Refactor Disk children sections Child devices are not added by default to improve performance. Disk.children is always present to avoid overly cautious lookups. --- scripts/wk/clone/ddrescue.py | 23 +++--- scripts/wk/hw/disk.py | 132 +++++++++++++++++++++-------------- 2 files changed, 91 insertions(+), 64 deletions(-) diff --git a/scripts/wk/clone/ddrescue.py b/scripts/wk/clone/ddrescue.py index 93724443..a5f4676d 100644 --- a/scripts/wk/clone/ddrescue.py +++ b/scripts/wk/clone/ddrescue.py @@ -1310,7 +1310,6 @@ def build_directory_report(path): def build_disk_report(dev): """Build device report, returns list.""" - children = dev.raw_details.get('children', []) report = [] # Get widths @@ -1319,7 +1318,7 @@ def build_disk_report(dev): 'label': max(5, len(str(dev.raw_details.get('label', '')))), 'name': max(4, len(dev.path.name)), } - for child in children: + for child in dev.children: widths['fstype'] = max(widths['fstype'], len(str(child['fstype']))) widths['label'] = max(widths['label'], len(str(child['label']))) widths['name'] = max( @@ -1341,7 +1340,7 @@ def build_disk_report(dev): std.color_string( ( f'{"NAME":<{widths["name"]}}' - f'{" " if children else ""}' + f'{" " if dev.children else ""}' f'{"SIZE":<7}' f'{"FSTYPE":<{widths["fstype"]}}' f'{"LABEL":<{widths["label"]}}' @@ -1351,12 +1350,12 @@ def build_disk_report(dev): ) report.append( f'{dev_name if dev_name else "":<{widths["name"]}}' - f'{" " if children else ""}' + f'{" " if dev.children else ""}' f'{dev_size:>6} ' f'{dev_fstype if dev_fstype else "":<{widths["fstype"]}}' f'{dev_label if dev_label else "":<{widths["label"]}}' ) - for child in children: + for child in dev.children: fstype = child['fstype'] label = child['label'] name = child['name'].replace('/dev/', '') @@ -1369,13 +1368,13 @@ def build_disk_report(dev): ) # Indent children - if len(children) > 1: + if len(dev.children) > 1: report = [ *report[:4], *[f'├─{line}' for line in report[4:-1]], f'└─{report[-1]}', ] - elif len(children) == 1: + elif len(dev.children) == 1: report[-1] = f'└─{report[-1]}' # Done @@ -2284,8 +2283,12 @@ def select_disk(prompt, skip_disk=None): if 'Quit' in selection: raise std.GenericAbort() + # Update details to include child devices + selected_disk = selection[-1]['Object'] + selected_disk.update_details(skip_children=False) + # Done - return selection[-1]['Object'] + return selected_disk def select_disk_parts(prompt, disk): @@ -2329,7 +2332,7 @@ def select_disk_parts(prompt, disk): # Add parts whole_disk_str = f'{str(disk.path):<14} (Whole device)' - for part in disk.raw_details.get('children', []): + for part in disk.children: size = part["size"] name = ( f'{str(part["path"]):<14} ' @@ -2352,7 +2355,7 @@ def select_disk_parts(prompt, disk): object_list.append(option['Path']) # Check if whole disk selected - if len(object_list) == len(disk.raw_details.get('children', [])): + if len(object_list) == len(disk.children): # NOTE: This is not true if the disk has no partitions msg = f'Preserve partition table and unused space in {prompt.lower()}?' if std.ask(msg): diff --git a/scripts/wk/hw/disk.py b/scripts/wk/hw/disk.py index 3daa5aef..45cbcce2 100644 --- a/scripts/wk/hw/disk.py +++ b/scripts/wk/hw/disk.py @@ -37,6 +37,7 @@ class Disk: """Object for tracking disk specific data.""" attributes: dict[Any, dict] = field(init=False, default_factory=dict) bus: str = field(init=False) + children: list[dict] = field(init=False, default_factory=list) filesystem: str = field(init=False) log_sec: int = field(init=False) model: str = field(init=False) @@ -55,7 +56,7 @@ class Disk: def __post_init__(self) -> None: self.path = pathlib.Path(self.path).resolve() - self.get_details() + self.update_details() enable_smart(self) update_smart_details(self) if not self.attributes and self.bus == 'USB': @@ -125,43 +126,6 @@ class Disk: return report - def get_details(self) -> None: - """Get disk details using OS specific methods. - - Required details default to generic descriptions - and are converted to the correct type. - """ - if PLATFORM == 'Darwin': - self.raw_details = get_disk_details_macos(self.path) - elif PLATFORM == 'Linux': - self.raw_details = get_disk_details_linux(self.path) - - # Set necessary details - self.bus = str(self.raw_details.get('bus', '???')).upper() - self.bus = self.bus.replace('IMAGE', 'Image') - self.bus = self.bus.replace('NVME', 'NVMe') - self.filesystem = self.raw_details.get('fstype', 'Unknown') - self.log_sec = self.raw_details.get('log-sec', 512) - self.model = self.raw_details.get('model', 'Unknown Model') - self.name = self.raw_details.get('name', self.path) - self.parent = self.raw_details.get('parent', None) - self.phy_sec = self.raw_details.get('phy-sec', 512) - self.serial = self.raw_details.get('serial', 'Unknown Serial') - self.size = self.raw_details.get('size', -1) - self.ssd = self.raw_details.get('ssd', False) - - # Ensure certain attributes types - ## NOTE: This is ugly, deal. - for attr in ['bus', 'model', 'name', 'serial']: - setattr(self, attr, str(getattr(self, attr))) - for attr in ['log_sec', 'phy_sec', 'size']: - try: - setattr(self, attr, int(getattr(self, attr))) - except (TypeError, ValueError): - LOG.error('Invalid disk %s: %s', attr, getattr(self, attr)) - if attr == 'size': - setattr(self, attr, -1) - def get_labels(self) -> list[str]: """Build list of labels for this disk, returns list.""" labels = [] @@ -187,48 +151,109 @@ class Disk: return aligned + def update_details(self, skip_children=True) -> None: + """Update disk details using OS specific methods. + + Required details default to generic descriptions + and are converted to the correct type. + """ + if PLATFORM == 'Darwin': + self.raw_details = get_disk_details_macos( + self.path, skip_children=skip_children, + ) + elif PLATFORM == 'Linux': + self.raw_details = get_disk_details_linux( + self.path, skip_children=skip_children, + ) + + # Set necessary details + self.bus = str(self.raw_details.get('bus', '???')).upper() + self.bus = self.bus.replace('IMAGE', 'Image') + self.bus = self.bus.replace('NVME', 'NVMe') + self.children = self.raw_details.get('children', []) + self.filesystem = self.raw_details.get('fstype', 'Unknown') + self.log_sec = self.raw_details.get('log-sec', 512) + self.model = self.raw_details.get('model', 'Unknown Model') + self.name = self.raw_details.get('name', self.path) + self.parent = self.raw_details.get('parent', None) + self.phy_sec = self.raw_details.get('phy-sec', 512) + self.serial = self.raw_details.get('serial', 'Unknown Serial') + self.size = self.raw_details.get('size', -1) + self.ssd = self.raw_details.get('ssd', False) + + # Ensure certain attributes types + ## NOTE: This is ugly, deal. + for attr in ['bus', 'model', 'name', 'serial']: + setattr(self, attr, str(getattr(self, attr))) + for attr in ['log_sec', 'phy_sec', 'size']: + try: + setattr(self, attr, int(getattr(self, attr))) + except (TypeError, ValueError): + LOG.error('Invalid disk %s: %s', attr, getattr(self, attr)) + if attr == 'size': + setattr(self, attr, -1) + # Functions -def get_disk_details_linux(path) -> dict[Any, Any]: +def get_disk_details_linux(disk_path, skip_children=True) -> dict[Any, Any]: """Get disk details using lsblk, returns dict.""" - cmd = ['lsblk', '--bytes', '--json', '--output-all', '--paths', path] + def _flatten_json(dev) -> list: + """Convert lsblk JSON tree to a flat list of items, returns list.""" + devs = [dev] + for child in dev.pop('children', []): + devs.extend(_flatten_json(child)) + return devs + + # Get lsblk info + cmd = ['lsblk', '--bytes', '--json', '--output-all', '--paths', disk_path] + if skip_children: + cmd.append('--nodeps') json_data = get_json_from_command(cmd, check=False) - details = json_data.get('blockdevices', [{}])[0] + dev_list = _flatten_json(json_data.get('blockdevices', [{}])[0]) # Fix details - for dev in [details, *details.get('children', [])]: + for dev in dev_list: dev['bus'] = dev.pop('tran', '???') dev['parent'] = dev.pop('pkname', None) dev['ssd'] = not dev.pop('rota', True) - if 'loop' in str(path) and dev['bus'] is None: + if 'loop' in str(disk_path) and dev['bus'] is None: dev['bus'] = 'Image' dev['model'] = '' dev['serial'] = '' + # Convert to dict + details = dev_list.pop(0) + details['children'] = dev_list + # Done return details -def get_disk_details_macos(path) -> dict[Any, Any]: +def get_disk_details_macos(disk_path, skip_children=True) -> dict: """Get disk details using diskutil, returns dict.""" details = {} + disk_path = pathlib.Path(disk_path) # Get "list" details - cmd = ['diskutil', 'list', '-plist', path] + cmd = ['diskutil', 'list', '-plist', disk_path] proc = run_program(cmd, check=False, encoding=None, errors=None) try: plist_data = plistlib.loads(proc.stdout) except (TypeError, ValueError): # Invalid / corrupt plist data? return empty dict to avoid crash - LOG.error('Failed to get diskutil list for %s', path) + LOG.error('Failed to get diskutil list for %s', disk_path) return details # Parse "list" details details = plist_data.get('AllDisksAndPartitions', [{}])[0] - details['children'] = details.pop('Partitions', []) - details['path'] = path - for child in details['children']: - child['path'] = path.with_name(child.get('DeviceIdentifier', 'null')) + details['path'] = disk_path + if skip_children: + details['children'] = [] + else: + details['children'] = details.pop('Partitions', []) + details['children'].extend(details.pop('APFSVolumes', [])) + for child in details['children']: + child['path'] = disk_path.with_name(child['DeviceIdentifier']) # Get "info" details for dev in [details, *details['children']]: @@ -237,8 +262,8 @@ def get_disk_details_macos(path) -> dict[Any, Any]: try: plist_data = plistlib.loads(proc.stdout) except (TypeError, ValueError): - LOG.error('Failed to get diskutil info for %s', path) - continue #Skip + LOG.error('Failed to get diskutil info for %s', dev['path']) + continue # Parse "info" details dev.update(plist_data) @@ -260,7 +285,7 @@ def get_disk_details_macos(path) -> dict[Any, Any]: # Fix details if main dev is a child for child in details['children']: - if path == child['path']: + if disk_path == child['path']: for key in ('fstype', 'label', 'name', 'size'): details[key] = child[key] break @@ -350,7 +375,6 @@ def get_disks_macos() -> list[Disk]: disks.append(Disk(f'/dev/{name}')) # Remove virtual disks - # TODO: Test more to figure out why some drives are being marked 'Unknown' disks = [ d for d in disks if d.raw_details.get('VirtualOrPhysical') != 'Virtual' ]