298 lines
7.5 KiB
Python
298 lines
7.5 KiB
Python
"""WizardKit: Linux Functions"""
|
|
# vim: sts=2 sw=2 ts=2
|
|
|
|
import logging
|
|
import os
|
|
import pathlib
|
|
import re
|
|
import subprocess
|
|
|
|
from wk.std import bytes_to_string, color_string
|
|
from wk.exe import get_json_from_command, popen_program, run_program
|
|
from wk.log import format_log_path
|
|
|
|
|
|
# STATIC VARIABLES
|
|
LOG = logging.getLogger(__name__)
|
|
UUID_CORESTORAGE = '53746f72-6167-11aa-aa11-00306543ecac'
|
|
|
|
|
|
# Functions
|
|
def build_volume_report(device_path=None) -> list:
|
|
"""Build volume report using lsblk, returns list.
|
|
|
|
If device_path is provided the report is limited to that device.
|
|
"""
|
|
def _get_volumes(dev, indent=0) -> list:
|
|
"""Convert lsblk JSON tree to a flat list of items, returns list."""
|
|
dev['name'] = f'{" "*indent}{dev["name"].replace("/dev/mapper/", "")}'
|
|
volumes = [dev]
|
|
for child in dev.get('children', []):
|
|
volumes.extend(_get_volumes(child, indent=indent+1))
|
|
return volumes
|
|
|
|
json_data = None
|
|
report = []
|
|
vol_data = []
|
|
m_width = 40
|
|
|
|
# Get details from lsblk
|
|
cmd = [
|
|
'lsblk', '--bytes', '--json',
|
|
'--output=fsavail,fstype,fsused,mountpoint,name,size',
|
|
]
|
|
if device_path:
|
|
cmd.append(device_path)
|
|
json_data = get_json_from_command(cmd)
|
|
|
|
# Build list of volumes
|
|
for dev in json_data.get('blockdevices', []):
|
|
vol_data.extend(_get_volumes(dev))
|
|
|
|
# Set mountpoint width based on current data
|
|
## NOTE: We add 11 for the 'Mounted on ' prefix and a 1-space buffer
|
|
m_width = min(
|
|
m_width,
|
|
max(map(len, [str(v['mountpoint']) for v in vol_data]))+11,
|
|
)
|
|
|
|
# Build report
|
|
for vol in vol_data:
|
|
percent_used = 0
|
|
size_color = None
|
|
|
|
# Set size color
|
|
if vol['fsused']:
|
|
percent_used = (int(vol['fsused']) / int(vol['size'])) * 100
|
|
if percent_used >= 85:
|
|
size_color = 'RED'
|
|
elif percent_used >= 70:
|
|
size_color = 'YELLOW'
|
|
|
|
# Clean data
|
|
for key, value in vol.items():
|
|
if value is None:
|
|
value = ""
|
|
value = str(value)
|
|
if value.isnumeric():
|
|
value = bytes_to_string(value, decimals=1)
|
|
vol[key] = value
|
|
if vol['mountpoint']:
|
|
vol['mountpoint'] = f'Mounted on {vol["mountpoint"]}'
|
|
|
|
# Name and size
|
|
line = color_string(
|
|
[f'{vol["name"]:<20}', f'{vol["size"]:>9}'],
|
|
[None, 'CYAN'],
|
|
)
|
|
|
|
# Mountpoint and type
|
|
line = color_string(
|
|
[line, f'{vol["mountpoint"]:<{m_width}}', f'{vol["fstype"]:<11}'],
|
|
[None, None, 'BLUE'],
|
|
)
|
|
|
|
# Used and free
|
|
if any([vol['fsused'], vol['fsavail']]):
|
|
line = color_string(
|
|
[line, f'({vol["fsused"]:>9} used, {vol["fsavail"]:>9} free)'],
|
|
[None, size_color],
|
|
)
|
|
|
|
# Add line to report
|
|
report.append(line)
|
|
|
|
# Done
|
|
return report
|
|
|
|
|
|
def get_user_home(user):
|
|
"""Get path to user's home dir, returns pathlib.Path obj."""
|
|
home = None
|
|
|
|
# Get path from user details
|
|
cmd = ['getent', 'passwd', user]
|
|
proc = run_program(cmd, check=False)
|
|
try:
|
|
home = proc.stdout.split(':')[5]
|
|
except IndexError:
|
|
# Try using environment variable
|
|
home = os.environ.get('HOME')
|
|
|
|
# Raise exception if necessary
|
|
if not home:
|
|
raise RuntimeError(f'Failed to find home for: {user}')
|
|
|
|
# Done
|
|
return pathlib.Path(home)
|
|
|
|
|
|
def get_user_name():
|
|
"""Get real user name, returns str."""
|
|
user = None
|
|
|
|
# Query environment
|
|
user = os.environ.get('SUDO_USER')
|
|
if not user:
|
|
user = os.environ.get('USER')
|
|
|
|
# Raise exception if necessary
|
|
if not user:
|
|
raise RuntimeError('Failed to determine user')
|
|
|
|
# Done
|
|
return user
|
|
|
|
|
|
def make_temp_file(suffix=None):
|
|
"""Make temporary file, returns pathlib.Path() obj."""
|
|
cmd = ['mktemp']
|
|
if suffix:
|
|
cmd.append(f'--suffix={suffix}')
|
|
proc = run_program(cmd)
|
|
return pathlib.Path(proc.stdout.strip())
|
|
|
|
|
|
def mount(source, mount_point=None, read_write=False):
|
|
"""Mount source (on mount_point if provided).
|
|
|
|
NOTE: If not running_as_root() then udevil will be used.
|
|
"""
|
|
cmd = [
|
|
'mount',
|
|
'-o', 'rw' if read_write else 'ro',
|
|
source,
|
|
]
|
|
if not running_as_root():
|
|
cmd.insert(0, 'udevil')
|
|
if mount_point:
|
|
mount_point = pathlib.Path(mount_point).resolve()
|
|
mount_point.mkdir(parents=True, exist_ok=True)
|
|
cmd.append(mount_point)
|
|
|
|
# Run mount command
|
|
proc = run_program(cmd, check=False)
|
|
if not proc.returncode == 0:
|
|
raise RuntimeError(f'Failed to mount: {source} on {mount_point}')
|
|
|
|
|
|
def mount_volumes(device_path=None, read_write=False, scan_corestorage=False):
|
|
# pylint: disable=too-many-branches
|
|
"""Mount all detected volumes.
|
|
|
|
NOTE: If device_path is specified then only volumes
|
|
under that path will be mounted.
|
|
"""
|
|
json_data = {}
|
|
volumes = []
|
|
containers = []
|
|
|
|
# Get list of volumes
|
|
cmd = [
|
|
'lsblk',
|
|
'--output=mountpoint,name,parttype',
|
|
'--paths',
|
|
'--json',
|
|
]
|
|
if device_path:
|
|
cmd.append(device_path)
|
|
json_data = get_json_from_command(cmd)
|
|
|
|
# Build list of volumes
|
|
for dev in json_data.get('blockdevices', []):
|
|
volumes.append(dev)
|
|
for child in dev.get('children', []):
|
|
volumes.append(child)
|
|
if child['parttype'] == UUID_CORESTORAGE:
|
|
containers.append(child['name'])
|
|
|
|
# Scan CoreStorage containers
|
|
if scan_corestorage:
|
|
for container in containers:
|
|
LOG.info(
|
|
'Scanning CoreStorage container for inner volumes (%s)', container,
|
|
)
|
|
volumes.extend(scan_corestorage_container(container))
|
|
|
|
# Mount volumes
|
|
for vol in volumes:
|
|
if not vol['mountpoint']:
|
|
try:
|
|
mount(vol['name'], read_write=read_write)
|
|
except RuntimeError:
|
|
# Couldn't mount the volume
|
|
pass
|
|
|
|
|
|
def running_as_root():
|
|
"""Check if running with effective UID of 0, returns bool."""
|
|
return os.geteuid() == 0
|
|
|
|
|
|
def scan_corestorage_container(container, timeout=300):
|
|
"""Scan CoreStorage container for inner volumes, returns list."""
|
|
container_path = pathlib.Path(container)
|
|
detected_volumes = {}
|
|
inner_volumes = []
|
|
log_path = format_log_path(log_name=f'{container_path.name}_testdisk')
|
|
|
|
# Run scan via TestDisk
|
|
cmd = [
|
|
'sudo', 'testdisk',
|
|
'/logname', log_path,
|
|
'/debug',
|
|
'/log',
|
|
'/cmd', container_path, 'partition_none,analyze',
|
|
]
|
|
proc = popen_program(cmd, pipe=True)
|
|
try:
|
|
proc.wait(timeout=timeout)
|
|
except subprocess.TimeoutExpired:
|
|
# Failed to find any volumes, stop scan
|
|
run_program(['sudo', 'kill', str(proc.pid)], check=False)
|
|
|
|
# Check results
|
|
if proc.returncode == 0 and log_path.exists():
|
|
results = log_path.read_text(encoding='utf-8', errors='ignore')
|
|
for line in results.splitlines():
|
|
line = line.lower().strip()
|
|
match = re.match(r'^.*echo "([^"]+)" . dmsetup create test(\d)$', line)
|
|
if match:
|
|
cs_name = f'CoreStorage_{container_path.name}_{match.group(2)}'
|
|
detected_volumes[cs_name] = match.group(1)
|
|
|
|
# Create mapper device(s) if necessary
|
|
for name, cmd in detected_volumes.items():
|
|
cmd_file = make_temp_file()
|
|
cmd_file.write_text(cmd, encoding='utf-8')
|
|
proc = run_program(
|
|
cmd=['sudo', 'dmsetup', 'create', name, cmd_file],
|
|
check=False,
|
|
)
|
|
if proc.returncode == 0:
|
|
inner_volumes.append({'name': f'/dev/mapper/{name}'})
|
|
|
|
# Done
|
|
return inner_volumes
|
|
|
|
|
|
def unmount(source_or_mountpoint):
|
|
"""Unmount source_or_mountpoint.
|
|
|
|
NOTE: If not running_as_root() then udevil will be used.
|
|
"""
|
|
cmd = [
|
|
'umount',
|
|
source_or_mountpoint,
|
|
]
|
|
if not running_as_root():
|
|
cmd.insert(0, 'udevil')
|
|
|
|
# Run unmount command
|
|
proc = run_program(cmd, check=False)
|
|
if not proc.returncode == 0:
|
|
raise RuntimeError(f'Failed to unmount: {source_or_mountpoint}')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
print("This file is not meant to be called directly.")
|