Added CoreStorage scanning logic

* Still needs tested
This commit is contained in:
2Shirt 2020-01-04 21:35:42 -07:00
parent b75326aeee
commit 7bf03749ec
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C
2 changed files with 49 additions and 6 deletions

View file

@ -14,11 +14,10 @@ def main():
# Mount volumes and get report
wk.std.print_standard('Mounting volumes...')
report = wk.os.linux.mount_volumes()
report = [f' {line}' for line in report]
# Show results
wk.std.print_info('Results')
wk.std.print_report(report)
wk.std.print_report(report, indent=2)
if __name__ == '__main__':

View file

@ -2,9 +2,12 @@
# vim: sts=2 sw=2 ts=2
import logging
import pathlib
import re
import subprocess
from wk import std
from wk.exe import run_program
from wk.exe import popen_program, run_program
from wk.hw.obj import Disk
@ -14,6 +17,12 @@ UUID_CORESTORAGE = '53746f72-6167-11aa-aa11-00306543ecac'
# Functions
def make_temp_file():
"""Make temporary file, returns pathlib.Path() obj."""
proc = run_program(['mktemp'], check=False)
return pathlib.Path(proc.stdout.strip())
def mount_volumes(device_path=None, read_write=False, scan_corestorage=False):
"""Mount all detected volumes, returns list.
@ -104,11 +113,46 @@ def mount_volumes(device_path=None, read_write=False, scan_corestorage=False):
def scan_corestorage_container(container, timeout=300):
"""Scan CoreStorage container for inner volumes, returns list."""
# TODO: Test Scanning CoreStorage containers
detected_volumes = {}
inner_volumes = []
log_path = make_temp_file()
#TODO: Add testdisk logic to scan CoreStorage
if container or timeout:
pass
# Run scan via TestDisk
cmd = [
'sudo', 'testdisk',
'/logname', log_path,
'/debug',
'/log',
'/cmd', container.path, 'partition_none,analyze',
]
proc = popen_program(cmd)
try:
proc.wait(timeout=timeout)
except subprocess.TimeoutExpired:
# Failed to find any volumes, stop scan
run_program(['sudo', 'kill', proc.pid], check=False)
# Check results
if proc.returncode == 0 and log_path.exists():
results = log_path.read_text(encoding='utf-8', errors='ignore')
for line in results.splitlines():
line = line.lower().strip()
match = re.match(r'^.*echo "([^"]+)" . dmsetup create test(\d)$', line)
if match:
cs_name = f'CoreStorage_{container.path.name}_{match.group(2)}'
detected_volumes[cs_name] = match.group(1)
# Create mapper device(s) if necessary
for name, cmd in detected_volumes.items():
cmd_file = make_temp_file()
cmd_file.write_text(cmd)
proc = run_program(
cmd=['sudo', 'dmsetup', 'create', name, cmd_file],
check=False,
)
if proc.returncode == 0:
inner_volumes.append(Disk(f'/dev/mapper/{name}'))
# Done
return inner_volumes