diff --git a/scripts/mount-all-volumes b/scripts/mount-all-volumes index 89f3edb8..ff235579 100755 --- a/scripts/mount-all-volumes +++ b/scripts/mount-all-volumes @@ -15,7 +15,8 @@ def main(): # Mount volumes and get report wk.std.print_standard('Mounting volumes...') - report = wk.os.linux.mount_volumes() + wk.os.linux.mount_volumes() + report = wk.os.linux.build_volume_report() # Show results wk.std.print_info('Results') diff --git a/scripts/wk/os/linux.py b/scripts/wk/os/linux.py index df254e3f..e76f37aa 100644 --- a/scripts/wk/os/linux.py +++ b/scripts/wk/os/linux.py @@ -7,9 +7,8 @@ import pathlib import re import subprocess -from wk import std -from wk.exe import popen_program, run_program -from wk.hw.disk import Disk +from wk.std import bytes_to_string, color_string +from wk.exe import get_json_from_command, popen_program, run_program from wk.log import format_log_path @@ -19,6 +18,94 @@ UUID_CORESTORAGE = '53746f72-6167-11aa-aa11-00306543ecac' # Functions +def build_volume_report(device_path=None) -> str: + """Build volume report using lsblk, returns list. + + If device_path is provided the report is limited to that device. + """ + def _get_volumes(dev, indent=0) -> list: + """Convert lsblk JSON tree to a flat list of items, returns list.""" + dev['name'] = f'{" "*indent}{dev["name"].replace("/dev/mapper/", "")}' + volumes = [dev] + for child in dev.get('children', []): + volumes.extend(_get_volumes(child, indent=indent+1)) + return volumes + + json_data = None + report = [] + vol_data = [] + m_width = 40 + + # Get details from lsblk + cmd = [ + 'lsblk', '--bytes', '--json', + '--output=fsavail,fstype,fsused,mountpoint,name,size', + ] + if device_path: + cmd.append(device_path) + json_data = get_json_from_command(cmd) + + # Build list of volumes + for dev in json_data.get('blockdevices', []): + vol_data.extend(_get_volumes(dev)) + + # Set mountpoint width based on current data + ## NOTE: We add 11 for the 'Mounted on ' prefix and a 1-space buffer + m_width = min( + m_width, + max(map(len, [str(v['mountpoint']) for v in vol_data]))+11, + ) + + # Build report + for vol in vol_data: + percent_used = 0 + size_color = None + + # Set size color + if vol['fsused']: + percent_used = (int(vol['fsused']) / int(vol['size'])) * 100 + if percent_used >= 85: + size_color = 'RED' + elif percent_used >= 70: + size_color = 'YELLOW' + + # Clean data + for key, value in vol.items(): + if value is None: + value = "" + value = str(value) + if value.isnumeric(): + value = bytes_to_string(value, decimals=1) + vol[key] = value + if vol['mountpoint']: + vol['mountpoint'] = f'Mounted on {vol["mountpoint"]}' + + # Name and size + line = color_string( + [f'{vol["name"]:<20}', f'{vol["size"]:>9}'], + [None, 'CYAN'], + ) + + # Mountpoint and type + line = color_string( + [line, f'{vol["mountpoint"]:<{m_width}}', f'{vol["fstype"]:<11}'], + [None, None, 'BLUE'], + ) + + # Used and free + if any([vol['fsused'], vol['fsavail']]): + line = color_string( + [line, f'({vol["fsused"]:>9} used, {vol["fsavail"]:>9} free)'], + [None, size_color], + ) + + # Add line to report + report.append(line) + + # Done + return report + + def get_user_home(user): """Get path to user's home dir, returns pathlib.Path obj.""" home = None @@ -91,92 +178,50 @@ def mount(source, mount_point=None, read_write=False): def mount_volumes(device_path=None, read_write=False, scan_corestorage=False): # pylint: disable=too-many-branches - """Mount all detected volumes, returns list. + """Mount all detected volumes. NOTE: If device_path is specified then only volumes under that path will be mounted. """ - report = [] + json_data = {} volumes = [] containers = [] # Get list of volumes cmd = [ 'lsblk', - '--list', - '--noheadings', - '--output=name', + '--output=mountpoint,name,parttype', '--paths', + '--json', ] if device_path: cmd.append(device_path) - proc = run_program(cmd, check=False) - for line in sorted(proc.stdout.splitlines()): - volumes.append(Disk(line.strip())) + json_data = get_json_from_command(cmd) - # Get list of CoreStorage containers - containers = [ - vol for vol in volumes if vol.details.get('parttype', '') == UUID_CORESTORAGE - ] + # Build list of volumes + for dev in json_data.get('blockdevices', []): + volumes.append(dev) + for child in dev.get('children', []): + volumes.append(child) + if child['parttype'] == UUID_CORESTORAGE: + containers.append(child['name']) # Scan CoreStorage containers if scan_corestorage: - if containers: - std.print_warning( - f'Detected CoreStorage container{"s" if len(containers) > 1 else ""}', - ) - std.print_standard('Scanning for inner volume(s)...') for container in containers: + LOG.info( + 'Scanning CoreStorage container for inner volumes (%s)', container, + ) volumes.extend(scan_corestorage_container(container)) # Mount volumes for vol in volumes: - already_mounted = vol.details.get('mountpoint', '') - result = f'{vol.details["name"].replace("/dev/mapper/", ""):<20}' - - # Parent devices - if vol.details.get('children', False): - if vol.details.get('fstype', ''): - result += vol.details['fstype'] - if vol.details.get('label', ''): - result += f' "{vol.details["label"]}"' - report.append(std.color_string(result, 'BLUE')) - continue - - # Attempt to mount volume - if not already_mounted: + if not vol['mountpoint']: try: - mount(vol.path, read_write=read_write) + mount(vol['name'], read_write=read_write) except RuntimeError: - result += 'Failed to mount' - report.append(std.color_string(result, 'RED')) - continue - result += f'{"Mounted on "+str(vol.details.get("mountpoint", "?")):<40}' - - # Add size to result - vol.get_details() - vol.details['fsused'] = vol.details.get('fsused', -1) - vol.details['fsavail'] = vol.details.get('fsavail', -1) - if vol.details['fsused'] is None: - result = ( - f'{result} ({vol.details.get("fstype", "Unknown FS")+",":<5}' - f'{std.bytes_to_string(vol.details["size"], decimals=1):>9})' - ) - else: - result = ( - f'{result} ({vol.details.get("fstype", "Unknown FS")+",":<5} ' - f'{std.bytes_to_string(vol.details["fsused"], decimals=1):>9} used, ' - f'{std.bytes_to_string(vol.details["fsavail"], decimals=1):>9} free)' - ) - report.append( - std.color_string( - result, - 'YELLOW' if already_mounted else None, - ), - ) - - # Done - return report + # Couldn't mount the volume + pass def running_as_root(): @@ -186,9 +231,10 @@ def running_as_root(): def scan_corestorage_container(container, timeout=300): """Scan CoreStorage container for inner volumes, returns list.""" + container_path = pathlib.Path(container) detected_volumes = {} inner_volumes = [] - log_path = format_log_path(log_name=f'{container.path.name}_testdisk') + log_path = format_log_path(log_name=f'{container_path.name}_testdisk') # Run scan via TestDisk cmd = [ @@ -196,7 +242,7 @@ def scan_corestorage_container(container, timeout=300): '/logname', log_path, '/debug', '/log', - '/cmd', container.path, 'partition_none,analyze', + '/cmd', container_path, 'partition_none,analyze', ] proc = popen_program(cmd, pipe=True) try: @@ -212,7 +258,7 @@ def scan_corestorage_container(container, timeout=300): line = line.lower().strip() match = re.match(r'^.*echo "([^"]+)" . dmsetup create test(\d)$', line) if match: - cs_name = f'CoreStorage_{container.path.name}_{match.group(2)}' + cs_name = f'CoreStorage_{container_path.name}_{match.group(2)}' detected_volumes[cs_name] = match.group(1) # Create mapper device(s) if necessary @@ -224,7 +270,7 @@ def scan_corestorage_container(container, timeout=300): check=False, ) if proc.returncode == 0: - inner_volumes.append(Disk(f'/dev/mapper/{name}')) + inner_volumes.append({'name': f'/dev/mapper/{name}'}) # Done return inner_volumes