Refactor Disk children sections
Child devices are not added by default to improve performance. Disk.children is always present to avoid overly cautious lookups.
This commit is contained in:
parent
193d207d5b
commit
7abd4c21c3
2 changed files with 91 additions and 64 deletions
|
|
@ -1310,7 +1310,6 @@ def build_directory_report(path):
|
|||
|
||||
def build_disk_report(dev):
|
||||
"""Build device report, returns list."""
|
||||
children = dev.raw_details.get('children', [])
|
||||
report = []
|
||||
|
||||
# Get widths
|
||||
|
|
@ -1319,7 +1318,7 @@ def build_disk_report(dev):
|
|||
'label': max(5, len(str(dev.raw_details.get('label', '')))),
|
||||
'name': max(4, len(dev.path.name)),
|
||||
}
|
||||
for child in children:
|
||||
for child in dev.children:
|
||||
widths['fstype'] = max(widths['fstype'], len(str(child['fstype'])))
|
||||
widths['label'] = max(widths['label'], len(str(child['label'])))
|
||||
widths['name'] = max(
|
||||
|
|
@ -1341,7 +1340,7 @@ def build_disk_report(dev):
|
|||
std.color_string(
|
||||
(
|
||||
f'{"NAME":<{widths["name"]}}'
|
||||
f'{" " if children else ""}'
|
||||
f'{" " if dev.children else ""}'
|
||||
f'{"SIZE":<7}'
|
||||
f'{"FSTYPE":<{widths["fstype"]}}'
|
||||
f'{"LABEL":<{widths["label"]}}'
|
||||
|
|
@ -1351,12 +1350,12 @@ def build_disk_report(dev):
|
|||
)
|
||||
report.append(
|
||||
f'{dev_name if dev_name else "":<{widths["name"]}}'
|
||||
f'{" " if children else ""}'
|
||||
f'{" " if dev.children else ""}'
|
||||
f'{dev_size:>6} '
|
||||
f'{dev_fstype if dev_fstype else "":<{widths["fstype"]}}'
|
||||
f'{dev_label if dev_label else "":<{widths["label"]}}'
|
||||
)
|
||||
for child in children:
|
||||
for child in dev.children:
|
||||
fstype = child['fstype']
|
||||
label = child['label']
|
||||
name = child['name'].replace('/dev/', '')
|
||||
|
|
@ -1369,13 +1368,13 @@ def build_disk_report(dev):
|
|||
)
|
||||
|
||||
# Indent children
|
||||
if len(children) > 1:
|
||||
if len(dev.children) > 1:
|
||||
report = [
|
||||
*report[:4],
|
||||
*[f'├─{line}' for line in report[4:-1]],
|
||||
f'└─{report[-1]}',
|
||||
]
|
||||
elif len(children) == 1:
|
||||
elif len(dev.children) == 1:
|
||||
report[-1] = f'└─{report[-1]}'
|
||||
|
||||
# Done
|
||||
|
|
@ -2284,8 +2283,12 @@ def select_disk(prompt, skip_disk=None):
|
|||
if 'Quit' in selection:
|
||||
raise std.GenericAbort()
|
||||
|
||||
# Update details to include child devices
|
||||
selected_disk = selection[-1]['Object']
|
||||
selected_disk.update_details(skip_children=False)
|
||||
|
||||
# Done
|
||||
return selection[-1]['Object']
|
||||
return selected_disk
|
||||
|
||||
|
||||
def select_disk_parts(prompt, disk):
|
||||
|
|
@ -2329,7 +2332,7 @@ def select_disk_parts(prompt, disk):
|
|||
|
||||
# Add parts
|
||||
whole_disk_str = f'{str(disk.path):<14} (Whole device)'
|
||||
for part in disk.raw_details.get('children', []):
|
||||
for part in disk.children:
|
||||
size = part["size"]
|
||||
name = (
|
||||
f'{str(part["path"]):<14} '
|
||||
|
|
@ -2352,7 +2355,7 @@ def select_disk_parts(prompt, disk):
|
|||
object_list.append(option['Path'])
|
||||
|
||||
# Check if whole disk selected
|
||||
if len(object_list) == len(disk.raw_details.get('children', [])):
|
||||
if len(object_list) == len(disk.children):
|
||||
# NOTE: This is not true if the disk has no partitions
|
||||
msg = f'Preserve partition table and unused space in {prompt.lower()}?'
|
||||
if std.ask(msg):
|
||||
|
|
|
|||
|
|
@ -37,6 +37,7 @@ class Disk:
|
|||
"""Object for tracking disk specific data."""
|
||||
attributes: dict[Any, dict] = field(init=False, default_factory=dict)
|
||||
bus: str = field(init=False)
|
||||
children: list[dict] = field(init=False, default_factory=list)
|
||||
filesystem: str = field(init=False)
|
||||
log_sec: int = field(init=False)
|
||||
model: str = field(init=False)
|
||||
|
|
@ -55,7 +56,7 @@ class Disk:
|
|||
|
||||
def __post_init__(self) -> None:
|
||||
self.path = pathlib.Path(self.path).resolve()
|
||||
self.get_details()
|
||||
self.update_details()
|
||||
enable_smart(self)
|
||||
update_smart_details(self)
|
||||
if not self.attributes and self.bus == 'USB':
|
||||
|
|
@ -125,43 +126,6 @@ class Disk:
|
|||
|
||||
return report
|
||||
|
||||
def get_details(self) -> None:
|
||||
"""Get disk details using OS specific methods.
|
||||
|
||||
Required details default to generic descriptions
|
||||
and are converted to the correct type.
|
||||
"""
|
||||
if PLATFORM == 'Darwin':
|
||||
self.raw_details = get_disk_details_macos(self.path)
|
||||
elif PLATFORM == 'Linux':
|
||||
self.raw_details = get_disk_details_linux(self.path)
|
||||
|
||||
# Set necessary details
|
||||
self.bus = str(self.raw_details.get('bus', '???')).upper()
|
||||
self.bus = self.bus.replace('IMAGE', 'Image')
|
||||
self.bus = self.bus.replace('NVME', 'NVMe')
|
||||
self.filesystem = self.raw_details.get('fstype', 'Unknown')
|
||||
self.log_sec = self.raw_details.get('log-sec', 512)
|
||||
self.model = self.raw_details.get('model', 'Unknown Model')
|
||||
self.name = self.raw_details.get('name', self.path)
|
||||
self.parent = self.raw_details.get('parent', None)
|
||||
self.phy_sec = self.raw_details.get('phy-sec', 512)
|
||||
self.serial = self.raw_details.get('serial', 'Unknown Serial')
|
||||
self.size = self.raw_details.get('size', -1)
|
||||
self.ssd = self.raw_details.get('ssd', False)
|
||||
|
||||
# Ensure certain attributes types
|
||||
## NOTE: This is ugly, deal.
|
||||
for attr in ['bus', 'model', 'name', 'serial']:
|
||||
setattr(self, attr, str(getattr(self, attr)))
|
||||
for attr in ['log_sec', 'phy_sec', 'size']:
|
||||
try:
|
||||
setattr(self, attr, int(getattr(self, attr)))
|
||||
except (TypeError, ValueError):
|
||||
LOG.error('Invalid disk %s: %s', attr, getattr(self, attr))
|
||||
if attr == 'size':
|
||||
setattr(self, attr, -1)
|
||||
|
||||
def get_labels(self) -> list[str]:
|
||||
"""Build list of labels for this disk, returns list."""
|
||||
labels = []
|
||||
|
|
@ -187,48 +151,109 @@ class Disk:
|
|||
|
||||
return aligned
|
||||
|
||||
def update_details(self, skip_children=True) -> None:
|
||||
"""Update disk details using OS specific methods.
|
||||
|
||||
Required details default to generic descriptions
|
||||
and are converted to the correct type.
|
||||
"""
|
||||
if PLATFORM == 'Darwin':
|
||||
self.raw_details = get_disk_details_macos(
|
||||
self.path, skip_children=skip_children,
|
||||
)
|
||||
elif PLATFORM == 'Linux':
|
||||
self.raw_details = get_disk_details_linux(
|
||||
self.path, skip_children=skip_children,
|
||||
)
|
||||
|
||||
# Set necessary details
|
||||
self.bus = str(self.raw_details.get('bus', '???')).upper()
|
||||
self.bus = self.bus.replace('IMAGE', 'Image')
|
||||
self.bus = self.bus.replace('NVME', 'NVMe')
|
||||
self.children = self.raw_details.get('children', [])
|
||||
self.filesystem = self.raw_details.get('fstype', 'Unknown')
|
||||
self.log_sec = self.raw_details.get('log-sec', 512)
|
||||
self.model = self.raw_details.get('model', 'Unknown Model')
|
||||
self.name = self.raw_details.get('name', self.path)
|
||||
self.parent = self.raw_details.get('parent', None)
|
||||
self.phy_sec = self.raw_details.get('phy-sec', 512)
|
||||
self.serial = self.raw_details.get('serial', 'Unknown Serial')
|
||||
self.size = self.raw_details.get('size', -1)
|
||||
self.ssd = self.raw_details.get('ssd', False)
|
||||
|
||||
# Ensure certain attributes types
|
||||
## NOTE: This is ugly, deal.
|
||||
for attr in ['bus', 'model', 'name', 'serial']:
|
||||
setattr(self, attr, str(getattr(self, attr)))
|
||||
for attr in ['log_sec', 'phy_sec', 'size']:
|
||||
try:
|
||||
setattr(self, attr, int(getattr(self, attr)))
|
||||
except (TypeError, ValueError):
|
||||
LOG.error('Invalid disk %s: %s', attr, getattr(self, attr))
|
||||
if attr == 'size':
|
||||
setattr(self, attr, -1)
|
||||
|
||||
|
||||
# Functions
|
||||
def get_disk_details_linux(path) -> dict[Any, Any]:
|
||||
def get_disk_details_linux(disk_path, skip_children=True) -> dict[Any, Any]:
|
||||
"""Get disk details using lsblk, returns dict."""
|
||||
cmd = ['lsblk', '--bytes', '--json', '--output-all', '--paths', path]
|
||||
def _flatten_json(dev) -> list:
|
||||
"""Convert lsblk JSON tree to a flat list of items, returns list."""
|
||||
devs = [dev]
|
||||
for child in dev.pop('children', []):
|
||||
devs.extend(_flatten_json(child))
|
||||
return devs
|
||||
|
||||
# Get lsblk info
|
||||
cmd = ['lsblk', '--bytes', '--json', '--output-all', '--paths', disk_path]
|
||||
if skip_children:
|
||||
cmd.append('--nodeps')
|
||||
json_data = get_json_from_command(cmd, check=False)
|
||||
details = json_data.get('blockdevices', [{}])[0]
|
||||
dev_list = _flatten_json(json_data.get('blockdevices', [{}])[0])
|
||||
|
||||
# Fix details
|
||||
for dev in [details, *details.get('children', [])]:
|
||||
for dev in dev_list:
|
||||
dev['bus'] = dev.pop('tran', '???')
|
||||
dev['parent'] = dev.pop('pkname', None)
|
||||
dev['ssd'] = not dev.pop('rota', True)
|
||||
if 'loop' in str(path) and dev['bus'] is None:
|
||||
if 'loop' in str(disk_path) and dev['bus'] is None:
|
||||
dev['bus'] = 'Image'
|
||||
dev['model'] = ''
|
||||
dev['serial'] = ''
|
||||
|
||||
# Convert to dict
|
||||
details = dev_list.pop(0)
|
||||
details['children'] = dev_list
|
||||
|
||||
# Done
|
||||
return details
|
||||
|
||||
|
||||
def get_disk_details_macos(path) -> dict[Any, Any]:
|
||||
def get_disk_details_macos(disk_path, skip_children=True) -> dict:
|
||||
"""Get disk details using diskutil, returns dict."""
|
||||
details = {}
|
||||
disk_path = pathlib.Path(disk_path)
|
||||
|
||||
# Get "list" details
|
||||
cmd = ['diskutil', 'list', '-plist', path]
|
||||
cmd = ['diskutil', 'list', '-plist', disk_path]
|
||||
proc = run_program(cmd, check=False, encoding=None, errors=None)
|
||||
try:
|
||||
plist_data = plistlib.loads(proc.stdout)
|
||||
except (TypeError, ValueError):
|
||||
# Invalid / corrupt plist data? return empty dict to avoid crash
|
||||
LOG.error('Failed to get diskutil list for %s', path)
|
||||
LOG.error('Failed to get diskutil list for %s', disk_path)
|
||||
return details
|
||||
|
||||
# Parse "list" details
|
||||
details = plist_data.get('AllDisksAndPartitions', [{}])[0]
|
||||
details['children'] = details.pop('Partitions', [])
|
||||
details['path'] = path
|
||||
for child in details['children']:
|
||||
child['path'] = path.with_name(child.get('DeviceIdentifier', 'null'))
|
||||
details['path'] = disk_path
|
||||
if skip_children:
|
||||
details['children'] = []
|
||||
else:
|
||||
details['children'] = details.pop('Partitions', [])
|
||||
details['children'].extend(details.pop('APFSVolumes', []))
|
||||
for child in details['children']:
|
||||
child['path'] = disk_path.with_name(child['DeviceIdentifier'])
|
||||
|
||||
# Get "info" details
|
||||
for dev in [details, *details['children']]:
|
||||
|
|
@ -237,8 +262,8 @@ def get_disk_details_macos(path) -> dict[Any, Any]:
|
|||
try:
|
||||
plist_data = plistlib.loads(proc.stdout)
|
||||
except (TypeError, ValueError):
|
||||
LOG.error('Failed to get diskutil info for %s', path)
|
||||
continue #Skip
|
||||
LOG.error('Failed to get diskutil info for %s', dev['path'])
|
||||
continue
|
||||
|
||||
# Parse "info" details
|
||||
dev.update(plist_data)
|
||||
|
|
@ -260,7 +285,7 @@ def get_disk_details_macos(path) -> dict[Any, Any]:
|
|||
|
||||
# Fix details if main dev is a child
|
||||
for child in details['children']:
|
||||
if path == child['path']:
|
||||
if disk_path == child['path']:
|
||||
for key in ('fstype', 'label', 'name', 'size'):
|
||||
details[key] = child[key]
|
||||
break
|
||||
|
|
@ -350,7 +375,6 @@ def get_disks_macos() -> list[Disk]:
|
|||
disks.append(Disk(f'/dev/{name}'))
|
||||
|
||||
# Remove virtual disks
|
||||
# TODO: Test more to figure out why some drives are being marked 'Unknown'
|
||||
disks = [
|
||||
d for d in disks if d.raw_details.get('VirtualOrPhysical') != 'Virtual'
|
||||
]
|
||||
|
|
|
|||
Loading…
Reference in a new issue