Merge remote-tracking branch 'upstream/dev' into dev
This commit is contained in:
commit
cc4b485a24
7 changed files with 172 additions and 101 deletions
|
|
@ -7,6 +7,7 @@ from . import log
|
||||||
from . import main
|
from . import main
|
||||||
from . import music
|
from . import music
|
||||||
from . import net
|
from . import net
|
||||||
|
from . import python
|
||||||
from . import repairs
|
from . import repairs
|
||||||
from . import setup
|
from . import setup
|
||||||
from . import sources
|
from . import sources
|
||||||
|
|
|
||||||
14
scripts/wk/cfg/python.py
Normal file
14
scripts/wk/cfg/python.py
Normal file
|
|
@ -0,0 +1,14 @@
|
||||||
|
"""WizardKit: Config - Python"""
|
||||||
|
# vim: sts=2 sw=2 ts=2
|
||||||
|
|
||||||
|
from sys import version_info
|
||||||
|
|
||||||
|
DATACLASS_DECORATOR_KWARGS = {}
|
||||||
|
if version_info.major >= 3 and version_info.minor >= 10:
|
||||||
|
DATACLASS_DECORATOR_KWARGS['slots'] = True
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
print("This file is not meant to be called directly.")
|
||||||
|
|
||||||
|
# vim: sts=2 sw=2 ts=2
|
||||||
|
|
@ -548,7 +548,7 @@ class State():
|
||||||
settings['Table Type'] = 'MBR'
|
settings['Table Type'] = 'MBR'
|
||||||
else:
|
else:
|
||||||
# Match source type
|
# Match source type
|
||||||
settings['Table Type'] = get_table_type(self.source)
|
settings['Table Type'] = get_table_type(self.source.path)
|
||||||
if std.ask('Create an empty Windows boot partition on the clone?'):
|
if std.ask('Create an empty Windows boot partition on the clone?'):
|
||||||
settings['Create Boot Partition'] = True
|
settings['Create Boot Partition'] = True
|
||||||
offset = 2 if settings['Table Type'] == 'GPT' else 1
|
offset = 2 if settings['Table Type'] == 'GPT' else 1
|
||||||
|
|
@ -1399,7 +1399,6 @@ def build_directory_report(path):
|
||||||
|
|
||||||
def build_disk_report(dev):
|
def build_disk_report(dev):
|
||||||
"""Build device report, returns list."""
|
"""Build device report, returns list."""
|
||||||
children = dev.raw_details.get('children', [])
|
|
||||||
report = []
|
report = []
|
||||||
|
|
||||||
# Get widths
|
# Get widths
|
||||||
|
|
@ -1408,7 +1407,7 @@ def build_disk_report(dev):
|
||||||
'label': max(5, len(str(dev.raw_details.get('label', '')))),
|
'label': max(5, len(str(dev.raw_details.get('label', '')))),
|
||||||
'name': max(4, len(dev.path.name)),
|
'name': max(4, len(dev.path.name)),
|
||||||
}
|
}
|
||||||
for child in children:
|
for child in dev.children:
|
||||||
widths['fstype'] = max(widths['fstype'], len(str(child['fstype'])))
|
widths['fstype'] = max(widths['fstype'], len(str(child['fstype'])))
|
||||||
widths['label'] = max(widths['label'], len(str(child['label'])))
|
widths['label'] = max(widths['label'], len(str(child['label'])))
|
||||||
widths['name'] = max(
|
widths['name'] = max(
|
||||||
|
|
@ -1430,7 +1429,7 @@ def build_disk_report(dev):
|
||||||
std.color_string(
|
std.color_string(
|
||||||
(
|
(
|
||||||
f'{"NAME":<{widths["name"]}}'
|
f'{"NAME":<{widths["name"]}}'
|
||||||
f'{" " if children else ""}'
|
f'{" " if dev.children else ""}'
|
||||||
f'{"SIZE":<7}'
|
f'{"SIZE":<7}'
|
||||||
f'{"FSTYPE":<{widths["fstype"]}}'
|
f'{"FSTYPE":<{widths["fstype"]}}'
|
||||||
f'{"LABEL":<{widths["label"]}}'
|
f'{"LABEL":<{widths["label"]}}'
|
||||||
|
|
@ -1440,12 +1439,12 @@ def build_disk_report(dev):
|
||||||
)
|
)
|
||||||
report.append(
|
report.append(
|
||||||
f'{dev_name if dev_name else "":<{widths["name"]}}'
|
f'{dev_name if dev_name else "":<{widths["name"]}}'
|
||||||
f'{" " if children else ""}'
|
f'{" " if dev.children else ""}'
|
||||||
f'{dev_size:>6} '
|
f'{dev_size:>6} '
|
||||||
f'{dev_fstype if dev_fstype else "":<{widths["fstype"]}}'
|
f'{dev_fstype if dev_fstype else "":<{widths["fstype"]}}'
|
||||||
f'{dev_label if dev_label else "":<{widths["label"]}}'
|
f'{dev_label if dev_label else "":<{widths["label"]}}'
|
||||||
)
|
)
|
||||||
for child in children:
|
for child in dev.children:
|
||||||
fstype = child['fstype']
|
fstype = child['fstype']
|
||||||
label = child['label']
|
label = child['label']
|
||||||
name = child['name'].replace('/dev/', '')
|
name = child['name'].replace('/dev/', '')
|
||||||
|
|
@ -1458,13 +1457,13 @@ def build_disk_report(dev):
|
||||||
)
|
)
|
||||||
|
|
||||||
# Indent children
|
# Indent children
|
||||||
if len(children) > 1:
|
if len(dev.children) > 1:
|
||||||
report = [
|
report = [
|
||||||
*report[:4],
|
*report[:4],
|
||||||
*[f'├─{line}' for line in report[4:-1]],
|
*[f'├─{line}' for line in report[4:-1]],
|
||||||
f'└─{report[-1]}',
|
f'└─{report[-1]}',
|
||||||
]
|
]
|
||||||
elif len(children) == 1:
|
elif len(dev.children) == 1:
|
||||||
report[-1] = f'└─{report[-1]}'
|
report[-1] = f'└─{report[-1]}'
|
||||||
|
|
||||||
# Done
|
# Done
|
||||||
|
|
@ -1852,14 +1851,36 @@ def get_percent_color(percent):
|
||||||
return color
|
return color
|
||||||
|
|
||||||
|
|
||||||
def get_table_type(disk):
|
def get_table_type(disk_path):
|
||||||
"""Get disk partition table type, returns str.
|
"""Get disk partition table type, returns str.
|
||||||
|
|
||||||
NOTE: If resulting table type is not GPT or MBR
|
NOTE: If resulting table type is not GPT or MBR
|
||||||
then an exception is raised.
|
then an exception is raised.
|
||||||
"""
|
"""
|
||||||
table_type = str(disk.raw_details.get('pttype', '')).upper()
|
disk_path = str(disk_path)
|
||||||
table_type = table_type.replace('DOS', 'MBR')
|
table_type = None
|
||||||
|
|
||||||
|
# Linux
|
||||||
|
if std.PLATFORM == 'Linux':
|
||||||
|
cmd = f'lsblk --json --output=pttype --nodeps {disk_path}'.split()
|
||||||
|
json_data = exe.get_json_from_command(cmd)
|
||||||
|
table_type = json_data['blockdevices'][0].get('pttype', '').upper()
|
||||||
|
table_type = table_type.replace('DOS', 'MBR')
|
||||||
|
|
||||||
|
# macOS
|
||||||
|
if std.PLATFORM == 'Darwin':
|
||||||
|
cmd = ['diskutil', 'list', '-plist', disk_path]
|
||||||
|
proc = exe.run_program(cmd, check=False, encoding=None, errors=None)
|
||||||
|
try:
|
||||||
|
plist_data = plistlib.loads(proc.stdout)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
# Invalid / corrupt plist data? return empty dict to avoid crash
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
disk_details = plist_data.get('AllDisksAndPartitions', [{}])[0]
|
||||||
|
table_type = disk_details['Content']
|
||||||
|
table_type = table_type.replace('FDisk_partition_scheme', 'MBR')
|
||||||
|
table_type = table_type.replace('GUID_partition_scheme', 'GPT')
|
||||||
|
|
||||||
# Check type
|
# Check type
|
||||||
if table_type not in ('GPT', 'MBR'):
|
if table_type not in ('GPT', 'MBR'):
|
||||||
|
|
@ -2403,8 +2424,12 @@ def select_disk(prompt, skip_disk=None):
|
||||||
if 'Quit' in selection:
|
if 'Quit' in selection:
|
||||||
raise std.GenericAbort()
|
raise std.GenericAbort()
|
||||||
|
|
||||||
|
# Update details to include child devices
|
||||||
|
selected_disk = selection[-1]['Object']
|
||||||
|
selected_disk.update_details(skip_children=False)
|
||||||
|
|
||||||
# Done
|
# Done
|
||||||
return selection[-1]['Object']
|
return selected_disk
|
||||||
|
|
||||||
|
|
||||||
def select_disk_parts(prompt, disk):
|
def select_disk_parts(prompt, disk):
|
||||||
|
|
@ -2448,7 +2473,7 @@ def select_disk_parts(prompt, disk):
|
||||||
|
|
||||||
# Add parts
|
# Add parts
|
||||||
whole_disk_str = f'{str(disk.path):<14} (Whole device)'
|
whole_disk_str = f'{str(disk.path):<14} (Whole device)'
|
||||||
for part in disk.raw_details.get('children', []):
|
for part in disk.children:
|
||||||
size = part["size"]
|
size = part["size"]
|
||||||
name = (
|
name = (
|
||||||
f'{str(part["path"]):<14} '
|
f'{str(part["path"]):<14} '
|
||||||
|
|
@ -2471,7 +2496,7 @@ def select_disk_parts(prompt, disk):
|
||||||
object_list.append(option['Path'])
|
object_list.append(option['Path'])
|
||||||
|
|
||||||
# Check if whole disk selected
|
# Check if whole disk selected
|
||||||
if len(object_list) == len(disk.raw_details.get('children', [])):
|
if len(object_list) == len(disk.children):
|
||||||
# NOTE: This is not true if the disk has no partitions
|
# NOTE: This is not true if the disk has no partitions
|
||||||
msg = f'Preserve partition table and unused space in {prompt.lower()}?'
|
msg = f'Preserve partition table and unused space in {prompt.lower()}?'
|
||||||
if std.ask(msg):
|
if std.ask(msg):
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import pathlib
|
import pathlib
|
||||||
|
import platform
|
||||||
import plistlib
|
import plistlib
|
||||||
import re
|
import re
|
||||||
|
|
||||||
|
|
@ -10,6 +11,7 @@ from dataclasses import dataclass, field
|
||||||
from typing import Any, Union
|
from typing import Any, Union
|
||||||
|
|
||||||
from wk.cfg.main import KIT_NAME_SHORT
|
from wk.cfg.main import KIT_NAME_SHORT
|
||||||
|
from wk.cfg.python import DATACLASS_DECORATOR_KWARGS
|
||||||
from wk.exe import get_json_from_command, run_program
|
from wk.exe import get_json_from_command, run_program
|
||||||
from wk.hw.test import Test
|
from wk.hw.test import Test
|
||||||
from wk.hw.smart import (
|
from wk.hw.smart import (
|
||||||
|
|
@ -29,13 +31,13 @@ WK_LABEL_REGEX = re.compile(
|
||||||
|
|
||||||
|
|
||||||
# Classes
|
# Classes
|
||||||
@dataclass(slots=True)
|
@dataclass(**DATACLASS_DECORATOR_KWARGS)
|
||||||
class Disk:
|
class Disk:
|
||||||
# pylint: disable=too-many-instance-attributes
|
# pylint: disable=too-many-instance-attributes
|
||||||
"""Object for tracking disk specific data."""
|
"""Object for tracking disk specific data."""
|
||||||
attributes: dict[Any, dict] = field(init=False, default_factory=dict)
|
attributes: dict[Any, dict] = field(init=False, default_factory=dict)
|
||||||
bus: str = field(init=False)
|
bus: str = field(init=False)
|
||||||
description: str = field(init=False)
|
children: list[dict] = field(init=False, default_factory=list)
|
||||||
filesystem: str = field(init=False)
|
filesystem: str = field(init=False)
|
||||||
log_sec: int = field(init=False)
|
log_sec: int = field(init=False)
|
||||||
model: str = field(init=False)
|
model: str = field(init=False)
|
||||||
|
|
@ -54,8 +56,7 @@ class Disk:
|
||||||
|
|
||||||
def __post_init__(self) -> None:
|
def __post_init__(self) -> None:
|
||||||
self.path = pathlib.Path(self.path).resolve()
|
self.path = pathlib.Path(self.path).resolve()
|
||||||
self.get_details()
|
self.update_details()
|
||||||
self.set_description()
|
|
||||||
enable_smart(self)
|
enable_smart(self)
|
||||||
update_smart_details(self)
|
update_smart_details(self)
|
||||||
if not self.attributes and self.bus == 'USB':
|
if not self.attributes and self.bus == 'USB':
|
||||||
|
|
@ -84,6 +85,14 @@ class Disk:
|
||||||
present = True
|
present = True
|
||||||
return present
|
return present
|
||||||
|
|
||||||
|
@property
|
||||||
|
def description(self) -> str:
|
||||||
|
"""Get disk description from details."""
|
||||||
|
return (
|
||||||
|
f'{bytes_to_string(self.size, use_binary=False)}'
|
||||||
|
f' ({self.bus}) {self.model} {self.serial}'
|
||||||
|
)
|
||||||
|
|
||||||
def disable_disk_tests(self) -> None:
|
def disable_disk_tests(self) -> None:
|
||||||
"""Disable all tests."""
|
"""Disable all tests."""
|
||||||
LOG.warning('Disabling all tests for: %s', self.path)
|
LOG.warning('Disabling all tests for: %s', self.path)
|
||||||
|
|
@ -117,51 +126,6 @@ class Disk:
|
||||||
|
|
||||||
return report
|
return report
|
||||||
|
|
||||||
def get_details(self) -> None:
|
|
||||||
"""Get disk details using OS specific methods.
|
|
||||||
|
|
||||||
Required details default to generic descriptions
|
|
||||||
and are converted to the correct type.
|
|
||||||
"""
|
|
||||||
if PLATFORM == 'Darwin':
|
|
||||||
self.raw_details = get_disk_details_macos(self.path)
|
|
||||||
elif PLATFORM == 'Linux':
|
|
||||||
self.raw_details = get_disk_details_linux(self.path)
|
|
||||||
|
|
||||||
# Set necessary details
|
|
||||||
self.bus = str(self.raw_details.get('bus', '???')).upper()
|
|
||||||
self.bus = self.bus.replace('IMAGE', 'Image')
|
|
||||||
self.bus = self.bus.replace('NVME', 'NVMe')
|
|
||||||
self.filesystem = self.raw_details.get('fstype', 'Unknown')
|
|
||||||
self.log_sec = self.raw_details.get('log-sec', 512)
|
|
||||||
self.model = self.raw_details.get('model', 'Unknown Model')
|
|
||||||
self.name = self.raw_details.get('name', self.path)
|
|
||||||
self.parent = self.raw_details.get('parent', None)
|
|
||||||
self.phy_sec = self.raw_details.get('phy-sec', 512)
|
|
||||||
self.serial = self.raw_details.get('serial', 'Unknown Serial')
|
|
||||||
self.size = self.raw_details.get('size', -1)
|
|
||||||
self.ssd = self.raw_details.get('ssd', False)
|
|
||||||
|
|
||||||
# Ensure certain attributes types
|
|
||||||
## NOTE: This is ugly, deal.
|
|
||||||
for attr in ['bus', 'model', 'name', 'serial']:
|
|
||||||
setattr(self, attr, str(getattr(self, attr)))
|
|
||||||
for attr in ['log_sec', 'phy_sec', 'size']:
|
|
||||||
try:
|
|
||||||
setattr(self, attr, int(getattr(self, attr)))
|
|
||||||
except (TypeError, ValueError):
|
|
||||||
LOG.error('Invalid disk %s: %s', attr, getattr(self, attr))
|
|
||||||
if attr == 'size':
|
|
||||||
setattr(self, attr, -1)
|
|
||||||
|
|
||||||
# Set description
|
|
||||||
self.description = (
|
|
||||||
f'{bytes_to_string(self.size, use_binary=False)}'
|
|
||||||
f' ({self.bus})'
|
|
||||||
f' {self.model}'
|
|
||||||
f' {self.serial}'
|
|
||||||
)
|
|
||||||
|
|
||||||
def get_labels(self) -> list[str]:
|
def get_labels(self) -> list[str]:
|
||||||
"""Build list of labels for this disk, returns list."""
|
"""Build list of labels for this disk, returns list."""
|
||||||
labels = []
|
labels = []
|
||||||
|
|
@ -187,55 +151,109 @@ class Disk:
|
||||||
|
|
||||||
return aligned
|
return aligned
|
||||||
|
|
||||||
def set_description(self) -> None:
|
def update_details(self, skip_children=True) -> None:
|
||||||
"""Set disk description from details."""
|
"""Update disk details using OS specific methods.
|
||||||
self.description = (
|
|
||||||
f'{bytes_to_string(self.size, use_binary=False)}'
|
Required details default to generic descriptions
|
||||||
f' ({self.bus}) {self.model} {self.serial}'
|
and are converted to the correct type.
|
||||||
)
|
"""
|
||||||
|
if PLATFORM == 'Darwin':
|
||||||
|
self.raw_details = get_disk_details_macos(
|
||||||
|
self.path, skip_children=skip_children,
|
||||||
|
)
|
||||||
|
elif PLATFORM == 'Linux':
|
||||||
|
self.raw_details = get_disk_details_linux(
|
||||||
|
self.path, skip_children=skip_children,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Set necessary details
|
||||||
|
self.bus = str(self.raw_details.get('bus', '???')).upper()
|
||||||
|
self.bus = self.bus.replace('IMAGE', 'Image')
|
||||||
|
self.bus = self.bus.replace('NVME', 'NVMe')
|
||||||
|
self.children = self.raw_details.get('children', [])
|
||||||
|
self.filesystem = self.raw_details.get('fstype', 'Unknown')
|
||||||
|
self.log_sec = self.raw_details.get('log-sec', 512)
|
||||||
|
self.model = self.raw_details.get('model', 'Unknown Model')
|
||||||
|
self.name = self.raw_details.get('name', self.path)
|
||||||
|
self.parent = self.raw_details.get('parent', None)
|
||||||
|
self.phy_sec = self.raw_details.get('phy-sec', 512)
|
||||||
|
self.serial = self.raw_details.get('serial', 'Unknown Serial')
|
||||||
|
self.size = self.raw_details.get('size', -1)
|
||||||
|
self.ssd = self.raw_details.get('ssd', False)
|
||||||
|
|
||||||
|
# Ensure certain attributes types
|
||||||
|
## NOTE: This is ugly, deal.
|
||||||
|
for attr in ['bus', 'model', 'name', 'serial']:
|
||||||
|
setattr(self, attr, str(getattr(self, attr)))
|
||||||
|
for attr in ['log_sec', 'phy_sec', 'size']:
|
||||||
|
try:
|
||||||
|
setattr(self, attr, int(getattr(self, attr)))
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
LOG.error('Invalid disk %s: %s', attr, getattr(self, attr))
|
||||||
|
if attr == 'size':
|
||||||
|
setattr(self, attr, -1)
|
||||||
|
|
||||||
|
|
||||||
# Functions
|
# Functions
|
||||||
def get_disk_details_linux(path) -> dict[Any, Any]:
|
def get_disk_details_linux(disk_path, skip_children=True) -> dict[Any, Any]:
|
||||||
"""Get disk details using lsblk, returns dict."""
|
"""Get disk details using lsblk, returns dict."""
|
||||||
cmd = ['lsblk', '--bytes', '--json', '--output-all', '--paths', path]
|
def _flatten_json(dev) -> list:
|
||||||
|
"""Convert lsblk JSON tree to a flat list of items, returns list."""
|
||||||
|
devs = [dev]
|
||||||
|
for child in dev.pop('children', []):
|
||||||
|
devs.extend(_flatten_json(child))
|
||||||
|
return devs
|
||||||
|
|
||||||
|
# Get lsblk info
|
||||||
|
cmd = ['lsblk', '--bytes', '--json', '--output-all', '--paths', disk_path]
|
||||||
|
if skip_children:
|
||||||
|
cmd.append('--nodeps')
|
||||||
json_data = get_json_from_command(cmd, check=False)
|
json_data = get_json_from_command(cmd, check=False)
|
||||||
details = json_data.get('blockdevices', [{}])[0]
|
dev_list = _flatten_json(json_data.get('blockdevices', [{}])[0])
|
||||||
|
|
||||||
# Fix details
|
# Fix details
|
||||||
for dev in [details, *details.get('children', [])]:
|
for dev in dev_list:
|
||||||
dev['bus'] = dev.pop('tran', '???')
|
dev['bus'] = dev.pop('tran', '???')
|
||||||
dev['parent'] = dev.pop('pkname', None)
|
dev['parent'] = dev.pop('pkname', None)
|
||||||
dev['ssd'] = not dev.pop('rota', True)
|
dev['ssd'] = not dev.pop('rota', True)
|
||||||
if 'loop' in str(path) and dev['bus'] is None:
|
if 'loop' in str(disk_path) and dev['bus'] is None:
|
||||||
dev['bus'] = 'Image'
|
dev['bus'] = 'Image'
|
||||||
dev['model'] = ''
|
dev['model'] = ''
|
||||||
dev['serial'] = ''
|
dev['serial'] = ''
|
||||||
|
|
||||||
|
# Convert to dict
|
||||||
|
details = dev_list.pop(0)
|
||||||
|
details['children'] = dev_list
|
||||||
|
|
||||||
# Done
|
# Done
|
||||||
return details
|
return details
|
||||||
|
|
||||||
|
|
||||||
def get_disk_details_macos(path) -> dict[Any, Any]:
|
def get_disk_details_macos(disk_path, skip_children=True) -> dict:
|
||||||
"""Get disk details using diskutil, returns dict."""
|
"""Get disk details using diskutil, returns dict."""
|
||||||
details = {}
|
details = {}
|
||||||
|
disk_path = pathlib.Path(disk_path)
|
||||||
|
|
||||||
# Get "list" details
|
# Get "list" details
|
||||||
cmd = ['diskutil', 'list', '-plist', path]
|
cmd = ['diskutil', 'list', '-plist', disk_path]
|
||||||
proc = run_program(cmd, check=False, encoding=None, errors=None)
|
proc = run_program(cmd, check=False, encoding=None, errors=None)
|
||||||
try:
|
try:
|
||||||
plist_data = plistlib.loads(proc.stdout)
|
plist_data = plistlib.loads(proc.stdout)
|
||||||
except (TypeError, ValueError):
|
except (TypeError, ValueError):
|
||||||
# Invalid / corrupt plist data? return empty dict to avoid crash
|
# Invalid / corrupt plist data? return empty dict to avoid crash
|
||||||
LOG.error('Failed to get diskutil list for %s', path)
|
LOG.error('Failed to get diskutil list for %s', disk_path)
|
||||||
return details
|
return details
|
||||||
|
|
||||||
# Parse "list" details
|
# Parse "list" details
|
||||||
details = plist_data.get('AllDisksAndPartitions', [{}])[0]
|
details = plist_data.get('AllDisksAndPartitions', [{}])[0]
|
||||||
details['children'] = details.pop('Partitions', [])
|
details['path'] = disk_path
|
||||||
details['path'] = path
|
if skip_children:
|
||||||
for child in details['children']:
|
details['children'] = []
|
||||||
child['path'] = path.with_name(child.get('DeviceIdentifier', 'null'))
|
else:
|
||||||
|
details['children'] = details.pop('Partitions', [])
|
||||||
|
details['children'].extend(details.pop('APFSVolumes', []))
|
||||||
|
for child in details['children']:
|
||||||
|
child['path'] = disk_path.with_name(child['DeviceIdentifier'])
|
||||||
|
|
||||||
# Get "info" details
|
# Get "info" details
|
||||||
for dev in [details, *details['children']]:
|
for dev in [details, *details['children']]:
|
||||||
|
|
@ -244,8 +262,8 @@ def get_disk_details_macos(path) -> dict[Any, Any]:
|
||||||
try:
|
try:
|
||||||
plist_data = plistlib.loads(proc.stdout)
|
plist_data = plistlib.loads(proc.stdout)
|
||||||
except (TypeError, ValueError):
|
except (TypeError, ValueError):
|
||||||
LOG.error('Failed to get diskutil info for %s', path)
|
LOG.error('Failed to get diskutil info for %s', dev['path'])
|
||||||
continue #Skip
|
continue
|
||||||
|
|
||||||
# Parse "info" details
|
# Parse "info" details
|
||||||
dev.update(plist_data)
|
dev.update(plist_data)
|
||||||
|
|
@ -267,7 +285,7 @@ def get_disk_details_macos(path) -> dict[Any, Any]:
|
||||||
|
|
||||||
# Fix details if main dev is a child
|
# Fix details if main dev is a child
|
||||||
for child in details['children']:
|
for child in details['children']:
|
||||||
if path == child['path']:
|
if disk_path == child['path']:
|
||||||
for key in ('fstype', 'label', 'name', 'size'):
|
for key in ('fstype', 'label', 'name', 'size'):
|
||||||
details[key] = child[key]
|
details[key] = child[key]
|
||||||
break
|
break
|
||||||
|
|
@ -330,12 +348,12 @@ def get_disks_macos() -> list[Disk]:
|
||||||
cmd = ['diskutil', 'list', '-plist', 'physical']
|
cmd = ['diskutil', 'list', '-plist', 'physical']
|
||||||
disks = []
|
disks = []
|
||||||
|
|
||||||
|
# El Capitan workaround
|
||||||
|
if platform.mac_ver()[0].startswith('10.11'):
|
||||||
|
cmd.pop()
|
||||||
|
|
||||||
# Get info from diskutil
|
# Get info from diskutil
|
||||||
proc = run_program(cmd, encoding=None, errors=None, check=False)
|
proc = run_program(cmd, encoding=None, errors=None, check=False)
|
||||||
if proc.returncode != 0:
|
|
||||||
# Assuming we're running on an older macOS version
|
|
||||||
cmd.pop(-1)
|
|
||||||
proc = run_program(cmd, encoding=None, errors=None, check=False)
|
|
||||||
|
|
||||||
# Parse plist data
|
# Parse plist data
|
||||||
try:
|
try:
|
||||||
|
|
@ -346,13 +364,19 @@ def get_disks_macos() -> list[Disk]:
|
||||||
return disks
|
return disks
|
||||||
|
|
||||||
# Add valid disks
|
# Add valid disks
|
||||||
for disk in plist_data['WholeDisks']:
|
for disk in plist_data['AllDisksAndPartitions']:
|
||||||
disks.append(Disk(f'/dev/{disk}'))
|
name = disk['DeviceIdentifier']
|
||||||
|
if name not in plist_data['WholeDisks']:
|
||||||
|
# Only check "WholeDisks"
|
||||||
|
continue
|
||||||
|
if not disk['Content']:
|
||||||
|
# This lists GPT or MBR for device, blank should mean it's an image
|
||||||
|
continue
|
||||||
|
disks.append(Disk(f'/dev/{name}'))
|
||||||
|
|
||||||
# Remove virtual disks
|
# Remove virtual disks
|
||||||
# TODO: Test more to figure out why some drives are being marked 'Unknown'
|
|
||||||
disks = [
|
disks = [
|
||||||
d for d in disks if d.details.get('VirtualOrPhysical') != 'Virtual'
|
d for d in disks if d.raw_details.get('VirtualOrPhysical') != 'Virtual'
|
||||||
]
|
]
|
||||||
|
|
||||||
# Done
|
# Done
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,7 @@ from dataclasses import dataclass, field
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from wk.cfg.hw import KNOWN_RAM_VENDOR_IDS
|
from wk.cfg.hw import KNOWN_RAM_VENDOR_IDS
|
||||||
|
from wk.cfg.python import DATACLASS_DECORATOR_KWARGS
|
||||||
from wk.exe import get_json_from_command, run_program
|
from wk.exe import get_json_from_command, run_program
|
||||||
from wk.hw.test import Test
|
from wk.hw.test import Test
|
||||||
from wk.std import (
|
from wk.std import (
|
||||||
|
|
@ -23,7 +24,7 @@ from wk.std import (
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@dataclass(slots=True)
|
@dataclass(**DATACLASS_DECORATOR_KWARGS)
|
||||||
class System:
|
class System:
|
||||||
"""Object for tracking system specific hardware data."""
|
"""Object for tracking system specific hardware data."""
|
||||||
cpu_description: str = field(init=False)
|
cpu_description: str = field(init=False)
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,9 @@
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from typing import Any, Callable
|
from typing import Any, Callable
|
||||||
|
|
||||||
@dataclass(slots=True)
|
from wk.cfg.python import DATACLASS_DECORATOR_KWARGS
|
||||||
|
|
||||||
|
@dataclass(**DATACLASS_DECORATOR_KWARGS)
|
||||||
class Test:
|
class Test:
|
||||||
# pylint: disable=too-many-instance-attributes
|
# pylint: disable=too-many-instance-attributes
|
||||||
"""Object for tracking test specific data."""
|
"""Object for tracking test specific data."""
|
||||||
|
|
@ -27,7 +29,7 @@ class Test:
|
||||||
self.status = status
|
self.status = status
|
||||||
|
|
||||||
|
|
||||||
@dataclass(slots=True)
|
@dataclass(**DATACLASS_DECORATOR_KWARGS)
|
||||||
class TestGroup:
|
class TestGroup:
|
||||||
"""Object for tracking groups of tests."""
|
"""Object for tracking groups of tests."""
|
||||||
name: str
|
name: str
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,7 @@ UUID_CORESTORAGE = '53746f72-6167-11aa-aa11-00306543ecac'
|
||||||
|
|
||||||
|
|
||||||
# Functions
|
# Functions
|
||||||
def build_volume_report(device_path=None) -> str:
|
def build_volume_report(device_path=None) -> list:
|
||||||
"""Build volume report using lsblk, returns list.
|
"""Build volume report using lsblk, returns list.
|
||||||
|
|
||||||
If device_path is provided the report is limited to that device.
|
If device_path is provided the report is limited to that device.
|
||||||
|
|
@ -183,6 +183,12 @@ def mount_volumes(device_path=None, read_write=False, scan_corestorage=False):
|
||||||
NOTE: If device_path is specified then only volumes
|
NOTE: If device_path is specified then only volumes
|
||||||
under that path will be mounted.
|
under that path will be mounted.
|
||||||
"""
|
"""
|
||||||
|
def _get_volumes(dev) -> list:
|
||||||
|
"""Convert lsblk JSON tree to a flat list of items, returns list."""
|
||||||
|
volumes = [dev]
|
||||||
|
for child in dev.get('children', []):
|
||||||
|
volumes.extend(_get_volumes(child))
|
||||||
|
return volumes
|
||||||
json_data = {}
|
json_data = {}
|
||||||
volumes = []
|
volumes = []
|
||||||
containers = []
|
containers = []
|
||||||
|
|
@ -199,12 +205,10 @@ def mount_volumes(device_path=None, read_write=False, scan_corestorage=False):
|
||||||
json_data = get_json_from_command(cmd)
|
json_data = get_json_from_command(cmd)
|
||||||
|
|
||||||
# Build list of volumes
|
# Build list of volumes
|
||||||
for dev in json_data.get('blockdevices', []):
|
for dev in _get_volumes(json_data.get('blockdevices', [{}])[0]):
|
||||||
volumes.append(dev)
|
volumes.append(dev)
|
||||||
for child in dev.get('children', []):
|
if dev.get('parttype', '') == UUID_CORESTORAGE:
|
||||||
volumes.append(child)
|
containers.append(dev['name'])
|
||||||
if child['parttype'] == UUID_CORESTORAGE:
|
|
||||||
containers.append(child['name'])
|
|
||||||
|
|
||||||
# Scan CoreStorage containers
|
# Scan CoreStorage containers
|
||||||
if scan_corestorage:
|
if scan_corestorage:
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue