diff --git a/scripts/wk/hw/ddrescue.py b/scripts/wk/hw/ddrescue.py index fe90c061..21dd52bd 100644 --- a/scripts/wk/hw/ddrescue.py +++ b/scripts/wk/hw/ddrescue.py @@ -3,6 +3,7 @@ # vim: sts=2 sw=2 ts=2 import atexit +import json import logging import os import pathlib @@ -30,6 +31,15 @@ Usage: Options: -h --help Show this page ''' +CLONE_SETTINGS = { + 'Source': None, + 'Destination': None, + 'Needs Format': False, + 'Table Type': None, + 'Partition Mapping': [ + # (5, 1) ## Clone source partition #5 to destination partition #1 + ], + } LOG = logging.getLogger(__name__) MENU_ACTIONS = ( 'Start', @@ -78,6 +88,101 @@ class State(): self.init_tmux() exe.start_thread(self.fix_tmux_layout_loop) + def add_clone_block_pairs(self, source_parts, working_dir): + """Add device to device block pairs and set settings if necessary.""" + part_prefix = '' + if re.search(r'(loop|mmc|nvme)', self.destination.path.name): + part_prefix = 'p' + settings = {} + + def _check_settings(settings): + """Check settings for issues and update as necessary.""" + if settings: + bail = False + for key in ('model', 'serial'): + if settings['Source'][key] != self.source.details[key]: + std.print_error(f"Clone settings don't match source {key}") + bail = True + if settings['Destination'][key] != self.destination.details[key]: + std.print_error(f"Clone settings don't match destination {key}") + bail = True + if bail: + raise std.GenericAbort() + + # Update settings + if not settings: + settings = CLONE_SETTINGS.copy() + if not settings['Source']: + settings['Source'] = { + 'model': self.source.details['model'], + 'serial': self.source.details['serial'], + } + if not settings['Destination']: + settings['Destination'] = { + 'model': self.destination.details['model'], + 'serial': self.destination.details['serial'], + } + + # Done + return settings + + # Clone settings + settings = self.load_settings(working_dir) + settings = _check_settings(settings) + + # Add pairs + if not self.source.path.samefile(source_parts[0].path): + # One or more partitions selected for cloning + if settings['Partition Mapping']: + for part_map in settings['Partition Mapping']: + bp_source = pathlib.Path( + f'{self.source.path}{part_prefix}{part_map[0]}', + ) + bp_dest = pathlib.Path( + f'{self.destination.path}{part_prefix}{part_map[1]}', + ) + # TODO: add bp(bp_source, bp_dest, map_dir=working_dir) + else: + # New run and new settings + offset = 0 + if (std.ask('Does the source disk contain an OS?') + and std.ask('Create an empty boot partition on the clone?')): + offset = 2 + settings['Needs Format'] = True + settings['Table Type'] = 'GPT' + if std.choice(['G', 'M'], 'GPT or MBR partition table?') == 'M': + offset = 1 + settings['Table Type'] = 'MBR' + + # Add pairs + for dest_num, part in enumerate(source_parts): + dest_num += offset + 1 + bp_source = part.path + bp_dest = pathlib.Path( + f'{self.destination.path}{part_prefix}{dest_num}', + ) + # TODO: add bp(bp_source, bp_dest, map_dir=working_dir) + + # Add to settings file + source_num = re.sub(r'^.*?(\d+)$', r'\1', part.path.name) + settings['Partition Mapping'].append([source_num, dest_num]) + + # Save settings + self.save_settings(settings, working_dir) + + else: + # Whole device or forced single partition selected, skip settings + bp_source = self.source.path + bp_dest = self.destination.path + # TODO: add bp(bp_source, bp_dest, map_dir=working_dir) + + def add_image_block_pairs(self, source_parts, working_dir): + """Add device to image file block pairs.""" + for part in source_parts: + bp_source = part.path + bp_dest = pathlib.Path(f'{self.destination.path}/{part_TODO}.dd') + # TODO: add bp(bp_source, bp_dest, map_dir=working_dir) + def confirm_selections(self, mode, prompt, map_dir=None): """Show selection details and prompt for confirmation.""" report = [] @@ -232,11 +337,10 @@ class State(): working_dir = pathlib.Path(os.getcwd()) # Add block pairs - # NOTE: Destination is not updated - # Load settings/maps - # Ask about boot partition - # Create pairs using paths - # TODO + if mode == 'Clone': + self.add_clone_block_pairs(source_parts, working_dir) + else: + self.add_image_block_pairs(source_parts, working_dir) # Confirmation #2 self.confirm_selections(mode, 'Start recovery?', map_dir=working_dir) @@ -273,6 +377,25 @@ class State(): # Source / Dest self.update_top_panes() + def load_settings(self, working_dir): + # pylint: disable=no-self-use + """Load settings from previous run, returns dict.""" + settings = {} + settings_file = pathlib.Path(f'{working_dir}/clone.json') + + # Try loading JSON data + if settings_file.exists(): + with open(settings_file, 'r') as _f: + try: + settings = json.loads(_f.read()) + except (OSError, json.JSONDecodeError): + LOG.error('Failed to load clone settings') + std.print_error('Invalid clone settings detected.') + raise std.GenericAbort() + + # Done + return settings + def save_debug_reports(self): """Save debug reports to disk.""" LOG.info('Saving debug reports') @@ -289,6 +412,19 @@ class State(): with open(f'{debug_dir}/bp_part#.report', 'a') as _f: _f.write('\n'.join(debug.generate_object_report(_bp))) + def save_settings(self, settings, working_dir): + # pylint: disable=no-self-use + """Save settings for future runs.""" + settings_file = pathlib.Path(f'{working_dir}/clone.json') + + # Try saving JSON data + try: + with open(settings_file, 'w') as _f: + json.dump(settings, _f) + except OSError: + std.print_error('Failed to save clone settings') + raise std.GenericAbort() + def update_progress_pane(self): """Update progress pane.""" report = [] @@ -609,6 +745,7 @@ def get_working_dir(mode, destination): """Get working directory using mode and destination, returns path.""" working_dir = None if mode == 'Clone': + std.print_info('Mounting backup shares...') net.mount_backup_shares(read_write=True) for server in cfg.net.BACKUP_SERVERS: path = pathlib.Path(f'/Backups/{server}')