575 lines
17 KiB
Python
575 lines
17 KiB
Python
"""WizardKit: ddrescue TUI - Block Pairs"""
|
|
# vim: sts=2 sw=2 ts=2
|
|
|
|
import logging
|
|
import math
|
|
import os
|
|
import pathlib
|
|
import plistlib
|
|
import re
|
|
import subprocess
|
|
|
|
from wk import cfg, exe, std
|
|
from wk.clone import menus
|
|
from wk.hw import disk as hw_disk
|
|
from wk.ui import ansi, cli
|
|
|
|
|
|
# STATIC VARIABLES
|
|
LOG = logging.getLogger(__name__)
|
|
DDRESCUE_LOG_REGEX = re.compile(
|
|
r'^\s*(?P<key>\S+):\s+'
|
|
r'(?P<size>\d+)\s+'
|
|
r'(?P<unit>[PTGMKB]i?B?)'
|
|
r'.*\(\s*(?P<percent>\d+\.?\d*)%\)$',
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
|
|
# Classes
|
|
class BlockPair():
|
|
"""Object for tracking source to dest recovery data."""
|
|
def __init__(
|
|
self,
|
|
source_dev: hw_disk.Disk,
|
|
destination: pathlib.Path,
|
|
working_dir: pathlib.Path,
|
|
):
|
|
self.sector_size: int = source_dev.phy_sec
|
|
self.source: pathlib.Path = pathlib.Path(source_dev.path)
|
|
self.destination: pathlib.Path = destination
|
|
self.map_data: dict[str, bool | int] = {}
|
|
self.map_path: pathlib.Path = pathlib.Path()
|
|
self.size: int = source_dev.size
|
|
self.status: dict[str, float | int | str] = {
|
|
'read-skip': 'Pending',
|
|
'read-full': 'Pending',
|
|
'trim': 'Pending',
|
|
'scrape': 'Pending',
|
|
}
|
|
self.test_map: pathlib.Path | None = None
|
|
self.view_map: bool = 'DISPLAY' in os.environ or 'WAYLAND_DISPLAY' in os.environ
|
|
self.view_proc: subprocess.Popen | None = None
|
|
|
|
# Set map path
|
|
# e.g. '(Clone|Image)_Model[_p#]_Size[_Label].map'
|
|
map_name = source_dev.model
|
|
if source_dev.bus == 'Image':
|
|
map_name = 'Image'
|
|
if source_dev.parent:
|
|
part_num = re.sub(r"^.*?(\d+)$", r"\1", self.source.name)
|
|
map_name += f'_p{part_num}'
|
|
size_str = std.bytes_to_string(
|
|
size=self.size,
|
|
use_binary=False,
|
|
)
|
|
map_name += f'_{size_str.replace(" ", "")}'
|
|
if source_dev.raw_details.get('label', ''):
|
|
map_name += f'_{source_dev.raw_details["label"]}'
|
|
map_name = map_name.replace(' ', '_')
|
|
map_name = map_name.replace('/', '_')
|
|
map_name = map_name.replace('\\', '_')
|
|
if destination.is_dir():
|
|
# Imaging
|
|
self.map_path = pathlib.Path(f'{destination}/Image_{map_name}.map')
|
|
self.destination = self.map_path.with_suffix('.dd')
|
|
self.destination.touch()
|
|
else:
|
|
# Cloning
|
|
self.map_path = pathlib.Path(f'{working_dir}/Clone_{map_name}.map')
|
|
|
|
# Create map file if needed
|
|
# NOTE: We need to set the domain size for --complete-only to work
|
|
if not self.map_path.exists():
|
|
self.map_path.write_text(
|
|
data=cfg.ddrescue.DDRESCUE_MAP_TEMPLATE.format(
|
|
name=cfg.main.KIT_NAME_FULL,
|
|
size=self.size,
|
|
),
|
|
encoding='utf-8',
|
|
)
|
|
|
|
# Set initial status
|
|
self.set_initial_status()
|
|
|
|
def __getstate__(self):
|
|
"""Override to allow pickling ddrescue.State() objects."""
|
|
bp_state = self.__dict__.copy()
|
|
del bp_state['view_proc']
|
|
return bp_state
|
|
|
|
def get_error_size(self) -> int:
|
|
"""Get error size in bytes, returns int."""
|
|
return self.size - self.get_rescued_size()
|
|
|
|
def get_percent_recovered(self) -> float:
|
|
"""Get percent rescued from map_data, returns float."""
|
|
return 100 * self.map_data.get('rescued', 0) / self.size
|
|
|
|
def get_rescued_size(self) -> int:
|
|
"""Get rescued size using map data.
|
|
|
|
NOTE: Returns 0 if no map data is available.
|
|
"""
|
|
self.load_map_data()
|
|
return self.map_data.get('rescued', 0)
|
|
|
|
def load_map_data(self) -> None:
|
|
"""Load map data from file.
|
|
|
|
NOTE: If the file is missing it is assumed that recovery hasn't
|
|
started yet so default values will be returned instead.
|
|
"""
|
|
data: dict[str, bool | int] = {'full recovery': False, 'pass completed': False}
|
|
|
|
# Get output from ddrescuelog
|
|
cmd = [
|
|
'ddrescuelog',
|
|
'--binary-prefixes',
|
|
'--show-status',
|
|
f'--size={self.size}',
|
|
self.map_path,
|
|
]
|
|
proc = exe.run_program(cmd, check=False)
|
|
|
|
# Parse output
|
|
for line in proc.stdout.splitlines():
|
|
_r = DDRESCUE_LOG_REGEX.search(line)
|
|
if _r:
|
|
if _r.group('key') == 'rescued' and _r.group('percent') == '100':
|
|
# Fix rounding errors from ddrescuelog output
|
|
data['rescued'] = self.size
|
|
else:
|
|
data[_r.group('key')] = std.string_to_bytes(
|
|
f'{_r.group("size")} {_r.group("unit")}',
|
|
)
|
|
data['pass completed'] = 'current status: finished' in line.lower()
|
|
|
|
# Check if 100% done (only if map is present and non-zero size
|
|
# NOTE: ddrescuelog returns 0 (i.e. 100% done) for empty files
|
|
if self.map_path.exists() and self.map_path.stat().st_size != 0:
|
|
cmd = [
|
|
'ddrescuelog',
|
|
'--done-status',
|
|
f'--size={self.size}',
|
|
self.map_path,
|
|
]
|
|
proc = exe.run_program(cmd, check=False)
|
|
data['full recovery'] = proc.returncode == 0
|
|
|
|
# Done
|
|
self.map_data.update(data)
|
|
|
|
def pass_complete(self, pass_name) -> bool:
|
|
"""Check if pass_name is complete based on map data, returns bool."""
|
|
pending_size = self.map_data['non-tried']
|
|
|
|
# Full recovery
|
|
if self.map_data.get('full recovery', False):
|
|
return True
|
|
|
|
# New recovery
|
|
if 'non-tried' not in self.map_data:
|
|
return False
|
|
|
|
# Initial read skip pass
|
|
if pass_name == 'read-skip':
|
|
pass_threshold = cfg.ddrescue.AUTO_PASS_THRESHOLDS[pass_name]
|
|
if self.get_percent_recovered() >= pass_threshold:
|
|
return True
|
|
|
|
# Recovery in progress
|
|
if pass_name in ('trim', 'scrape'):
|
|
pending_size += self.map_data['non-trimmed']
|
|
if pass_name == 'scrape':
|
|
pending_size += self.map_data['non-scraped']
|
|
if pending_size == 0:
|
|
# This is true when the previous and current passes are complete
|
|
return True
|
|
|
|
# This should never be reached
|
|
return False
|
|
|
|
def safety_check(self) -> None:
|
|
"""Run safety check and abort if necessary."""
|
|
# TODO: Expand section to support non-Linux systems
|
|
dest_size = -1
|
|
if self.destination.is_block_device():
|
|
cmd = [
|
|
'lsblk', '--bytes', '--json',
|
|
'--nodeps', '--noheadings', '--output=size',
|
|
self.destination,
|
|
]
|
|
json_data = exe.get_json_from_command(cmd)
|
|
dest_size = json_data['blockdevices'][0]['size']
|
|
del json_data
|
|
|
|
# Check destination size if cloning
|
|
if not self.destination.is_file() and dest_size < self.size:
|
|
cli.print_error(f'Invalid destination: {self.destination}')
|
|
raise std.GenericAbort()
|
|
|
|
def set_initial_status(self) -> None:
|
|
"""Read map data and set initial statuses."""
|
|
self.load_map_data()
|
|
percent = self.get_percent_recovered()
|
|
for name in self.status:
|
|
if self.pass_complete(name):
|
|
self.status[name] = percent
|
|
else:
|
|
# Stop checking
|
|
if percent > 0:
|
|
self.status[name] = percent
|
|
break
|
|
|
|
def skip_pass(self, pass_name) -> None:
|
|
"""Mark pass as skipped if applicable."""
|
|
if self.status[pass_name] == 'Pending':
|
|
self.status[pass_name] = 'Skipped'
|
|
|
|
def update_progress(self, pass_name) -> None:
|
|
"""Update progress via map data."""
|
|
self.load_map_data()
|
|
|
|
# Update status
|
|
percent = self.get_percent_recovered()
|
|
if percent > 0:
|
|
self.status[pass_name] = percent
|
|
|
|
# Mark future passes as skipped if applicable
|
|
if percent == 100:
|
|
status_keys = list(self.status.keys())
|
|
for pass_n in status_keys[status_keys.index(pass_name)+1:]:
|
|
self.status[pass_n] = 'Skipped'
|
|
|
|
|
|
# Functions
|
|
def add_clone_block_pairs(state) -> list[hw_disk.Disk]:
|
|
"""Add device to device block pairs and set settings if necessary."""
|
|
source_sep = get_partition_separator(state.source.path.name)
|
|
dest_sep = get_partition_separator(state.destination.path.name)
|
|
settings = {}
|
|
|
|
# Clone settings
|
|
settings = state.load_settings(discard_unused_settings=True)
|
|
|
|
# Add pairs from previous run
|
|
if settings['Partition Mapping']:
|
|
source_parts = []
|
|
for part_map in settings['Partition Mapping']:
|
|
bp_source = hw_disk.Disk(
|
|
f'{state.source.path}{source_sep}{part_map[0]}',
|
|
)
|
|
bp_dest = pathlib.Path(
|
|
f'{state.destination.path}{dest_sep}{part_map[1]}',
|
|
)
|
|
source_parts.append(bp_source)
|
|
state.add_block_pair(bp_source, bp_dest)
|
|
return source_parts
|
|
|
|
# Add pairs from selection
|
|
source_parts = menus.select_disk_parts('Clone', state.source)
|
|
if state.source.path.samefile(source_parts[0].path):
|
|
# Whole disk (or single partition via args), skip settings
|
|
bp_dest = state.destination.path
|
|
state.add_block_pair(state.source, bp_dest)
|
|
return source_parts
|
|
|
|
# New run, use new settings file
|
|
settings['Needs Format'] = True
|
|
offset = 0
|
|
user_choice = cli.choice(
|
|
'Format clone using GPT, MBR, or match Source type?',
|
|
['G', 'M', 'S'],
|
|
)
|
|
if user_choice == 'G':
|
|
settings['Table Type'] = 'GPT'
|
|
elif user_choice == 'M':
|
|
settings['Table Type'] = 'MBR'
|
|
else:
|
|
# Match source type
|
|
settings['Table Type'] = get_table_type(state.source.path)
|
|
if cli.ask('Create an empty Windows boot partition on the clone?'):
|
|
settings['Create Boot Partition'] = True
|
|
offset = 2 if settings['Table Type'] == 'GPT' else 1
|
|
|
|
# Add pairs
|
|
for dest_num, part in enumerate(source_parts):
|
|
dest_num += offset + 1
|
|
bp_dest = pathlib.Path(
|
|
f'{state.destination.path}{dest_sep}{dest_num}',
|
|
)
|
|
state.add_block_pair(part, bp_dest)
|
|
|
|
# Add to settings file
|
|
source_num = re.sub(r'^.*?(\d+)$', r'\1', part.path.name)
|
|
settings['Partition Mapping'].append([source_num, dest_num])
|
|
|
|
# Save settings
|
|
state.save_settings(settings)
|
|
|
|
# Done
|
|
return source_parts
|
|
|
|
|
|
def add_image_block_pairs(state) -> list[hw_disk.Disk]:
|
|
"""Add device to image file block pairs."""
|
|
source_parts = menus.select_disk_parts(state.mode, state.source)
|
|
for part in source_parts:
|
|
state.add_block_pair(part, state.destination)
|
|
|
|
# Done
|
|
return source_parts
|
|
|
|
|
|
def build_block_pair_report(block_pairs, settings) -> list:
|
|
"""Build block pair report, returns list."""
|
|
report = []
|
|
notes = []
|
|
if block_pairs:
|
|
report.append(ansi.color_string('Block Pairs', 'GREEN'))
|
|
else:
|
|
# Bail early
|
|
return report
|
|
|
|
# Show block pair mapping
|
|
if settings and settings['Create Boot Partition']:
|
|
if settings['Table Type'] == 'GPT':
|
|
report.append(f'{" —— ":<9} --> EFI System Partition')
|
|
report.append(f'{" —— ":<9} --> Microsoft Reserved Partition')
|
|
elif settings['Table Type'] == 'MBR':
|
|
report.append(f'{" —— ":<9} --> System Reserved')
|
|
for pair in block_pairs:
|
|
report.append(f'{pair.source.name:<9} --> {pair.destination.name}')
|
|
|
|
# Show resume messages as necessary
|
|
if settings:
|
|
if not settings['First Run']:
|
|
notes.append(
|
|
ansi.color_string(
|
|
['NOTE:', 'Clone settings loaded from previous run.'],
|
|
['BLUE', None],
|
|
),
|
|
)
|
|
if settings['Needs Format'] and settings['Table Type']:
|
|
msg = f'Destination will be formatted using {settings["Table Type"]}'
|
|
notes.append(
|
|
ansi.color_string(
|
|
['NOTE:', msg],
|
|
['BLUE', None],
|
|
),
|
|
)
|
|
if any(pair.get_rescued_size() > 0 for pair in block_pairs):
|
|
notes.append(
|
|
ansi.color_string(
|
|
['NOTE:', 'Resume data loaded from map file(s).'],
|
|
['BLUE', None],
|
|
),
|
|
)
|
|
|
|
# Add notes to report
|
|
if notes:
|
|
report.append(' ')
|
|
report.extend(notes)
|
|
|
|
# Done
|
|
return report
|
|
|
|
|
|
def build_sfdisk_partition_line(table_type, dev_path, size, details) -> str:
|
|
"""Build sfdisk partition line using passed details, returns str."""
|
|
line = f'{dev_path} : size={size}'
|
|
dest_type = ''
|
|
source_filesystem = str(details.get('fstype', '')).upper()
|
|
source_table_type = ''
|
|
source_type = details.get('parttype', '')
|
|
|
|
# Set dest type
|
|
if re.match(r'^0x\w+$', source_type):
|
|
# Source is a MBR type
|
|
source_table_type = 'MBR'
|
|
if table_type == 'MBR':
|
|
dest_type = source_type.replace('0x', '').lower()
|
|
elif re.match(r'^\w{8}-\w{4}-\w{4}-\w{4}-\w{12}$', source_type):
|
|
# Source is a GPT type
|
|
source_table_type = 'GPT'
|
|
if table_type == 'GPT':
|
|
dest_type = source_type.upper()
|
|
if not dest_type:
|
|
# Assuming changing table types, set based on FS
|
|
if source_filesystem in cfg.ddrescue.PARTITION_TYPES.get(table_type, {}):
|
|
dest_type = cfg.ddrescue.PARTITION_TYPES[table_type][source_filesystem]
|
|
line += f', type={dest_type}'
|
|
|
|
# Safety Check
|
|
if not dest_type:
|
|
cli.print_error(f'Failed to determine partition type for: {dev_path}')
|
|
raise std.GenericAbort()
|
|
|
|
# Add extra details
|
|
if details.get('partlabel', ''):
|
|
line += f', name="{details["partlabel"]}"'
|
|
if details.get('partuuid', '') and source_table_type == table_type:
|
|
# Only add UUID if source/dest table types match
|
|
line += f', uuid={details["partuuid"].upper()}'
|
|
|
|
# Done
|
|
return line
|
|
|
|
|
|
def get_partition_separator(name) -> str:
|
|
"""Get partition separator based on device name, returns str."""
|
|
separator = ''
|
|
if re.search(r'(loop|mmc|nvme)', name, re.IGNORECASE):
|
|
separator = 'p'
|
|
|
|
return separator
|
|
|
|
|
|
def get_table_type(disk_path) -> str:
|
|
"""Get disk partition table type, returns str.
|
|
|
|
NOTE: If resulting table type is not GPT or MBR
|
|
then an exception is raised.
|
|
"""
|
|
disk_path = str(disk_path)
|
|
table_type = None
|
|
|
|
# Linux
|
|
if std.PLATFORM == 'Linux':
|
|
cmd = f'lsblk --json --output=pttype --nodeps {disk_path}'.split()
|
|
json_data = exe.get_json_from_command(cmd)
|
|
table_type = json_data['blockdevices'][0].get('pttype', '').upper()
|
|
table_type = table_type.replace('DOS', 'MBR')
|
|
|
|
# macOS
|
|
if std.PLATFORM == 'Darwin':
|
|
cmd = ['diskutil', 'list', '-plist', disk_path]
|
|
proc = exe.run_program(cmd, check=False, encoding=None, errors=None)
|
|
try:
|
|
plist_data = plistlib.loads(proc.stdout)
|
|
except (TypeError, ValueError):
|
|
# Invalid / corrupt plist data? return empty dict to avoid crash
|
|
pass
|
|
else:
|
|
disk_details = plist_data.get('AllDisksAndPartitions', [{}])[0]
|
|
table_type = disk_details['Content']
|
|
table_type = table_type.replace('FDisk_partition_scheme', 'MBR')
|
|
table_type = table_type.replace('GUID_partition_scheme', 'GPT')
|
|
|
|
# Check type
|
|
if table_type not in ('GPT', 'MBR'):
|
|
cli.print_error(f'Unsupported partition table type: {table_type}')
|
|
raise std.GenericAbort()
|
|
|
|
# Done
|
|
return table_type
|
|
|
|
|
|
def prep_destination(
|
|
state,
|
|
source_parts: list[hw_disk.Disk],
|
|
dry_run: bool = True,
|
|
) -> None:
|
|
"""Prep destination as necessary."""
|
|
# TODO: Split into Linux and macOS
|
|
# logical sector size is not easily found under macOS
|
|
# It might be easier to rewrite this section using macOS tools
|
|
dest_prefix = str(state.destination.path)
|
|
dest_prefix += get_partition_separator(state.destination.path.name)
|
|
esp_type = 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B'
|
|
msr_type = 'E3C9E316-0B5C-4DB8-817D-F92DF00215AE'
|
|
part_num = 0
|
|
sfdisk_script = []
|
|
settings = state.load_settings()
|
|
|
|
# Bail early
|
|
if not settings['Needs Format']:
|
|
return
|
|
|
|
# Add partition table settings
|
|
if settings['Table Type'] == 'GPT':
|
|
sfdisk_script.append('label: gpt')
|
|
else:
|
|
sfdisk_script.append('label: dos')
|
|
sfdisk_script.append('unit: sectors')
|
|
sfdisk_script.append('')
|
|
|
|
# Add boot partition if requested
|
|
if settings['Create Boot Partition']:
|
|
if settings['Table Type'] == 'GPT':
|
|
part_num += 1
|
|
sfdisk_script.append(
|
|
build_sfdisk_partition_line(
|
|
table_type='GPT',
|
|
dev_path=f'{dest_prefix}{part_num}',
|
|
size='260MiB',
|
|
details={'parttype': esp_type, 'partlabel': 'EFI System'},
|
|
),
|
|
)
|
|
part_num += 1
|
|
sfdisk_script.append(
|
|
build_sfdisk_partition_line(
|
|
table_type=settings['Table Type'],
|
|
dev_path=f'{dest_prefix}{part_num}',
|
|
size='16MiB',
|
|
details={'parttype': msr_type, 'partlabel': 'Microsoft Reserved'},
|
|
),
|
|
)
|
|
elif settings['Table Type'] == 'MBR':
|
|
part_num += 1
|
|
sfdisk_script.append(
|
|
build_sfdisk_partition_line(
|
|
table_type='MBR',
|
|
dev_path=f'{dest_prefix}{part_num}',
|
|
size='100MiB',
|
|
details={'parttype': '0x7', 'partlabel': 'System Reserved'},
|
|
),
|
|
)
|
|
|
|
# Add selected partition(s)
|
|
for part in source_parts:
|
|
num_sectors = part.size / state.destination.log_sec
|
|
num_sectors = math.ceil(num_sectors)
|
|
part_num += 1
|
|
sfdisk_script.append(
|
|
build_sfdisk_partition_line(
|
|
table_type=settings['Table Type'],
|
|
dev_path=f'{dest_prefix}{part_num}',
|
|
size=num_sectors,
|
|
details=part.raw_details,
|
|
),
|
|
)
|
|
|
|
# Save sfdisk script
|
|
script_path = (
|
|
f'{state.working_dir}/'
|
|
f'sfdisk_{state.destination.path.name}.script'
|
|
)
|
|
with open(script_path, 'w', encoding='utf-8') as _f:
|
|
_f.write('\n'.join(sfdisk_script))
|
|
|
|
# Skip real format for dry runs
|
|
if dry_run:
|
|
LOG.info('Dry run, refusing to format destination')
|
|
return
|
|
|
|
# Format disk
|
|
LOG.warning('Formatting destination: %s', state.destination.path)
|
|
with open(script_path, 'r', encoding='utf-8') as _f:
|
|
proc = exe.run_program(
|
|
cmd=['sudo', 'sfdisk', state.destination.path],
|
|
stdin=_f,
|
|
check=False,
|
|
)
|
|
if proc.returncode != 0:
|
|
cli.print_error('Error(s) encoundtered while formatting destination')
|
|
raise std.GenericAbort()
|
|
|
|
# Update settings
|
|
settings['Needs Format'] = False
|
|
state.save_settings(settings)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
print("This file is not meant to be called directly.")
|