Update Linux mount volume sections

This commit is contained in:
2Shirt 2022-05-18 14:59:14 -07:00
parent 4d34e868b2
commit 068f1773aa
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C
2 changed files with 116 additions and 69 deletions

View file

@ -15,7 +15,8 @@ def main():
# Mount volumes and get report
wk.std.print_standard('Mounting volumes...')
report = wk.os.linux.mount_volumes()
wk.os.linux.mount_volumes()
report = wk.os.linux.build_volume_report()
# Show results
wk.std.print_info('Results')

View file

@ -7,9 +7,8 @@ import pathlib
import re
import subprocess
from wk import std
from wk.exe import popen_program, run_program
from wk.hw.disk import Disk
from wk.std import bytes_to_string, color_string
from wk.exe import get_json_from_command, popen_program, run_program
from wk.log import format_log_path
@ -19,6 +18,94 @@ UUID_CORESTORAGE = '53746f72-6167-11aa-aa11-00306543ecac'
# Functions
def build_volume_report(device_path=None) -> str:
"""Build volume report using lsblk, returns list.
If device_path is provided the report is limited to that device.
"""
def _get_volumes(dev, indent=0) -> list:
"""Convert lsblk JSON tree to a flat list of items, returns list."""
dev['name'] = f'{" "*indent}{dev["name"].replace("/dev/mapper/", "")}'
volumes = [dev]
for child in dev.get('children', []):
volumes.extend(_get_volumes(child, indent=indent+1))
return volumes
json_data = None
report = []
vol_data = []
m_width = 40
# Get details from lsblk
cmd = [
'lsblk', '--bytes', '--json',
'--output=fsavail,fstype,fsused,mountpoint,name,size',
]
if device_path:
cmd.append(device_path)
json_data = get_json_from_command(cmd)
# Build list of volumes
for dev in json_data.get('blockdevices', []):
vol_data.extend(_get_volumes(dev))
# Set mountpoint width based on current data
## NOTE: We add 11 for the 'Mounted on ' prefix and a 1-space buffer
m_width = min(
m_width,
max(map(len, [str(v['mountpoint']) for v in vol_data]))+11,
)
# Build report
for vol in vol_data:
percent_used = 0
size_color = None
# Set size color
if vol['fsused']:
percent_used = (int(vol['fsused']) / int(vol['size'])) * 100
if percent_used >= 85:
size_color = 'RED'
elif percent_used >= 70:
size_color = 'YELLOW'
# Clean data
for key, value in vol.items():
if value is None:
value = ""
value = str(value)
if value.isnumeric():
value = bytes_to_string(value, decimals=1)
vol[key] = value
if vol['mountpoint']:
vol['mountpoint'] = f'Mounted on {vol["mountpoint"]}'
# Name and size
line = color_string(
[f'{vol["name"]:<20}', f'{vol["size"]:>9}'],
[None, 'CYAN'],
)
# Mountpoint and type
line = color_string(
[line, f'{vol["mountpoint"]:<{m_width}}', f'{vol["fstype"]:<11}'],
[None, None, 'BLUE'],
)
# Used and free
if any([vol['fsused'], vol['fsavail']]):
line = color_string(
[line, f'({vol["fsused"]:>9} used, {vol["fsavail"]:>9} free)'],
[None, size_color],
)
# Add line to report
report.append(line)
# Done
return report
def get_user_home(user):
"""Get path to user's home dir, returns pathlib.Path obj."""
home = None
@ -91,92 +178,50 @@ def mount(source, mount_point=None, read_write=False):
def mount_volumes(device_path=None, read_write=False, scan_corestorage=False):
# pylint: disable=too-many-branches
"""Mount all detected volumes, returns list.
"""Mount all detected volumes.
NOTE: If device_path is specified then only volumes
under that path will be mounted.
"""
report = []
json_data = {}
volumes = []
containers = []
# Get list of volumes
cmd = [
'lsblk',
'--list',
'--noheadings',
'--output=name',
'--output=mountpoint,name,parttype',
'--paths',
'--json',
]
if device_path:
cmd.append(device_path)
proc = run_program(cmd, check=False)
for line in sorted(proc.stdout.splitlines()):
volumes.append(Disk(line.strip()))
json_data = get_json_from_command(cmd)
# Get list of CoreStorage containers
containers = [
vol for vol in volumes if vol.details.get('parttype', '') == UUID_CORESTORAGE
]
# Build list of volumes
for dev in json_data.get('blockdevices', []):
volumes.append(dev)
for child in dev.get('children', []):
volumes.append(child)
if child['parttype'] == UUID_CORESTORAGE:
containers.append(child['name'])
# Scan CoreStorage containers
if scan_corestorage:
if containers:
std.print_warning(
f'Detected CoreStorage container{"s" if len(containers) > 1 else ""}',
)
std.print_standard('Scanning for inner volume(s)...')
for container in containers:
LOG.info(
'Scanning CoreStorage container for inner volumes (%s)', container,
)
volumes.extend(scan_corestorage_container(container))
# Mount volumes
for vol in volumes:
already_mounted = vol.details.get('mountpoint', '')
result = f'{vol.details["name"].replace("/dev/mapper/", ""):<20}'
# Parent devices
if vol.details.get('children', False):
if vol.details.get('fstype', ''):
result += vol.details['fstype']
if vol.details.get('label', ''):
result += f' "{vol.details["label"]}"'
report.append(std.color_string(result, 'BLUE'))
continue
# Attempt to mount volume
if not already_mounted:
if not vol['mountpoint']:
try:
mount(vol.path, read_write=read_write)
mount(vol['name'], read_write=read_write)
except RuntimeError:
result += 'Failed to mount'
report.append(std.color_string(result, 'RED'))
continue
result += f'{"Mounted on "+str(vol.details.get("mountpoint", "?")):<40}'
# Add size to result
vol.get_details()
vol.details['fsused'] = vol.details.get('fsused', -1)
vol.details['fsavail'] = vol.details.get('fsavail', -1)
if vol.details['fsused'] is None:
result = (
f'{result} ({vol.details.get("fstype", "Unknown FS")+",":<5}'
f'{std.bytes_to_string(vol.details["size"], decimals=1):>9})'
)
else:
result = (
f'{result} ({vol.details.get("fstype", "Unknown FS")+",":<5} '
f'{std.bytes_to_string(vol.details["fsused"], decimals=1):>9} used, '
f'{std.bytes_to_string(vol.details["fsavail"], decimals=1):>9} free)'
)
report.append(
std.color_string(
result,
'YELLOW' if already_mounted else None,
),
)
# Done
return report
# Couldn't mount the volume
pass
def running_as_root():
@ -186,9 +231,10 @@ def running_as_root():
def scan_corestorage_container(container, timeout=300):
"""Scan CoreStorage container for inner volumes, returns list."""
container_path = pathlib.Path(container)
detected_volumes = {}
inner_volumes = []
log_path = format_log_path(log_name=f'{container.path.name}_testdisk')
log_path = format_log_path(log_name=f'{container_path.name}_testdisk')
# Run scan via TestDisk
cmd = [
@ -196,7 +242,7 @@ def scan_corestorage_container(container, timeout=300):
'/logname', log_path,
'/debug',
'/log',
'/cmd', container.path, 'partition_none,analyze',
'/cmd', container_path, 'partition_none,analyze',
]
proc = popen_program(cmd, pipe=True)
try:
@ -212,7 +258,7 @@ def scan_corestorage_container(container, timeout=300):
line = line.lower().strip()
match = re.match(r'^.*echo "([^"]+)" . dmsetup create test(\d)$', line)
if match:
cs_name = f'CoreStorage_{container.path.name}_{match.group(2)}'
cs_name = f'CoreStorage_{container_path.name}_{match.group(2)}'
detected_volumes[cs_name] = match.group(1)
# Create mapper device(s) if necessary
@ -224,7 +270,7 @@ def scan_corestorage_container(container, timeout=300):
check=False,
)
if proc.returncode == 0:
inner_volumes.append(Disk(f'/dev/mapper/{name}'))
inner_volumes.append({'name': f'/dev/mapper/{name}'})
# Done
return inner_volumes