"""WizardKit: macOS Functions""" # vim: sts=2 sw=2 ts=2 import logging import plistlib import re from wk import std from wk.exe import run_program from wk.hw.obj import Disk # STATIC VARIABLES LOG = logging.getLogger(__name__) REGEX_FANS = re.compile(r'^.*\(bytes (?P.*)\)$') # Functions def decode_smc_bytes(text): """Decode SMC bytes, returns int.""" result = None # Get bytes match = REGEX_FANS.match(text) if not match: LOG.error('Failed to decode smc output: %s', text) raise ValueError(f'Failed to decocde smc output: {text}') # Convert to text result = match.group('bytes') result = result.replace(' ', '') result = int(result, 16) # Done return result def get_apfs_volumes(device_path): """Get APFS volumes contained in device_path, returns list.""" volumes = [] containers = [] # Get APFS details cmd = ['diskutil', 'apfs', 'list', '-plist'] proc = run_program(cmd, check=False, encoding=None, errors=None) try: plist_data = plistlib.loads(proc.stdout) except (TypeError, ValueError): # Invalid / corrupt plist data? return empty dict to avoid crash LOG.error('Failed to get diskutil apfs list for %s', device_path) # Find container(s) relating to device_path for container in plist_data['Containers']: if container['DesignatedPhysicalStore'] == device_path: containers.append(container) # Add volumes for container in containers: for volume in container['Volumes']: volumes.append(Disk(f'/dev/{volume["DeviceIdentifier"]}')) # Done return volumes def get_core_storage_volumes(device_path): """Get CoreStorage volumes contained in device_path, returns list.""" disks = [] volumes = [] # Get coreStorage details cmd = ['diskutil', 'corestorage', 'list', '-plist'] proc = run_program(cmd, check=False, encoding=None, errors=None) try: plist_data = plistlib.loads(proc.stdout) except (TypeError, ValueError): # invalid / corrupt plist data? return empty dict to avoid crash LOG.error('failed to get diskutil corestorage list for %s', device_path) # Find related virtual disks for l_vg in plist_data['CoreStorageLogicalVolumeGroups']: related = False # Compare parent physical volumes againt device_path for p_v in l_vg['CoreStoragePhysicalVolumes']: uuid = p_v['CoreStorageUUID'] cmd = ['diskutil', 'coreStorage', 'info', '-plist', uuid] try: uuid_data = plistlib.loads(proc.stdout) except (TypeError, ValueError): LOG.error('failed to get diskutil corestorage info for %s', uuid) continue if uuid_data['DeviceIdentifier'] == device_path: related = True break # Move on if no related p_v was found if not related: continue # Add logical disks to list for l_v in l_vg['CoreStorageLogicalVolumes']: uuid = l_v['CoreStorageUUID'] cmd = ['diskutil', 'coreStorage', 'info', '-plist', uuid] try: uuid_data = plistlib.loads(proc.stdout) except (TypeError, ValueError): LOG.error('failed to get diskutil corestorage info for %s', uuid) continue disks.append(uuid_data['DeviceIdentifier']) # Get volumes from logical disks for disk in disks: cmd = ['diskutil', 'list', '-plist', f'/dev/{disk}'] proc = run_program(cmd, check=False, encoding=None, errors=None) try: disk_data = plistlib.loads(proc.stdout) except (TypeError, ValueError): LOG.error('Failed to get diskutil list for %s', disk) continue for part in disk_data['AllDisksAndPartitions'][0]['Partitions']: volumes.append(Disk(f'/dev/{part["DeviceIdentifier"]}')) # Done return volumes def mount_volumes(device_path=None, read_write=False): """Mount all detected volumes, returns list. NOTE: If device_path is specified then only volumes under that path will be mounted. """ report = [] volumes = [] # Get device details cmd = [ 'diskutil', 'list', '-plist', ] if device_path: cmd.append(device_path) proc = run_program(cmd, check=False, encoding=None, errors=None) try: plist_data = plistlib.loads(proc.stdout) except (TypeError, ValueError): # Invalid / corrupt plist data? return empty dict to avoid crash LOG.error('Failed to get diskutil list for %s', device_path) return report # Mount and add volumes for part in plist_data['AllDisksAndPartitions'][0]['Partitions']: cmd = ['diskutil', 'mount', 'readOnly', part['DeviceIdentifier']] if read_write: # NOTE: Unused 2021-03 cmd.pop(2) proc = run_program(cmd, check=False) # Add volume volumes.append(Disk(f'/dev/{part["DeviceIdentifier"]}')) # Add sub-volumes if proc.returncode: if part['Content'] == 'Apple_CoreStorage': volumes.extend(get_core_storage_volumes(part['DeviceIdentifier'])) elif part['Content'] == 'Apple_APFS': volumes.extend(get_apfs_volumes(part['DeviceIdentifier'])) # Build report from volume list for vol in volumes: result = f'{vol.details["name"]:<20}' # Containers if vol.details['Content'] in ('Apple_APFS', 'Apple_CoreStorage'): result += f'{vol.details["Content"].replace("Apple_", "")} container' report.append(std.color_string(result, 'BLUE')) continue # Unknown partitions if not vol.details['mountpoint']: result += 'Failed to mount' report.append(std.color_string(result, 'RED')) continue # Volumes result += f'{"Mounted on "+vol.details.get("mountpoint", "?"):<40}' size = vol.details.get('VolumeSize', -1) free = vol.details.get('FreeSpace', -1) used = size - free result = ( f'{result} ({vol.details.get("fstype", "Unknown FS")+",":<5} ' f'{std.bytes_to_string(used, decimals=1):>9} used, ' f'{std.bytes_to_string(size, decimals=1):>9} size, ' ) report.append(result) # Done return report def set_fans(mode): """Set fans to auto or max.""" if mode == 'auto': set_fans_auto() elif mode == 'max': set_fans_max() else: raise RuntimeError(f'Invalid fan mode: {mode}') def set_fans_auto(): """Set fans to auto.""" LOG.info('Setting fans to auto') cmd = ['sudo', 'smc', '-k', 'FS! ', '-w', '0000'] run_program(cmd) def set_fans_max(): """Set fans to their max speeds.""" LOG.info('Setting fans to max') num_fans = 0 # Get number of fans cmd = ['smc', '-k', 'FNum', '-r'] proc = run_program(cmd) num_fans = decode_smc_bytes(proc.stdout) LOG.info('Found %s fans', num_fans) # Set all fans to forced speed ## NOTE: mask is bit mask from right to left enabling fans ## e.g. bit 1 is fan 0, bit 2 is fan 1, etc ## So the mask for two fans is 0b11 or 0x3, four would be 0b111 or 0x7 mask = f'{hex(2**num_fans - 1)[2:]:0>4}' cmd = ['sudo', 'smc', '-k', 'FS! ', '-w', mask] run_program(cmd) # Set all fans to their max speed for fan in range(num_fans): cmd = ['smc', '-k', f'F{fan}Mx', '-r'] proc = run_program(cmd) max_temp = decode_smc_bytes(proc.stdout) LOG.info('Setting fan #%s to %s RPM', fan, str(max_temp >> 2)) cmd = ['sudo', 'smc', '-k', f'F{fan}Tg', '-w', hex(max_temp)[2:]] run_program(cmd) if __name__ == '__main__': print("This file is not meant to be called directly.")