From 6bef46bb4d54d66760a83be4811ee2d7d46ee16b Mon Sep 17 00:00:00 2001 From: 2Shirt <2xShirt@gmail.com> Date: Sun, 13 Aug 2023 20:39:23 -0700 Subject: [PATCH] Move more logic into wk/clone/block_pair.py --- scripts/wk/clone/block_pair.py | 270 ++++++++++++++++ scripts/wk/clone/ddrescue.py | 2 +- scripts/wk/clone/state.py | 569 +++++++++------------------------ 3 files changed, 426 insertions(+), 415 deletions(-) diff --git a/scripts/wk/clone/block_pair.py b/scripts/wk/clone/block_pair.py index c6b602bf..8691a393 100644 --- a/scripts/wk/clone/block_pair.py +++ b/scripts/wk/clone/block_pair.py @@ -2,12 +2,15 @@ # vim: sts=2 sw=2 ts=2 import logging +import math import os import pathlib +import plistlib import re import subprocess from wk import cfg, exe, std +from wk.clone import menus from wk.hw import disk as hw_disk from wk.ui import cli @@ -240,5 +243,272 @@ class BlockPair(): self.status[pass_n] = 'Skipped' +# Functions +def add_clone_block_pairs(state) -> None: + """Add device to device block pairs and set settings if necessary.""" + source_sep = get_partition_separator(state.source.path.name) + dest_sep = get_partition_separator(state.destination.path.name) + settings = {} + + # Clone settings + settings = state.load_settings(discard_unused_settings=True) + + # Add pairs from previous run + if settings['Partition Mapping']: + # Resume previous run, load pairs from settings file + for part_map in settings['Partition Mapping']: + bp_source = hw_disk.Disk( + f'{state.source.path}{source_sep}{part_map[0]}', + ) + bp_dest = pathlib.Path( + f'{state.destination.path}{dest_sep}{part_map[1]}', + ) + state.add_block_pair(bp_source, bp_dest) + return + + # Add pairs from selection + source_parts = menus.select_disk_parts('Clone', state.source) + if state.source.path.samefile(source_parts[0].path): + # Whole disk (or single partition via args), skip settings + bp_dest = state.destination.path + state.add_block_pair(state.source, bp_dest) + return + + # New run, use new settings file + settings['Needs Format'] = True + offset = 0 + user_choice = cli.choice( + 'Format clone using GPT, MBR, or match Source type?', + ['G', 'M', 'S'], + ) + if user_choice == 'G': + settings['Table Type'] = 'GPT' + elif user_choice == 'M': + settings['Table Type'] = 'MBR' + else: + # Match source type + settings['Table Type'] = get_table_type(state.source.path) + if cli.ask('Create an empty Windows boot partition on the clone?'): + settings['Create Boot Partition'] = True + offset = 2 if settings['Table Type'] == 'GPT' else 1 + + # Add pairs + for dest_num, part in enumerate(source_parts): + dest_num += offset + 1 + bp_dest = pathlib.Path( + f'{state.destination.path}{dest_sep}{dest_num}', + ) + state.add_block_pair(part, bp_dest) + + # Add to settings file + source_num = re.sub(r'^.*?(\d+)$', r'\1', part.path.name) + settings['Partition Mapping'].append([source_num, dest_num]) + + # Save settings + state.save_settings(settings) + + +def add_image_block_pairs(state) -> None: + """Add device to image file block pairs.""" + source_parts = menus.select_disk_parts(state.mode, state.source) + for part in source_parts: + state.add_block_pair(part, state.destination) + + +def build_sfdisk_partition_line(table_type, dev_path, size, details) -> str: + """Build sfdisk partition line using passed details, returns str.""" + line = f'{dev_path} : size={size}' + dest_type = '' + source_filesystem = str(details.get('fstype', '')).upper() + source_table_type = '' + source_type = details.get('parttype', '') + + # Set dest type + if re.match(r'^0x\w+$', source_type): + # Both source and dest are MBR + source_table_type = 'MBR' + if table_type == 'MBR': + dest_type = source_type.replace('0x', '').lower() + elif re.match(r'^\w{8}-\w{4}-\w{4}-\w{4}-\w{12}$', source_type): + # Source is a GPT type + source_table_type = 'GPT' + if table_type == 'GPT': + dest_type = source_type.upper() + if not dest_type: + # Assuming changing table types, set based on FS + if source_filesystem in cfg.ddrescue.PARTITION_TYPES.get(table_type, {}): + dest_type = cfg.ddrescue.PARTITION_TYPES[table_type][source_filesystem] + line += f', type={dest_type}' + + # Safety Check + if not dest_type: + cli.print_error(f'Failed to determine partition type for: {dev_path}') + raise std.GenericAbort() + + # Add extra details + if details.get('partlabel', ''): + line += f', name="{details["partlabel"]}"' + if details.get('partuuid', '') and source_table_type == table_type: + # Only add UUID if source/dest table types match + line += f', uuid={details["partuuid"].upper()}' + + # Done + return line + + +def get_partition_separator(name) -> str: + """Get partition separator based on device name, returns str.""" + separator = '' + if re.search(r'(loop|mmc|nvme)', name, re.IGNORECASE): + separator = 'p' + + return separator + + +def get_table_type(disk_path) -> str: + """Get disk partition table type, returns str. + + NOTE: If resulting table type is not GPT or MBR + then an exception is raised. + """ + disk_path = str(disk_path) + table_type = None + + # Linux + if std.PLATFORM == 'Linux': + cmd = f'lsblk --json --output=pttype --nodeps {disk_path}'.split() + json_data = exe.get_json_from_command(cmd) + table_type = json_data['blockdevices'][0].get('pttype', '').upper() + table_type = table_type.replace('DOS', 'MBR') + + # macOS + if std.PLATFORM == 'Darwin': + cmd = ['diskutil', 'list', '-plist', disk_path] + proc = exe.run_program(cmd, check=False, encoding=None, errors=None) + try: + plist_data = plistlib.loads(proc.stdout) + except (TypeError, ValueError): + # Invalid / corrupt plist data? return empty dict to avoid crash + pass + else: + disk_details = plist_data.get('AllDisksAndPartitions', [{}])[0] + table_type = disk_details['Content'] + table_type = table_type.replace('FDisk_partition_scheme', 'MBR') + table_type = table_type.replace('GUID_partition_scheme', 'GPT') + + # Check type + if table_type not in ('GPT', 'MBR'): + cli.print_error(f'Unsupported partition table type: {table_type}') + raise std.GenericAbort() + + # Done + return table_type + + +def prep_destination( + state, + source_parts: list[hw_disk.Disk], + dry_run: bool = True, + ) -> None: + """Prep destination as necessary.""" + # TODO: Split into Linux and macOS + # logical sector size is not easily found under macOS + # It might be easier to rewrite this section using macOS tools + dest_prefix = str(state.destination.path) + dest_prefix += get_partition_separator(state.destination.path.name) + esp_type = 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B' + msr_type = 'E3C9E316-0B5C-4DB8-817D-F92DF00215AE' + part_num = 0 + sfdisk_script = [] + settings = state.load_settings() + + # Bail early + if not settings['Needs Format']: + return + + # Add partition table settings + if settings['Table Type'] == 'GPT': + sfdisk_script.append('label: gpt') + else: + sfdisk_script.append('label: dos') + sfdisk_script.append('unit: sectors') + sfdisk_script.append('') + + # Add boot partition if requested + if settings['Create Boot Partition']: + if settings['Table Type'] == 'GPT': + part_num += 1 + sfdisk_script.append( + build_sfdisk_partition_line( + table_type='GPT', + dev_path=f'{dest_prefix}{part_num}', + size='260MiB', + details={'parttype': esp_type, 'partlabel': 'EFI System'}, + ), + ) + part_num += 1 + sfdisk_script.append( + build_sfdisk_partition_line( + table_type=settings['Table Type'], + dev_path=f'{dest_prefix}{part_num}', + size='16MiB', + details={'parttype': msr_type, 'partlabel': 'Microsoft Reserved'}, + ), + ) + elif settings['Table Type'] == 'MBR': + part_num += 1 + sfdisk_script.append( + build_sfdisk_partition_line( + table_type='MBR', + dev_path=f'{dest_prefix}{part_num}', + size='100MiB', + details={'parttype': '0x7', 'partlabel': 'System Reserved'}, + ), + ) + + # Add selected partition(s) + for part in source_parts: + num_sectors = part.size / state.destination.log_sec + num_sectors = math.ceil(num_sectors) + part_num += 1 + sfdisk_script.append( + build_sfdisk_partition_line( + table_type=settings['Table Type'], + dev_path=f'{dest_prefix}{part_num}', + size=num_sectors, + details=part.raw_details, + ), + ) + + # Save sfdisk script + script_path = ( + f'{state.working_dir}/' + f'sfdisk_{state.destination.path.name}.script' + ) + with open(script_path, 'w', encoding='utf-8') as _f: + _f.write('\n'.join(sfdisk_script)) + + # Skip real format for dry runs + if dry_run: + LOG.info('Dry run, refusing to format destination') + return + + # Format disk + LOG.warning('Formatting destination: %s', state.destination.path) + with open(script_path, 'r', encoding='utf-8') as _f: + proc = exe.run_program( + cmd=['sudo', 'sfdisk', state.destination.path], + stdin=_f, + check=False, + ) + if proc.returncode != 0: + cli.print_error('Error(s) encoundtered while formatting destination') + raise std.GenericAbort() + + # Update settings + settings['Needs Format'] = False + state.save_settings(settings) + + if __name__ == '__main__': print("This file is not meant to be called directly.") diff --git a/scripts/wk/clone/ddrescue.py b/scripts/wk/clone/ddrescue.py index a52edba9..f019e0de 100644 --- a/scripts/wk/clone/ddrescue.py +++ b/scripts/wk/clone/ddrescue.py @@ -233,7 +233,7 @@ def main() -> None: raise RuntimeError('tmux session not found') # Init - state = State() + state = State(log_dir=log_path.parent) try: state.init_recovery(args) except (FileNotFoundError, std.GenericAbort): diff --git a/scripts/wk/clone/state.py b/scripts/wk/clone/state.py index f5706935..43653139 100644 --- a/scripts/wk/clone/state.py +++ b/scripts/wk/clone/state.py @@ -4,23 +4,25 @@ import datetime import json import logging -import math import os import pathlib -import plistlib import re import shutil import subprocess -import time from typing import Any import psutil import pytz -from wk import cfg, debug, exe, io, log, net, std +from wk import cfg, debug, exe, io, net, std from wk.clone import menus -from wk.clone.block_pair import BlockPair +from wk.clone.block_pair import ( + BlockPair, + add_clone_block_pairs, + add_image_block_pairs, + prep_destination, + ) from wk.clone.image import mount_raw_image from wk.hw import disk as hw_disk from wk.hw.smart import ( @@ -74,20 +76,85 @@ TIMEZONE = pytz.timezone(cfg.main.LINUX_TIME_ZONE) # Classes class State(): """Object for tracking hardware diagnostic data.""" - def __init__(self): + def __init__(self, log_dir: pathlib.Path): self.block_pairs: list[BlockPair] = [] self.destination: hw_disk.Disk | pathlib.Path = pathlib.Path('/dev/null') - self.log_dir: pathlib.Path = log.format_log_path() - self.log_dir = self.log_dir.parent.joinpath( - f'ddrescue-TUI_{time.strftime("%Y-%m-%d_%H%M%S%z")}/', - ) + self.log_dir: pathlib.Path = log_dir self.progress_out: pathlib.Path = self.log_dir.joinpath('progress.out') self.mode: str = '?' self.source: hw_disk.Disk | None = None self.working_dir: pathlib.Path | None = None self.ui: tui.TUI = tui.TUI('Source') - def _add_block_pair(self, source: hw_disk.Disk, destination: pathlib.Path) -> None: + def _check_dest_size(self) -> None: + """Run size safety check and abort if necessary.""" + required_size = sum(pair.size for pair in self.block_pairs) + settings = self.load_settings() if self.mode == 'Clone' else {} + + # Increase required_size if necessary + if self.mode == 'Clone' and settings.get('Needs Format', False): + if settings['Table Type'] == 'GPT': + # Below is the size calculation for the GPT + # 1 LBA for the protective MBR + # 33 LBAs each for the primary and backup GPT tables + # Source: https://en.wikipedia.org/wiki/GUID_Partition_Table + required_size += (1 + 33 + 33) * self.destination.phy_sec + if settings['Create Boot Partition']: + # 260MiB EFI System Partition and a 16MiB MS Reserved partition + required_size += (260 + 16) * 1024**2 + else: + # MBR only requires one LBA but adding a full 4096 bytes anyway + required_size += 4096 + if settings['Create Boot Partition']: + # 100MiB System Reserved partition + required_size += 100 * 1024**2 + + # Reduce required_size if necessary + if self.mode == 'Image': + for pair in self.block_pairs: + if pair.destination.exists(): + # NOTE: This uses the "max space" of the destination + # i.e. not the apparent size which is smaller for sparse files + # While this can result in an out-of-space error it's better + # than nothing. + required_size -= pair.destination.stat().st_size + + # Check destination size + if self.mode == 'Clone': + destination_size = self.destination.size + error_msg = 'A larger destination disk is required' + else: + # NOTE: Adding an extra 5% here to better ensure it will fit + destination_size = psutil.disk_usage(self.destination).free + destination_size *= 1.05 + error_msg = 'Not enough free space on the destination' + if required_size > destination_size: + cli.print_error(error_msg) + raise std.GenericAbort() + + def _check_dest_smart(self) -> None: + """Check SMART for destination.""" + errors_detected = False + + # Check for critical errors + if not smart_status_ok(self.destination): + cli.print_error( + f'Critical error(s) detected for: {self.destination.path}', + ) + errors_detected = True + + # Check for minor errors + if not check_attributes(self.destination, only_blocking=False): + cli.print_warning( + f'Attribute error(s) detected for: {self.destination.path}', + ) + errors_detected = True + + # Done + if errors_detected: + raise std.GenericAbort() + + def add_block_pair(self, source: hw_disk.Disk, destination: pathlib.Path) -> None: """Add BlockPair object and run safety checks.""" self.block_pairs.append( BlockPair( @@ -96,143 +163,6 @@ class State(): working_dir=self.working_dir, )) - def _get_clone_settings_path(self) -> pathlib.Path: - """get Clone settings file path, returns pathlib.Path obj.""" - description = self.source.model - if not description: - description = self.source.path.name - return pathlib.Path(f'{self.working_dir}/Clone_{description}.json') - - def _load_settings(self, discard_unused_settings: bool = False) -> dict[Any, Any]: - """Load settings from previous run, returns dict.""" - settings = {} - settings_file = self._get_clone_settings_path() - - # Try loading JSON data - if settings_file.exists(): - with open(settings_file, 'r', encoding='utf-8') as _f: - try: - settings = json.loads(_f.read()) - except (OSError, json.JSONDecodeError) as err: - LOG.error('Failed to load clone settings') - cli.print_error('Invalid clone settings detected.') - raise std.GenericAbort() from err - - # Check settings - if settings: - if settings['First Run'] and discard_unused_settings: - # Previous run aborted before starting recovery, discard settings - settings = {} - else: - bail = False - for key in ('model', 'serial'): - if settings['Source'][key] != getattr(self.source, key): - cli.print_error(f"Clone settings don't match source {key}") - bail = True - if settings['Destination'][key] != getattr(self.destination, key): - cli.print_error(f"Clone settings don't match destination {key}") - bail = True - if bail: - raise std.GenericAbort() - - # Update settings - if not settings: - settings = CLONE_SETTINGS.copy() - if not settings['Source']: - settings['Source'] = { - 'model': self.source.model, - 'serial': self.source.serial, - } - if not settings['Destination']: - settings['Destination'] = { - 'model': self.destination.model, - 'serial': self.destination.serial, - } - - # Done - return settings - - def _save_settings(self, settings: dict[Any, Any]) -> None: - """Save settings for future runs.""" - settings_file = self._get_clone_settings_path() - - # Try saving JSON data - try: - with open(settings_file, 'w', encoding='utf-8') as _f: - json.dump(settings, _f) - except OSError as err: - cli.print_error('Failed to save clone settings') - raise std.GenericAbort() from err - - def add_clone_block_pairs(self) -> list[hw_disk.Disk]: - """Add device to device block pairs and set settings if necessary.""" - source_sep = get_partition_separator(self.source.path.name) - dest_sep = get_partition_separator(self.destination.path.name) - settings = {} - source_parts = [] - - # Clone settings - settings = self._load_settings(discard_unused_settings=True) - - # Add pairs - if settings['Partition Mapping']: - # Resume previous run, load pairs from settings file - for part_map in settings['Partition Mapping']: - bp_source = hw_disk.Disk( - f'{self.source.path}{source_sep}{part_map[0]}', - ) - bp_dest = pathlib.Path( - f'{self.destination.path}{dest_sep}{part_map[1]}', - ) - self._add_block_pair(bp_source, bp_dest) - else: - source_parts = menus.select_disk_parts('Clone', self.source) - if self.source.path.samefile(source_parts[0].path): - # Whole disk (or single partition via args), skip settings - bp_dest = self.destination.path - self._add_block_pair(self.source, bp_dest) - else: - # New run, use new settings file - settings['Needs Format'] = True - offset = 0 - user_choice = cli.choice( - 'Format clone using GPT, MBR, or match Source type?', - ['G', 'M', 'S'], - ) - if user_choice == 'G': - settings['Table Type'] = 'GPT' - elif user_choice == 'M': - settings['Table Type'] = 'MBR' - else: - # Match source type - settings['Table Type'] = get_table_type(self.source.path) - if cli.ask('Create an empty Windows boot partition on the clone?'): - settings['Create Boot Partition'] = True - offset = 2 if settings['Table Type'] == 'GPT' else 1 - - # Add pairs - for dest_num, part in enumerate(source_parts): - dest_num += offset + 1 - bp_dest = pathlib.Path( - f'{self.destination.path}{dest_sep}{dest_num}', - ) - self._add_block_pair(part, bp_dest) - - # Add to settings file - source_num = re.sub(r'^.*?(\d+)$', r'\1', part.path.name) - settings['Partition Mapping'].append([source_num, dest_num]) - - # Save settings - self._save_settings(settings) - - # Done - return source_parts - - def add_image_block_pairs(self, source_parts: list[hw_disk.Disk]) -> None: - """Add device to image file block pairs.""" - for part in source_parts: - self._add_block_pair(part, self.destination) - def confirm_selections( self, prompt_msg: str, @@ -274,7 +204,7 @@ class State(): report.extend( build_block_pair_report( self.block_pairs, - self._load_settings() if self.mode == 'Clone' else {}, + self.load_settings() if self.mode == 'Clone' else {}, ), ) report.append(' ') @@ -362,6 +292,13 @@ class State(): # Done return report + def get_clone_settings_path(self) -> pathlib.Path: + """get Clone settings file path, returns pathlib.Path obj.""" + description = self.source.model + if not description: + description = self.source.path.name + return pathlib.Path(f'{self.working_dir}/Clone_{description}.json') + def get_error_size(self) -> int: """Get total error size from block_pairs in bytes, returns int.""" return self.get_total_size() - self.get_rescued_size() @@ -440,10 +377,9 @@ class State(): # Add block pairs if self.mode == 'Clone': - source_parts = self.add_clone_block_pairs() + add_clone_block_pairs(self) else: - source_parts = menus.select_disk_parts(self.mode, self.source) - self.add_image_block_pairs(source_parts) + add_image_block_pairs(self) # Update SMART data ## TODO: Verify if needed @@ -454,9 +390,7 @@ class State(): update_smart_details(dev) # Safety Checks #1 - if self.mode == 'Clone': - self.safety_check_destination() - self.safety_check_size() + self.safety_check_destination() # Confirmation #2 self.update_progress_pane('Idle') @@ -476,13 +410,62 @@ class State(): # Prep destination if self.mode == 'Clone': - self.prep_destination(source_parts, dry_run=docopt_args['--dry-run']) + prep_destination(self, source_parts, dry_run=docopt_args['--dry-run']) # Safety Checks #2 if not docopt_args['--dry-run']: for pair in self.block_pairs: pair.safety_check() + def load_settings(self, discard_unused_settings: bool = False) -> dict[Any, Any]: + """Load settings from previous run, returns dict.""" + settings = {} + settings_file = self.get_clone_settings_path() + + # Try loading JSON data + if settings_file.exists(): + with open(settings_file, 'r', encoding='utf-8') as _f: + try: + settings = json.loads(_f.read()) + except (OSError, json.JSONDecodeError) as err: + LOG.error('Failed to load clone settings') + cli.print_error('Invalid clone settings detected.') + raise std.GenericAbort() from err + + # Check settings + if settings: + if settings['First Run'] and discard_unused_settings: + # Previous run aborted before starting recovery, discard settings + settings = {} + else: + bail = False + for key in ('model', 'serial'): + if settings['Source'][key] != getattr(self.source, key): + cli.print_error(f"Clone settings don't match source {key}") + bail = True + if settings['Destination'][key] != getattr(self.destination, key): + cli.print_error(f"Clone settings don't match destination {key}") + bail = True + if bail: + raise std.GenericAbort() + + # Update settings + if not settings: + settings = CLONE_SETTINGS.copy() + if not settings['Source']: + settings['Source'] = { + 'model': self.source.model, + 'serial': self.source.serial, + } + if not settings['Destination']: + settings['Destination'] = { + 'model': self.destination.model, + 'serial': self.destination.serial, + } + + # Done + return settings + def mark_started(self) -> None: """Edit clone settings, if applicable, to mark recovery as started.""" # Skip if not cloning @@ -495,10 +478,10 @@ class State(): return # Update settings - settings = self._load_settings() + settings = self.load_settings() if settings.get('First Run', False): settings['First Run'] = False - self._save_settings(settings) + self.save_settings(settings) def pass_above_threshold(self, pass_name: str) -> bool: """Check if all block_pairs meet the pass threshold, returns bool.""" @@ -511,110 +494,6 @@ class State(): """Check if all block_pairs completed pass_name, returns bool.""" return all(p.pass_complete(pass_name) for p in self.block_pairs) - def prep_destination( - self, - source_parts: list[hw_disk.Disk], - dry_run: bool = True, - ) -> None: - """Prep destination as necessary.""" - # TODO: Split into Linux and macOS - # logical sector size is not easily found under macOS - # It might be easier to rewrite this section using macOS tools - dest_prefix = str(self.destination.path) - dest_prefix += get_partition_separator(self.destination.path.name) - esp_type = 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B' - msr_type = 'E3C9E316-0B5C-4DB8-817D-F92DF00215AE' - part_num = 0 - sfdisk_script = [] - settings = self._load_settings() - - # Bail early - if not settings['Needs Format']: - return - - # Add partition table settings - if settings['Table Type'] == 'GPT': - sfdisk_script.append('label: gpt') - else: - sfdisk_script.append('label: dos') - sfdisk_script.append('unit: sectors') - sfdisk_script.append('') - - # Add boot partition if requested - if settings['Create Boot Partition']: - if settings['Table Type'] == 'GPT': - part_num += 1 - sfdisk_script.append( - build_sfdisk_partition_line( - table_type='GPT', - dev_path=f'{dest_prefix}{part_num}', - size='260MiB', - details={'parttype': esp_type, 'partlabel': 'EFI System'}, - ), - ) - part_num += 1 - sfdisk_script.append( - build_sfdisk_partition_line( - table_type=settings['Table Type'], - dev_path=f'{dest_prefix}{part_num}', - size='16MiB', - details={'parttype': msr_type, 'partlabel': 'Microsoft Reserved'}, - ), - ) - elif settings['Table Type'] == 'MBR': - part_num += 1 - sfdisk_script.append( - build_sfdisk_partition_line( - table_type='MBR', - dev_path=f'{dest_prefix}{part_num}', - size='100MiB', - details={'parttype': '0x7', 'partlabel': 'System Reserved'}, - ), - ) - - # Add selected partition(s) - for part in source_parts: - num_sectors = part.size / self.destination.log_sec - num_sectors = math.ceil(num_sectors) - part_num += 1 - sfdisk_script.append( - build_sfdisk_partition_line( - table_type=settings['Table Type'], - dev_path=f'{dest_prefix}{part_num}', - size=num_sectors, - details=part.raw_details, - ), - ) - - # Save sfdisk script - script_path = ( - f'{self.working_dir}/' - f'sfdisk_{self.destination.path.name}.script' - ) - with open(script_path, 'w', encoding='utf-8') as _f: - _f.write('\n'.join(sfdisk_script)) - - # Skip real format for dry runs - if dry_run: - LOG.info('Dry run, refusing to format destination') - return - - # Format disk - LOG.warning('Formatting destination: %s', self.destination.path) - with open(script_path, 'r', encoding='utf-8') as _f: - proc = exe.run_program( - cmd=['sudo', 'sfdisk', self.destination.path], - stdin=_f, - check=False, - ) - if proc.returncode != 0: - cli.print_error('Error(s) encoundtered while formatting destination') - raise std.GenericAbort() - - # Update settings - settings['Needs Format'] = False - self._save_settings(settings) - def retry_all_passes(self) -> None: """Prep block_pairs for a retry recovery attempt.""" bad_statuses = ('*', '/', '-') @@ -645,69 +524,9 @@ class State(): def safety_check_destination(self) -> None: """Run safety checks for destination and abort if necessary.""" - errors_detected = False - - # Check for critical errors - if not smart_status_ok(self.destination): - cli.print_error( - f'Critical error(s) detected for: {self.destination.path}', - ) - - # Check for minor errors - if not check_attributes(self.destination, only_blocking=False): - cli.print_warning( - f'Attribute error(s) detected for: {self.destination.path}', - ) - - # Done - if errors_detected: - raise std.GenericAbort() - - def safety_check_size(self) -> None: - """Run size safety check and abort if necessary.""" - required_size = sum(pair.size for pair in self.block_pairs) - settings = self._load_settings() if self.mode == 'Clone' else {} - - # Increase required_size if necessary - if self.mode == 'Clone' and settings.get('Needs Format', False): - if settings['Table Type'] == 'GPT': - # Below is the size calculation for the GPT - # 1 LBA for the protective MBR - # 33 LBAs each for the primary and backup GPT tables - # Source: https://en.wikipedia.org/wiki/GUID_Partition_Table - required_size += (1 + 33 + 33) * self.destination.phy_sec - if settings['Create Boot Partition']: - # 260MiB EFI System Partition and a 16MiB MS Reserved partition - required_size += (260 + 16) * 1024**2 - else: - # MBR only requires one LBA but adding a full 4096 bytes anyway - required_size += 4096 - if settings['Create Boot Partition']: - # 100MiB System Reserved partition - required_size += 100 * 1024**2 - - # Reduce required_size if necessary - if self.mode == 'Image': - for pair in self.block_pairs: - if pair.destination.exists(): - # NOTE: This uses the "max space" of the destination - # i.e. not the apparent size which is smaller for sparse files - # While this can result in an out-of-space error it's better - # than nothing. - required_size -= pair.destination.stat().st_size - - # Check destination size if self.mode == 'Clone': - destination_size = self.destination.size - error_msg = 'A larger destination disk is required' - else: - # NOTE: Adding an extra 5% here to better ensure it will fit - destination_size = psutil.disk_usage(self.destination).free - destination_size *= 1.05 - error_msg = 'Not enough free space on the destination' - if required_size > destination_size: - cli.print_error(error_msg) - raise std.GenericAbort() + self._check_dest_smart() + self._check_dest_size() def save_debug_reports(self) -> None: """Save debug reports to disk.""" @@ -731,6 +550,18 @@ class State(): _f.write('\n'.join(debug.generate_object_report(_bp))) _f.write('\n') + def save_settings(self, settings: dict[Any, Any]) -> None: + """Save settings for future runs.""" + settings_file = self.get_clone_settings_path() + + # Try saving JSON data + try: + with open(settings_file, 'w', encoding='utf-8') as _f: + json.dump(settings, _f) + except OSError as err: + cli.print_error('Failed to save clone settings') + raise std.GenericAbort() from err + def skip_pass(self, pass_name: str) -> None: """Mark block_pairs as skipped if applicable.""" for pair in self.block_pairs: @@ -1035,47 +866,6 @@ def build_object_report(obj) -> list[str]: return report -def build_sfdisk_partition_line(table_type, dev_path, size, details) -> str: - """Build sfdisk partition line using passed details, returns str.""" - line = f'{dev_path} : size={size}' - dest_type = '' - source_filesystem = str(details.get('fstype', '')).upper() - source_table_type = '' - source_type = details.get('parttype', '') - - # Set dest type - if re.match(r'^0x\w+$', source_type): - # Both source and dest are MBR - source_table_type = 'MBR' - if table_type == 'MBR': - dest_type = source_type.replace('0x', '').lower() - elif re.match(r'^\w{8}-\w{4}-\w{4}-\w{4}-\w{12}$', source_type): - # Source is a GPT type - source_table_type = 'GPT' - if table_type == 'GPT': - dest_type = source_type.upper() - if not dest_type: - # Assuming changing table types, set based on FS - if source_filesystem in cfg.ddrescue.PARTITION_TYPES.get(table_type, {}): - dest_type = cfg.ddrescue.PARTITION_TYPES[table_type][source_filesystem] - line += f', type={dest_type}' - - # Safety Check - if not dest_type: - cli.print_error(f'Failed to determine partition type for: {dev_path}') - raise std.GenericAbort() - - # Add extra details - if details.get('partlabel', ''): - line += f', name="{details["partlabel"]}"' - if details.get('partuuid', '') and source_table_type == table_type: - # Only add UUID if source/dest table types match - line += f', uuid={details["partuuid"].upper()}' - - # Done - return line - - def clean_working_dir(working_dir) -> None: """Clean working directory to ensure a fresh recovery session. @@ -1212,15 +1002,6 @@ def get_fstype_macos(path) -> str: return fstype -def get_partition_separator(name) -> str: - """Get partition separator based on device name, returns str.""" - separator = '' - if re.search(r'(loop|mmc|nvme)', name, re.IGNORECASE): - separator = 'p' - - return separator - - def get_percent_color(percent) -> str: """Get color based on percentage, returns str.""" color = None @@ -1237,46 +1018,6 @@ def get_percent_color(percent) -> str: return color -def get_table_type(disk_path) -> str: - """Get disk partition table type, returns str. - - NOTE: If resulting table type is not GPT or MBR - then an exception is raised. - """ - disk_path = str(disk_path) - table_type = None - - # Linux - if std.PLATFORM == 'Linux': - cmd = f'lsblk --json --output=pttype --nodeps {disk_path}'.split() - json_data = exe.get_json_from_command(cmd) - table_type = json_data['blockdevices'][0].get('pttype', '').upper() - table_type = table_type.replace('DOS', 'MBR') - - # macOS - if std.PLATFORM == 'Darwin': - cmd = ['diskutil', 'list', '-plist', disk_path] - proc = exe.run_program(cmd, check=False, encoding=None, errors=None) - try: - plist_data = plistlib.loads(proc.stdout) - except (TypeError, ValueError): - # Invalid / corrupt plist data? return empty dict to avoid crash - pass - else: - disk_details = plist_data.get('AllDisksAndPartitions', [{}])[0] - table_type = disk_details['Content'] - table_type = table_type.replace('FDisk_partition_scheme', 'MBR') - table_type = table_type.replace('GUID_partition_scheme', 'GPT') - - # Check type - if table_type not in ('GPT', 'MBR'): - cli.print_error(f'Unsupported partition table type: {table_type}') - raise std.GenericAbort() - - # Done - return table_type - - def get_working_dir(mode, destination, force_local=False) -> pathlib.Path: """Get working directory using mode and destination, returns path.""" ticket_id = cli.get_ticket_id()