WizardKit/scripts/wk/os/mac.py
2Shirt 207c52663b Allow mounting of protected macOS partitions
Renamed mount_volumes() to mount_disk() to better match diskutil naming.
Dropped read_write from mount_disk() since it isn't used
2021-03-18 22:23:07 -06:00

269 lines
7.9 KiB
Python

"""WizardKit: macOS Functions"""
# vim: sts=2 sw=2 ts=2
import logging
import plistlib
import re
from wk import std
from wk.exe import run_program
from wk.hw.obj import Disk
# STATIC VARIABLES
LOG = logging.getLogger(__name__)
REGEX_FANS = re.compile(r'^.*\(bytes (?P<bytes>.*)\)$')
# Functions
def decode_smc_bytes(text):
"""Decode SMC bytes, returns int."""
result = None
# Get bytes
match = REGEX_FANS.match(text)
if not match:
LOG.error('Failed to decode smc output: %s', text)
raise ValueError(f'Failed to decocde smc output: {text}')
# Convert to text
result = match.group('bytes')
result = result.replace(' ', '')
result = int(result, 16)
# Done
return result
def get_apfs_volumes(device_path):
"""Get APFS volumes contained in device_path, returns list."""
volumes = []
containers = []
# Get APFS details
cmd = ['diskutil', 'apfs', 'list', '-plist']
proc = run_program(cmd, check=False, encoding=None, errors=None)
try:
plist_data = plistlib.loads(proc.stdout)
except (TypeError, ValueError):
# Invalid / corrupt plist data? return empty dict to avoid crash
LOG.error('Failed to get diskutil apfs list for %s', device_path)
# Find container(s) relating to device_path
for container in plist_data['Containers']:
if container['DesignatedPhysicalStore'] == device_path:
containers.append(container)
# Add volumes
for container in containers:
for volume in container['Volumes']:
mount_volume(volume['DeviceIdentifier'])
volumes.append(Disk(f'/dev/{volume["DeviceIdentifier"]}'))
# Done
return volumes
def get_core_storage_volumes(device_path):
# pylint: disable=too-many-branches
"""Get CoreStorage volumes contained in device_path, returns list."""
disks = []
volumes = []
# Get coreStorage details
cmd = ['diskutil', 'corestorage', 'list', '-plist']
proc = run_program(cmd, check=False, encoding=None, errors=None)
try:
plist_data = plistlib.loads(proc.stdout)
except (TypeError, ValueError):
# invalid / corrupt plist data? return empty dict to avoid crash
LOG.error('failed to get diskutil corestorage list for %s', device_path)
# Find related virtual disks
for l_vg in plist_data['CoreStorageLogicalVolumeGroups']:
related = False
# Compare parent physical volumes againt device_path
for p_v in l_vg['CoreStoragePhysicalVolumes']:
uuid = p_v['CoreStorageUUID']
cmd = ['diskutil', 'coreStorage', 'info', '-plist', uuid]
proc = run_program(cmd, check=False, encoding=None, errors=None)
try:
plist_data = plistlib.loads(proc.stdout)
except (TypeError, ValueError):
LOG.error('failed to get diskutil corestorage info for %s', uuid)
continue
if plist_data['DeviceIdentifier'] == device_path:
related = True
break
# Move on if no related p_v was found
if not related:
continue
# Add logical disks to list
for l_vf in l_vg['CoreStorageLogicalVolumeFamilies']:
for l_v in l_vf['CoreStorageLogicalVolumes']:
uuid = l_v['CoreStorageUUID']
cmd = ['diskutil', 'coreStorage', 'info', '-plist', uuid]
proc = run_program(cmd, check=False, encoding=None, errors=None)
try:
plist_data = plistlib.loads(proc.stdout)
except (TypeError, ValueError):
LOG.error('failed to get diskutil corestorage info for %s', uuid)
continue
disks.append(plist_data['DeviceIdentifier'])
# Get volumes from logical disks
for disk in disks:
cmd = ['diskutil', 'list', '-plist', f'/dev/{disk}']
proc = run_program(cmd, check=False, encoding=None, errors=None)
try:
plist_data = plistlib.loads(proc.stdout)
disk_data = plist_data['AllDisksAndPartitions'][0]
except (TypeError, ValueError):
LOG.error('Failed to get diskutil list for %s', disk)
continue
if 'Partitions' in disk_data:
for part in disk_data['Partitions']:
mount_volume(part['DeviceIdentifier'])
volumes.append(Disk(f'/dev/{part["DeviceIdentifier"]}'))
elif 'VolumeName' in disk_data:
mount_volume(disk_data['DeviceIdentifier'])
volumes.append(Disk(f'/dev/{disk_data["DeviceIdentifier"]}'))
# Done
return volumes
def mount_volume(volume_path, read_only=True):
"""Try to mount a volume, returns subprocess.CompletedProcess."""
cmd = ['sudo', 'diskutil', 'mount']
if read_only:
cmd.append('readOnly')
cmd.append(volume_path)
proc = run_program(cmd, check=False)
# Done
return proc
def mount_disk(device_path=None):
"""Mount all detected volumes, returns list.
NOTE: If device_path is specified then only volumes
under that path will be mounted.
"""
report = []
volumes = []
# Get device details
cmd = [
'diskutil',
'list',
'-plist',
]
if device_path:
cmd.append(device_path)
proc = run_program(cmd, check=False, encoding=None, errors=None)
try:
plist_data = plistlib.loads(proc.stdout)
except (TypeError, ValueError):
# Invalid / corrupt plist data? return empty dict to avoid crash
LOG.error('Failed to get diskutil list for %s', device_path)
return report
# Mount and add volumes
for part in plist_data['AllDisksAndPartitions'][0]['Partitions']:
proc = mount_volume(part['DeviceIdentifier'])
# Add volume
volumes.append(Disk(f'/dev/{part["DeviceIdentifier"]}'))
# Add sub-volumes
if proc.returncode:
if part['Content'] == 'Apple_CoreStorage':
volumes.extend(get_core_storage_volumes(part['DeviceIdentifier']))
elif part['Content'] == 'Apple_APFS':
volumes.extend(get_apfs_volumes(part['DeviceIdentifier']))
# Build report from volume list
for vol in volumes:
result = f'{vol.details["name"]:<20}'
# Containers
if vol.details['Content'] in ('Apple_APFS', 'Apple_CoreStorage'):
result += f'{vol.details["Content"].replace("Apple_", "")} container'
report.append(std.color_string(result, 'BLUE'))
continue
# Unknown partitions
if not vol.details['mountpoint']:
result += 'Failed to mount'
report.append(std.color_string(result, 'RED'))
continue
# Volumes
result += f'{"Mounted on "+vol.details.get("mountpoint", "?"):<40}'
size = vol.details.get('TotalSize', -1)
free = vol.details.get('FreeSpace', -1)
used = size - free
result = (
f'{result} ({vol.details.get("fstype", "Unknown FS")+",":<5} '
f'{std.bytes_to_string(used, decimals=1):>9} used, '
f'{std.bytes_to_string(free, decimals=1):>9} free) '
)
report.append(result)
# Done
return report
def set_fans(mode):
"""Set fans to auto or max."""
if mode == 'auto':
set_fans_auto()
elif mode == 'max':
set_fans_max()
else:
raise RuntimeError(f'Invalid fan mode: {mode}')
def set_fans_auto():
"""Set fans to auto."""
LOG.info('Setting fans to auto')
cmd = ['sudo', 'smc', '-k', 'FS! ', '-w', '0000']
run_program(cmd)
def set_fans_max():
"""Set fans to their max speeds."""
LOG.info('Setting fans to max')
num_fans = 0
# Get number of fans
cmd = ['smc', '-k', 'FNum', '-r']
proc = run_program(cmd)
num_fans = decode_smc_bytes(proc.stdout)
LOG.info('Found %s fans', num_fans)
# Set all fans to forced speed
## NOTE: mask is bit mask from right to left enabling fans
## e.g. bit 1 is fan 0, bit 2 is fan 1, etc
## So the mask for two fans is 0b11 or 0x3, four would be 0b111 or 0x7
mask = f'{hex(2**num_fans - 1)[2:]:0>4}'
cmd = ['sudo', 'smc', '-k', 'FS! ', '-w', mask]
run_program(cmd)
# Set all fans to their max speed
for fan in range(num_fans):
cmd = ['smc', '-k', f'F{fan}Mx', '-r']
proc = run_program(cmd)
max_temp = decode_smc_bytes(proc.stdout)
LOG.info('Setting fan #%s to %s RPM', fan, str(max_temp >> 2))
cmd = ['sudo', 'smc', '-k', f'F{fan}Tg', '-w', hex(max_temp)[2:]]
run_program(cmd)
if __name__ == '__main__':
print("This file is not meant to be called directly.")