WizardKit/scripts/wk/os/linux.py

302 lines
7.7 KiB
Python

"""WizardKit: Linux Functions"""
# vim: sts=2 sw=2 ts=2
import logging
import os
import pathlib
import re
import subprocess
from wk.std import bytes_to_string, color_string
from wk.exe import get_json_from_command, popen_program, run_program
from wk.log import format_log_path
# STATIC VARIABLES
LOG = logging.getLogger(__name__)
UUID_CORESTORAGE = '53746f72-6167-11aa-aa11-00306543ecac'
# Functions
def build_volume_report(device_path=None) -> list:
"""Build volume report using lsblk, returns list.
If device_path is provided the report is limited to that device.
"""
def _get_volumes(dev, indent=0) -> list:
"""Convert lsblk JSON tree to a flat list of items, returns list."""
dev['name'] = f'{" "*indent}{dev["name"].replace("/dev/mapper/", "")}'
volumes = [dev]
for child in dev.get('children', []):
volumes.extend(_get_volumes(child, indent=indent+1))
return volumes
json_data = None
report = []
vol_data = []
m_width = 40
# Get details from lsblk
cmd = [
'lsblk', '--bytes', '--json',
'--output=fsavail,fstype,fsused,mountpoint,name,size',
]
if device_path:
cmd.append(device_path)
json_data = get_json_from_command(cmd)
# Build list of volumes
for dev in json_data.get('blockdevices', []):
vol_data.extend(_get_volumes(dev))
# Set mountpoint width based on current data
## NOTE: We add 11 for the 'Mounted on ' prefix and a 1-space buffer
m_width = min(
m_width,
max(map(len, [str(v['mountpoint']) for v in vol_data]))+11,
)
# Build report
for vol in vol_data:
percent_used = 0
size_color = None
# Set size color
if vol['fsused']:
percent_used = (int(vol['fsused']) / int(vol['size'])) * 100
if percent_used >= 85:
size_color = 'RED'
elif percent_used >= 70:
size_color = 'YELLOW'
# Clean data
for key, value in vol.items():
if value is None:
value = ""
value = str(value)
if value.isnumeric():
value = bytes_to_string(value, decimals=1)
vol[key] = value
if vol['mountpoint']:
vol['mountpoint'] = f'Mounted on {vol["mountpoint"]}'
# Name and size
line = color_string(
[f'{vol["name"]:<20}', f'{vol["size"]:>9}'],
[None, 'CYAN'],
)
# Mountpoint and type
line = color_string(
[line, f'{vol["mountpoint"]:<{m_width}}', f'{vol["fstype"]:<11}'],
[None, None, 'BLUE'],
)
# Used and free
if any([vol['fsused'], vol['fsavail']]):
line = color_string(
[line, f'({vol["fsused"]:>9} used, {vol["fsavail"]:>9} free)'],
[None, size_color],
)
# Add line to report
report.append(line)
# Done
return report
def get_user_home(user):
"""Get path to user's home dir, returns pathlib.Path obj."""
home = None
# Get path from user details
cmd = ['getent', 'passwd', user]
proc = run_program(cmd, check=False)
try:
home = proc.stdout.split(':')[5]
except IndexError:
# Try using environment variable
home = os.environ.get('HOME')
# Raise exception if necessary
if not home:
raise RuntimeError(f'Failed to find home for: {user}')
# Done
return pathlib.Path(home)
def get_user_name():
"""Get real user name, returns str."""
user = None
# Query environment
user = os.environ.get('SUDO_USER')
if not user:
user = os.environ.get('USER')
# Raise exception if necessary
if not user:
raise RuntimeError('Failed to determine user')
# Done
return user
def make_temp_file(suffix=None):
"""Make temporary file, returns pathlib.Path() obj."""
cmd = ['mktemp']
if suffix:
cmd.append(f'--suffix={suffix}')
proc = run_program(cmd)
return pathlib.Path(proc.stdout.strip())
def mount(source, mount_point=None, read_write=False):
"""Mount source (on mount_point if provided).
NOTE: If not running_as_root() then udevil will be used.
"""
cmd = [
'mount',
'-o', 'rw' if read_write else 'ro',
source,
]
if not running_as_root():
cmd.insert(0, 'udevil')
if mount_point:
mount_point = pathlib.Path(mount_point).resolve()
mount_point.mkdir(parents=True, exist_ok=True)
cmd.append(mount_point)
# Run mount command
proc = run_program(cmd, check=False)
if not proc.returncode == 0:
raise RuntimeError(f'Failed to mount: {source} on {mount_point}')
def mount_volumes(device_path=None, read_write=False, scan_corestorage=False):
# pylint: disable=too-many-branches
"""Mount all detected volumes.
NOTE: If device_path is specified then only volumes
under that path will be mounted.
"""
def _get_volumes(dev) -> list:
"""Convert lsblk JSON tree to a flat list of items, returns list."""
volumes = [dev]
for child in dev.get('children', []):
volumes.extend(_get_volumes(child))
return volumes
json_data = {}
volumes = []
containers = []
# Get list of volumes
cmd = [
'lsblk',
'--output=mountpoint,name,parttype',
'--paths',
'--json',
]
if device_path:
cmd.append(device_path)
json_data = get_json_from_command(cmd)
# Build list of volumes
for dev in _get_volumes(json_data.get('blockdevices', [])):
volumes.append(dev)
if dev.get('parttype', '') == UUID_CORESTORAGE:
containers.append(dev['name'])
# Scan CoreStorage containers
if scan_corestorage:
for container in containers:
LOG.info(
'Scanning CoreStorage container for inner volumes (%s)', container,
)
volumes.extend(scan_corestorage_container(container))
# Mount volumes
for vol in volumes:
if not vol['mountpoint']:
try:
mount(vol['name'], read_write=read_write)
except RuntimeError:
# Couldn't mount the volume
pass
def running_as_root():
"""Check if running with effective UID of 0, returns bool."""
return os.geteuid() == 0
def scan_corestorage_container(container, timeout=300):
"""Scan CoreStorage container for inner volumes, returns list."""
container_path = pathlib.Path(container)
detected_volumes = {}
inner_volumes = []
log_path = format_log_path(log_name=f'{container_path.name}_testdisk')
# Run scan via TestDisk
cmd = [
'sudo', 'testdisk',
'/logname', log_path,
'/debug',
'/log',
'/cmd', container_path, 'partition_none,analyze',
]
proc = popen_program(cmd, pipe=True)
try:
proc.wait(timeout=timeout)
except subprocess.TimeoutExpired:
# Failed to find any volumes, stop scan
run_program(['sudo', 'kill', str(proc.pid)], check=False)
# Check results
if proc.returncode == 0 and log_path.exists():
results = log_path.read_text(encoding='utf-8', errors='ignore')
for line in results.splitlines():
line = line.lower().strip()
match = re.match(r'^.*echo "([^"]+)" . dmsetup create test(\d)$', line)
if match:
cs_name = f'CoreStorage_{container_path.name}_{match.group(2)}'
detected_volumes[cs_name] = match.group(1)
# Create mapper device(s) if necessary
for name, cmd in detected_volumes.items():
cmd_file = make_temp_file()
cmd_file.write_text(cmd, encoding='utf-8')
proc = run_program(
cmd=['sudo', 'dmsetup', 'create', name, cmd_file],
check=False,
)
if proc.returncode == 0:
inner_volumes.append({'name': f'/dev/mapper/{name}'})
# Done
return inner_volumes
def unmount(source_or_mountpoint):
"""Unmount source_or_mountpoint.
NOTE: If not running_as_root() then udevil will be used.
"""
cmd = [
'umount',
source_or_mountpoint,
]
if not running_as_root():
cmd.insert(0, 'udevil')
# Run unmount command
proc = run_program(cmd, check=False)
if not proc.returncode == 0:
raise RuntimeError(f'Failed to unmount: {source_or_mountpoint}')
if __name__ == '__main__':
print("This file is not meant to be called directly.")