Split wk/clone/ddrescue.py to match upstream
This commit is contained in:
parent
945338ca01
commit
f85940955a
5 changed files with 1913 additions and 1823 deletions
|
|
@ -1,4 +1,7 @@
|
|||
"""WizardKit: ddrescue-tui module init"""
|
||||
|
||||
from . import block_pair
|
||||
from . import ddrescue
|
||||
from . import image
|
||||
from . import menus
|
||||
from . import state
|
||||
|
|
|
|||
587
scripts/wk/clone/block_pair.py
Normal file
587
scripts/wk/clone/block_pair.py
Normal file
|
|
@ -0,0 +1,587 @@
|
|||
"""WizardKit: ddrescue TUI - Block Pairs"""
|
||||
# vim: sts=2 sw=2 ts=2
|
||||
|
||||
import logging
|
||||
import math
|
||||
import os
|
||||
import pathlib
|
||||
import plistlib
|
||||
import re
|
||||
import subprocess
|
||||
|
||||
from wk import cfg, exe, std
|
||||
from wk.clone import menus
|
||||
from wk.hw import disk as hw_disk
|
||||
from wk.ui import ansi, cli
|
||||
|
||||
|
||||
# STATIC VARIABLES
|
||||
LOG = logging.getLogger(__name__)
|
||||
DDRESCUE_LOG_REGEX = re.compile(
|
||||
r'^\s*(?P<key>\S+):\s+'
|
||||
r'(?P<size>\d+)\s+'
|
||||
r'(?P<unit>[PTGMKB]i?B?)'
|
||||
r'.*\(\s*(?P<percent>\d+\.?\d*)%\)$',
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
# Classes
|
||||
class BlockPair():
|
||||
"""Object for tracking source to dest recovery data."""
|
||||
def __init__(
|
||||
self,
|
||||
source_dev: hw_disk.Disk,
|
||||
destination: pathlib.Path,
|
||||
working_dir: pathlib.Path,
|
||||
):
|
||||
self.sector_size: int = source_dev.phy_sec
|
||||
self.source: pathlib.Path = pathlib.Path(source_dev.path)
|
||||
self.destination: pathlib.Path = destination
|
||||
self.map_data: dict[str, bool | int] = {}
|
||||
self.map_path: pathlib.Path = pathlib.Path()
|
||||
self.size: int = source_dev.size
|
||||
self.stats = {}
|
||||
self.status: dict[str, float | int | str] = {
|
||||
'read-skip': 'Pending',
|
||||
'read-full': 'Pending',
|
||||
'trim': 'Pending',
|
||||
'scrape': 'Pending',
|
||||
}
|
||||
self.test_map: pathlib.Path | None = None
|
||||
self.view_map: bool = 'DISPLAY' in os.environ or 'WAYLAND_DISPLAY' in os.environ
|
||||
self.view_proc: subprocess.Popen | None = None
|
||||
|
||||
# Set map path
|
||||
# e.g. '(Clone|Image)_Model[_p#]_Size[_Label].map'
|
||||
map_name = source_dev.model
|
||||
if source_dev.bus == 'Image':
|
||||
map_name = 'Image'
|
||||
if source_dev.parent:
|
||||
part_num = re.sub(r"^.*?(\d+)$", r"\1", self.source.name)
|
||||
map_name += f'_p{part_num}'
|
||||
size_str = std.bytes_to_string(
|
||||
size=self.size,
|
||||
use_binary=False,
|
||||
)
|
||||
map_name += f'_{size_str.replace(" ", "")}'
|
||||
if source_dev.raw_details.get('label', ''):
|
||||
map_name += f'_{source_dev.raw_details["label"]}'
|
||||
map_name = map_name.replace(' ', '_')
|
||||
map_name = map_name.replace('/', '_')
|
||||
map_name = map_name.replace('\\', '_')
|
||||
if destination.is_dir():
|
||||
# Imaging
|
||||
self.map_path = pathlib.Path(f'{destination}/Image_{map_name}.map')
|
||||
self.destination = self.map_path.with_suffix('.dd')
|
||||
self.destination.touch()
|
||||
else:
|
||||
# Cloning
|
||||
self.map_path = pathlib.Path(f'{working_dir}/Clone_{map_name}.map')
|
||||
|
||||
# Create map file if needed
|
||||
# NOTE: We need to set the domain size for --complete-only to work
|
||||
if not self.map_path.exists():
|
||||
self.map_path.write_text(
|
||||
data=cfg.ddrescue.DDRESCUE_MAP_TEMPLATE.format(
|
||||
name=cfg.main.KIT_NAME_FULL,
|
||||
size=self.size,
|
||||
),
|
||||
encoding='utf-8',
|
||||
)
|
||||
|
||||
# Set initial status
|
||||
self.set_initial_status()
|
||||
|
||||
def __getstate__(self):
|
||||
"""Override to allow pickling ddrescue.State() objects."""
|
||||
bp_state = self.__dict__.copy()
|
||||
del bp_state['view_proc']
|
||||
return bp_state
|
||||
|
||||
def get_error_size(self) -> int:
|
||||
"""Get error size in bytes, returns int."""
|
||||
return self.size - self.get_rescued_size()
|
||||
|
||||
def get_percent_recovered(self) -> float:
|
||||
"""Get percent rescued from map_data, returns float."""
|
||||
return 100 * self.map_data.get('rescued', 0) / self.size
|
||||
|
||||
def get_rescued_size(self) -> int:
|
||||
"""Get rescued size using map data.
|
||||
|
||||
NOTE: Returns 0 if no map data is available.
|
||||
"""
|
||||
self.load_map_data()
|
||||
return self.map_data.get('rescued', 0)
|
||||
|
||||
def load_map_data(self) -> None:
|
||||
"""Load map data from file.
|
||||
|
||||
NOTE: If the file is missing it is assumed that recovery hasn't
|
||||
started yet so default values will be returned instead.
|
||||
"""
|
||||
data: dict[str, bool | int] = {'full recovery': False, 'pass completed': False}
|
||||
|
||||
# Get output from ddrescuelog
|
||||
cmd = [
|
||||
'ddrescuelog',
|
||||
'--binary-prefixes',
|
||||
'--show-status',
|
||||
f'--size={self.size}',
|
||||
self.map_path,
|
||||
]
|
||||
proc = exe.run_program(cmd, check=False)
|
||||
|
||||
# Parse output
|
||||
for line in proc.stdout.splitlines():
|
||||
_r = DDRESCUE_LOG_REGEX.search(line)
|
||||
if _r:
|
||||
if _r.group('key') == 'rescued' and _r.group('percent') == '100':
|
||||
# Fix rounding errors from ddrescuelog output
|
||||
data['rescued'] = self.size
|
||||
else:
|
||||
data[_r.group('key')] = std.string_to_bytes(
|
||||
f'{_r.group("size")} {_r.group("unit")}',
|
||||
)
|
||||
data['pass completed'] = 'current status: finished' in line.lower()
|
||||
|
||||
# Check if 100% done (only if map is present and non-zero size
|
||||
# NOTE: ddrescuelog returns 0 (i.e. 100% done) for empty files
|
||||
if self.map_path.exists() and self.map_path.stat().st_size != 0:
|
||||
cmd = [
|
||||
'ddrescuelog',
|
||||
'--done-status',
|
||||
f'--size={self.size}',
|
||||
self.map_path,
|
||||
]
|
||||
proc = exe.run_program(cmd, check=False)
|
||||
data['full recovery'] = proc.returncode == 0
|
||||
|
||||
# Done
|
||||
self.map_data.update(data)
|
||||
|
||||
def pass_complete(self, pass_name) -> bool:
|
||||
"""Check if pass_name is complete based on map data, returns bool."""
|
||||
pending_size = self.map_data['non-tried']
|
||||
|
||||
# Full recovery
|
||||
if self.map_data.get('full recovery', False):
|
||||
return True
|
||||
|
||||
# New recovery
|
||||
if 'non-tried' not in self.map_data:
|
||||
return False
|
||||
|
||||
# Initial read skip pass
|
||||
if pass_name == 'read-skip':
|
||||
pass_threshold = cfg.ddrescue.AUTO_PASS_THRESHOLDS[pass_name]
|
||||
if self.get_percent_recovered() >= pass_threshold:
|
||||
return True
|
||||
|
||||
# Recovery in progress
|
||||
if pass_name in ('trim', 'scrape'):
|
||||
pending_size += self.map_data['non-trimmed']
|
||||
if pass_name == 'scrape':
|
||||
pending_size += self.map_data['non-scraped']
|
||||
if pending_size == 0:
|
||||
# This is true when the previous and current passes are complete
|
||||
return True
|
||||
|
||||
# This should never be reached
|
||||
return False
|
||||
|
||||
def reset_progress(self):
|
||||
"""Reset progress to start fresh recovery."""
|
||||
self.map_data = {}
|
||||
self.status = {
|
||||
'read': 'Pending',
|
||||
'trim': 'Pending',
|
||||
'scrape': 'Pending',
|
||||
}
|
||||
self.map_path.write_text(
|
||||
data=cfg.ddrescue.DDRESCUE_MAP_TEMPLATE.format(
|
||||
name=cfg.main.KIT_NAME_FULL,
|
||||
size=self.size,
|
||||
),
|
||||
encoding='utf-8',
|
||||
)
|
||||
|
||||
# Set initial status
|
||||
self.set_initial_status()
|
||||
|
||||
def safety_check(self) -> None:
|
||||
"""Run safety check and abort if necessary."""
|
||||
# TODO: Expand section to support non-Linux systems
|
||||
dest_size = -1
|
||||
if self.destination.is_block_device():
|
||||
cmd = [
|
||||
'lsblk', '--bytes', '--json',
|
||||
'--nodeps', '--noheadings', '--output=size',
|
||||
self.destination,
|
||||
]
|
||||
json_data = exe.get_json_from_command(cmd)
|
||||
dest_size = json_data['blockdevices'][0]['size']
|
||||
del json_data
|
||||
|
||||
# Check destination size if cloning
|
||||
if not self.destination.is_file() and dest_size < self.size:
|
||||
cli.print_error(f'Invalid destination: {self.destination}')
|
||||
raise std.GenericAbort()
|
||||
|
||||
def set_initial_status(self) -> None:
|
||||
"""Read map data and set initial statuses."""
|
||||
self.load_map_data()
|
||||
percent = self.get_percent_recovered()
|
||||
for name in self.status:
|
||||
if self.pass_complete(name):
|
||||
self.status[name] = percent
|
||||
else:
|
||||
# Stop checking
|
||||
if percent > 0:
|
||||
self.status[name] = percent
|
||||
break
|
||||
|
||||
def skip_pass(self, pass_name) -> None:
|
||||
"""Mark pass as skipped if applicable."""
|
||||
if self.status[pass_name] == 'Pending':
|
||||
self.status[pass_name] = 'Skipped'
|
||||
|
||||
def update_progress(self, pass_name) -> None:
|
||||
"""Update progress via map data."""
|
||||
self.load_map_data()
|
||||
|
||||
# Update status
|
||||
percent = self.get_percent_recovered()
|
||||
if percent > 0:
|
||||
self.status[pass_name] = percent
|
||||
|
||||
# Mark future passes as skipped if applicable
|
||||
if percent == 100:
|
||||
status_keys = list(self.status.keys())
|
||||
for pass_n in status_keys[status_keys.index(pass_name)+1:]:
|
||||
self.status[pass_n] = 'Skipped'
|
||||
|
||||
|
||||
# Functions
|
||||
def add_clone_block_pairs(state) -> None:
|
||||
"""Add device to device block pairs and set settings if necessary."""
|
||||
source_sep = get_partition_separator(state.source.path.name)
|
||||
dest_sep = get_partition_separator(state.destination.path.name)
|
||||
settings = {}
|
||||
source_parts = []
|
||||
|
||||
# Clone settings
|
||||
settings = state.load_settings(discard_unused_settings=True)
|
||||
|
||||
# Add pairs from previous run
|
||||
if settings['Partition Mapping']:
|
||||
for part_map in settings['Partition Mapping']:
|
||||
bp_source = hw_disk.Disk(
|
||||
f'{state.source.path}{source_sep}{part_map[0]}',
|
||||
)
|
||||
bp_dest = pathlib.Path(
|
||||
f'{state.destination.path}{dest_sep}{part_map[1]}',
|
||||
)
|
||||
state.add_block_pair(bp_source, bp_dest)
|
||||
return
|
||||
|
||||
# Add pairs from selection
|
||||
source_parts = menus.select_disk_parts('Clone', state.source)
|
||||
if state.source.path.samefile(source_parts[0].path):
|
||||
# Whole disk (or single partition via args), skip settings
|
||||
bp_dest = state.destination.path
|
||||
state.add_block_pair(state.source, bp_dest)
|
||||
return
|
||||
|
||||
# New run, use new settings file
|
||||
settings['Needs Format'] = True
|
||||
offset = 0
|
||||
user_choice = cli.choice(
|
||||
'Format clone using GPT, MBR, or match Source type?',
|
||||
['G', 'M', 'S'],
|
||||
)
|
||||
if user_choice == 'G':
|
||||
settings['Table Type'] = 'GPT'
|
||||
elif user_choice == 'M':
|
||||
settings['Table Type'] = 'MBR'
|
||||
else:
|
||||
# Match source type
|
||||
settings['Table Type'] = get_table_type(state.source.path)
|
||||
if cli.ask('Create an empty Windows boot partition on the clone?'):
|
||||
settings['Create Boot Partition'] = True
|
||||
offset = 2 if settings['Table Type'] == 'GPT' else 1
|
||||
|
||||
# Add pairs
|
||||
for dest_num, part in enumerate(source_parts):
|
||||
dest_num += offset + 1
|
||||
bp_dest = pathlib.Path(
|
||||
f'{state.destination.path}{dest_sep}{dest_num}',
|
||||
)
|
||||
state.add_block_pair(part, bp_dest)
|
||||
|
||||
# Add to settings file
|
||||
source_num = re.sub(r'^.*?(\d+)$', r'\1', part.path.name)
|
||||
settings['Partition Mapping'].append([source_num, dest_num])
|
||||
|
||||
# Save settings
|
||||
state.save_settings(settings)
|
||||
|
||||
|
||||
def add_image_block_pairs(state) -> None:
|
||||
"""Add device to image file block pairs."""
|
||||
source_parts = menus.select_disk_parts(state.mode, state.source)
|
||||
for part in source_parts:
|
||||
state.add_block_pair(part, state.destination)
|
||||
|
||||
|
||||
def build_block_pair_report(block_pairs, settings) -> list:
|
||||
"""Build block pair report, returns list."""
|
||||
report = []
|
||||
notes = []
|
||||
if block_pairs:
|
||||
report.append(ansi.color_string('Block Pairs', 'GREEN'))
|
||||
else:
|
||||
# Bail early
|
||||
return report
|
||||
|
||||
# Show block pair mapping
|
||||
if settings and settings['Create Boot Partition']:
|
||||
if settings['Table Type'] == 'GPT':
|
||||
report.append(f'{" —— ":<9} --> EFI System Partition')
|
||||
report.append(f'{" —— ":<9} --> Microsoft Reserved Partition')
|
||||
elif settings['Table Type'] == 'MBR':
|
||||
report.append(f'{" —— ":<9} --> System Reserved')
|
||||
for pair in block_pairs:
|
||||
report.append(f'{pair.source.name:<9} --> {pair.destination.name}')
|
||||
|
||||
# Show resume messages as necessary
|
||||
if settings:
|
||||
if not settings['First Run']:
|
||||
notes.append(
|
||||
ansi.color_string(
|
||||
['NOTE:', 'Clone settings loaded from previous run.'],
|
||||
['BLUE', None],
|
||||
),
|
||||
)
|
||||
if settings['Needs Format'] and settings['Table Type']:
|
||||
msg = f'Destination will be formatted using {settings["Table Type"]}'
|
||||
notes.append(
|
||||
ansi.color_string(
|
||||
['NOTE:', msg],
|
||||
['BLUE', None],
|
||||
),
|
||||
)
|
||||
if any(pair.get_rescued_size() > 0 for pair in block_pairs):
|
||||
notes.append(
|
||||
ansi.color_string(
|
||||
['NOTE:', 'Resume data loaded from map file(s).'],
|
||||
['BLUE', None],
|
||||
),
|
||||
)
|
||||
|
||||
# Add notes to report
|
||||
if notes:
|
||||
report.append(' ')
|
||||
report.extend(notes)
|
||||
|
||||
# Done
|
||||
return report
|
||||
|
||||
|
||||
def build_sfdisk_partition_line(table_type, dev_path, size, details) -> str:
|
||||
"""Build sfdisk partition line using passed details, returns str."""
|
||||
line = f'{dev_path} : size={size}'
|
||||
dest_type = ''
|
||||
source_filesystem = str(details.get('fstype', '')).upper()
|
||||
source_table_type = ''
|
||||
source_type = details.get('parttype', '')
|
||||
|
||||
# Set dest type
|
||||
if re.match(r'^0x\w+$', source_type):
|
||||
# Both source and dest are MBR
|
||||
source_table_type = 'MBR'
|
||||
if table_type == 'MBR':
|
||||
dest_type = source_type.replace('0x', '').lower()
|
||||
elif re.match(r'^\w{8}-\w{4}-\w{4}-\w{4}-\w{12}$', source_type):
|
||||
# Source is a GPT type
|
||||
source_table_type = 'GPT'
|
||||
if table_type == 'GPT':
|
||||
dest_type = source_type.upper()
|
||||
if not dest_type:
|
||||
# Assuming changing table types, set based on FS
|
||||
if source_filesystem in cfg.ddrescue.PARTITION_TYPES.get(table_type, {}):
|
||||
dest_type = cfg.ddrescue.PARTITION_TYPES[table_type][source_filesystem]
|
||||
line += f', type={dest_type}'
|
||||
|
||||
# Safety Check
|
||||
if not dest_type:
|
||||
cli.print_error(f'Failed to determine partition type for: {dev_path}')
|
||||
raise std.GenericAbort()
|
||||
|
||||
# Add extra details
|
||||
if details.get('partlabel', ''):
|
||||
line += f', name="{details["partlabel"]}"'
|
||||
if details.get('partuuid', '') and source_table_type == table_type:
|
||||
# Only add UUID if source/dest table types match
|
||||
line += f', uuid={details["partuuid"].upper()}'
|
||||
|
||||
# Done
|
||||
return line
|
||||
|
||||
|
||||
def get_partition_separator(name) -> str:
|
||||
"""Get partition separator based on device name, returns str."""
|
||||
separator = ''
|
||||
if re.search(r'(loop|mmc|nvme)', name, re.IGNORECASE):
|
||||
separator = 'p'
|
||||
|
||||
return separator
|
||||
|
||||
|
||||
def get_table_type(disk_path) -> str:
|
||||
"""Get disk partition table type, returns str.
|
||||
|
||||
NOTE: If resulting table type is not GPT or MBR
|
||||
then an exception is raised.
|
||||
"""
|
||||
disk_path = str(disk_path)
|
||||
table_type = None
|
||||
|
||||
# Linux
|
||||
if std.PLATFORM == 'Linux':
|
||||
cmd = f'lsblk --json --output=pttype --nodeps {disk_path}'.split()
|
||||
json_data = exe.get_json_from_command(cmd)
|
||||
table_type = json_data['blockdevices'][0].get('pttype', '').upper()
|
||||
table_type = table_type.replace('DOS', 'MBR')
|
||||
|
||||
# macOS
|
||||
if std.PLATFORM == 'Darwin':
|
||||
cmd = ['diskutil', 'list', '-plist', disk_path]
|
||||
proc = exe.run_program(cmd, check=False, encoding=None, errors=None)
|
||||
try:
|
||||
plist_data = plistlib.loads(proc.stdout)
|
||||
except (TypeError, ValueError):
|
||||
# Invalid / corrupt plist data? return empty dict to avoid crash
|
||||
pass
|
||||
else:
|
||||
disk_details = plist_data.get('AllDisksAndPartitions', [{}])[0]
|
||||
table_type = disk_details['Content']
|
||||
table_type = table_type.replace('FDisk_partition_scheme', 'MBR')
|
||||
table_type = table_type.replace('GUID_partition_scheme', 'GPT')
|
||||
|
||||
# Check type
|
||||
if table_type not in ('GPT', 'MBR'):
|
||||
cli.print_error(f'Unsupported partition table type: {table_type}')
|
||||
raise std.GenericAbort()
|
||||
|
||||
# Done
|
||||
return table_type
|
||||
|
||||
|
||||
def prep_destination(
|
||||
state,
|
||||
source_parts: list[hw_disk.Disk],
|
||||
dry_run: bool = True,
|
||||
) -> None:
|
||||
"""Prep destination as necessary."""
|
||||
# TODO: Split into Linux and macOS
|
||||
# logical sector size is not easily found under macOS
|
||||
# It might be easier to rewrite this section using macOS tools
|
||||
dest_prefix = str(state.destination.path)
|
||||
dest_prefix += get_partition_separator(state.destination.path.name)
|
||||
esp_type = 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B'
|
||||
msr_type = 'E3C9E316-0B5C-4DB8-817D-F92DF00215AE'
|
||||
part_num = 0
|
||||
sfdisk_script = []
|
||||
settings = state.load_settings()
|
||||
|
||||
# Bail early
|
||||
if not settings['Needs Format']:
|
||||
return
|
||||
|
||||
# Add partition table settings
|
||||
if settings['Table Type'] == 'GPT':
|
||||
sfdisk_script.append('label: gpt')
|
||||
else:
|
||||
sfdisk_script.append('label: dos')
|
||||
sfdisk_script.append('unit: sectors')
|
||||
sfdisk_script.append('')
|
||||
|
||||
# Add boot partition if requested
|
||||
if settings['Create Boot Partition']:
|
||||
if settings['Table Type'] == 'GPT':
|
||||
part_num += 1
|
||||
sfdisk_script.append(
|
||||
build_sfdisk_partition_line(
|
||||
table_type='GPT',
|
||||
dev_path=f'{dest_prefix}{part_num}',
|
||||
size='260MiB',
|
||||
details={'parttype': esp_type, 'partlabel': 'EFI System'},
|
||||
),
|
||||
)
|
||||
part_num += 1
|
||||
sfdisk_script.append(
|
||||
build_sfdisk_partition_line(
|
||||
table_type=settings['Table Type'],
|
||||
dev_path=f'{dest_prefix}{part_num}',
|
||||
size='16MiB',
|
||||
details={'parttype': msr_type, 'partlabel': 'Microsoft Reserved'},
|
||||
),
|
||||
)
|
||||
elif settings['Table Type'] == 'MBR':
|
||||
part_num += 1
|
||||
sfdisk_script.append(
|
||||
build_sfdisk_partition_line(
|
||||
table_type='MBR',
|
||||
dev_path=f'{dest_prefix}{part_num}',
|
||||
size='100MiB',
|
||||
details={'parttype': '0x7', 'partlabel': 'System Reserved'},
|
||||
),
|
||||
)
|
||||
|
||||
# Add selected partition(s)
|
||||
for part in source_parts:
|
||||
num_sectors = part.size / state.destination.log_sec
|
||||
num_sectors = math.ceil(num_sectors)
|
||||
part_num += 1
|
||||
sfdisk_script.append(
|
||||
build_sfdisk_partition_line(
|
||||
table_type=settings['Table Type'],
|
||||
dev_path=f'{dest_prefix}{part_num}',
|
||||
size=num_sectors,
|
||||
details=part.raw_details,
|
||||
),
|
||||
)
|
||||
|
||||
# Save sfdisk script
|
||||
script_path = (
|
||||
f'{state.working_dir}/'
|
||||
f'sfdisk_{state.destination.path.name}.script'
|
||||
)
|
||||
with open(script_path, 'w', encoding='utf-8') as _f:
|
||||
_f.write('\n'.join(sfdisk_script))
|
||||
|
||||
# Skip real format for dry runs
|
||||
if dry_run:
|
||||
LOG.info('Dry run, refusing to format destination')
|
||||
return
|
||||
|
||||
# Format disk
|
||||
LOG.warning('Formatting destination: %s', state.destination.path)
|
||||
with open(script_path, 'r', encoding='utf-8') as _f:
|
||||
proc = exe.run_program(
|
||||
cmd=['sudo', 'sfdisk', state.destination.path],
|
||||
stdin=_f,
|
||||
check=False,
|
||||
)
|
||||
if proc.returncode != 0:
|
||||
cli.print_error('Error(s) encoundtered while formatting destination')
|
||||
raise std.GenericAbort()
|
||||
|
||||
# Update settings
|
||||
settings['Needs Format'] = False
|
||||
state.save_settings(settings)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
print("This file is not meant to be called directly.")
|
||||
File diff suppressed because it is too large
Load diff
109
scripts/wk/clone/image.py
Normal file
109
scripts/wk/clone/image.py
Normal file
|
|
@ -0,0 +1,109 @@
|
|||
"""WizardKit: ddrescue TUI - State"""
|
||||
# vim: sts=2 sw=2 ts=2
|
||||
|
||||
import atexit
|
||||
import logging
|
||||
import pathlib
|
||||
import plistlib
|
||||
import re
|
||||
|
||||
from wk import exe
|
||||
from wk.std import PLATFORM
|
||||
from wk.ui import cli
|
||||
|
||||
|
||||
# STATIC VARIABLES
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Functions
|
||||
def mount_raw_image(path) -> pathlib.Path:
|
||||
"""Mount raw image using OS specific methods, returns pathlib.Path."""
|
||||
loopback_path = None
|
||||
|
||||
if PLATFORM == 'Darwin':
|
||||
loopback_path = mount_raw_image_macos(path)
|
||||
elif PLATFORM == 'Linux':
|
||||
loopback_path = mount_raw_image_linux(path)
|
||||
|
||||
# Check
|
||||
if not loopback_path:
|
||||
cli.print_error(f'Failed to mount image: {path}')
|
||||
|
||||
# Register unmount atexit
|
||||
atexit.register(unmount_loopback_device, loopback_path)
|
||||
|
||||
# Done
|
||||
return loopback_path
|
||||
|
||||
|
||||
def mount_raw_image_linux(path) -> pathlib.Path:
|
||||
"""Mount raw image using losetup, returns pathlib.Path."""
|
||||
loopback_path = None
|
||||
|
||||
# Mount using losetup
|
||||
cmd = [
|
||||
'sudo',
|
||||
'losetup',
|
||||
'--find',
|
||||
'--partscan',
|
||||
'--show',
|
||||
path,
|
||||
]
|
||||
proc = exe.run_program(cmd, check=False)
|
||||
|
||||
# Check result
|
||||
if proc.returncode == 0:
|
||||
loopback_path = proc.stdout.strip()
|
||||
|
||||
# Done
|
||||
return loopback_path
|
||||
|
||||
|
||||
def mount_raw_image_macos(path) -> pathlib.Path:
|
||||
"""Mount raw image using hdiutil, returns pathlib.Path."""
|
||||
loopback_path = None
|
||||
plist_data = {}
|
||||
|
||||
# Mount using hdiutil
|
||||
# plistdata['system-entities'][{}...]
|
||||
cmd = [
|
||||
'hdiutil', 'attach',
|
||||
'-imagekey', 'diskimage-class=CRawDiskImage',
|
||||
'-nomount',
|
||||
'-plist',
|
||||
'-readonly',
|
||||
path,
|
||||
]
|
||||
proc = exe.run_program(cmd, check=False, encoding=None, errors=None)
|
||||
|
||||
# Check result
|
||||
try:
|
||||
plist_data = plistlib.loads(proc.stdout)
|
||||
except plistlib.InvalidFileException:
|
||||
return None
|
||||
for dev in plist_data.get('system-entities', []):
|
||||
dev_path = dev.get('dev-entry', '')
|
||||
if re.match(r'^/dev/disk\d+$', dev_path):
|
||||
loopback_path = dev_path
|
||||
|
||||
# Done
|
||||
return loopback_path
|
||||
|
||||
|
||||
def unmount_loopback_device(path) -> None:
|
||||
"""Unmount loopback device using OS specific methods."""
|
||||
cmd = []
|
||||
|
||||
# Build OS specific cmd
|
||||
if PLATFORM == 'Darwin':
|
||||
cmd = ['hdiutil', 'detach', path]
|
||||
elif PLATFORM == 'Linux':
|
||||
cmd = ['sudo', 'losetup', '--detach', path]
|
||||
|
||||
# Unmount loopback device
|
||||
exe.run_program(cmd, check=False)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
print("This file is not meant to be called directly.")
|
||||
1209
scripts/wk/clone/state.py
Normal file
1209
scripts/wk/clone/state.py
Normal file
File diff suppressed because it is too large
Load diff
Loading…
Reference in a new issue