From 908ffdc99900a80890884a9f9ca2460c03b09391 Mon Sep 17 00:00:00 2001 From: 2Shirt Date: Thu, 18 Mar 2021 03:23:09 -0600 Subject: [PATCH] Add mount_volumes() to wk.os.mac Supports both CoreStorage and APFS containers --- scripts/wk/hw/diags.py | 21 ++++-- scripts/wk/os/mac.py | 168 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 182 insertions(+), 7 deletions(-) diff --git a/scripts/wk/hw/diags.py b/scripts/wk/hw/diags.py index 8c8abfad..3e506103 100644 --- a/scripts/wk/hw/diags.py +++ b/scripts/wk/hw/diags.py @@ -1384,18 +1384,25 @@ def ost_convert_report(original_report, start_index): def ost_generate_volume_report(dev): """Generate volume report for dev, returns list.""" report = [] + vol_report = None # OS Check - if PLATFORM != 'Linux': - # TODO: Add macOS volume report + if PLATFORM == 'Darwin': + vol_report = wk_os.mac.mount_volumes( + device_path=dev.path, + read_write=False, + ) + elif PLATFORM == 'Linux': + vol_report = wk_os.linux.mount_volumes( + device_path=dev.path, + read_write=False, + scan_corestorage=not dev.any_test_failed(), + ) + else: + # Volume report unavailable return report # Convert mount_volume report - vol_report = wk_os.linux.mount_volumes( - device_path=dev.path, - read_write=False, - scan_corestorage=not dev.any_test_failed(), - ) for line in vol_report: line = std.strip_colors(line) match = VOLUME_REGEX.match(line) diff --git a/scripts/wk/os/mac.py b/scripts/wk/os/mac.py index 5c812826..31b1f5de 100644 --- a/scripts/wk/os/mac.py +++ b/scripts/wk/os/mac.py @@ -2,9 +2,12 @@ # vim: sts=2 sw=2 ts=2 import logging +import plistlib import re +from wk import std from wk.exe import run_program +from wk.hw.obj import Disk # STATIC VARIABLES @@ -32,6 +35,171 @@ def decode_smc_bytes(text): return result +def get_apfs_volumes(device_path): + """Get APFS volumes contained in device_path, returns list.""" + volumes = [] + containers = [] + + # Get APFS details + cmd = ['diskutil', 'apfs', 'list', '-plist'] + proc = run_program(cmd, check=False, encoding=None, errors=None) + try: + plist_data = plistlib.loads(proc.stdout) + except (TypeError, ValueError): + # Invalid / corrupt plist data? return empty dict to avoid crash + LOG.error('Failed to get diskutil apfs list for %s', device_path) + + # Find container(s) relating to device_path + for container in plist_data['Containers']: + if container['DesignatedPhysicalStore'] == device_path: + containers.append(container) + + # Add volumes + for container in containers: + for volume in container['Volumes']: + volumes.append(Disk(f'/dev/{volume["DeviceIdentifier"]}')) + + # Done + return volumes + + +def get_core_storage_volumes(device_path): + """Get CoreStorage volumes contained in device_path, returns list.""" + disks = [] + volumes = [] + + # Get coreStorage details + cmd = ['diskutil', 'corestorage', 'list', '-plist'] + proc = run_program(cmd, check=False, encoding=None, errors=None) + try: + plist_data = plistlib.loads(proc.stdout) + except (TypeError, ValueError): + # invalid / corrupt plist data? return empty dict to avoid crash + LOG.error('failed to get diskutil corestorage list for %s', device_path) + + # Find related virtual disks + for l_vg in plist_data['CoreStorageLogicalVolumeGroups']: + related = False + + # Compare parent physical volumes againt device_path + for p_v in l_vg['CoreStoragePhysicalVolumes']: + uuid = p_v['CoreStorageUUID'] + cmd = ['diskutil', 'coreStorage', 'info', '-plist', uuid] + try: + uuid_data = plistlib.loads(proc.stdout) + except (TypeError, ValueError): + LOG.error('failed to get diskutil corestorage info for %s', uuid) + continue + if uuid_data['DeviceIdentifier'] == device_path: + related = True + break + + # Move on if no related p_v was found + if not related: + continue + + # Add logical disks to list + for l_v in l_vg['CoreStorageLogicalVolumes']: + uuid = l_v['CoreStorageUUID'] + cmd = ['diskutil', 'coreStorage', 'info', '-plist', uuid] + try: + uuid_data = plistlib.loads(proc.stdout) + except (TypeError, ValueError): + LOG.error('failed to get diskutil corestorage info for %s', uuid) + continue + disks.append(uuid_data['DeviceIdentifier']) + + # Get volumes from logical disks + for disk in disks: + cmd = ['diskutil', 'list', '-plist', f'/dev/{disk}'] + proc = run_program(cmd, check=False, encoding=None, errors=None) + try: + disk_data = plistlib.loads(proc.stdout) + except (TypeError, ValueError): + LOG.error('Failed to get diskutil list for %s', disk) + continue + for part in disk_data['AllDisksAndPartitions'][0]['Partitions']: + volumes.append(Disk(f'/dev/{part["DeviceIdentifier"]}')) + + # Done + return volumes + + +def mount_volumes(device_path=None, read_write=False): + """Mount all detected volumes, returns list. + + NOTE: If device_path is specified then only volumes + under that path will be mounted. + """ + report = [] + volumes = [] + + # Get device details + cmd = [ + 'diskutil', + 'list', + '-plist', + ] + if device_path: + cmd.append(device_path) + proc = run_program(cmd, check=False, encoding=None, errors=None) + try: + plist_data = plistlib.loads(proc.stdout) + except (TypeError, ValueError): + # Invalid / corrupt plist data? return empty dict to avoid crash + LOG.error('Failed to get diskutil list for %s', device_path) + return report + + # Mount and add volumes + for part in plist_data['AllDisksAndPartitions'][0]['Partitions']: + cmd = ['diskutil', 'mount', 'readOnly', part['DeviceIdentifier']] + if read_write: + # NOTE: Unused 2021-03 + cmd.pop(2) + proc = run_program(cmd, check=False) + + # Add volume + volumes.append(Disk(f'/dev/{part["DeviceIdentifier"]}')) + + # Add sub-volumes + if proc.returncode: + if part['Content'] == 'Apple_CoreStorage': + volumes.extend(get_core_storage_volumes(part['DeviceIdentifier'])) + elif part['Content'] == 'Apple_APFS': + volumes.extend(get_apfs_volumes(part['DeviceIdentifier'])) + + # Build report from volume list + for vol in volumes: + result = f'{vol.details["name"]:<20}' + + # Containers + if vol.details['Content'] in ('Apple_APFS', 'Apple_CoreStorage'): + result += f'{vol.details["Content"].replace("Apple_", "")} container' + report.append(std.color_string(result, 'BLUE')) + continue + + # Unknown partitions + if not vol.details['mountpoint']: + result += 'Failed to mount' + report.append(std.color_string(result, 'RED')) + continue + + # Volumes + result += f'{"Mounted on "+vol.details.get("mountpoint", "?"):<40}' + size = vol.details.get('VolumeSize', -1) + free = vol.details.get('FreeSpace', -1) + used = size - free + result = ( + f'{result} ({vol.details.get("fstype", "Unknown FS")+",":<5} ' + f'{std.bytes_to_string(used, decimals=1):>9} used, ' + f'{std.bytes_to_string(size, decimals=1):>9} size, ' + ) + report.append(result) + + # Done + return report + + def set_fans(mode): """Set fans to auto or max.""" if mode == 'auto':