"""WizardKit: Linux Functions""" # vim: sts=2 sw=2 ts=2 import logging import os import pathlib import re import subprocess from wk.cfg.hw import VOLUME_FAILURE_THRESHOLD, VOLUME_WARNING_THRESHOLD from wk.exe import get_json_from_command, popen_program, run_program from wk.log import format_log_path from wk.std import bytes_to_string from wk.ui import ansi # STATIC VARIABLES LOG = logging.getLogger(__name__) UUID_CORESTORAGE = '53746f72-6167-11aa-aa11-00306543ecac' # Functions def build_volume_report(device_path=None) -> list[str]: """Build volume report using lsblk, returns list. If device_path is provided the report is limited to that device. """ def _get_volumes(dev, indent=0) -> list[dict]: """Convert lsblk JSON tree to a flat list of items, returns list.""" dev['name'] = f'{" "*indent}{dev["name"]}' volumes = [dev] for child in dev.get('children', []): volumes.extend(_get_volumes(child, indent=indent+1)) return volumes json_data = None report = [] vol_data = [] m_width = 40 # Get details from lsblk cmd = [ 'lsblk', '--bytes', '--json', '--output=fsavail,fstype,fsused,mountpoint,name,size', ] if device_path: cmd.append(device_path) json_data = get_json_from_command(cmd) # Build list of volumes for dev in json_data.get('blockdevices', []): vol_data.extend(_get_volumes(dev)) # Set mountpoint width based on current data ## NOTE: We add 11 for the 'Mounted on ' prefix and a 1-space buffer m_width = min( m_width, max(map(len, [str(v['mountpoint']) for v in vol_data]))+11, ) # Build report for vol in vol_data: percent_used = 0 size_color = None # Set size color if vol['fsused']: percent_used = (int(vol['fsused']) / int(vol['size'])) * 100 if percent_used >= VOLUME_FAILURE_THRESHOLD: size_color = 'RED' elif percent_used >= VOLUME_WARNING_THRESHOLD: size_color = 'YELLOW' # Clean data for key, value in vol.items(): if value is None: value = "" value = str(value) if value.isnumeric(): value = bytes_to_string(value, decimals=1) vol[key] = value if vol['mountpoint']: vol['mountpoint'] = f'Mounted on {vol["mountpoint"]}' # Name and size line = ansi.color_string( [f'{vol["name"]:<20}', f'{vol["size"]:>9}'], [None, 'CYAN'], ) # Mountpoint and type line = ansi.color_string( [line, f'{vol["mountpoint"]:<{m_width}}', f'{vol["fstype"]:<11}'], [None, None, 'BLUE'], ) # Used and free if any([vol['fsused'], vol['fsavail']]): line = ansi.color_string( [line, f'({vol["fsused"]:>9} used, {vol["fsavail"]:>9} free)'], [None, size_color], ) # Add line to report report.append(line) # Done return report def get_user_home(user) -> pathlib.Path: """Get path to user's home dir, returns pathlib.Path obj.""" home = None # Get path from user details cmd = ['getent', 'passwd', user] proc = run_program(cmd, check=False) try: home = proc.stdout.split(':')[5] except IndexError: # Try using environment variable home = os.environ.get('HOME') # Raise exception if necessary if not home: raise RuntimeError(f'Failed to find home for: {user}') # Done return pathlib.Path(home) def get_user_name() -> str: """Get real user name, returns str.""" user = None # Query environment user = os.environ.get('SUDO_USER') if not user: user = os.environ.get('USER') # Raise exception if necessary if not user: raise RuntimeError('Failed to determine user') # Done return user def make_temp_file(suffix=None) -> pathlib.Path: """Make temporary file, returns pathlib.Path() obj.""" cmd = ['mktemp'] if suffix: cmd.append(f'--suffix={suffix}') proc = run_program(cmd) return pathlib.Path(proc.stdout.strip()) def mount(source, mount_point=None, read_write=False) -> None: """Mount source (on mount_point if provided). NOTE: If not running_as_root() then udevil will be used. """ cmd = [ 'mount', '-o', 'rw' if read_write else 'ro', source, ] if not running_as_root(): cmd.insert(0, 'udevil') if mount_point: mount_point = pathlib.Path(mount_point).resolve() mount_point.mkdir(parents=True, exist_ok=True) cmd.append(mount_point) # Run mount command proc = run_program(cmd, check=False) if not proc.returncode == 0: raise RuntimeError(f'Failed to mount: {source} on {mount_point}') def mount_volumes(device_path=None, read_write=False, scan_corestorage=False) -> None: """Mount all detected volumes. NOTE: If device_path is specified then only volumes under that path will be mounted. """ def _get_volumes(dev) -> list[dict]: """Convert lsblk JSON tree to a flat list of items, returns list.""" volumes = [dev] for child in dev.get('children', []): volumes.extend(_get_volumes(child)) return volumes json_data = {} volumes = [] containers = [] # Get list of volumes cmd = [ 'lsblk', '--output=mountpoint,name,parttype', '--paths', '--json', ] if device_path: cmd.append(device_path) json_data = get_json_from_command(cmd) # Bail if json_data is empty if not json_data: return # Build list of volumes for dev in json_data.get('blockdevices', [{}]): volumes.extend(_get_volumes(dev)) if dev.get('parttype', '') == UUID_CORESTORAGE: containers.append(dev['name']) # Scan CoreStorage containers if scan_corestorage: for container in containers: LOG.info( 'Scanning CoreStorage container for inner volumes (%s)', container, ) volumes.extend(scan_corestorage_container(container)) # Mount volumes for vol in volumes: if not vol['mountpoint']: try: mount(vol['name'], read_write=read_write) except RuntimeError: # Couldn't mount the volume pass def running_as_root() -> bool: """Check if running with effective UID of 0, returns bool.""" return os.geteuid() == 0 def scan_corestorage_container(container, timeout=300) -> list[dict]: """Scan CoreStorage container for inner volumes, returns list.""" container_path = pathlib.Path(container) detected_volumes = {} inner_volumes = [] log_path = format_log_path(log_name=f'{container_path.name}_testdisk') # Run scan via TestDisk cmd = [ 'sudo', 'testdisk', '/logname', log_path, '/debug', '/log', '/cmd', container_path, 'partition_none,analyze', ] proc = popen_program(cmd, pipe=True) try: proc.wait(timeout=timeout) except subprocess.TimeoutExpired: # Failed to find any volumes, stop scan run_program(['sudo', 'kill', str(proc.pid)], check=False) # Check results if proc.returncode == 0 and log_path.exists(): results = log_path.read_text(encoding='utf-8', errors='ignore') for line in results.splitlines(): line = line.lower().strip() match = re.match(r'^.*echo "([^"]+)" . dmsetup create test(\d)$', line) if match: cs_name = f'CoreStorage_{container_path.name}_{match.group(2)}' detected_volumes[cs_name] = match.group(1) # Create mapper device(s) if necessary for name, cmd in detected_volumes.items(): cmd_file = make_temp_file() cmd_file.write_text(cmd, encoding='utf-8') proc = run_program( cmd=['sudo', 'dmsetup', 'create', name, cmd_file], check=False, ) if proc.returncode == 0: inner_volumes.append({'name': f'/dev/mapper/{name}'}) # Done return inner_volumes def unmount(source_or_mountpoint) -> None: """Unmount source_or_mountpoint. NOTE: If not running_as_root() then udevil will be used. """ cmd = [ 'umount', source_or_mountpoint, ] if not running_as_root(): cmd.insert(0, 'udevil') # Run unmount command proc = run_program(cmd, check=False) if not proc.returncode == 0: raise RuntimeError(f'Failed to unmount: {source_or_mountpoint}') if __name__ == '__main__': print("This file is not meant to be called directly.")