464 lines
14 KiB
Python
464 lines
14 KiB
Python
"""WizardKit: Disk object and functions"""
|
|
# vim: sts=2 sw=2 ts=2
|
|
|
|
import copy
|
|
import logging
|
|
import pathlib
|
|
import platform
|
|
import plistlib
|
|
import re
|
|
|
|
from dataclasses import dataclass, field
|
|
from typing import Any, Union
|
|
|
|
from wk.cfg.main import KIT_NAME_SHORT
|
|
from wk.cfg.python import DATACLASS_DECORATOR_KWARGS
|
|
from wk.exe import get_json_from_command, run_program
|
|
from wk.hw.test import Test
|
|
from wk.hw.smart import (
|
|
generate_attribute_report,
|
|
get_known_disk_attributes,
|
|
)
|
|
from wk.std import PLATFORM
|
|
from wk.ui import ansi
|
|
|
|
|
|
# STATIC VARIABLES
|
|
LOG = logging.getLogger(__name__)
|
|
WK_LABEL_REGEX = re.compile(
|
|
fr'{KIT_NAME_SHORT}_(LINUX|UFD)',
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
|
|
# Classes
|
|
@dataclass(**DATACLASS_DECORATOR_KWARGS)
|
|
class Disk:
|
|
"""Object for tracking disk specific data."""
|
|
attributes: dict[Any, dict] = field(init=False, default_factory=dict)
|
|
bus: str = field(init=False)
|
|
children: list[dict] = field(init=False, default_factory=list)
|
|
description: str = field(init=False)
|
|
filesystem: str = field(init=False)
|
|
initial_attributes: dict[Any, dict] = field(init=False)
|
|
known_attributes: dict[Any, dict] = field(init=False, default_factory=dict)
|
|
log_sec: int = field(init=False)
|
|
model: str = field(init=False)
|
|
name: str = field(init=False)
|
|
notes: list[str] = field(init=False, default_factory=list)
|
|
path: Union[pathlib.Path, str]
|
|
parent: str = field(init=False)
|
|
phy_sec: int = field(init=False)
|
|
raw_details: dict[str, Any] = field(init=False)
|
|
raw_smartctl: dict[str, Any] = field(init=False)
|
|
serial: str = field(init=False)
|
|
size: int = field(init=False)
|
|
ssd: bool = field(init=False)
|
|
tests: list[Test] = field(init=False, default_factory=list)
|
|
use_sat: bool = field(init=False, default=False)
|
|
|
|
def __post_init__(self):
|
|
self.path = pathlib.Path(self.path).resolve()
|
|
self.update_details()
|
|
self.set_description()
|
|
self.known_attributes = get_known_disk_attributes(self.model)
|
|
if not self.attributes and self.bus == 'USB':
|
|
# Try using SAT
|
|
LOG.warning('Using SAT for smartctl for %s', self.path)
|
|
self.notes = []
|
|
self.use_sat = True
|
|
self.initial_attributes = copy.deepcopy(self.attributes)
|
|
if not self.is_4k_aligned():
|
|
self.add_note('One or more partitions are not 4K aligned', 'YELLOW')
|
|
|
|
def add_note(self, note, color=None) -> None:
|
|
"""Add note that will be included in the disk report."""
|
|
if color:
|
|
note = ansi.color_string(note, color)
|
|
if note not in self.notes:
|
|
self.notes.append(note)
|
|
self.notes.sort()
|
|
|
|
def contains_note(self, note_str) -> bool:
|
|
"""Check if note is already present."""
|
|
present = False
|
|
for note in self.notes:
|
|
if note_str == ansi.strip_colors(note):
|
|
present = True
|
|
return present
|
|
|
|
def disable_disk_tests(self) -> None:
|
|
"""Disable all tests."""
|
|
LOG.warning('Disabling all tests for: %s', self.path)
|
|
for test in self.tests:
|
|
if test.status in ('Pending', 'Working'):
|
|
test.set_status('Denied')
|
|
test.disabled = True
|
|
|
|
def generate_report(self, header=True) -> list[str]:
|
|
"""Generate Disk report, returns list."""
|
|
report = []
|
|
if header:
|
|
report.append(ansi.color_string(f'Device ({self.path.name})', 'BLUE'))
|
|
report.append(f' {self.description}')
|
|
|
|
# Attributes
|
|
if self.attributes:
|
|
if header:
|
|
report.append(ansi.color_string('Attributes', 'BLUE'))
|
|
report.extend(generate_attribute_report(self))
|
|
|
|
# Notes
|
|
if self.notes:
|
|
report.append(ansi.color_string('Notes', 'BLUE'))
|
|
for note in self.notes:
|
|
report.append(f' {note}')
|
|
|
|
# Tests
|
|
for test in self.tests:
|
|
report.extend(test.report)
|
|
|
|
return report
|
|
|
|
def get_labels(self) -> list[str]:
|
|
"""Build list of labels for this disk, returns list."""
|
|
labels = []
|
|
|
|
# Add all labels from raw_details
|
|
for details in [self.raw_details, *self.raw_details.get('children', [])]:
|
|
labels.append(details.get('label', ''))
|
|
labels.append(details.get('partlabel', ''))
|
|
|
|
# Remove empty labels
|
|
labels = [str(label) for label in labels if label]
|
|
|
|
# Done
|
|
return labels
|
|
|
|
def is_4k_aligned(self) -> bool:
|
|
"""Check that all disk partitions are aligned, returns bool."""
|
|
aligned = True
|
|
if PLATFORM == 'Darwin':
|
|
aligned = is_4k_aligned_macos(self.raw_details)
|
|
elif PLATFORM == 'Linux':
|
|
aligned = is_4k_aligned_linux(self.path, self.phy_sec)
|
|
|
|
return aligned
|
|
|
|
@property
|
|
def present(self) -> bool:
|
|
"""Verify this device is still present, returns bool."""
|
|
if not self.path.exists():
|
|
self.add_note('Device disconnected', 'RED')
|
|
return False
|
|
return True
|
|
|
|
def set_description(self) -> None:
|
|
"""Set disk description from details."""
|
|
decimals = 1
|
|
suffix = ' '
|
|
|
|
# Set size_str (try binary scale first)
|
|
for scale in (1024, 1000):
|
|
size = float(self.size)
|
|
units = list('KMGTPEZY')
|
|
while units:
|
|
if abs(size) < scale:
|
|
break
|
|
size /= scale
|
|
suffix = units.pop(0)
|
|
size = ((size * 10) // 1) / 10
|
|
if size % 1 == 0:
|
|
# Found an exact whole number, drop the decimal
|
|
decimals = 0
|
|
break
|
|
if size % 1 == 0.5:
|
|
break
|
|
|
|
# Done
|
|
self.description = (
|
|
f'{size:0.{decimals}f} {suffix}B ({self.bus}) {self.model} {self.serial}'
|
|
)
|
|
|
|
def update_details(self, skip_children=True) -> None:
|
|
"""Update disk details using OS specific methods.
|
|
|
|
Required details default to generic descriptions
|
|
and are converted to the correct type.
|
|
"""
|
|
if not self.present:
|
|
return
|
|
|
|
if PLATFORM == 'Darwin':
|
|
self.raw_details = get_disk_details_macos(
|
|
self.path, skip_children=skip_children,
|
|
)
|
|
elif PLATFORM == 'Linux':
|
|
self.raw_details = get_disk_details_linux(
|
|
self.path, skip_children=skip_children,
|
|
)
|
|
|
|
# Set necessary details
|
|
self.bus = str(self.raw_details.get('bus', '???')).upper()
|
|
self.bus = self.bus.replace('IMAGE', 'Image')
|
|
self.bus = self.bus.replace('NVME', 'NVMe')
|
|
self.children = self.raw_details.get('children', [])
|
|
self.filesystem = self.raw_details.get('fstype', 'Unknown')
|
|
self.log_sec = self.raw_details.get('log-sec', 512)
|
|
self.model = self.raw_details.get('model', 'Unknown Model')
|
|
self.name = self.raw_details.get('name', self.path)
|
|
self.parent = self.raw_details.get('parent', None)
|
|
self.phy_sec = self.raw_details.get('phy-sec', 512)
|
|
self.serial = self.raw_details.get('serial', 'Unknown Serial')
|
|
self.size = self.raw_details.get('size', -1)
|
|
self.ssd = self.raw_details.get('ssd', False)
|
|
|
|
# Ensure certain attributes types
|
|
## NOTE: This is ugly, deal.
|
|
for attr in ['bus', 'model', 'name', 'serial']:
|
|
setattr(self, attr, str(getattr(self, attr)))
|
|
for attr in ['log_sec', 'phy_sec', 'size']:
|
|
try:
|
|
setattr(self, attr, int(getattr(self, attr)))
|
|
except (TypeError, ValueError):
|
|
LOG.error('Invalid disk %s: %s', attr, getattr(self, attr))
|
|
if attr == 'size':
|
|
setattr(self, attr, -1)
|
|
|
|
|
|
# Functions
|
|
def get_disk_details_linux(disk_path, skip_children=True) -> dict[Any, Any]:
|
|
"""Get disk details using lsblk, returns dict."""
|
|
def _flatten_json(dev) -> list:
|
|
"""Convert lsblk JSON tree to a flat list of items, returns list."""
|
|
devs = [dev]
|
|
for child in dev.pop('children', []):
|
|
devs.extend(_flatten_json(child))
|
|
return devs
|
|
|
|
# Get lsblk info
|
|
cmd = ['lsblk', '--bytes', '--json', '--output-all', '--paths', disk_path]
|
|
if skip_children:
|
|
cmd.append('--nodeps')
|
|
json_data = get_json_from_command(cmd, check=False)
|
|
dev_list = _flatten_json(json_data.get('blockdevices', [{}])[0])
|
|
|
|
# Fix details
|
|
for dev in dev_list:
|
|
dev['bus'] = dev.pop('tran', '???')
|
|
dev['parent'] = dev.pop('pkname', None)
|
|
dev['ssd'] = not dev.pop('rota', True)
|
|
if 'loop' in str(disk_path) and dev['bus'] is None:
|
|
dev['bus'] = 'Image'
|
|
dev['model'] = ''
|
|
dev['serial'] = ''
|
|
|
|
# Convert to dict
|
|
details = dev_list.pop(0)
|
|
details['children'] = dev_list
|
|
|
|
# Done
|
|
return details
|
|
|
|
|
|
def get_disk_details_macos(disk_path, skip_children=True) -> dict:
|
|
"""Get disk details using diskutil, returns dict."""
|
|
details = {}
|
|
disk_path = pathlib.Path(disk_path)
|
|
|
|
# Get "list" details
|
|
cmd = ['diskutil', 'list', '-plist', disk_path]
|
|
proc = run_program(cmd, check=False, encoding=None, errors=None)
|
|
try:
|
|
plist_data = plistlib.loads(proc.stdout)
|
|
except (TypeError, ValueError):
|
|
# Invalid / corrupt plist data? return empty dict to avoid crash
|
|
LOG.error('Failed to get diskutil list for %s', disk_path)
|
|
return details
|
|
|
|
# Parse "list" details
|
|
details = plist_data.get('AllDisksAndPartitions', [{}])[0]
|
|
details['path'] = disk_path
|
|
if skip_children:
|
|
details['children'] = []
|
|
else:
|
|
details['children'] = details.pop('Partitions', [])
|
|
details['children'].extend(details.pop('APFSVolumes', []))
|
|
for child in details['children']:
|
|
child['path'] = disk_path.with_name(child['DeviceIdentifier'])
|
|
|
|
# Get "info" details
|
|
for dev in [details, *details['children']]:
|
|
cmd = ['diskutil', 'info', '-plist', dev['path']]
|
|
proc = run_program(cmd, check=False, encoding=None, errors=None)
|
|
try:
|
|
plist_data = plistlib.loads(proc.stdout)
|
|
except (TypeError, ValueError):
|
|
LOG.error('Failed to get diskutil info for %s', dev['path'])
|
|
continue
|
|
|
|
# Parse "info" details
|
|
dev.update(plist_data)
|
|
dev['bus'] = dev.pop('BusProtocol', '???')
|
|
dev['fstype'] = dev.pop('FilesystemType', '')
|
|
dev['label'] = dev.pop('VolumeName', '')
|
|
dev['model'] = dev.pop('MediaName', 'Unknown')
|
|
dev['mountpoint'] = dev.pop('MountPoint', '')
|
|
dev['name'] = dev.get('name', str(dev['path']))
|
|
dev['phy-sec'] = dev.pop('DeviceBlockSize', 512)
|
|
dev['serial'] = get_disk_serial_macos(dev['path'])
|
|
dev['size'] = dev.pop('Size', -1)
|
|
dev['ssd'] = dev.pop('SolidState', False)
|
|
dev['vendor'] = ''
|
|
if dev.get('WholeDisk', True):
|
|
dev['parent'] = None
|
|
else:
|
|
dev['parent'] = dev.pop('ParentWholeDisk', None)
|
|
|
|
# Fix details if main dev is a child
|
|
for child in details['children']:
|
|
if disk_path == child['path']:
|
|
for key in ('fstype', 'label', 'name', 'size'):
|
|
details[key] = child[key]
|
|
break
|
|
|
|
# Done
|
|
return details
|
|
|
|
|
|
def get_disk_serial_macos(path) -> str:
|
|
"""Get disk serial using system_profiler, returns str."""
|
|
cmd = ['sudo', 'smartctl', '--info', '--json', path]
|
|
smart_info = get_json_from_command(cmd)
|
|
return smart_info.get('serial_number', 'Unknown Serial')
|
|
|
|
|
|
def get_disks(skip_kits=False) -> list[Disk]:
|
|
"""Get disks using OS-specific methods, returns list."""
|
|
disks = []
|
|
if PLATFORM == 'Darwin':
|
|
disks = get_disks_macos()
|
|
elif PLATFORM == 'Linux':
|
|
disks = get_disks_linux()
|
|
|
|
# Skip WK disks
|
|
if skip_kits:
|
|
for disk in disks:
|
|
disk.update_details(skip_children=False)
|
|
disks = [
|
|
disk_obj for disk_obj in disks
|
|
if not any(
|
|
WK_LABEL_REGEX.search(label) for label in disk_obj.get_labels()
|
|
)
|
|
]
|
|
|
|
# Done
|
|
return disks
|
|
|
|
|
|
def get_disks_linux() -> list[Disk]:
|
|
"""Get disks via lsblk, returns list."""
|
|
cmd = ['lsblk', '--json', '--nodeps', '--paths']
|
|
disks = []
|
|
|
|
# Add valid disks
|
|
json_data = get_json_from_command(cmd)
|
|
for disk in json_data.get('blockdevices', []):
|
|
disk_obj = Disk(disk['name'])
|
|
|
|
# Skip loopback devices, optical devices, etc
|
|
if disk_obj.raw_details.get('type', '???') != 'disk':
|
|
continue
|
|
|
|
# Skip empty devices (usually card readers)
|
|
if disk_obj.size <= 0:
|
|
continue
|
|
|
|
# Add disk
|
|
disks.append(disk_obj)
|
|
|
|
# Done
|
|
return disks
|
|
|
|
|
|
def get_disks_macos() -> list[Disk]:
|
|
"""Get disks via diskutil, returns list."""
|
|
cmd = ['diskutil', 'list', '-plist', 'physical']
|
|
disks = []
|
|
|
|
# El Capitan workaround
|
|
if platform.mac_ver()[0].startswith('10.11'):
|
|
cmd.pop()
|
|
|
|
# Get info from diskutil
|
|
proc = run_program(cmd, encoding=None, errors=None, check=False)
|
|
|
|
# Parse plist data
|
|
try:
|
|
plist_data = plistlib.loads(proc.stdout)
|
|
except (TypeError, ValueError):
|
|
# Invalid / corrupt plist data? return empty list to avoid crash
|
|
LOG.error('Failed to get diskutil list')
|
|
return disks
|
|
|
|
# Add valid disks
|
|
for disk in plist_data['AllDisksAndPartitions']:
|
|
name = disk['DeviceIdentifier']
|
|
if name not in plist_data['WholeDisks']:
|
|
# Only check "WholeDisks"
|
|
continue
|
|
if not disk['Content']:
|
|
# This lists GPT or MBR for device, blank should mean it's an image
|
|
continue
|
|
disks.append(Disk(f'/dev/{name}'))
|
|
|
|
# Remove virtual disks
|
|
disks = [
|
|
d for d in disks if d.raw_details.get('VirtualOrPhysical') != 'Virtual'
|
|
]
|
|
|
|
# Done
|
|
return disks
|
|
|
|
|
|
def is_4k_aligned_macos(disk_details) -> bool:
|
|
"""Check partition alignment using diskutil info, returns bool."""
|
|
aligned = True
|
|
|
|
# Check partitions
|
|
for part in disk_details.get('children', []):
|
|
offset = part.get('PartitionMapPartitionOffset', 0)
|
|
if not offset:
|
|
# Assuming offset couldn't be found and it defaulted to 0
|
|
# NOTE: Just logging the error, not bailing
|
|
LOG.error('Failed to get partition offset for %s', part['path'])
|
|
aligned = aligned and offset >= 0 and offset % 4096 == 0
|
|
|
|
# Done
|
|
return aligned
|
|
|
|
|
|
def is_4k_aligned_linux(dev_path, physical_sector_size) -> bool:
|
|
"""Check partition alignment using lsblk, returns bool."""
|
|
aligned = True
|
|
cmd = [
|
|
'sudo',
|
|
'sfdisk',
|
|
'--json',
|
|
dev_path,
|
|
]
|
|
|
|
# Get partition details
|
|
json_data = get_json_from_command(cmd)
|
|
|
|
# Check partitions
|
|
for part in json_data.get('partitiontable', {}).get('partitions', []):
|
|
offset = physical_sector_size * part.get('start', -1)
|
|
aligned = aligned and offset >= 0 and offset % 4096 == 0
|
|
|
|
# Done
|
|
return aligned
|
|
|
|
|
|
if __name__ == '__main__':
|
|
print("This file is not meant to be called directly.")
|