diff --git a/scripts/wk/clone/__init__.py b/scripts/wk/clone/__init__.py index f6282d3a..021a5479 100644 --- a/scripts/wk/clone/__init__.py +++ b/scripts/wk/clone/__init__.py @@ -1,4 +1,6 @@ """WizardKit: ddrescue-tui module init""" +from . import block_pair from . import ddrescue from . import menus +from . import state diff --git a/scripts/wk/clone/block_pair.py b/scripts/wk/clone/block_pair.py new file mode 100644 index 00000000..c6b602bf --- /dev/null +++ b/scripts/wk/clone/block_pair.py @@ -0,0 +1,244 @@ +"""WizardKit: ddrescue TUI - Block Pairs""" +# vim: sts=2 sw=2 ts=2 + +import logging +import os +import pathlib +import re +import subprocess + +from wk import cfg, exe, std +from wk.hw import disk as hw_disk +from wk.ui import cli + + +# STATIC VARIABLES +LOG = logging.getLogger(__name__) +DDRESCUE_LOG_REGEX = re.compile( + r'^\s*(?P\S+):\s+' + r'(?P\d+)\s+' + r'(?P[PTGMKB]i?B?)' + r'.*\(\s*(?P\d+\.?\d*)%\)$', + re.IGNORECASE, + ) + + +# Classes +class BlockPair(): + """Object for tracking source to dest recovery data.""" + def __init__( + self, + source_dev: hw_disk.Disk, + destination: pathlib.Path, + working_dir: pathlib.Path, + ): + self.sector_size: int = source_dev.phy_sec + self.source: pathlib.Path = pathlib.Path(source_dev.path) + self.destination: pathlib.Path = destination + self.map_data: dict[str, bool | int] = {} + self.map_path: pathlib.Path = pathlib.Path() + self.size: int = source_dev.size + self.status: dict[str, float | int | str] = { + 'read-skip': 'Pending', + 'read-full': 'Pending', + 'trim': 'Pending', + 'scrape': 'Pending', + } + self.test_map: pathlib.Path | None = None + self.view_map: bool = 'DISPLAY' in os.environ or 'WAYLAND_DISPLAY' in os.environ + self.view_proc: subprocess.Popen | None = None + + # Set map path + # e.g. '(Clone|Image)_Model[_p#]_Size[_Label].map' + map_name = source_dev.model + if source_dev.bus == 'Image': + map_name = 'Image' + if source_dev.parent: + part_num = re.sub(r"^.*?(\d+)$", r"\1", self.source.name) + map_name += f'_p{part_num}' + size_str = std.bytes_to_string( + size=self.size, + use_binary=False, + ) + map_name += f'_{size_str.replace(" ", "")}' + if source_dev.raw_details.get('label', ''): + map_name += f'_{source_dev.raw_details["label"]}' + map_name = map_name.replace(' ', '_') + map_name = map_name.replace('/', '_') + map_name = map_name.replace('\\', '_') + if destination.is_dir(): + # Imaging + self.map_path = pathlib.Path(f'{destination}/Image_{map_name}.map') + self.destination = self.map_path.with_suffix('.dd') + self.destination.touch() + else: + # Cloning + self.map_path = pathlib.Path(f'{working_dir}/Clone_{map_name}.map') + + # Create map file if needed + # NOTE: We need to set the domain size for --complete-only to work + if not self.map_path.exists(): + self.map_path.write_text( + data=cfg.ddrescue.DDRESCUE_MAP_TEMPLATE.format( + name=cfg.main.KIT_NAME_FULL, + size=self.size, + ), + encoding='utf-8', + ) + + # Set initial status + self.set_initial_status() + + def __getstate__(self): + """Override to allow pickling ddrescue.State() objects.""" + bp_state = self.__dict__.copy() + del bp_state['view_proc'] + return bp_state + + def get_error_size(self) -> int: + """Get error size in bytes, returns int.""" + return self.size - self.get_rescued_size() + + def get_percent_recovered(self) -> float: + """Get percent rescued from map_data, returns float.""" + return 100 * self.map_data.get('rescued', 0) / self.size + + def get_rescued_size(self) -> int: + """Get rescued size using map data. + + NOTE: Returns 0 if no map data is available. + """ + self.load_map_data() + return self.map_data.get('rescued', 0) + + def load_map_data(self) -> None: + """Load map data from file. + + NOTE: If the file is missing it is assumed that recovery hasn't + started yet so default values will be returned instead. + """ + data: dict[str, bool | int] = {'full recovery': False, 'pass completed': False} + + # Get output from ddrescuelog + cmd = [ + 'ddrescuelog', + '--binary-prefixes', + '--show-status', + f'--size={self.size}', + self.map_path, + ] + proc = exe.run_program(cmd, check=False) + + # Parse output + for line in proc.stdout.splitlines(): + _r = DDRESCUE_LOG_REGEX.search(line) + if _r: + if _r.group('key') == 'rescued' and _r.group('percent') == '100': + # Fix rounding errors from ddrescuelog output + data['rescued'] = self.size + else: + data[_r.group('key')] = std.string_to_bytes( + f'{_r.group("size")} {_r.group("unit")}', + ) + data['pass completed'] = 'current status: finished' in line.lower() + + # Check if 100% done (only if map is present and non-zero size + # NOTE: ddrescuelog returns 0 (i.e. 100% done) for empty files + if self.map_path.exists() and self.map_path.stat().st_size != 0: + cmd = [ + 'ddrescuelog', + '--done-status', + f'--size={self.size}', + self.map_path, + ] + proc = exe.run_program(cmd, check=False) + data['full recovery'] = proc.returncode == 0 + + # Done + self.map_data.update(data) + + def pass_complete(self, pass_name) -> bool: + """Check if pass_name is complete based on map data, returns bool.""" + pending_size = self.map_data['non-tried'] + + # Full recovery + if self.map_data.get('full recovery', False): + return True + + # New recovery + if 'non-tried' not in self.map_data: + return False + + # Initial read skip pass + if pass_name == 'read-skip': + pass_threshold = cfg.ddrescue.AUTO_PASS_THRESHOLDS[pass_name] + if self.get_percent_recovered() >= pass_threshold: + return True + + # Recovery in progress + if pass_name in ('trim', 'scrape'): + pending_size += self.map_data['non-trimmed'] + if pass_name == 'scrape': + pending_size += self.map_data['non-scraped'] + if pending_size == 0: + # This is true when the previous and current passes are complete + return True + + # This should never be reached + return False + + def safety_check(self) -> None: + """Run safety check and abort if necessary.""" + # TODO: Expand section to support non-Linux systems + dest_size = -1 + if self.destination.is_block_device(): + cmd = [ + 'lsblk', '--bytes', '--json', + '--nodeps', '--noheadings', '--output=size', + self.destination, + ] + json_data = exe.get_json_from_command(cmd) + dest_size = json_data['blockdevices'][0]['size'] + del json_data + + # Check destination size if cloning + if not self.destination.is_file() and dest_size < self.size: + cli.print_error(f'Invalid destination: {self.destination}') + raise std.GenericAbort() + + def set_initial_status(self) -> None: + """Read map data and set initial statuses.""" + self.load_map_data() + percent = self.get_percent_recovered() + for name in self.status: + if self.pass_complete(name): + self.status[name] = percent + else: + # Stop checking + if percent > 0: + self.status[name] = percent + break + + def skip_pass(self, pass_name) -> None: + """Mark pass as skipped if applicable.""" + if self.status[pass_name] == 'Pending': + self.status[pass_name] = 'Skipped' + + def update_progress(self, pass_name) -> None: + """Update progress via map data.""" + self.load_map_data() + + # Update status + percent = self.get_percent_recovered() + if percent > 0: + self.status[pass_name] = percent + + # Mark future passes as skipped if applicable + if percent == 100: + status_keys = list(self.status.keys()) + for pass_n in status_keys[status_keys.index(pass_name)+1:]: + self.status[pass_n] = 'Skipped' + + +if __name__ == '__main__': + print("This file is not meant to be called directly.") diff --git a/scripts/wk/clone/ddrescue.py b/scripts/wk/clone/ddrescue.py index 8697ec1a..72410e56 100644 --- a/scripts/wk/clone/ddrescue.py +++ b/scripts/wk/clone/ddrescue.py @@ -3,39 +3,29 @@ import atexit import datetime -import json import logging -import math import os import pathlib -import plistlib -import re -import shutil import subprocess import time from random import randint -from typing import Any -import psutil import pytz from docopt import docopt -from wk import cfg, debug, exe, io, log, net, std -from wk.cfg.ddrescue import ( - DDRESCUE_MAP_TEMPLATE, - DDRESCUE_SPECIFIC_PASS_SETTINGS, - ) +from wk import cfg, exe, io, log, std +from wk.cfg.ddrescue import DDRESCUE_SPECIFIC_PASS_SETTINGS from wk.clone import menus +from wk.clone.state import State from wk.hw import disk as hw_disk from wk.hw.smart import ( check_attributes, - enable_smart, smart_status_ok, update_smart_details, ) -from wk.ui import ansi, cli, tmux, tui +from wk.ui import ansi, cli # STATIC VARIABLES @@ -59,1114 +49,13 @@ The method used is not 100% reliable and may cause issues. If you see any script errors or crashes after running this option then please restart the computer and try again. ''' -CLONE_SETTINGS = { - 'Source': None, - 'Destination': None, - 'Create Boot Partition': False, - 'First Run': True, - 'Needs Format': False, - 'Table Type': None, - 'Partition Mapping': [ - # (5, 1) ## Clone source partition #5 to destination partition #1 - ], - } -DDRESCUE_LOG_REGEX = re.compile( - r'^\s*(?P\S+):\s+' - r'(?P\d+)\s+' - r'(?P[PTGMKB]i?B?)' - r'.*\(\s*(?P\d+\.?\d*)%\)$', - re.IGNORECASE, - ) DDRESCUE_OUTPUT_HEIGHT = 14 INITIAL_SKIP_MIN = 64 * 1024 # This is ddrescue's minimum accepted value -REGEX_REMAINING_TIME = re.compile( - r'remaining time:' - r'\s*((?P\d+)d)?' - r'\s*((?P\d+)h)?' - r'\s*((?P\d+)m)?' - r'\s*((?P\d+)s)?' - r'\s*(?Pn/a)?', - re.IGNORECASE - ) PLATFORM = std.PLATFORM -RECOMMENDED_FSTYPES = re.compile(r'^(ext[234]|ntfs|xfs)$') -if PLATFORM == 'Darwin': - RECOMMENDED_FSTYPES = re.compile(r'^(apfs|hfs.?)$') -RECOMMENDED_MAP_FSTYPES = re.compile( - r'^(apfs|cifs|ext[234]|hfs.?|ntfs|smbfs|vfat|xfs)$' - ) -STATUS_COLORS = { - 'Passed': 'GREEN', - 'Aborted': 'YELLOW', - 'Skipped': 'YELLOW', - 'Working': 'YELLOW', - 'ERROR': 'RED', - } TIMEZONE = pytz.timezone(cfg.main.LINUX_TIME_ZONE) -# Classes -class BlockPair(): - """Object for tracking source to dest recovery data.""" - def __init__( - self, - source_dev: hw_disk.Disk, - destination: pathlib.Path, - working_dir: pathlib.Path, - ): - self.sector_size: int = source_dev.phy_sec - self.source: pathlib.Path = pathlib.Path(source_dev.path) - self.destination: pathlib.Path = destination - self.map_data: dict[str, bool | int] = {} - self.map_path: pathlib.Path = pathlib.Path() - self.size: int = source_dev.size - self.status: dict[str, float | int | str] = { - 'read-skip': 'Pending', - 'read-full': 'Pending', - 'trim': 'Pending', - 'scrape': 'Pending', - } - self.test_map: pathlib.Path | None = None - self.view_map: bool = 'DISPLAY' in os.environ or 'WAYLAND_DISPLAY' in os.environ - self.view_proc: subprocess.Popen | None = None - - # Set map path - # e.g. '(Clone|Image)_Model[_p#]_Size[_Label].map' - map_name = source_dev.model - if source_dev.bus == 'Image': - map_name = 'Image' - if source_dev.parent: - part_num = re.sub(r"^.*?(\d+)$", r"\1", self.source.name) - map_name += f'_p{part_num}' - size_str = std.bytes_to_string( - size=self.size, - use_binary=False, - ) - map_name += f'_{size_str.replace(" ", "")}' - if source_dev.raw_details.get('label', ''): - map_name += f'_{source_dev.raw_details["label"]}' - map_name = map_name.replace(' ', '_') - map_name = map_name.replace('/', '_') - map_name = map_name.replace('\\', '_') - if destination.is_dir(): - # Imaging - self.map_path = pathlib.Path(f'{destination}/Image_{map_name}.map') - self.destination = self.map_path.with_suffix('.dd') - self.destination.touch() - else: - # Cloning - self.map_path = pathlib.Path(f'{working_dir}/Clone_{map_name}.map') - - # Create map file if needed - # NOTE: We need to set the domain size for --complete-only to work - if not self.map_path.exists(): - self.map_path.write_text( - data=DDRESCUE_MAP_TEMPLATE.format( - name=cfg.main.KIT_NAME_FULL, - size=self.size, - ), - encoding='utf-8', - ) - - # Set initial status - self.set_initial_status() - - def __getstate__(self): - """Override to allow pickling ddrescue.State() objects.""" - bp_state = self.__dict__.copy() - del bp_state['view_proc'] - return bp_state - - def get_error_size(self) -> int: - """Get error size in bytes, returns int.""" - return self.size - self.get_rescued_size() - - def get_percent_recovered(self) -> float: - """Get percent rescued from map_data, returns float.""" - return 100 * self.map_data.get('rescued', 0) / self.size - - def get_rescued_size(self) -> int: - """Get rescued size using map data. - - NOTE: Returns 0 if no map data is available. - """ - self.load_map_data() - return self.map_data.get('rescued', 0) - - def load_map_data(self) -> None: - """Load map data from file. - - NOTE: If the file is missing it is assumed that recovery hasn't - started yet so default values will be returned instead. - """ - data: dict[str, bool | int] = {'full recovery': False, 'pass completed': False} - - # Get output from ddrescuelog - cmd = [ - 'ddrescuelog', - '--binary-prefixes', - '--show-status', - f'--size={self.size}', - self.map_path, - ] - proc = exe.run_program(cmd, check=False) - - # Parse output - for line in proc.stdout.splitlines(): - _r = DDRESCUE_LOG_REGEX.search(line) - if _r: - if _r.group('key') == 'rescued' and _r.group('percent') == '100': - # Fix rounding errors from ddrescuelog output - data['rescued'] = self.size - else: - data[_r.group('key')] = std.string_to_bytes( - f'{_r.group("size")} {_r.group("unit")}', - ) - data['pass completed'] = 'current status: finished' in line.lower() - - # Check if 100% done (only if map is present and non-zero size - # NOTE: ddrescuelog returns 0 (i.e. 100% done) for empty files - if self.map_path.exists() and self.map_path.stat().st_size != 0: - cmd = [ - 'ddrescuelog', - '--done-status', - f'--size={self.size}', - self.map_path, - ] - proc = exe.run_program(cmd, check=False) - data['full recovery'] = proc.returncode == 0 - - # Done - self.map_data.update(data) - - def pass_complete(self, pass_name) -> bool: - """Check if pass_name is complete based on map data, returns bool.""" - pending_size = self.map_data['non-tried'] - - # Full recovery - if self.map_data.get('full recovery', False): - return True - - # New recovery - if 'non-tried' not in self.map_data: - return False - - # Initial read skip pass - if pass_name == 'read-skip': - pass_threshold = cfg.ddrescue.AUTO_PASS_THRESHOLDS[pass_name] - if self.get_percent_recovered() >= pass_threshold: - return True - - # Recovery in progress - if pass_name in ('trim', 'scrape'): - pending_size += self.map_data['non-trimmed'] - if pass_name == 'scrape': - pending_size += self.map_data['non-scraped'] - if pending_size == 0: - # This is true when the previous and current passes are complete - return True - - # This should never be reached - return False - - def safety_check(self) -> None: - """Run safety check and abort if necessary.""" - # TODO: Expand section to support non-Linux systems - dest_size = -1 - if self.destination.is_block_device(): - cmd = [ - 'lsblk', '--bytes', '--json', - '--nodeps', '--noheadings', '--output=size', - self.destination, - ] - json_data = exe.get_json_from_command(cmd) - dest_size = json_data['blockdevices'][0]['size'] - del json_data - - # Check destination size if cloning - if not self.destination.is_file() and dest_size < self.size: - cli.print_error(f'Invalid destination: {self.destination}') - raise std.GenericAbort() - - def set_initial_status(self) -> None: - """Read map data and set initial statuses.""" - self.load_map_data() - percent = self.get_percent_recovered() - for name in self.status: - if self.pass_complete(name): - self.status[name] = percent - else: - # Stop checking - if percent > 0: - self.status[name] = percent - break - - def skip_pass(self, pass_name) -> None: - """Mark pass as skipped if applicable.""" - if self.status[pass_name] == 'Pending': - self.status[pass_name] = 'Skipped' - - def update_progress(self, pass_name) -> None: - """Update progress via map data.""" - self.load_map_data() - - # Update status - percent = self.get_percent_recovered() - if percent > 0: - self.status[pass_name] = percent - - # Mark future passes as skipped if applicable - if percent == 100: - status_keys = list(self.status.keys()) - for pass_n in status_keys[status_keys.index(pass_name)+1:]: - self.status[pass_n] = 'Skipped' - - -class State(): - """Object for tracking hardware diagnostic data.""" - def __init__(self): - self.block_pairs: list[BlockPair] = [] - self.destination: hw_disk.Disk | pathlib.Path = pathlib.Path('/dev/null') - self.log_dir: pathlib.Path = log.format_log_path() - self.log_dir = self.log_dir.parent.joinpath( - f'ddrescue-TUI_{time.strftime("%Y-%m-%d_%H%M%S%z")}/', - ) - self.progress_out: pathlib.Path = self.log_dir.joinpath('progress.out') - self.mode: str = '?' - self.source: hw_disk.Disk | None = None - self.working_dir: pathlib.Path | None = None - self.ui: tui.TUI = tui.TUI('Source') - - def _add_block_pair(self, source: hw_disk.Disk, destination: pathlib.Path) -> None: - """Add BlockPair object and run safety checks.""" - self.block_pairs.append( - BlockPair( - source_dev=source, - destination=destination, - working_dir=self.working_dir, - )) - - def _get_clone_settings_path(self) -> pathlib.Path: - """get Clone settings file path, returns pathlib.Path obj.""" - description = self.source.model - if not description: - description = self.source.path.name - return pathlib.Path(f'{self.working_dir}/Clone_{description}.json') - - def _load_settings(self, discard_unused_settings: bool = False) -> dict[Any, Any]: - """Load settings from previous run, returns dict.""" - settings = {} - settings_file = self._get_clone_settings_path() - - # Try loading JSON data - if settings_file.exists(): - with open(settings_file, 'r', encoding='utf-8') as _f: - try: - settings = json.loads(_f.read()) - except (OSError, json.JSONDecodeError) as err: - LOG.error('Failed to load clone settings') - cli.print_error('Invalid clone settings detected.') - raise std.GenericAbort() from err - - # Check settings - if settings: - if settings['First Run'] and discard_unused_settings: - # Previous run aborted before starting recovery, discard settings - settings = {} - else: - bail = False - for key in ('model', 'serial'): - if settings['Source'][key] != getattr(self.source, key): - cli.print_error(f"Clone settings don't match source {key}") - bail = True - if settings['Destination'][key] != getattr(self.destination, key): - cli.print_error(f"Clone settings don't match destination {key}") - bail = True - if bail: - raise std.GenericAbort() - - # Update settings - if not settings: - settings = CLONE_SETTINGS.copy() - if not settings['Source']: - settings['Source'] = { - 'model': self.source.model, - 'serial': self.source.serial, - } - if not settings['Destination']: - settings['Destination'] = { - 'model': self.destination.model, - 'serial': self.destination.serial, - } - - # Done - return settings - - def _save_settings(self, settings: dict[Any, Any]) -> None: - """Save settings for future runs.""" - settings_file = self._get_clone_settings_path() - - # Try saving JSON data - try: - with open(settings_file, 'w', encoding='utf-8') as _f: - json.dump(settings, _f) - except OSError as err: - cli.print_error('Failed to save clone settings') - raise std.GenericAbort() from err - - def add_clone_block_pairs(self) -> list[hw_disk.Disk]: - """Add device to device block pairs and set settings if necessary.""" - source_sep = get_partition_separator(self.source.path.name) - dest_sep = get_partition_separator(self.destination.path.name) - settings = {} - source_parts = [] - - # Clone settings - settings = self._load_settings(discard_unused_settings=True) - - # Add pairs - if settings['Partition Mapping']: - # Resume previous run, load pairs from settings file - for part_map in settings['Partition Mapping']: - bp_source = hw_disk.Disk( - f'{self.source.path}{source_sep}{part_map[0]}', - ) - bp_dest = pathlib.Path( - f'{self.destination.path}{dest_sep}{part_map[1]}', - ) - self._add_block_pair(bp_source, bp_dest) - else: - source_parts = menus.select_disk_parts('Clone', self.source) - if self.source.path.samefile(source_parts[0].path): - # Whole disk (or single partition via args), skip settings - bp_dest = self.destination.path - self._add_block_pair(self.source, bp_dest) - else: - # New run, use new settings file - settings['Needs Format'] = True - offset = 0 - user_choice = cli.choice( - 'Format clone using GPT, MBR, or match Source type?', - ['G', 'M', 'S'], - ) - if user_choice == 'G': - settings['Table Type'] = 'GPT' - elif user_choice == 'M': - settings['Table Type'] = 'MBR' - else: - # Match source type - settings['Table Type'] = get_table_type(self.source.path) - if cli.ask('Create an empty Windows boot partition on the clone?'): - settings['Create Boot Partition'] = True - offset = 2 if settings['Table Type'] == 'GPT' else 1 - - # Add pairs - for dest_num, part in enumerate(source_parts): - dest_num += offset + 1 - bp_dest = pathlib.Path( - f'{self.destination.path}{dest_sep}{dest_num}', - ) - self._add_block_pair(part, bp_dest) - - # Add to settings file - source_num = re.sub(r'^.*?(\d+)$', r'\1', part.path.name) - settings['Partition Mapping'].append([source_num, dest_num]) - - # Save settings - self._save_settings(settings) - - # Done - return source_parts - - def add_image_block_pairs(self, source_parts: list[hw_disk.Disk]) -> None: - """Add device to image file block pairs.""" - for part in source_parts: - self._add_block_pair(part, self.destination) - - def confirm_selections( - self, - prompt_msg: str, - source_parts: list[hw_disk.Disk], - ) -> None: - """Show selection details and prompt for confirmation.""" - report = [] - - # Source - report.append(ansi.color_string('Source', 'GREEN')) - report.extend(build_object_report(self.source)) - report.append(' ') - - # Destination - report.append(ansi.color_string('Destination', 'GREEN')) - if self.mode == 'Clone': - report[-1] += ansi.color_string(' (ALL DATA WILL BE DELETED)', 'RED') - report.extend(build_object_report(self.destination)) - report.append(' ') - - # Show deletion warning if necessary - # NOTE: The check for block_pairs is to limit this section - # to the second confirmation - if self.mode == 'Clone' and self.block_pairs: - report.append(ansi.color_string('WARNING', 'YELLOW')) - report.append( - 'All data will be deleted from the destination listed above.', - ) - report.append( - ansi.color_string( - ['This is irreversible and will lead to', 'DATA LOSS.'], - ['YELLOW', 'RED'], - ), - ) - report.append(' ') - - # Block pairs - if self.block_pairs: - report.extend( - build_block_pair_report( - self.block_pairs, - self._load_settings() if self.mode == 'Clone' else {}, - ), - ) - report.append(' ') - - # Map dir - if self.working_dir: - report.append(ansi.color_string('Map Save Directory', 'GREEN')) - report.append(f'{self.working_dir}/') - report.append(' ') - if not fstype_is_ok(self.working_dir, map_dir=True): - report.append( - ansi.color_string( - 'Map file(s) are being saved to a non-recommended filesystem.', - 'YELLOW', - ), - ) - report.append( - ansi.color_string( - ['This is strongly discouraged and may lead to', 'DATA LOSS'], - [None, 'RED'], - ), - ) - report.append(' ') - - # Source part(s) selected - if source_parts: - report.append(ansi.color_string('Source Part(s) selected', 'GREEN')) - if self.source.path.samefile(source_parts[0].path): - report.append('Whole Disk') - else: - report.append(ansi.color_string(f'{"NAME":<9} SIZE', 'BLUE')) - for part in source_parts: - report.append( - f'{part.path.name:<9} ' - f'{std.bytes_to_string(part.size, use_binary=False)}' - ) - report.append(' ') - - # Prompt user - cli.clear_screen() - cli.print_report(report) - if not cli.ask(prompt_msg): - raise std.GenericAbort() - - def generate_report(self) -> list[str]: - """Generate report of overall and per block_pair results, returns list.""" - report = [] - - # Header - report.append(f'{self.mode.title()} Results:') - report.append(' ') - report.append(f'Source: {self.source.description}') - if self.mode == 'Clone': - report.append(f'Destination: {self.destination.description}') - else: - report.append(f'Destination: {self.destination}/') - - # Overall - report.append(' ') - error_size = self.get_error_size() - error_size_str = std.bytes_to_string(error_size, decimals=2) - if error_size > 0: - error_size_str = ansi.color_string(error_size_str, 'YELLOW') - percent = self.get_percent_recovered() - percent = format_status_string(percent, width=0) - report.append(f'Overall rescued: {percent}, error size: {error_size_str}') - - # Block-Pairs - if len(self.block_pairs) > 1: - report.append(' ') - for pair in self.block_pairs: - error_size = pair.get_error_size() - error_size_str = std.bytes_to_string(error_size, decimals=2) - if error_size > 0: - error_size_str = ansi.color_string(error_size_str, 'YELLOW') - pair_size = std.bytes_to_string(pair.size, decimals=2) - percent = pair.get_percent_recovered() - percent = format_status_string(percent, width=0) - report.append( - f'{pair.source.name} ({pair_size}) ' - f'rescued: {percent}, ' - f'error size: {error_size_str}' - ) - - # Done - return report - - def get_error_size(self) -> int: - """Get total error size from block_pairs in bytes, returns int.""" - return self.get_total_size() - self.get_rescued_size() - - def get_percent_recovered(self) -> float: - """Get total percent rescued from block_pairs, returns float.""" - return 100 * self.get_rescued_size() / self.get_total_size() - - def get_rescued_size(self) -> int: - """Get total rescued size from all block pairs, returns int.""" - return sum(pair.get_rescued_size() for pair in self.block_pairs) - - def get_total_size(self) -> int: - """Get total size of all block_pairs in bytes, returns int.""" - return sum(pair.size for pair in self.block_pairs) - - def init_recovery(self, docopt_args: dict[str, Any]) -> None: - """Select source/dest and set env.""" - cli.clear_screen() - disk_menu = menus.disks() - source_parts = [] - self.ui.set_progress_file(str(self.progress_out)) - - # Set mode - self.mode = set_mode(docopt_args) - - # Select source - self.source = select_disk_obj('source', disk_menu, docopt_args['']) - self.update_top_panes() - if self.source.trim: - cli.print_warning('Source device supports TRIM') - if not cli.ask(' Proceed with recovery?'): - cli.abort() - self.ui.set_title('Source', self.source.name) - - # Select destination - if self.mode == 'Clone': - self.destination = select_disk_obj( - 'destination', - disk_menu, - docopt_args[''], - ) - self.ui.add_title_pane('Destination', self.destination.name) - elif self.mode == 'Image': - if docopt_args['']: - self.destination = pathlib.Path(docopt_args['']).resolve() - else: - self.destination = menus.select_path('Destination') - self.ui.add_title_pane('Destination', self.destination) - self.update_top_panes() - - # Update details - self.source.update_details(skip_children=False) - if self.mode == 'Clone': - self.destination.update_details(skip_children=False) - - # Confirmation #1 - self.confirm_selections( - prompt_msg='Are these selections correct?', - source_parts=source_parts, - ) - - # Update panes - self.update_progress_pane('Idle') - - # Set working dir - self.working_dir = get_working_dir( - self.mode, - self.destination, - force_local=docopt_args['--force-local-map'], - ) - - # Start fresh if requested - if docopt_args['--start-fresh']: - clean_working_dir(self.working_dir) - - # Add block pairs - if self.mode == 'Clone': - source_parts = self.add_clone_block_pairs() - else: - source_parts = menus.select_disk_parts(self.mode, self.source) - self.add_image_block_pairs(source_parts) - - # Update SMART data - ## TODO: Verify if needed - for dev in (self.source, self.destination): - if not isinstance(dev, hw_disk.Disk): - continue - enable_smart(dev) - update_smart_details(dev) - - # Safety Checks #1 - if self.mode == 'Clone': - self.safety_check_destination() - self.safety_check_size() - - # Confirmation #2 - self.update_progress_pane('Idle') - self.confirm_selections('Start recovery?', source_parts) - - # Unmount source and/or destination under macOS - if PLATFORM == 'Darwin': - for dev in (self.source, self.destination): - if not isinstance(dev, hw_disk.Disk): - continue - cmd = ['diskutil', 'unmountDisk', dev.path] - try: - exe.run_program(cmd) - except subprocess.CalledProcessError: - cli.print_error('Failed to unmount source and/or destination') - cli.abort() - - # Prep destination - if self.mode == 'Clone': - self.prep_destination(source_parts, dry_run=docopt_args['--dry-run']) - - # Safety Checks #2 - if not docopt_args['--dry-run']: - for pair in self.block_pairs: - pair.safety_check() - - def mark_started(self) -> None: - """Edit clone settings, if applicable, to mark recovery as started.""" - # Skip if not cloning - if self.mode != 'Clone': - return - - # Skip if not using settings - # i.e. Cloning whole disk (or single partition via args) - if self.source.path.samefile(self.block_pairs[0].source): - return - - # Update settings - settings = self._load_settings() - if settings.get('First Run', False): - settings['First Run'] = False - self._save_settings(settings) - - def pass_above_threshold(self, pass_name: str) -> bool: - """Check if all block_pairs meet the pass threshold, returns bool.""" - threshold = cfg.ddrescue.AUTO_PASS_THRESHOLDS[pass_name] - return all( - p.get_percent_recovered() >= threshold for p in self.block_pairs - ) - - def pass_complete(self, pass_name: str) -> bool: - """Check if all block_pairs completed pass_name, returns bool.""" - return all(p.pass_complete(pass_name) for p in self.block_pairs) - - def prep_destination( - self, - source_parts: list[hw_disk.Disk], - dry_run: bool = True, - ) -> None: - """Prep destination as necessary.""" - # TODO: Split into Linux and macOS - # logical sector size is not easily found under macOS - # It might be easier to rewrite this section using macOS tools - dest_prefix = str(self.destination.path) - dest_prefix += get_partition_separator(self.destination.path.name) - esp_type = 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B' - msr_type = 'E3C9E316-0B5C-4DB8-817D-F92DF00215AE' - part_num = 0 - sfdisk_script = [] - settings = self._load_settings() - - # Bail early - if not settings['Needs Format']: - return - - # Add partition table settings - if settings['Table Type'] == 'GPT': - sfdisk_script.append('label: gpt') - else: - sfdisk_script.append('label: dos') - sfdisk_script.append('unit: sectors') - sfdisk_script.append('') - - # Add boot partition if requested - if settings['Create Boot Partition']: - if settings['Table Type'] == 'GPT': - part_num += 1 - sfdisk_script.append( - build_sfdisk_partition_line( - table_type='GPT', - dev_path=f'{dest_prefix}{part_num}', - size='260MiB', - details={'parttype': esp_type, 'partlabel': 'EFI System'}, - ), - ) - part_num += 1 - sfdisk_script.append( - build_sfdisk_partition_line( - table_type=settings['Table Type'], - dev_path=f'{dest_prefix}{part_num}', - size='16MiB', - details={'parttype': msr_type, 'partlabel': 'Microsoft Reserved'}, - ), - ) - elif settings['Table Type'] == 'MBR': - part_num += 1 - sfdisk_script.append( - build_sfdisk_partition_line( - table_type='MBR', - dev_path=f'{dest_prefix}{part_num}', - size='100MiB', - details={'parttype': '0x7', 'partlabel': 'System Reserved'}, - ), - ) - - # Add selected partition(s) - for part in source_parts: - num_sectors = part.size / self.destination.log_sec - num_sectors = math.ceil(num_sectors) - part_num += 1 - sfdisk_script.append( - build_sfdisk_partition_line( - table_type=settings['Table Type'], - dev_path=f'{dest_prefix}{part_num}', - size=num_sectors, - details=part.raw_details, - ), - ) - - # Save sfdisk script - script_path = ( - f'{self.working_dir}/' - f'sfdisk_{self.destination.path.name}.script' - ) - with open(script_path, 'w', encoding='utf-8') as _f: - _f.write('\n'.join(sfdisk_script)) - - # Skip real format for dry runs - if dry_run: - LOG.info('Dry run, refusing to format destination') - return - - # Format disk - LOG.warning('Formatting destination: %s', self.destination.path) - with open(script_path, 'r', encoding='utf-8') as _f: - proc = exe.run_program( - cmd=['sudo', 'sfdisk', self.destination.path], - stdin=_f, - check=False, - ) - if proc.returncode != 0: - cli.print_error('Error(s) encoundtered while formatting destination') - raise std.GenericAbort() - - # Update settings - settings['Needs Format'] = False - self._save_settings(settings) - - def retry_all_passes(self) -> None: - """Prep block_pairs for a retry recovery attempt.""" - bad_statuses = ('*', '/', '-') - LOG.warning('Updating block_pairs for retry') - - # Update all block_pairs - for pair in self.block_pairs: - map_data = [] - - # Reset status strings - for name in pair.status.keys(): - pair.status[name] = 'Pending' - - # Mark all non-trimmed, non-scraped, and bad areas as non-tried - with open(pair.map_path, 'r', encoding='utf-8') as _f: - for line in _f.readlines(): - line = line.strip() - if line.startswith('0x') and line.endswith(bad_statuses): - line = f'{line[:-1]}?' - map_data.append(line) - - # Save updated map - with open(pair.map_path, 'w', encoding='utf-8') as _f: - _f.write('\n'.join(map_data)) - - # Reinitialize status - pair.set_initial_status() - - def safety_check_destination(self) -> None: - """Run safety checks for destination and abort if necessary.""" - errors_detected = False - - # Check for critical errors - if not smart_status_ok(self.destination): - cli.print_error( - f'Critical error(s) detected for: {self.destination.path}', - ) - - # Check for minor errors - if not check_attributes(self.destination, only_blocking=False): - cli.print_warning( - f'Attribute error(s) detected for: {self.destination.path}', - ) - - # Done - if errors_detected: - raise std.GenericAbort() - - def safety_check_size(self) -> None: - """Run size safety check and abort if necessary.""" - required_size = sum(pair.size for pair in self.block_pairs) - settings = self._load_settings() if self.mode == 'Clone' else {} - - # Increase required_size if necessary - if self.mode == 'Clone' and settings.get('Needs Format', False): - if settings['Table Type'] == 'GPT': - # Below is the size calculation for the GPT - # 1 LBA for the protective MBR - # 33 LBAs each for the primary and backup GPT tables - # Source: https://en.wikipedia.org/wiki/GUID_Partition_Table - required_size += (1 + 33 + 33) * self.destination.phy_sec - if settings['Create Boot Partition']: - # 260MiB EFI System Partition and a 16MiB MS Reserved partition - required_size += (260 + 16) * 1024**2 - else: - # MBR only requires one LBA but adding a full 4096 bytes anyway - required_size += 4096 - if settings['Create Boot Partition']: - # 100MiB System Reserved partition - required_size += 100 * 1024**2 - - # Reduce required_size if necessary - if self.mode == 'Image': - for pair in self.block_pairs: - if pair.destination.exists(): - # NOTE: This uses the "max space" of the destination - # i.e. not the apparent size which is smaller for sparse files - # While this can result in an out-of-space error it's better - # than nothing. - required_size -= pair.destination.stat().st_size - - # Check destination size - if self.mode == 'Clone': - destination_size = self.destination.size - error_msg = 'A larger destination disk is required' - else: - # NOTE: Adding an extra 5% here to better ensure it will fit - destination_size = psutil.disk_usage(self.destination).free - destination_size *= 1.05 - error_msg = 'Not enough free space on the destination' - if required_size > destination_size: - cli.print_error(error_msg) - raise std.GenericAbort() - - def save_debug_reports(self) -> None: - """Save debug reports to disk.""" - LOG.info('Saving debug reports') - debug_dir = pathlib.Path(f'{self.log_dir}/debug') - if not debug_dir.exists(): - debug_dir.mkdir() - - # State (self) - debug.save_pickles({'state': self}, debug_dir) - with open(f'{debug_dir}/state.report', 'a', encoding='utf-8') as _f: - _f.write('[Debug report]\n') - _f.write('\n'.join(debug.generate_object_report(self))) - _f.write('\n') - - # Block pairs - for _bp in self.block_pairs: - with open( - f'{debug_dir}/block_pairs.report', 'a', encoding='utf-8') as _f: - _f.write('[Debug report]\n') - _f.write('\n'.join(debug.generate_object_report(_bp))) - _f.write('\n') - - def skip_pass(self, pass_name: str) -> None: - """Mark block_pairs as skipped if applicable.""" - for pair in self.block_pairs: - if pair.status[pass_name] == 'Pending': - pair.status[pass_name] = 'Skipped' - - def update_progress_pane(self, overall_status: str) -> None: - """Update progress pane.""" - report = [] - separator = '─────────────────────' - width = cfg.ddrescue.TMUX_SIDE_WIDTH - - # Status - report.append(ansi.color_string(f'{"Status":^{width}}', 'BLUE')) - if 'NEEDS ATTENTION' in overall_status: - report.append( - ansi.color_string(f'{overall_status:^{width}}', 'YELLOW_BLINK'), - ) - else: - report.append(f'{overall_status:^{width}}') - report.append(separator) - - # Overall progress - if self.block_pairs: - total_rescued = self.get_rescued_size() - percent = self.get_percent_recovered() - report.append(ansi.color_string('Overall Progress', 'BLUE')) - report.append( - f'Rescued: {format_status_string(percent, width=width-9)}', - ) - report.append( - ansi.color_string( - [f'{std.bytes_to_string(total_rescued, decimals=2):>{width}}'], - [get_percent_color(percent)], - ), - ) - report.append(separator) - - # Block pair progress - for pair in self.block_pairs: - report.append(ansi.color_string(pair.source, 'BLUE')) - for name, status in pair.status.items(): - name = name.title() - report.append( - f'{name}{format_status_string(status, width=width-len(name))}', - ) - report.append(' ') - - # EToC - if overall_status in ('Active', 'NEEDS ATTENTION'): - etoc = get_etoc() - report.append(separator) - report.append(ansi.color_string('Estimated Pass Finish', 'BLUE')) - if overall_status == 'NEEDS ATTENTION' or etoc == 'N/A': - report.append(ansi.color_string('N/A', 'YELLOW')) - else: - report.append(etoc) - - # Write to progress file - self.progress_out.write_text('\n'.join(report), encoding='utf-8', errors='ignore') - - def update_top_panes(self) -> None: - """(Re)create top source/destination panes.""" - source_exists = True - source_str = '' - dest_exists = True - dest_str = '' - width = tmux.get_pane_size()[0] - width = int(width / 2) - 1 - - def _format_string(obj, width) -> str: - """Format source/dest string using obj and width, returns str.""" - string = '' - - # Build base string - if isinstance(obj, hw_disk.Disk): - string = f'{obj.path} {obj.description}' - elif obj.is_dir(): - string = f'{obj}/' - elif obj.is_file(): - size_str = std.bytes_to_string( - obj.stat().st_size, - decimals=0, - use_binary=False) - string = f'{obj.name} {size_str}' - - # Adjust for width - if len(string) > width: - if hasattr(obj, 'is_dir') and obj.is_dir(): - string = f'...{string[-width+3:]}' - else: - string = f'{string[:width-3]}...' - - # Done - return string - - # Check source/dest existance - if self.source: - source_exists = self.source.path.exists() - if self.destination: - if isinstance(self.destination, hw_disk.Disk): - dest_exists = self.destination.path.exists() - else: - dest_exists = self.destination.exists() - - # Source - if self.source: - source_str = _format_string(self.source, width) - - # Destination - if self.destination: - dest_str = _format_string(self.destination, width) - - # Reset title panes - self.ui.reset_title_pane( - ansi.color_string( - ['Source', '' if source_exists else ' (Missing)'], - ['BLUE', 'RED'], - ), - source_str, - ) - if dest_str: - self.ui.add_title_pane( - ansi.color_string( - ['Destination', '' if dest_exists else ' (Missing)'], - ['BLUE', 'RED'], - ), - dest_str, - ) - - # Functions -def build_block_pair_report(block_pairs, settings) -> list: - """Build block pair report, returns list.""" - report = [] - notes = [] - if block_pairs: - report.append(ansi.color_string('Block Pairs', 'GREEN')) - else: - # Bail early - return report - - # Show block pair mapping - if settings and settings['Create Boot Partition']: - if settings['Table Type'] == 'GPT': - report.append(f'{" —— ":<9} --> EFI System Partition') - report.append(f'{" —— ":<9} --> Microsoft Reserved Partition') - elif settings['Table Type'] == 'MBR': - report.append(f'{" —— ":<9} --> System Reserved') - for pair in block_pairs: - report.append(f'{pair.source.name:<9} --> {pair.destination.name}') - - # Show resume messages as necessary - if settings: - if not settings['First Run']: - notes.append( - ansi.color_string( - ['NOTE:', 'Clone settings loaded from previous run.'], - ['BLUE', None], - ), - ) - if settings['Needs Format'] and settings['Table Type']: - msg = f'Destination will be formatted using {settings["Table Type"]}' - notes.append( - ansi.color_string( - ['NOTE:', msg], - ['BLUE', None], - ), - ) - if any(pair.get_rescued_size() > 0 for pair in block_pairs): - notes.append( - ansi.color_string( - ['NOTE:', 'Resume data loaded from map file(s).'], - ['BLUE', None], - ), - ) - - # Add notes to report - if notes: - report.append(' ') - report.extend(notes) - - # Done - return report - - def build_ddrescue_cmd(block_pair, pass_name, settings_menu) -> list[str]: """Build ddrescue cmd using passed details, returns list.""" cmd = ['sudo', 'ddrescue'] @@ -1227,165 +116,6 @@ def build_ddrescue_cmd(block_pair, pass_name, settings_menu) -> list[str]: return cmd -def build_directory_report(path: pathlib.Path) -> list[str]: - """Build directory report, returns list.""" - path_str = f'{path}/' - report = [] - - # Get details - if PLATFORM == 'Linux': - cmd = [ - 'findmnt', - '--output', 'SIZE,AVAIL,USED,FSTYPE,OPTIONS', - '--target', path_str, - ] - proc = exe.run_program(cmd) - width = len(path_str) + 1 - for line in proc.stdout.splitlines(): - line = line.replace('\n', '') - if 'FSTYPE' in line: - line = ansi.color_string(f'{"path_str":<{width}}{line}', 'BLUE') - else: - line = f'{path_str:<{width}}{line}' - report.append(line) - else: - report.append(ansi.color_string('path_str', 'BLUE')) - report.append(str(path_str)) - - # Done - return report - - -def build_disk_report(dev: hw_disk.Disk) -> list[str]: - """Build device report, returns list.""" - report = [] - - # Get widths - widths = { - 'fstype': max(6, len(str(dev.filesystem))), - 'label': max(5, len(str(dev.raw_details.get('label', '')))), - 'name': max(4, len(dev.path.name)), - } - for child in dev.children: - widths['fstype'] = max(widths['fstype'], len(str(child['fstype']))) - widths['label'] = max(widths['label'], len(str(child['label']))) - widths['name'] = max( - widths['name'], - len(child['name'].replace('/dev/', '')), - ) - widths = {k: v+1 for k, v in widths.items()} - - # Disk details - report.append(f'{dev.path.name} {dev.description}') - report.append(' ') - dev_fstype = dev.filesystem - dev_label = dev.raw_details.get('label', '') - dev_name = dev.path.name - dev_size = std.bytes_to_string(dev.size, use_binary=False) - - # Partition details - report.append( - ansi.color_string( - ( - f'{"NAME":<{widths["name"]}}' - f'{" " if dev.children else ""}' - f'{"SIZE":<7}' - f'{"FSTYPE":<{widths["fstype"]}}' - f'{"LABEL":<{widths["label"]}}' - ), - 'BLUE', - ), - ) - report.append( - f'{dev_name if dev_name else "":<{widths["name"]}}' - f'{" " if dev.children else ""}' - f'{dev_size:>6} ' - f'{dev_fstype if dev_fstype else "":<{widths["fstype"]}}' - f'{dev_label if dev_label else "":<{widths["label"]}}' - ) - for child in dev.children: - fstype = child['fstype'] - label = child['label'] - name = child['name'].replace('/dev/', '') - size = std.bytes_to_string(child["size"], use_binary=False) - report.append( - f'{name if name else "":<{widths["name"]}}' - f'{size:>6} ' - f'{fstype if fstype else "":<{widths["fstype"]}}' - f'{label if label else "":<{widths["label"]}}' - ) - - # Indent children - if len(dev.children) > 1: - report = [ - *report[:4], - *[f'├─{line}' for line in report[4:-1]], - f'└─{report[-1]}', - ] - elif len(dev.children) == 1: - report[-1] = f'└─{report[-1]}' - - # Done - return report - - -def build_object_report(obj) -> list[str]: - """Build object report, returns list.""" - report = [] - - # Get details based on object given - if hasattr(obj, 'is_dir') and obj.is_dir(): - # Directory report - report = build_directory_report(obj) - else: - # Device report - report = build_disk_report(obj) - - # Done - return report - - -def build_sfdisk_partition_line(table_type, dev_path, size, details) -> str: - """Build sfdisk partition line using passed details, returns str.""" - line = f'{dev_path} : size={size}' - dest_type = '' - source_filesystem = str(details.get('fstype', '')).upper() - source_table_type = '' - source_type = details.get('parttype', '') - - # Set dest type - if re.match(r'^0x\w+$', source_type): - # Both source and dest are MBR - source_table_type = 'MBR' - if table_type == 'MBR': - dest_type = source_type.replace('0x', '').lower() - elif re.match(r'^\w{8}-\w{4}-\w{4}-\w{4}-\w{12}$', source_type): - # Source is a GPT type - source_table_type = 'GPT' - if table_type == 'GPT': - dest_type = source_type.upper() - if not dest_type: - # Assuming changing table types, set based on FS - if source_filesystem in cfg.ddrescue.PARTITION_TYPES.get(table_type, {}): - dest_type = cfg.ddrescue.PARTITION_TYPES[table_type][source_filesystem] - line += f', type={dest_type}' - - # Safety Check - if not dest_type: - cli.print_error(f'Failed to determine partition type for: {dev_path}') - raise std.GenericAbort() - - # Add extra details - if details.get('partlabel', ''): - line += f', name="{details["partlabel"]}"' - if details.get('partuuid', '') and source_table_type == table_type: - # Only add UUID if source/dest table types match - line += f', uuid={details["partuuid"].upper()}' - - # Done - return line - - def check_destination_health(destination) -> str: """Check destination health, returns str.""" result = '' @@ -1407,95 +137,6 @@ def check_destination_health(destination) -> str: return result -def clean_working_dir(working_dir) -> None: - """Clean working directory to ensure a fresh recovery session. - - NOTE: Data from previous sessions will be preserved - in a backup directory. - """ - backup_dir = pathlib.Path(f'{working_dir}/prev') - backup_dir = io.non_clobber_path(backup_dir) - backup_dir.mkdir() - - # Move settings, maps, etc to backup_dir - for entry in os.scandir(working_dir): - if entry.name.endswith(('.dd', '.json', '.map')): - new_path = f'{backup_dir}/{entry.name}' - new_path = io.non_clobber_path(new_path) - shutil.move(entry.path, new_path) - - -def format_status_string(status, width) -> str: - """Format colored status string, returns str.""" - color = None - percent = -1 - status_str = str(status) - - # Check if status is percentage - try: - percent = float(status_str) - except ValueError: - # Assuming status is text - pass - - # Format status - if percent >= 0: - # Percentage - color = get_percent_color(percent) - status_str = f'{percent:{width-2}.2f} %' - if '100.00' in status_str and percent < 100: - # Always round down to 99.99% - LOG.warning('Rounding down to 99.99 from %s', percent) - status_str = f'{"99.99 %":>{width}}' - else: - # Text - color = STATUS_COLORS.get(status_str, None) - status_str = f'{status_str:>{width}}' - - # Add color if necessary - if color: - status_str = ansi.color_string(status_str, color) - - # Done - return status_str - - -def fstype_is_ok(path, map_dir=False) -> bool: - """Check if filesystem type is acceptable, returns bool.""" - is_ok = False - fstype = None - - # Get fstype - if PLATFORM == 'Darwin': - # Check all parent dirs until a mountpoint is found - test_path = pathlib.Path(path) - while test_path: - fstype = get_fstype_macos(test_path) - if fstype != 'UNKNOWN': - break - fstype = None - test_path = test_path.parent - elif PLATFORM == 'Linux': - cmd = [ - 'findmnt', - '--noheadings', - '--output', 'FSTYPE', - '--target', path, - ] - proc = exe.run_program(cmd, check=False) - fstype = proc.stdout - fstype = fstype.strip().lower() - - # Check fstype - if map_dir: - is_ok = RECOMMENDED_MAP_FSTYPES.match(fstype) - else: - is_ok = RECOMMENDED_FSTYPES.match(fstype) - - # Done - return is_ok - - def generate_test_map(map_path: pathlib.Path, size: int) -> None: """Generate test map with roughly 20% of the space marked as bad.""" chunk = 2*1024**2 @@ -1536,203 +177,12 @@ def get_ddrescue_settings(settings_menu) -> list: return settings -def get_etoc() -> str: - """Get EToC from ddrescue output, returns str.""" - delta = None - delta_dict = {} - etoc = 'Unknown' - now = datetime.datetime.now(tz=TIMEZONE) - output = tmux.capture_pane() - - # Search for EToC delta - matches = re.findall(r'remaining time:.*$', output, re.MULTILINE) - if matches: - match = REGEX_REMAINING_TIME.search(matches[-1]) - if match.group('na'): - etoc = 'N/A' - else: - for key in ('days', 'hours', 'minutes', 'seconds'): - delta_dict[key] = match.group(key) - delta_dict = {k: int(v) if v else 0 for k, v in delta_dict.items()} - delta = datetime.timedelta(**delta_dict) - - # Calc EToC if delta found - if delta: - etoc_datetime = now + delta - etoc = etoc_datetime.strftime('%Y-%m-%d %H:%M %Z') - - # Done - return etoc - - def finalize_recovery(state: State, dry_run: bool = True) -> None: """Show recovery finalization options.""" zero_fill_destination(state, dry_run=dry_run) if state.mode == 'Clone': relocate_backup_gpt(state, dry_run=dry_run) -def get_fstype_macos(path) -> str: - """Get fstype for path under macOS, returns str.""" - fstype = 'UNKNOWN' - proc = exe.run_program(['mount'], check=False) - - # Bail early - if proc.returncode: - return fstype - - # Parse output - match = re.search(rf'{path} \((\w+)', proc.stdout) - if match: - fstype = match.group(1) - - # Done - return fstype - - -def select_disk_obj(label:str, disk_menu: cli.Menu, disk_path: str) -> hw_disk.Disk: - """Get disk based on path or menu selection, returns Disk.""" - if not disk_path: - return menus.select_disk(label.capitalize(), disk_menu) - - # Source was provided, parse and run safety checks - path = pathlib.Path(disk_path).resolve() - - # Bail early - if not path.exists(): - raise FileNotFoundError(f'Path provided does not exist: {path}') - - # Disk objects - if path.is_block_device() or path.is_char_device(): - obj = hw_disk.Disk(path) - - # Child/Parent check - if obj.parent: - cli.print_warning(f'"{obj.path}" is a child device') - if cli.ask(f'Use parent device "{obj.parent}" instead?'): - obj = hw_disk.Disk(obj.parent) - - # Done - return obj - - # Raw image objects - if path.is_file(): - loop_path = mount_raw_image(path) - return hw_disk.Disk(loop_path) - - # Abort if object type couldn't be determined - # NOTE: This shouldn't every be reached? - cli.print_error(f'Invalid {label} path: {disk_path}') - raise std.GenericAbort() - - -def get_partition_separator(name) -> str: - """Get partition separator based on device name, returns str.""" - separator = '' - if re.search(r'(loop|mmc|nvme)', name, re.IGNORECASE): - separator = 'p' - - return separator - - -def get_percent_color(percent) -> str: - """Get color based on percentage, returns str.""" - color = None - if percent > 100: - color = 'PURPLE' - elif percent >= 99: - color = 'GREEN' - elif percent >= 90: - color = 'YELLOW' - elif percent > 0: - color = 'RED' - - # Done - return color - - -def get_table_type(disk_path) -> str: - """Get disk partition table type, returns str. - - NOTE: If resulting table type is not GPT or MBR - then an exception is raised. - """ - disk_path = str(disk_path) - table_type = None - - # Linux - if std.PLATFORM == 'Linux': - cmd = f'lsblk --json --output=pttype --nodeps {disk_path}'.split() - json_data = exe.get_json_from_command(cmd) - table_type = json_data['blockdevices'][0].get('pttype', '').upper() - table_type = table_type.replace('DOS', 'MBR') - - # macOS - if std.PLATFORM == 'Darwin': - cmd = ['diskutil', 'list', '-plist', disk_path] - proc = exe.run_program(cmd, check=False, encoding=None, errors=None) - try: - plist_data = plistlib.loads(proc.stdout) - except (TypeError, ValueError): - # Invalid / corrupt plist data? return empty dict to avoid crash - pass - else: - disk_details = plist_data.get('AllDisksAndPartitions', [{}])[0] - table_type = disk_details['Content'] - table_type = table_type.replace('FDisk_partition_scheme', 'MBR') - table_type = table_type.replace('GUID_partition_scheme', 'GPT') - - # Check type - if table_type not in ('GPT', 'MBR'): - cli.print_error(f'Unsupported partition table type: {table_type}') - raise std.GenericAbort() - - # Done - return table_type - - -def get_working_dir(mode, destination, force_local=False) -> pathlib.Path: - """Get working directory using mode and destination, returns path.""" - ticket_id = cli.get_ticket_id() - working_dir = None - - # Use preferred path if possible - if mode == 'Image': - try: - path = pathlib.Path(destination).resolve() - except TypeError as err: - cli.print_error(f'Invalid destination: {destination}') - raise std.GenericAbort() from err - if path.exists() and fstype_is_ok(path, map_dir=False): - working_dir = path - elif mode == 'Clone' and not force_local: - cli.print_info('Mounting backup shares...') - net.mount_backup_shares(read_write=True) - for server in cfg.net.BACKUP_SERVERS: - path = pathlib.Path( - f'/{"Volumes" if PLATFORM == "Darwin" else "Backups"}/{server}', - ) - if path.exists() and fstype_is_ok(path, map_dir=True): - # Acceptable path found - working_dir = path - break - - # Default to current dir if necessary - if not working_dir: - LOG.error('Failed to set preferred working directory') - working_dir = pathlib.Path(os.getcwd()) - - # Set subdir using ticket ID - if mode == 'Clone': - working_dir = working_dir.joinpath(ticket_id) - - # Create directory - working_dir.mkdir(parents=True, exist_ok=True) - os.chdir(working_dir) - - # Done - LOG.info('Set working directory to: %s', working_dir) - return working_dir - def is_missing_source_or_destination(state) -> bool: """Check if source or destination dissapeared, returns bool.""" @@ -1764,31 +214,6 @@ def is_missing_source_or_destination(state) -> bool: return missing -def source_or_destination_changed(state) -> bool: - """Verify the source and destination objects are still valid.""" - changed = False - - # Compare objects - for obj in (state.source, state.destination): - if not obj: - changed = True - elif hasattr(obj, 'exists'): - # Assuming dest path - changed = changed or not obj.exists() - elif isinstance(obj, hw_disk.Disk): - compare_dev = hw_disk.Disk(obj.path) - for key in ('model', 'serial'): - changed = changed or getattr(obj, key) != getattr(compare_dev, key) - - # Update top panes - state.update_top_panes() - - # Done - if changed: - cli.print_error('Source and/or Destination changed') - return changed - - def main() -> None: """Main function for ddrescue TUI.""" args = docopt(DOCSTRING) @@ -1871,79 +296,6 @@ def main() -> None: LOG.info(' %s', ansi.strip_colors(line)) -def mount_raw_image(path) -> pathlib.Path: - """Mount raw image using OS specific methods, returns pathlib.Path.""" - loopback_path = None - - if PLATFORM == 'Darwin': - loopback_path = mount_raw_image_macos(path) - elif PLATFORM == 'Linux': - loopback_path = mount_raw_image_linux(path) - - # Check - if not loopback_path: - cli.print_error(f'Failed to mount image: {path}') - - # Register unmount atexit - atexit.register(unmount_loopback_device, loopback_path) - - # Done - return loopback_path - - -def mount_raw_image_linux(path) -> pathlib.Path: - """Mount raw image using losetup, returns pathlib.Path.""" - loopback_path = None - - # Mount using losetup - cmd = [ - 'sudo', - 'losetup', - '--find', - '--partscan', - '--show', - path, - ] - proc = exe.run_program(cmd, check=False) - - # Check result - if proc.returncode == 0: - loopback_path = proc.stdout.strip() - - # Done - return loopback_path - -def mount_raw_image_macos(path) -> pathlib.Path: - """Mount raw image using hdiutil, returns pathlib.Path.""" - loopback_path = None - plist_data = {} - - # Mount using hdiutil - # plistdata['system-entities'][{}...] - cmd = [ - 'hdiutil', 'attach', - '-imagekey', 'diskimage-class=CRawDiskImage', - '-nomount', - '-plist', - '-readonly', - path, - ] - proc = exe.run_program(cmd, check=False, encoding=None, errors=None) - - # Check result - try: - plist_data = plistlib.loads(proc.stdout) - except plistlib.InvalidFileException: - return None - for dev in plist_data.get('system-entities', []): - dev_path = dev.get('dev-entry', '') - if re.match(r'^/dev/disk\d+$', dev_path): - loopback_path = dev_path - - # Done - return loopback_path - - def relocate_backup_gpt(state: State, dry_run: bool = True) -> None: """Relocate backup GPT on the destination if applicable and approved.""" cmd = ['sudo', 'sfdisk', '--relocate', 'gpt-bak-std', state.destination.path] @@ -2215,40 +567,29 @@ def run_recovery(state: State, main_menu, settings_menu, dry_run=True) -> None: state.update_progress_pane('Idle') -def set_mode(docopt_args) -> str: - """Set mode from docopt_args or user selection, returns str.""" - mode = '?' +def source_or_destination_changed(state) -> bool: + """Verify the source and destination objects are still valid.""" + changed = False - # Check docopt_args - if docopt_args['clone']: - mode = 'Clone' - elif docopt_args['image']: - mode = 'Image' + # Compare objects + for obj in (state.source, state.destination): + if not obj: + changed = True + elif hasattr(obj, 'exists'): + # Assuming dest path + changed = changed or not obj.exists() + elif isinstance(obj, hw_disk.Disk): + compare_dev = hw_disk.Disk(obj.path) + for key in ('model', 'serial'): + changed = changed or getattr(obj, key) != getattr(compare_dev, key) - # Ask user if necessary - if not mode: - answer = cli.choice('Are we cloning or imaging?', ['C', 'I']) - if answer == 'C': - mode = 'Clone' - else: - mode = 'Image' + # Update top panes + state.update_top_panes() # Done - return mode - - -def unmount_loopback_device(path) -> None: - """Unmount loopback device using OS specific methods.""" - cmd = [] - - # Build OS specific cmd - if PLATFORM == 'Darwin': - cmd = ['hdiutil', 'detach', path] - elif PLATFORM == 'Linux': - cmd = ['sudo', 'losetup', '--detach', path] - - # Unmount loopback device - exe.run_program(cmd, check=False) + if changed: + cli.print_error('Source and/or Destination changed') + return changed def zero_fill_destination(state: State, dry_run: bool = True) -> None: diff --git a/scripts/wk/clone/state.py b/scripts/wk/clone/state.py new file mode 100644 index 00000000..3c229ab2 --- /dev/null +++ b/scripts/wk/clone/state.py @@ -0,0 +1,1471 @@ +"""WizardKit: ddrescue TUI - State""" +# vim: sts=2 sw=2 ts=2 + +import atexit +import datetime +import json +import logging +import math +import os +import pathlib +import plistlib +import re +import shutil +import subprocess +import time + +from typing import Any + +import psutil +import pytz + +from wk import cfg, debug, exe, io, log, net, std +from wk.clone import menus +from wk.clone.block_pair import BlockPair +from wk.hw import disk as hw_disk +from wk.hw.smart import ( + check_attributes, + enable_smart, + smart_status_ok, + update_smart_details, + ) +from wk.ui import ansi, cli, tmux, tui + + +# STATIC VARIABLES +LOG = logging.getLogger(__name__) +CLONE_SETTINGS = { + 'Source': None, + 'Destination': None, + 'Create Boot Partition': False, + 'First Run': True, + 'Needs Format': False, + 'Table Type': None, + 'Partition Mapping': [ + # (5, 1) ## Clone source partition #5 to destination partition #1 + ], + } +REGEX_REMAINING_TIME = re.compile( + r'remaining time:' + r'\s*((?P\d+)d)?' + r'\s*((?P\d+)h)?' + r'\s*((?P\d+)m)?' + r'\s*((?P\d+)s)?' + r'\s*(?Pn/a)?', + re.IGNORECASE + ) +PLATFORM = std.PLATFORM +RECOMMENDED_FSTYPES = re.compile(r'^(ext[234]|ntfs|xfs)$') +if PLATFORM == 'Darwin': + RECOMMENDED_FSTYPES = re.compile(r'^(apfs|hfs.?)$') +RECOMMENDED_MAP_FSTYPES = re.compile( + r'^(apfs|cifs|ext[234]|hfs.?|ntfs|smbfs|vfat|xfs)$' + ) +STATUS_COLORS = { + 'Passed': 'GREEN', + 'Aborted': 'YELLOW', + 'Skipped': 'YELLOW', + 'Working': 'YELLOW', + 'ERROR': 'RED', + } +TIMEZONE = pytz.timezone(cfg.main.LINUX_TIME_ZONE) + + +# Classes +class State(): + """Object for tracking hardware diagnostic data.""" + def __init__(self): + self.block_pairs: list[BlockPair] = [] + self.destination: hw_disk.Disk | pathlib.Path = pathlib.Path('/dev/null') + self.log_dir: pathlib.Path = log.format_log_path() + self.log_dir = self.log_dir.parent.joinpath( + f'ddrescue-TUI_{time.strftime("%Y-%m-%d_%H%M%S%z")}/', + ) + self.progress_out: pathlib.Path = self.log_dir.joinpath('progress.out') + self.mode: str = '?' + self.source: hw_disk.Disk | None = None + self.working_dir: pathlib.Path | None = None + self.ui: tui.TUI = tui.TUI('Source') + + def _add_block_pair(self, source: hw_disk.Disk, destination: pathlib.Path) -> None: + """Add BlockPair object and run safety checks.""" + self.block_pairs.append( + BlockPair( + source_dev=source, + destination=destination, + working_dir=self.working_dir, + )) + + def _get_clone_settings_path(self) -> pathlib.Path: + """get Clone settings file path, returns pathlib.Path obj.""" + description = self.source.model + if not description: + description = self.source.path.name + return pathlib.Path(f'{self.working_dir}/Clone_{description}.json') + + def _load_settings(self, discard_unused_settings: bool = False) -> dict[Any, Any]: + """Load settings from previous run, returns dict.""" + settings = {} + settings_file = self._get_clone_settings_path() + + # Try loading JSON data + if settings_file.exists(): + with open(settings_file, 'r', encoding='utf-8') as _f: + try: + settings = json.loads(_f.read()) + except (OSError, json.JSONDecodeError) as err: + LOG.error('Failed to load clone settings') + cli.print_error('Invalid clone settings detected.') + raise std.GenericAbort() from err + + # Check settings + if settings: + if settings['First Run'] and discard_unused_settings: + # Previous run aborted before starting recovery, discard settings + settings = {} + else: + bail = False + for key in ('model', 'serial'): + if settings['Source'][key] != getattr(self.source, key): + cli.print_error(f"Clone settings don't match source {key}") + bail = True + if settings['Destination'][key] != getattr(self.destination, key): + cli.print_error(f"Clone settings don't match destination {key}") + bail = True + if bail: + raise std.GenericAbort() + + # Update settings + if not settings: + settings = CLONE_SETTINGS.copy() + if not settings['Source']: + settings['Source'] = { + 'model': self.source.model, + 'serial': self.source.serial, + } + if not settings['Destination']: + settings['Destination'] = { + 'model': self.destination.model, + 'serial': self.destination.serial, + } + + # Done + return settings + + def _save_settings(self, settings: dict[Any, Any]) -> None: + """Save settings for future runs.""" + settings_file = self._get_clone_settings_path() + + # Try saving JSON data + try: + with open(settings_file, 'w', encoding='utf-8') as _f: + json.dump(settings, _f) + except OSError as err: + cli.print_error('Failed to save clone settings') + raise std.GenericAbort() from err + + def add_clone_block_pairs(self) -> list[hw_disk.Disk]: + """Add device to device block pairs and set settings if necessary.""" + source_sep = get_partition_separator(self.source.path.name) + dest_sep = get_partition_separator(self.destination.path.name) + settings = {} + source_parts = [] + + # Clone settings + settings = self._load_settings(discard_unused_settings=True) + + # Add pairs + if settings['Partition Mapping']: + # Resume previous run, load pairs from settings file + for part_map in settings['Partition Mapping']: + bp_source = hw_disk.Disk( + f'{self.source.path}{source_sep}{part_map[0]}', + ) + bp_dest = pathlib.Path( + f'{self.destination.path}{dest_sep}{part_map[1]}', + ) + self._add_block_pair(bp_source, bp_dest) + else: + source_parts = menus.select_disk_parts('Clone', self.source) + if self.source.path.samefile(source_parts[0].path): + # Whole disk (or single partition via args), skip settings + bp_dest = self.destination.path + self._add_block_pair(self.source, bp_dest) + else: + # New run, use new settings file + settings['Needs Format'] = True + offset = 0 + user_choice = cli.choice( + 'Format clone using GPT, MBR, or match Source type?', + ['G', 'M', 'S'], + ) + if user_choice == 'G': + settings['Table Type'] = 'GPT' + elif user_choice == 'M': + settings['Table Type'] = 'MBR' + else: + # Match source type + settings['Table Type'] = get_table_type(self.source.path) + if cli.ask('Create an empty Windows boot partition on the clone?'): + settings['Create Boot Partition'] = True + offset = 2 if settings['Table Type'] == 'GPT' else 1 + + # Add pairs + for dest_num, part in enumerate(source_parts): + dest_num += offset + 1 + bp_dest = pathlib.Path( + f'{self.destination.path}{dest_sep}{dest_num}', + ) + self._add_block_pair(part, bp_dest) + + # Add to settings file + source_num = re.sub(r'^.*?(\d+)$', r'\1', part.path.name) + settings['Partition Mapping'].append([source_num, dest_num]) + + # Save settings + self._save_settings(settings) + + # Done + return source_parts + + def add_image_block_pairs(self, source_parts: list[hw_disk.Disk]) -> None: + """Add device to image file block pairs.""" + for part in source_parts: + self._add_block_pair(part, self.destination) + + def confirm_selections( + self, + prompt_msg: str, + source_parts: list[hw_disk.Disk], + ) -> None: + """Show selection details and prompt for confirmation.""" + report = [] + + # Source + report.append(ansi.color_string('Source', 'GREEN')) + report.extend(build_object_report(self.source)) + report.append(' ') + + # Destination + report.append(ansi.color_string('Destination', 'GREEN')) + if self.mode == 'Clone': + report[-1] += ansi.color_string(' (ALL DATA WILL BE DELETED)', 'RED') + report.extend(build_object_report(self.destination)) + report.append(' ') + + # Show deletion warning if necessary + # NOTE: The check for block_pairs is to limit this section + # to the second confirmation + if self.mode == 'Clone' and self.block_pairs: + report.append(ansi.color_string('WARNING', 'YELLOW')) + report.append( + 'All data will be deleted from the destination listed above.', + ) + report.append( + ansi.color_string( + ['This is irreversible and will lead to', 'DATA LOSS.'], + ['YELLOW', 'RED'], + ), + ) + report.append(' ') + + # Block pairs + if self.block_pairs: + report.extend( + build_block_pair_report( + self.block_pairs, + self._load_settings() if self.mode == 'Clone' else {}, + ), + ) + report.append(' ') + + # Map dir + if self.working_dir: + report.append(ansi.color_string('Map Save Directory', 'GREEN')) + report.append(f'{self.working_dir}/') + report.append(' ') + if not fstype_is_ok(self.working_dir, map_dir=True): + report.append( + ansi.color_string( + 'Map file(s) are being saved to a non-recommended filesystem.', + 'YELLOW', + ), + ) + report.append( + ansi.color_string( + ['This is strongly discouraged and may lead to', 'DATA LOSS'], + [None, 'RED'], + ), + ) + report.append(' ') + + # Source part(s) selected + if source_parts: + report.append(ansi.color_string('Source Part(s) selected', 'GREEN')) + if self.source.path.samefile(source_parts[0].path): + report.append('Whole Disk') + else: + report.append(ansi.color_string(f'{"NAME":<9} SIZE', 'BLUE')) + for part in source_parts: + report.append( + f'{part.path.name:<9} ' + f'{std.bytes_to_string(part.size, use_binary=False)}' + ) + report.append(' ') + + # Prompt user + cli.clear_screen() + cli.print_report(report) + if not cli.ask(prompt_msg): + raise std.GenericAbort() + + def generate_report(self) -> list[str]: + """Generate report of overall and per block_pair results, returns list.""" + report = [] + + # Header + report.append(f'{self.mode.title()} Results:') + report.append(' ') + report.append(f'Source: {self.source.description}') + if self.mode == 'Clone': + report.append(f'Destination: {self.destination.description}') + else: + report.append(f'Destination: {self.destination}/') + + # Overall + report.append(' ') + error_size = self.get_error_size() + error_size_str = std.bytes_to_string(error_size, decimals=2) + if error_size > 0: + error_size_str = ansi.color_string(error_size_str, 'YELLOW') + percent = self.get_percent_recovered() + percent = format_status_string(percent, width=0) + report.append(f'Overall rescued: {percent}, error size: {error_size_str}') + + # Block-Pairs + if len(self.block_pairs) > 1: + report.append(' ') + for pair in self.block_pairs: + error_size = pair.get_error_size() + error_size_str = std.bytes_to_string(error_size, decimals=2) + if error_size > 0: + error_size_str = ansi.color_string(error_size_str, 'YELLOW') + pair_size = std.bytes_to_string(pair.size, decimals=2) + percent = pair.get_percent_recovered() + percent = format_status_string(percent, width=0) + report.append( + f'{pair.source.name} ({pair_size}) ' + f'rescued: {percent}, ' + f'error size: {error_size_str}' + ) + + # Done + return report + + def get_error_size(self) -> int: + """Get total error size from block_pairs in bytes, returns int.""" + return self.get_total_size() - self.get_rescued_size() + + def get_percent_recovered(self) -> float: + """Get total percent rescued from block_pairs, returns float.""" + return 100 * self.get_rescued_size() / self.get_total_size() + + def get_rescued_size(self) -> int: + """Get total rescued size from all block pairs, returns int.""" + return sum(pair.get_rescued_size() for pair in self.block_pairs) + + def get_total_size(self) -> int: + """Get total size of all block_pairs in bytes, returns int.""" + return sum(pair.size for pair in self.block_pairs) + + def init_recovery(self, docopt_args: dict[str, Any]) -> None: + """Select source/dest and set env.""" + cli.clear_screen() + disk_menu = menus.disks() + source_parts = [] + self.ui.set_progress_file(str(self.progress_out)) + + # Set mode + self.mode = set_mode(docopt_args) + + # Select source + self.source = select_disk_obj('source', disk_menu, docopt_args['']) + self.update_top_panes() + if self.source.trim: + cli.print_warning('Source device supports TRIM') + if not cli.ask(' Proceed with recovery?'): + cli.abort() + self.ui.set_title('Source', self.source.name) + + # Select destination + if self.mode == 'Clone': + self.destination = select_disk_obj( + 'destination', + disk_menu, + docopt_args[''], + ) + self.ui.add_title_pane('Destination', self.destination.name) + elif self.mode == 'Image': + if docopt_args['']: + self.destination = pathlib.Path(docopt_args['']).resolve() + else: + self.destination = menus.select_path('Destination') + self.ui.add_title_pane('Destination', self.destination) + self.update_top_panes() + + # Update details + self.source.update_details(skip_children=False) + if self.mode == 'Clone': + self.destination.update_details(skip_children=False) + + # Confirmation #1 + self.confirm_selections( + prompt_msg='Are these selections correct?', + source_parts=source_parts, + ) + + # Update panes + self.update_progress_pane('Idle') + + # Set working dir + self.working_dir = get_working_dir( + self.mode, + self.destination, + force_local=docopt_args['--force-local-map'], + ) + + # Start fresh if requested + if docopt_args['--start-fresh']: + clean_working_dir(self.working_dir) + + # Add block pairs + if self.mode == 'Clone': + source_parts = self.add_clone_block_pairs() + else: + source_parts = menus.select_disk_parts(self.mode, self.source) + self.add_image_block_pairs(source_parts) + + # Update SMART data + ## TODO: Verify if needed + for dev in (self.source, self.destination): + if not isinstance(dev, hw_disk.Disk): + continue + enable_smart(dev) + update_smart_details(dev) + + # Safety Checks #1 + if self.mode == 'Clone': + self.safety_check_destination() + self.safety_check_size() + + # Confirmation #2 + self.update_progress_pane('Idle') + self.confirm_selections('Start recovery?', source_parts) + + # Unmount source and/or destination under macOS + if PLATFORM == 'Darwin': + for dev in (self.source, self.destination): + if not isinstance(dev, hw_disk.Disk): + continue + cmd = ['diskutil', 'unmountDisk', dev.path] + try: + exe.run_program(cmd) + except subprocess.CalledProcessError: + cli.print_error('Failed to unmount source and/or destination') + cli.abort() + + # Prep destination + if self.mode == 'Clone': + self.prep_destination(source_parts, dry_run=docopt_args['--dry-run']) + + # Safety Checks #2 + if not docopt_args['--dry-run']: + for pair in self.block_pairs: + pair.safety_check() + + def mark_started(self) -> None: + """Edit clone settings, if applicable, to mark recovery as started.""" + # Skip if not cloning + if self.mode != 'Clone': + return + + # Skip if not using settings + # i.e. Cloning whole disk (or single partition via args) + if self.source.path.samefile(self.block_pairs[0].source): + return + + # Update settings + settings = self._load_settings() + if settings.get('First Run', False): + settings['First Run'] = False + self._save_settings(settings) + + def pass_above_threshold(self, pass_name: str) -> bool: + """Check if all block_pairs meet the pass threshold, returns bool.""" + threshold = cfg.ddrescue.AUTO_PASS_THRESHOLDS[pass_name] + return all( + p.get_percent_recovered() >= threshold for p in self.block_pairs + ) + + def pass_complete(self, pass_name: str) -> bool: + """Check if all block_pairs completed pass_name, returns bool.""" + return all(p.pass_complete(pass_name) for p in self.block_pairs) + + def prep_destination( + self, + source_parts: list[hw_disk.Disk], + dry_run: bool = True, + ) -> None: + """Prep destination as necessary.""" + # TODO: Split into Linux and macOS + # logical sector size is not easily found under macOS + # It might be easier to rewrite this section using macOS tools + dest_prefix = str(self.destination.path) + dest_prefix += get_partition_separator(self.destination.path.name) + esp_type = 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B' + msr_type = 'E3C9E316-0B5C-4DB8-817D-F92DF00215AE' + part_num = 0 + sfdisk_script = [] + settings = self._load_settings() + + # Bail early + if not settings['Needs Format']: + return + + # Add partition table settings + if settings['Table Type'] == 'GPT': + sfdisk_script.append('label: gpt') + else: + sfdisk_script.append('label: dos') + sfdisk_script.append('unit: sectors') + sfdisk_script.append('') + + # Add boot partition if requested + if settings['Create Boot Partition']: + if settings['Table Type'] == 'GPT': + part_num += 1 + sfdisk_script.append( + build_sfdisk_partition_line( + table_type='GPT', + dev_path=f'{dest_prefix}{part_num}', + size='260MiB', + details={'parttype': esp_type, 'partlabel': 'EFI System'}, + ), + ) + part_num += 1 + sfdisk_script.append( + build_sfdisk_partition_line( + table_type=settings['Table Type'], + dev_path=f'{dest_prefix}{part_num}', + size='16MiB', + details={'parttype': msr_type, 'partlabel': 'Microsoft Reserved'}, + ), + ) + elif settings['Table Type'] == 'MBR': + part_num += 1 + sfdisk_script.append( + build_sfdisk_partition_line( + table_type='MBR', + dev_path=f'{dest_prefix}{part_num}', + size='100MiB', + details={'parttype': '0x7', 'partlabel': 'System Reserved'}, + ), + ) + + # Add selected partition(s) + for part in source_parts: + num_sectors = part.size / self.destination.log_sec + num_sectors = math.ceil(num_sectors) + part_num += 1 + sfdisk_script.append( + build_sfdisk_partition_line( + table_type=settings['Table Type'], + dev_path=f'{dest_prefix}{part_num}', + size=num_sectors, + details=part.raw_details, + ), + ) + + # Save sfdisk script + script_path = ( + f'{self.working_dir}/' + f'sfdisk_{self.destination.path.name}.script' + ) + with open(script_path, 'w', encoding='utf-8') as _f: + _f.write('\n'.join(sfdisk_script)) + + # Skip real format for dry runs + if dry_run: + LOG.info('Dry run, refusing to format destination') + return + + # Format disk + LOG.warning('Formatting destination: %s', self.destination.path) + with open(script_path, 'r', encoding='utf-8') as _f: + proc = exe.run_program( + cmd=['sudo', 'sfdisk', self.destination.path], + stdin=_f, + check=False, + ) + if proc.returncode != 0: + cli.print_error('Error(s) encoundtered while formatting destination') + raise std.GenericAbort() + + # Update settings + settings['Needs Format'] = False + self._save_settings(settings) + + def retry_all_passes(self) -> None: + """Prep block_pairs for a retry recovery attempt.""" + bad_statuses = ('*', '/', '-') + LOG.warning('Updating block_pairs for retry') + + # Update all block_pairs + for pair in self.block_pairs: + map_data = [] + + # Reset status strings + for name in pair.status.keys(): + pair.status[name] = 'Pending' + + # Mark all non-trimmed, non-scraped, and bad areas as non-tried + with open(pair.map_path, 'r', encoding='utf-8') as _f: + for line in _f.readlines(): + line = line.strip() + if line.startswith('0x') and line.endswith(bad_statuses): + line = f'{line[:-1]}?' + map_data.append(line) + + # Save updated map + with open(pair.map_path, 'w', encoding='utf-8') as _f: + _f.write('\n'.join(map_data)) + + # Reinitialize status + pair.set_initial_status() + + def safety_check_destination(self) -> None: + """Run safety checks for destination and abort if necessary.""" + errors_detected = False + + # Check for critical errors + if not smart_status_ok(self.destination): + cli.print_error( + f'Critical error(s) detected for: {self.destination.path}', + ) + + # Check for minor errors + if not check_attributes(self.destination, only_blocking=False): + cli.print_warning( + f'Attribute error(s) detected for: {self.destination.path}', + ) + + # Done + if errors_detected: + raise std.GenericAbort() + + def safety_check_size(self) -> None: + """Run size safety check and abort if necessary.""" + required_size = sum(pair.size for pair in self.block_pairs) + settings = self._load_settings() if self.mode == 'Clone' else {} + + # Increase required_size if necessary + if self.mode == 'Clone' and settings.get('Needs Format', False): + if settings['Table Type'] == 'GPT': + # Below is the size calculation for the GPT + # 1 LBA for the protective MBR + # 33 LBAs each for the primary and backup GPT tables + # Source: https://en.wikipedia.org/wiki/GUID_Partition_Table + required_size += (1 + 33 + 33) * self.destination.phy_sec + if settings['Create Boot Partition']: + # 260MiB EFI System Partition and a 16MiB MS Reserved partition + required_size += (260 + 16) * 1024**2 + else: + # MBR only requires one LBA but adding a full 4096 bytes anyway + required_size += 4096 + if settings['Create Boot Partition']: + # 100MiB System Reserved partition + required_size += 100 * 1024**2 + + # Reduce required_size if necessary + if self.mode == 'Image': + for pair in self.block_pairs: + if pair.destination.exists(): + # NOTE: This uses the "max space" of the destination + # i.e. not the apparent size which is smaller for sparse files + # While this can result in an out-of-space error it's better + # than nothing. + required_size -= pair.destination.stat().st_size + + # Check destination size + if self.mode == 'Clone': + destination_size = self.destination.size + error_msg = 'A larger destination disk is required' + else: + # NOTE: Adding an extra 5% here to better ensure it will fit + destination_size = psutil.disk_usage(self.destination).free + destination_size *= 1.05 + error_msg = 'Not enough free space on the destination' + if required_size > destination_size: + cli.print_error(error_msg) + raise std.GenericAbort() + + def save_debug_reports(self) -> None: + """Save debug reports to disk.""" + LOG.info('Saving debug reports') + debug_dir = pathlib.Path(f'{self.log_dir}/debug') + if not debug_dir.exists(): + debug_dir.mkdir() + + # State (self) + debug.save_pickles({'state': self}, debug_dir) + with open(f'{debug_dir}/state.report', 'a', encoding='utf-8') as _f: + _f.write('[Debug report]\n') + _f.write('\n'.join(debug.generate_object_report(self))) + _f.write('\n') + + # Block pairs + for _bp in self.block_pairs: + with open( + f'{debug_dir}/block_pairs.report', 'a', encoding='utf-8') as _f: + _f.write('[Debug report]\n') + _f.write('\n'.join(debug.generate_object_report(_bp))) + _f.write('\n') + + def skip_pass(self, pass_name: str) -> None: + """Mark block_pairs as skipped if applicable.""" + for pair in self.block_pairs: + if pair.status[pass_name] == 'Pending': + pair.status[pass_name] = 'Skipped' + + def update_progress_pane(self, overall_status: str) -> None: + """Update progress pane.""" + report = [] + separator = '─────────────────────' + width = cfg.ddrescue.TMUX_SIDE_WIDTH + + # Status + report.append(ansi.color_string(f'{"Status":^{width}}', 'BLUE')) + if 'NEEDS ATTENTION' in overall_status: + report.append( + ansi.color_string(f'{overall_status:^{width}}', 'YELLOW_BLINK'), + ) + else: + report.append(f'{overall_status:^{width}}') + report.append(separator) + + # Overall progress + if self.block_pairs: + total_rescued = self.get_rescued_size() + percent = self.get_percent_recovered() + report.append(ansi.color_string('Overall Progress', 'BLUE')) + report.append( + f'Rescued: {format_status_string(percent, width=width-9)}', + ) + report.append( + ansi.color_string( + [f'{std.bytes_to_string(total_rescued, decimals=2):>{width}}'], + [get_percent_color(percent)], + ), + ) + report.append(separator) + + # Block pair progress + for pair in self.block_pairs: + report.append(ansi.color_string(pair.source, 'BLUE')) + for name, status in pair.status.items(): + name = name.title() + report.append( + f'{name}{format_status_string(status, width=width-len(name))}', + ) + report.append(' ') + + # EToC + if overall_status in ('Active', 'NEEDS ATTENTION'): + etoc = get_etoc() + report.append(separator) + report.append(ansi.color_string('Estimated Pass Finish', 'BLUE')) + if overall_status == 'NEEDS ATTENTION' or etoc == 'N/A': + report.append(ansi.color_string('N/A', 'YELLOW')) + else: + report.append(etoc) + + # Write to progress file + self.progress_out.write_text('\n'.join(report), encoding='utf-8', errors='ignore') + + def update_top_panes(self) -> None: + """(Re)create top source/destination panes.""" + source_exists = True + source_str = '' + dest_exists = True + dest_str = '' + width = tmux.get_pane_size()[0] + width = int(width / 2) - 1 + + def _format_string(obj, width) -> str: + """Format source/dest string using obj and width, returns str.""" + string = '' + + # Build base string + if isinstance(obj, hw_disk.Disk): + string = f'{obj.path} {obj.description}' + elif obj.is_dir(): + string = f'{obj}/' + elif obj.is_file(): + size_str = std.bytes_to_string( + obj.stat().st_size, + decimals=0, + use_binary=False) + string = f'{obj.name} {size_str}' + + # Adjust for width + if len(string) > width: + if hasattr(obj, 'is_dir') and obj.is_dir(): + string = f'...{string[-width+3:]}' + else: + string = f'{string[:width-3]}...' + + # Done + return string + + # Check source/dest existance + if self.source: + source_exists = self.source.path.exists() + if self.destination: + if isinstance(self.destination, hw_disk.Disk): + dest_exists = self.destination.path.exists() + else: + dest_exists = self.destination.exists() + + # Source + if self.source: + source_str = _format_string(self.source, width) + + # Destination + if self.destination: + dest_str = _format_string(self.destination, width) + + # Reset title panes + self.ui.reset_title_pane( + ansi.color_string( + ['Source', '' if source_exists else ' (Missing)'], + ['BLUE', 'RED'], + ), + source_str, + ) + if dest_str: + self.ui.add_title_pane( + ansi.color_string( + ['Destination', '' if dest_exists else ' (Missing)'], + ['BLUE', 'RED'], + ), + dest_str, + ) + + +# Functions +def build_block_pair_report(block_pairs, settings) -> list: + """Build block pair report, returns list.""" + report = [] + notes = [] + if block_pairs: + report.append(ansi.color_string('Block Pairs', 'GREEN')) + else: + # Bail early + return report + + # Show block pair mapping + if settings and settings['Create Boot Partition']: + if settings['Table Type'] == 'GPT': + report.append(f'{" —— ":<9} --> EFI System Partition') + report.append(f'{" —— ":<9} --> Microsoft Reserved Partition') + elif settings['Table Type'] == 'MBR': + report.append(f'{" —— ":<9} --> System Reserved') + for pair in block_pairs: + report.append(f'{pair.source.name:<9} --> {pair.destination.name}') + + # Show resume messages as necessary + if settings: + if not settings['First Run']: + notes.append( + ansi.color_string( + ['NOTE:', 'Clone settings loaded from previous run.'], + ['BLUE', None], + ), + ) + if settings['Needs Format'] and settings['Table Type']: + msg = f'Destination will be formatted using {settings["Table Type"]}' + notes.append( + ansi.color_string( + ['NOTE:', msg], + ['BLUE', None], + ), + ) + if any(pair.get_rescued_size() > 0 for pair in block_pairs): + notes.append( + ansi.color_string( + ['NOTE:', 'Resume data loaded from map file(s).'], + ['BLUE', None], + ), + ) + + # Add notes to report + if notes: + report.append(' ') + report.extend(notes) + + # Done + return report + + +def build_directory_report(path: pathlib.Path) -> list[str]: + """Build directory report, returns list.""" + path_str = f'{path}/' + report = [] + + # Get details + if PLATFORM == 'Linux': + cmd = [ + 'findmnt', + '--output', 'SIZE,AVAIL,USED,FSTYPE,OPTIONS', + '--target', path_str, + ] + proc = exe.run_program(cmd) + width = len(path_str) + 1 + for line in proc.stdout.splitlines(): + line = line.replace('\n', '') + if 'FSTYPE' in line: + line = ansi.color_string(f'{"path_str":<{width}}{line}', 'BLUE') + else: + line = f'{path_str:<{width}}{line}' + report.append(line) + else: + report.append(ansi.color_string('path_str', 'BLUE')) + report.append(str(path_str)) + + # Done + return report + + +def build_disk_report(dev: hw_disk.Disk) -> list[str]: + """Build device report, returns list.""" + report = [] + + # Get widths + widths = { + 'fstype': max(6, len(str(dev.filesystem))), + 'label': max(5, len(str(dev.raw_details.get('label', '')))), + 'name': max(4, len(dev.path.name)), + } + for child in dev.children: + widths['fstype'] = max(widths['fstype'], len(str(child['fstype']))) + widths['label'] = max(widths['label'], len(str(child['label']))) + widths['name'] = max( + widths['name'], + len(child['name'].replace('/dev/', '')), + ) + widths = {k: v+1 for k, v in widths.items()} + + # Disk details + report.append(f'{dev.path.name} {dev.description}') + report.append(' ') + dev_fstype = dev.filesystem + dev_label = dev.raw_details.get('label', '') + dev_name = dev.path.name + dev_size = std.bytes_to_string(dev.size, use_binary=False) + + # Partition details + report.append( + ansi.color_string( + ( + f'{"NAME":<{widths["name"]}}' + f'{" " if dev.children else ""}' + f'{"SIZE":<7}' + f'{"FSTYPE":<{widths["fstype"]}}' + f'{"LABEL":<{widths["label"]}}' + ), + 'BLUE', + ), + ) + report.append( + f'{dev_name if dev_name else "":<{widths["name"]}}' + f'{" " if dev.children else ""}' + f'{dev_size:>6} ' + f'{dev_fstype if dev_fstype else "":<{widths["fstype"]}}' + f'{dev_label if dev_label else "":<{widths["label"]}}' + ) + for child in dev.children: + fstype = child['fstype'] + label = child['label'] + name = child['name'].replace('/dev/', '') + size = std.bytes_to_string(child["size"], use_binary=False) + report.append( + f'{name if name else "":<{widths["name"]}}' + f'{size:>6} ' + f'{fstype if fstype else "":<{widths["fstype"]}}' + f'{label if label else "":<{widths["label"]}}' + ) + + # Indent children + if len(dev.children) > 1: + report = [ + *report[:4], + *[f'├─{line}' for line in report[4:-1]], + f'└─{report[-1]}', + ] + elif len(dev.children) == 1: + report[-1] = f'└─{report[-1]}' + + # Done + return report + + +def build_object_report(obj) -> list[str]: + """Build object report, returns list.""" + report = [] + + # Get details based on object given + if hasattr(obj, 'is_dir') and obj.is_dir(): + # Directory report + report = build_directory_report(obj) + else: + # Device report + report = build_disk_report(obj) + + # Done + return report + + +def build_sfdisk_partition_line(table_type, dev_path, size, details) -> str: + """Build sfdisk partition line using passed details, returns str.""" + line = f'{dev_path} : size={size}' + dest_type = '' + source_filesystem = str(details.get('fstype', '')).upper() + source_table_type = '' + source_type = details.get('parttype', '') + + # Set dest type + if re.match(r'^0x\w+$', source_type): + # Both source and dest are MBR + source_table_type = 'MBR' + if table_type == 'MBR': + dest_type = source_type.replace('0x', '').lower() + elif re.match(r'^\w{8}-\w{4}-\w{4}-\w{4}-\w{12}$', source_type): + # Source is a GPT type + source_table_type = 'GPT' + if table_type == 'GPT': + dest_type = source_type.upper() + if not dest_type: + # Assuming changing table types, set based on FS + if source_filesystem in cfg.ddrescue.PARTITION_TYPES.get(table_type, {}): + dest_type = cfg.ddrescue.PARTITION_TYPES[table_type][source_filesystem] + line += f', type={dest_type}' + + # Safety Check + if not dest_type: + cli.print_error(f'Failed to determine partition type for: {dev_path}') + raise std.GenericAbort() + + # Add extra details + if details.get('partlabel', ''): + line += f', name="{details["partlabel"]}"' + if details.get('partuuid', '') and source_table_type == table_type: + # Only add UUID if source/dest table types match + line += f', uuid={details["partuuid"].upper()}' + + # Done + return line + + +def clean_working_dir(working_dir) -> None: + """Clean working directory to ensure a fresh recovery session. + + NOTE: Data from previous sessions will be preserved + in a backup directory. + """ + backup_dir = pathlib.Path(f'{working_dir}/prev') + backup_dir = io.non_clobber_path(backup_dir) + backup_dir.mkdir() + + # Move settings, maps, etc to backup_dir + for entry in os.scandir(working_dir): + if entry.name.endswith(('.dd', '.json', '.map')): + new_path = f'{backup_dir}/{entry.name}' + new_path = io.non_clobber_path(new_path) + shutil.move(entry.path, new_path) + + +def format_status_string(status, width) -> str: + """Format colored status string, returns str.""" + color = None + percent = -1 + status_str = str(status) + + # Check if status is percentage + try: + percent = float(status_str) + except ValueError: + # Assuming status is text + pass + + # Format status + if percent >= 0: + # Percentage + color = get_percent_color(percent) + status_str = f'{percent:{width-2}.2f} %' + if '100.00' in status_str and percent < 100: + # Always round down to 99.99% + LOG.warning('Rounding down to 99.99 from %s', percent) + status_str = f'{"99.99 %":>{width}}' + else: + # Text + color = STATUS_COLORS.get(status_str, None) + status_str = f'{status_str:>{width}}' + + # Add color if necessary + if color: + status_str = ansi.color_string(status_str, color) + + # Done + return status_str + + +def fstype_is_ok(path, map_dir=False) -> bool: + """Check if filesystem type is acceptable, returns bool.""" + is_ok = False + fstype = None + + # Get fstype + if PLATFORM == 'Darwin': + # Check all parent dirs until a mountpoint is found + test_path = pathlib.Path(path) + while test_path: + fstype = get_fstype_macos(test_path) + if fstype != 'UNKNOWN': + break + fstype = None + test_path = test_path.parent + elif PLATFORM == 'Linux': + cmd = [ + 'findmnt', + '--noheadings', + '--output', 'FSTYPE', + '--target', path, + ] + proc = exe.run_program(cmd, check=False) + fstype = proc.stdout + fstype = fstype.strip().lower() + + # Check fstype + if map_dir: + is_ok = RECOMMENDED_MAP_FSTYPES.match(fstype) + else: + is_ok = RECOMMENDED_FSTYPES.match(fstype) + + # Done + return is_ok + + +def get_etoc() -> str: + """Get EToC from ddrescue output, returns str.""" + delta = None + delta_dict = {} + etoc = 'Unknown' + now = datetime.datetime.now(tz=TIMEZONE) + output = tmux.capture_pane() + + # Search for EToC delta + matches = re.findall(r'remaining time:.*$', output, re.MULTILINE) + if matches: + match = REGEX_REMAINING_TIME.search(matches[-1]) + if match.group('na'): + etoc = 'N/A' + else: + for key in ('days', 'hours', 'minutes', 'seconds'): + delta_dict[key] = match.group(key) + delta_dict = {k: int(v) if v else 0 for k, v in delta_dict.items()} + delta = datetime.timedelta(**delta_dict) + + # Calc EToC if delta found + if delta: + etoc_datetime = now + delta + etoc = etoc_datetime.strftime('%Y-%m-%d %H:%M %Z') + + # Done + return etoc + + +def get_fstype_macos(path) -> str: + """Get fstype for path under macOS, returns str.""" + fstype = 'UNKNOWN' + proc = exe.run_program(['mount'], check=False) + + # Bail early + if proc.returncode: + return fstype + + # Parse output + match = re.search(rf'{path} \((\w+)', proc.stdout) + if match: + fstype = match.group(1) + + # Done + return fstype + + +def get_partition_separator(name) -> str: + """Get partition separator based on device name, returns str.""" + separator = '' + if re.search(r'(loop|mmc|nvme)', name, re.IGNORECASE): + separator = 'p' + + return separator + + +def get_percent_color(percent) -> str: + """Get color based on percentage, returns str.""" + color = None + if percent > 100: + color = 'PURPLE' + elif percent >= 99: + color = 'GREEN' + elif percent >= 90: + color = 'YELLOW' + elif percent > 0: + color = 'RED' + + # Done + return color + + +def get_table_type(disk_path) -> str: + """Get disk partition table type, returns str. + + NOTE: If resulting table type is not GPT or MBR + then an exception is raised. + """ + disk_path = str(disk_path) + table_type = None + + # Linux + if std.PLATFORM == 'Linux': + cmd = f'lsblk --json --output=pttype --nodeps {disk_path}'.split() + json_data = exe.get_json_from_command(cmd) + table_type = json_data['blockdevices'][0].get('pttype', '').upper() + table_type = table_type.replace('DOS', 'MBR') + + # macOS + if std.PLATFORM == 'Darwin': + cmd = ['diskutil', 'list', '-plist', disk_path] + proc = exe.run_program(cmd, check=False, encoding=None, errors=None) + try: + plist_data = plistlib.loads(proc.stdout) + except (TypeError, ValueError): + # Invalid / corrupt plist data? return empty dict to avoid crash + pass + else: + disk_details = plist_data.get('AllDisksAndPartitions', [{}])[0] + table_type = disk_details['Content'] + table_type = table_type.replace('FDisk_partition_scheme', 'MBR') + table_type = table_type.replace('GUID_partition_scheme', 'GPT') + + # Check type + if table_type not in ('GPT', 'MBR'): + cli.print_error(f'Unsupported partition table type: {table_type}') + raise std.GenericAbort() + + # Done + return table_type + + +def get_working_dir(mode, destination, force_local=False) -> pathlib.Path: + """Get working directory using mode and destination, returns path.""" + ticket_id = cli.get_ticket_id() + working_dir = None + + # Use preferred path if possible + if mode == 'Image': + try: + path = pathlib.Path(destination).resolve() + except TypeError as err: + cli.print_error(f'Invalid destination: {destination}') + raise std.GenericAbort() from err + if path.exists() and fstype_is_ok(path, map_dir=False): + working_dir = path + elif mode == 'Clone' and not force_local: + cli.print_info('Mounting backup shares...') + net.mount_backup_shares(read_write=True) + for server in cfg.net.BACKUP_SERVERS: + path = pathlib.Path( + f'/{"Volumes" if PLATFORM == "Darwin" else "Backups"}/{server}', + ) + if path.exists() and fstype_is_ok(path, map_dir=True): + # Acceptable path found + working_dir = path + break + + # Default to current dir if necessary + if not working_dir: + LOG.error('Failed to set preferred working directory') + working_dir = pathlib.Path(os.getcwd()) + + # Set subdir using ticket ID + if mode == 'Clone': + working_dir = working_dir.joinpath(ticket_id) + + # Create directory + working_dir.mkdir(parents=True, exist_ok=True) + os.chdir(working_dir) + + # Done + LOG.info('Set working directory to: %s', working_dir) + return working_dir + + +def mount_raw_image(path) -> pathlib.Path: + """Mount raw image using OS specific methods, returns pathlib.Path.""" + loopback_path = None + + if PLATFORM == 'Darwin': + loopback_path = mount_raw_image_macos(path) + elif PLATFORM == 'Linux': + loopback_path = mount_raw_image_linux(path) + + # Check + if not loopback_path: + cli.print_error(f'Failed to mount image: {path}') + + # Register unmount atexit + atexit.register(unmount_loopback_device, loopback_path) + + # Done + return loopback_path + + +def mount_raw_image_linux(path) -> pathlib.Path: + """Mount raw image using losetup, returns pathlib.Path.""" + loopback_path = None + + # Mount using losetup + cmd = [ + 'sudo', + 'losetup', + '--find', + '--partscan', + '--show', + path, + ] + proc = exe.run_program(cmd, check=False) + + # Check result + if proc.returncode == 0: + loopback_path = proc.stdout.strip() + + # Done + return loopback_path + + +def mount_raw_image_macos(path) -> pathlib.Path: + """Mount raw image using hdiutil, returns pathlib.Path.""" + loopback_path = None + plist_data = {} + + # Mount using hdiutil + # plistdata['system-entities'][{}...] + cmd = [ + 'hdiutil', 'attach', + '-imagekey', 'diskimage-class=CRawDiskImage', + '-nomount', + '-plist', + '-readonly', + path, + ] + proc = exe.run_program(cmd, check=False, encoding=None, errors=None) + + # Check result + try: + plist_data = plistlib.loads(proc.stdout) + except plistlib.InvalidFileException: + return None + for dev in plist_data.get('system-entities', []): + dev_path = dev.get('dev-entry', '') + if re.match(r'^/dev/disk\d+$', dev_path): + loopback_path = dev_path + + # Done + return loopback_path + + +def select_disk_obj(label:str, disk_menu: cli.Menu, disk_path: str) -> hw_disk.Disk: + """Get disk based on path or menu selection, returns Disk.""" + if not disk_path: + return menus.select_disk(label.capitalize(), disk_menu) + + # Source was provided, parse and run safety checks + path = pathlib.Path(disk_path).resolve() + + # Bail early + if not path.exists(): + raise FileNotFoundError(f'Path provided does not exist: {path}') + + # Disk objects + if path.is_block_device() or path.is_char_device(): + obj = hw_disk.Disk(path) + + # Child/Parent check + if obj.parent: + cli.print_warning(f'"{obj.path}" is a child device') + if cli.ask(f'Use parent device "{obj.parent}" instead?'): + obj = hw_disk.Disk(obj.parent) + + # Done + return obj + + # Raw image objects + if path.is_file(): + loop_path = mount_raw_image(path) + return hw_disk.Disk(loop_path) + + # Abort if object type couldn't be determined + # NOTE: This shouldn't every be reached? + cli.print_error(f'Invalid {label} path: {disk_path}') + raise std.GenericAbort() + + +def set_mode(docopt_args) -> str: + """Set mode from docopt_args or user selection, returns str.""" + mode = '?' + + # Check docopt_args + if docopt_args['clone']: + mode = 'Clone' + elif docopt_args['image']: + mode = 'Image' + + # Ask user if necessary + if not mode: + answer = cli.choice('Are we cloning or imaging?', ['C', 'I']) + if answer == 'C': + mode = 'Clone' + else: + mode = 'Image' + + # Done + return mode + + +def unmount_loopback_device(path) -> None: + """Unmount loopback device using OS specific methods.""" + cmd = [] + + # Build OS specific cmd + if PLATFORM == 'Darwin': + cmd = ['hdiutil', 'detach', path] + elif PLATFORM == 'Linux': + cmd = ['sudo', 'losetup', '--detach', path] + + # Unmount loopback device + exe.run_program(cmd, check=False) + + +if __name__ == '__main__': + print("This file is not meant to be called directly.")