"""WizardKit: ddrescue TUI - State""" # vim: sts=2 sw=2 ts=2 import datetime import json import logging import os import pathlib import re import shutil import subprocess from typing import Any import psutil import pytz from wk import cfg, debug, exe, io, net, std from wk.clone import menus from wk.clone.block_pair import ( BlockPair, add_clone_block_pairs, add_image_block_pairs, prep_destination, ) from wk.clone.image import mount_raw_image from wk.hw import disk as hw_disk from wk.hw.smart import ( check_attributes, enable_smart, smart_status_ok, update_smart_details, ) from wk.ui import ansi, cli, tmux, tui # STATIC VARIABLES LOG = logging.getLogger(__name__) CLONE_SETTINGS = { 'Source': None, 'Destination': None, 'Create Boot Partition': False, 'First Run': True, 'Needs Format': False, 'Table Type': None, 'Partition Mapping': [ # (5, 1) ## Clone source partition #5 to destination partition #1 ], } REGEX_REMAINING_TIME = re.compile( r'remaining time:' r'\s*((?P\d+)d)?' r'\s*((?P\d+)h)?' r'\s*((?P\d+)m)?' r'\s*((?P\d+)s)?' r'\s*(?Pn/a)?', re.IGNORECASE ) PLATFORM = std.PLATFORM RECOMMENDED_FSTYPES = re.compile(r'^(ext[234]|ntfs|xfs)$') if PLATFORM == 'Darwin': RECOMMENDED_FSTYPES = re.compile(r'^(apfs|hfs.?)$') RECOMMENDED_MAP_FSTYPES = re.compile( r'^(apfs|cifs|ext[234]|hfs.?|ntfs|smbfs|vfat|xfs)$' ) STATUS_COLORS = { 'Passed': 'GREEN', 'Aborted': 'YELLOW', 'Skipped': 'YELLOW', 'Working': 'YELLOW', 'ERROR': 'RED', } TIMEZONE = pytz.timezone(cfg.main.LINUX_TIME_ZONE) # Classes class State(): """Object for tracking hardware diagnostic data.""" def __init__(self, log_dir: pathlib.Path): self.block_pairs: list[BlockPair] = [] self.destination: hw_disk.Disk | pathlib.Path = pathlib.Path('/dev/null') self.log_dir: pathlib.Path = log_dir self.progress_out: pathlib.Path = self.log_dir.joinpath('progress.out') self.mode: str = '?' self.source: hw_disk.Disk | None = None self.working_dir: pathlib.Path | None = None self.ui: tui.TUI = tui.TUI('Source') def _check_dest_size(self) -> None: """Run size safety check and abort if necessary.""" required_size = sum(pair.size for pair in self.block_pairs) settings = self.load_settings() if self.mode == 'Clone' else {} # Increase required_size if necessary if self.mode == 'Clone' and settings.get('Needs Format', False): if settings['Table Type'] == 'GPT': # Below is the size calculation for the GPT # 1 LBA for the protective MBR # 33 LBAs each for the primary and backup GPT tables # Source: https://en.wikipedia.org/wiki/GUID_Partition_Table required_size += (1 + 33 + 33) * self.destination.phy_sec if settings['Create Boot Partition']: # 260MiB EFI System Partition and a 16MiB MS Reserved partition required_size += (260 + 16) * 1024**2 else: # MBR only requires one LBA but adding a full 4096 bytes anyway required_size += 4096 if settings['Create Boot Partition']: # 100MiB System Reserved partition required_size += 100 * 1024**2 # Reduce required_size if necessary if self.mode == 'Image': for pair in self.block_pairs: if pair.destination.exists(): # NOTE: This uses the "max space" of the destination # i.e. not the apparent size which is smaller for sparse files # While this can result in an out-of-space error it's better # than nothing. required_size -= pair.destination.stat().st_size # Check destination size if self.mode == 'Clone': destination_size = self.destination.size error_msg = 'A larger destination disk is required' else: # NOTE: Adding an extra 5% here to better ensure it will fit destination_size = psutil.disk_usage(self.destination).free destination_size *= 1.05 error_msg = 'Not enough free space on the destination' if required_size > destination_size: cli.print_error(error_msg) raise std.GenericAbort() def _check_dest_smart(self) -> None: """Check SMART for destination.""" errors_detected = False # Check for critical errors if not smart_status_ok(self.destination): cli.print_error( f'Critical error(s) detected for: {self.destination.path}', ) errors_detected = True # Check for minor errors if not check_attributes(self.destination, only_blocking=False): cli.print_warning( f'Attribute error(s) detected for: {self.destination.path}', ) errors_detected = True # Done if errors_detected: raise std.GenericAbort() def add_block_pair(self, source: hw_disk.Disk, destination: pathlib.Path) -> None: """Add BlockPair object and run safety checks.""" self.block_pairs.append( BlockPair( source_dev=source, destination=destination, working_dir=self.working_dir, )) def confirm_selections( self, prompt_msg: str, source_parts: list[hw_disk.Disk], ) -> None: """Show selection details and prompt for confirmation.""" report = [] # Source report.append(ansi.color_string('Source', 'GREEN')) report.extend(build_object_report(self.source)) report.append(' ') # Destination report.append(ansi.color_string('Destination', 'GREEN')) if self.mode == 'Clone': report[-1] += ansi.color_string(' (ALL DATA WILL BE DELETED)', 'RED') report.extend(build_object_report(self.destination)) report.append(' ') # Show deletion warning if necessary # NOTE: The check for block_pairs is to limit this section # to the second confirmation if self.mode == 'Clone' and self.block_pairs: report.append(ansi.color_string('WARNING', 'YELLOW')) report.append( 'All data will be deleted from the destination listed above.', ) report.append( ansi.color_string( ['This is irreversible and will lead to', 'DATA LOSS.'], ['YELLOW', 'RED'], ), ) report.append(' ') # Block pairs if self.block_pairs: report.extend( build_block_pair_report( self.block_pairs, self.load_settings() if self.mode == 'Clone' else {}, ), ) report.append(' ') # Map dir if self.working_dir: report.append(ansi.color_string('Map Save Directory', 'GREEN')) report.append(f'{self.working_dir}/') report.append(' ') if not fstype_is_ok(self.working_dir, map_dir=True): report.append( ansi.color_string( 'Map file(s) are being saved to a non-recommended filesystem.', 'YELLOW', ), ) report.append( ansi.color_string( ['This is strongly discouraged and may lead to', 'DATA LOSS'], [None, 'RED'], ), ) report.append(' ') # Source part(s) selected if source_parts: report.append(ansi.color_string('Source Part(s) selected', 'GREEN')) if self.source.path.samefile(source_parts[0].path): report.append('Whole Disk') else: report.append(ansi.color_string(f'{"NAME":<9} SIZE', 'BLUE')) for part in source_parts: report.append( f'{part.path.name:<9} ' f'{std.bytes_to_string(part.size, use_binary=False)}' ) report.append(' ') # Prompt user cli.clear_screen() cli.print_report(report) if not cli.ask(prompt_msg): raise std.GenericAbort() def generate_report(self) -> list[str]: """Generate report of overall and per block_pair results, returns list.""" report = [] # Header report.append(f'{self.mode.title()} Results:') report.append(' ') report.append(f'Source: {self.source.description}') if self.mode == 'Clone': report.append(f'Destination: {self.destination.description}') else: report.append(f'Destination: {self.destination}/') # Overall report.append(' ') error_size = self.get_error_size() error_size_str = std.bytes_to_string(error_size, decimals=2) if error_size > 0: error_size_str = ansi.color_string(error_size_str, 'YELLOW') percent = self.get_percent_recovered() percent = format_status_string(percent, width=0) report.append(f'Overall rescued: {percent}, error size: {error_size_str}') # Block-Pairs if len(self.block_pairs) > 1: report.append(' ') for pair in self.block_pairs: error_size = pair.get_error_size() error_size_str = std.bytes_to_string(error_size, decimals=2) if error_size > 0: error_size_str = ansi.color_string(error_size_str, 'YELLOW') pair_size = std.bytes_to_string(pair.size, decimals=2) percent = pair.get_percent_recovered() percent = format_status_string(percent, width=0) report.append( f'{pair.source.name} ({pair_size}) ' f'rescued: {percent}, ' f'error size: {error_size_str}' ) # Done return report def get_clone_settings_path(self) -> pathlib.Path: """get Clone settings file path, returns pathlib.Path obj.""" description = self.source.model if not description: description = self.source.path.name return pathlib.Path(f'{self.working_dir}/Clone_{description}.json') def get_error_size(self) -> int: """Get total error size from block_pairs in bytes, returns int.""" return self.get_total_size() - self.get_rescued_size() def get_percent_recovered(self) -> float: """Get total percent rescued from block_pairs, returns float.""" return 100 * self.get_rescued_size() / self.get_total_size() def get_rescued_size(self) -> int: """Get total rescued size from all block pairs, returns int.""" return sum(pair.get_rescued_size() for pair in self.block_pairs) def get_total_size(self) -> int: """Get total size of all block_pairs in bytes, returns int.""" return sum(pair.size for pair in self.block_pairs) def init_recovery(self, docopt_args: dict[str, Any]) -> None: """Select source/dest and set env.""" cli.clear_screen() disk_menu = menus.disks() source_parts = [] self.ui.set_progress_file(str(self.progress_out)) # Set mode self.mode = set_mode(docopt_args) # Select source self.source = select_disk_obj('source', disk_menu, docopt_args['']) self.update_top_panes() if self.source.trim: cli.print_warning('Source device supports TRIM') if not cli.ask(' Proceed with recovery?'): cli.abort() self.ui.set_title('Source', self.source.name) # Select destination if self.mode == 'Clone': self.destination = select_disk_obj( 'destination', disk_menu, docopt_args[''], ) self.ui.add_title_pane('Destination', self.destination.name) elif self.mode == 'Image': if docopt_args['']: self.destination = pathlib.Path(docopt_args['']).resolve() else: self.destination = menus.select_path('Destination') self.ui.add_title_pane('Destination', self.destination) self.update_top_panes() # Update details self.source.update_details(skip_children=False) if self.mode == 'Clone': self.destination.update_details(skip_children=False) # Confirmation #1 self.confirm_selections( prompt_msg='Are these selections correct?', source_parts=source_parts, ) # Update panes self.update_progress_pane('Idle') # Set working dir self.working_dir = get_working_dir( self.mode, self.destination, force_local=docopt_args['--force-local-map'], ) # Start fresh if requested if docopt_args['--start-fresh']: clean_working_dir(self.working_dir) # Add block pairs if self.mode == 'Clone': add_clone_block_pairs(self) else: add_image_block_pairs(self) # Update SMART data ## TODO: Verify if needed for dev in (self.source, self.destination): if not isinstance(dev, hw_disk.Disk): continue enable_smart(dev) update_smart_details(dev) # Safety Checks #1 self.safety_check_destination() # Confirmation #2 self.update_progress_pane('Idle') self.confirm_selections('Start recovery?', source_parts) # Unmount source and/or destination under macOS if PLATFORM == 'Darwin': for dev in (self.source, self.destination): if not isinstance(dev, hw_disk.Disk): continue cmd = ['diskutil', 'unmountDisk', dev.path] try: exe.run_program(cmd) except subprocess.CalledProcessError: cli.print_error('Failed to unmount source and/or destination') cli.abort() # Prep destination if self.mode == 'Clone': prep_destination(self, source_parts, dry_run=docopt_args['--dry-run']) # Safety Checks #2 if not docopt_args['--dry-run']: for pair in self.block_pairs: pair.safety_check() def load_settings(self, discard_unused_settings: bool = False) -> dict[Any, Any]: """Load settings from previous run, returns dict.""" settings = {} settings_file = self.get_clone_settings_path() # Try loading JSON data if settings_file.exists(): with open(settings_file, 'r', encoding='utf-8') as _f: try: settings = json.loads(_f.read()) except (OSError, json.JSONDecodeError) as err: LOG.error('Failed to load clone settings') cli.print_error('Invalid clone settings detected.') raise std.GenericAbort() from err # Check settings if settings: if settings['First Run'] and discard_unused_settings: # Previous run aborted before starting recovery, discard settings settings = {} else: bail = False for key in ('model', 'serial'): if settings['Source'][key] != getattr(self.source, key): cli.print_error(f"Clone settings don't match source {key}") bail = True if settings['Destination'][key] != getattr(self.destination, key): cli.print_error(f"Clone settings don't match destination {key}") bail = True if bail: raise std.GenericAbort() # Update settings if not settings: settings = CLONE_SETTINGS.copy() if not settings['Source']: settings['Source'] = { 'model': self.source.model, 'serial': self.source.serial, } if not settings['Destination']: settings['Destination'] = { 'model': self.destination.model, 'serial': self.destination.serial, } # Done return settings def mark_started(self) -> None: """Edit clone settings, if applicable, to mark recovery as started.""" # Skip if not cloning if self.mode != 'Clone': return # Skip if not using settings # i.e. Cloning whole disk (or single partition via args) if self.source.path.samefile(self.block_pairs[0].source): return # Update settings settings = self.load_settings() if settings.get('First Run', False): settings['First Run'] = False self.save_settings(settings) def pass_above_threshold(self, pass_name: str) -> bool: """Check if all block_pairs meet the pass threshold, returns bool.""" threshold = cfg.ddrescue.AUTO_PASS_THRESHOLDS[pass_name] return all( p.get_percent_recovered() >= threshold for p in self.block_pairs ) def pass_complete(self, pass_name: str) -> bool: """Check if all block_pairs completed pass_name, returns bool.""" return all(p.pass_complete(pass_name) for p in self.block_pairs) def retry_all_passes(self) -> None: """Prep block_pairs for a retry recovery attempt.""" bad_statuses = ('*', '/', '-') LOG.warning('Updating block_pairs for retry') # Update all block_pairs for pair in self.block_pairs: map_data = [] # Reset status strings for name in pair.status.keys(): pair.status[name] = 'Pending' # Mark all non-trimmed, non-scraped, and bad areas as non-tried with open(pair.map_path, 'r', encoding='utf-8') as _f: for line in _f.readlines(): line = line.strip() if line.startswith('0x') and line.endswith(bad_statuses): line = f'{line[:-1]}?' map_data.append(line) # Save updated map with open(pair.map_path, 'w', encoding='utf-8') as _f: _f.write('\n'.join(map_data)) # Reinitialize status pair.set_initial_status() def safety_check_destination(self) -> None: """Run safety checks for destination and abort if necessary.""" if self.mode == 'Clone': self._check_dest_smart() self._check_dest_size() def save_debug_reports(self) -> None: """Save debug reports to disk.""" LOG.info('Saving debug reports') debug_dir = pathlib.Path(f'{self.log_dir}/debug') if not debug_dir.exists(): debug_dir.mkdir() # State (self) debug.save_pickles({'state': self}, debug_dir) with open(f'{debug_dir}/state.report', 'a', encoding='utf-8') as _f: _f.write('[Debug report]\n') _f.write('\n'.join(debug.generate_object_report(self))) _f.write('\n') # Block pairs for _bp in self.block_pairs: with open( f'{debug_dir}/block_pairs.report', 'a', encoding='utf-8') as _f: _f.write('[Debug report]\n') _f.write('\n'.join(debug.generate_object_report(_bp))) _f.write('\n') def save_settings(self, settings: dict[Any, Any]) -> None: """Save settings for future runs.""" settings_file = self.get_clone_settings_path() # Try saving JSON data try: with open(settings_file, 'w', encoding='utf-8') as _f: json.dump(settings, _f) except OSError as err: cli.print_error('Failed to save clone settings') raise std.GenericAbort() from err def skip_pass(self, pass_name: str) -> None: """Mark block_pairs as skipped if applicable.""" for pair in self.block_pairs: if pair.status[pass_name] == 'Pending': pair.status[pass_name] = 'Skipped' def update_progress_pane(self, overall_status: str) -> None: """Update progress pane.""" report = [] separator = '─────────────────────' width = cfg.ddrescue.TMUX_SIDE_WIDTH # Status report.append(ansi.color_string(f'{"Status":^{width}}', 'BLUE')) if 'NEEDS ATTENTION' in overall_status: report.append( ansi.color_string(f'{overall_status:^{width}}', 'YELLOW_BLINK'), ) else: report.append(f'{overall_status:^{width}}') report.append(separator) # Overall progress if self.block_pairs: total_rescued = self.get_rescued_size() percent = self.get_percent_recovered() report.append(ansi.color_string('Overall Progress', 'BLUE')) report.append( f'Rescued: {format_status_string(percent, width=width-9)}', ) report.append( ansi.color_string( [f'{std.bytes_to_string(total_rescued, decimals=2):>{width}}'], [get_percent_color(percent)], ), ) report.append(separator) # Block pair progress for pair in self.block_pairs: report.append(ansi.color_string(pair.source, 'BLUE')) for name, status in pair.status.items(): name = name.title() report.append( f'{name}{format_status_string(status, width=width-len(name))}', ) report.append(' ') # EToC if overall_status in ('Active', 'NEEDS ATTENTION'): etoc = get_etoc() report.append(separator) report.append(ansi.color_string('Estimated Pass Finish', 'BLUE')) if overall_status == 'NEEDS ATTENTION' or etoc == 'N/A': report.append(ansi.color_string('N/A', 'YELLOW')) else: report.append(etoc) # Write to progress file self.progress_out.write_text('\n'.join(report), encoding='utf-8', errors='ignore') def update_top_panes(self) -> None: """(Re)create top source/destination panes.""" source_exists = True source_str = '' dest_exists = True dest_str = '' width = tmux.get_pane_size()[0] width = int(width / 2) - 1 def _format_string(obj, width) -> str: """Format source/dest string using obj and width, returns str.""" string = '' # Build base string if isinstance(obj, hw_disk.Disk): string = f'{obj.path} {obj.description}' elif obj.is_dir(): string = f'{obj}/' elif obj.is_file(): size_str = std.bytes_to_string( obj.stat().st_size, decimals=0, use_binary=False) string = f'{obj.name} {size_str}' # Adjust for width if len(string) > width: if hasattr(obj, 'is_dir') and obj.is_dir(): string = f'...{string[-width+3:]}' else: string = f'{string[:width-3]}...' # Done return string # Check source/dest existance if self.source: source_exists = self.source.path.exists() if self.destination: if isinstance(self.destination, hw_disk.Disk): dest_exists = self.destination.path.exists() else: dest_exists = self.destination.exists() # Source if self.source: source_str = _format_string(self.source, width) # Destination if self.destination: dest_str = _format_string(self.destination, width) # Reset title panes self.ui.reset_title_pane( ansi.color_string( ['Source', '' if source_exists else ' (Missing)'], ['BLUE', 'RED'], ), source_str, ) if dest_str: self.ui.add_title_pane( ansi.color_string( ['Destination', '' if dest_exists else ' (Missing)'], ['BLUE', 'RED'], ), dest_str, ) # Functions def build_block_pair_report(block_pairs, settings) -> list: """Build block pair report, returns list.""" report = [] notes = [] if block_pairs: report.append(ansi.color_string('Block Pairs', 'GREEN')) else: # Bail early return report # Show block pair mapping if settings and settings['Create Boot Partition']: if settings['Table Type'] == 'GPT': report.append(f'{" —— ":<9} --> EFI System Partition') report.append(f'{" —— ":<9} --> Microsoft Reserved Partition') elif settings['Table Type'] == 'MBR': report.append(f'{" —— ":<9} --> System Reserved') for pair in block_pairs: report.append(f'{pair.source.name:<9} --> {pair.destination.name}') # Show resume messages as necessary if settings: if not settings['First Run']: notes.append( ansi.color_string( ['NOTE:', 'Clone settings loaded from previous run.'], ['BLUE', None], ), ) if settings['Needs Format'] and settings['Table Type']: msg = f'Destination will be formatted using {settings["Table Type"]}' notes.append( ansi.color_string( ['NOTE:', msg], ['BLUE', None], ), ) if any(pair.get_rescued_size() > 0 for pair in block_pairs): notes.append( ansi.color_string( ['NOTE:', 'Resume data loaded from map file(s).'], ['BLUE', None], ), ) # Add notes to report if notes: report.append(' ') report.extend(notes) # Done return report def build_directory_report(path: pathlib.Path) -> list[str]: """Build directory report, returns list.""" path_str = f'{path}/' report = [] # Get details if PLATFORM == 'Linux': cmd = [ 'findmnt', '--output', 'SIZE,AVAIL,USED,FSTYPE,OPTIONS', '--target', path_str, ] proc = exe.run_program(cmd) width = len(path_str) + 1 for line in proc.stdout.splitlines(): line = line.replace('\n', '') if 'FSTYPE' in line: line = ansi.color_string(f'{"path_str":<{width}}{line}', 'BLUE') else: line = f'{path_str:<{width}}{line}' report.append(line) else: report.append(ansi.color_string('path_str', 'BLUE')) report.append(str(path_str)) # Done return report def build_disk_report(dev: hw_disk.Disk) -> list[str]: """Build device report, returns list.""" report = [] # Get widths widths = { 'fstype': max(6, len(str(dev.filesystem))), 'label': max(5, len(str(dev.raw_details.get('label', '')))), 'name': max(4, len(dev.path.name)), } for child in dev.children: widths['fstype'] = max(widths['fstype'], len(str(child['fstype']))) widths['label'] = max(widths['label'], len(str(child['label']))) widths['name'] = max( widths['name'], len(child['name'].replace('/dev/', '')), ) widths = {k: v+1 for k, v in widths.items()} # Disk details report.append(f'{dev.path.name} {dev.description}') report.append(' ') dev_fstype = dev.filesystem dev_label = dev.raw_details.get('label', '') dev_name = dev.path.name dev_size = std.bytes_to_string(dev.size, use_binary=False) # Partition details report.append( ansi.color_string( ( f'{"NAME":<{widths["name"]}}' f'{" " if dev.children else ""}' f'{"SIZE":<7}' f'{"FSTYPE":<{widths["fstype"]}}' f'{"LABEL":<{widths["label"]}}' ), 'BLUE', ), ) report.append( f'{dev_name if dev_name else "":<{widths["name"]}}' f'{" " if dev.children else ""}' f'{dev_size:>6} ' f'{dev_fstype if dev_fstype else "":<{widths["fstype"]}}' f'{dev_label if dev_label else "":<{widths["label"]}}' ) for child in dev.children: fstype = child['fstype'] label = child['label'] name = child['name'].replace('/dev/', '') size = std.bytes_to_string(child["size"], use_binary=False) report.append( f'{name if name else "":<{widths["name"]}}' f'{size:>6} ' f'{fstype if fstype else "":<{widths["fstype"]}}' f'{label if label else "":<{widths["label"]}}' ) # Indent children if len(dev.children) > 1: report = [ *report[:4], *[f'├─{line}' for line in report[4:-1]], f'└─{report[-1]}', ] elif len(dev.children) == 1: report[-1] = f'└─{report[-1]}' # Done return report def build_object_report(obj) -> list[str]: """Build object report, returns list.""" report = [] # Get details based on object given if hasattr(obj, 'is_dir') and obj.is_dir(): # Directory report report = build_directory_report(obj) else: # Device report report = build_disk_report(obj) # Done return report def clean_working_dir(working_dir) -> None: """Clean working directory to ensure a fresh recovery session. NOTE: Data from previous sessions will be preserved in a backup directory. """ backup_dir = pathlib.Path(f'{working_dir}/prev') backup_dir = io.non_clobber_path(backup_dir) backup_dir.mkdir() # Move settings, maps, etc to backup_dir for entry in os.scandir(working_dir): if entry.name.endswith(('.dd', '.json', '.map')): new_path = f'{backup_dir}/{entry.name}' new_path = io.non_clobber_path(new_path) shutil.move(entry.path, new_path) def format_status_string(status, width) -> str: """Format colored status string, returns str.""" color = None percent = -1 status_str = str(status) # Check if status is percentage try: percent = float(status_str) except ValueError: # Assuming status is text pass # Format status if percent >= 0: # Percentage color = get_percent_color(percent) status_str = f'{percent:{width-2}.2f} %' if '100.00' in status_str and percent < 100: # Always round down to 99.99% LOG.warning('Rounding down to 99.99 from %s', percent) status_str = f'{"99.99 %":>{width}}' else: # Text color = STATUS_COLORS.get(status_str, None) status_str = f'{status_str:>{width}}' # Add color if necessary if color: status_str = ansi.color_string(status_str, color) # Done return status_str def fstype_is_ok(path, map_dir=False) -> bool: """Check if filesystem type is acceptable, returns bool.""" is_ok = False fstype = None # Get fstype if PLATFORM == 'Darwin': # Check all parent dirs until a mountpoint is found test_path = pathlib.Path(path) while test_path: fstype = get_fstype_macos(test_path) if fstype != 'UNKNOWN': break fstype = None test_path = test_path.parent elif PLATFORM == 'Linux': cmd = [ 'findmnt', '--noheadings', '--output', 'FSTYPE', '--target', path, ] proc = exe.run_program(cmd, check=False) fstype = proc.stdout fstype = fstype.strip().lower() # Check fstype if map_dir: is_ok = RECOMMENDED_MAP_FSTYPES.match(fstype) else: is_ok = RECOMMENDED_FSTYPES.match(fstype) # Done return is_ok def get_etoc() -> str: """Get EToC from ddrescue output, returns str.""" delta = None delta_dict = {} etoc = 'Unknown' now = datetime.datetime.now(tz=TIMEZONE) output = tmux.capture_pane() # Search for EToC delta matches = re.findall(r'remaining time:.*$', output, re.MULTILINE) if matches: match = REGEX_REMAINING_TIME.search(matches[-1]) if match.group('na'): etoc = 'N/A' else: for key in ('days', 'hours', 'minutes', 'seconds'): delta_dict[key] = match.group(key) delta_dict = {k: int(v) if v else 0 for k, v in delta_dict.items()} delta = datetime.timedelta(**delta_dict) # Calc EToC if delta found if delta: etoc_datetime = now + delta etoc = etoc_datetime.strftime('%Y-%m-%d %H:%M %Z') # Done return etoc def get_fstype_macos(path) -> str: """Get fstype for path under macOS, returns str.""" fstype = 'UNKNOWN' proc = exe.run_program(['mount'], check=False) # Bail early if proc.returncode: return fstype # Parse output match = re.search(rf'{path} \((\w+)', proc.stdout) if match: fstype = match.group(1) # Done return fstype def get_percent_color(percent) -> str: """Get color based on percentage, returns str.""" color = None if percent > 100: color = 'PURPLE' elif percent >= 99: color = 'GREEN' elif percent >= 90: color = 'YELLOW' elif percent > 0: color = 'RED' # Done return color def get_working_dir(mode, destination, force_local=False) -> pathlib.Path: """Get working directory using mode and destination, returns path.""" ticket_id = cli.get_ticket_id() working_dir = None # Use preferred path if possible if mode == 'Image': try: path = pathlib.Path(destination).resolve() except TypeError as err: cli.print_error(f'Invalid destination: {destination}') raise std.GenericAbort() from err if path.exists() and fstype_is_ok(path, map_dir=False): working_dir = path elif mode == 'Clone' and not force_local: cli.print_info('Mounting backup shares...') net.mount_backup_shares(read_write=True) for server in cfg.net.BACKUP_SERVERS: path = pathlib.Path( f'/{"Volumes" if PLATFORM == "Darwin" else "Backups"}/{server}', ) if path.exists() and fstype_is_ok(path, map_dir=True): # Acceptable path found working_dir = path break # Default to current dir if necessary if not working_dir: LOG.error('Failed to set preferred working directory') working_dir = pathlib.Path(os.getcwd()) # Set subdir using ticket ID if mode == 'Clone': working_dir = working_dir.joinpath(ticket_id) # Create directory working_dir.mkdir(parents=True, exist_ok=True) os.chdir(working_dir) # Done LOG.info('Set working directory to: %s', working_dir) return working_dir def select_disk_obj(label:str, disk_menu: cli.Menu, disk_path: str) -> hw_disk.Disk: """Get disk based on path or menu selection, returns Disk.""" if not disk_path: return menus.select_disk(label.capitalize(), disk_menu) # Source was provided, parse and run safety checks path = pathlib.Path(disk_path).resolve() # Bail early if not path.exists(): raise FileNotFoundError(f'Path provided does not exist: {path}') # Disk objects if path.is_block_device() or path.is_char_device(): obj = hw_disk.Disk(path) # Child/Parent check if obj.parent: cli.print_warning(f'"{obj.path}" is a child device') if cli.ask(f'Use parent device "{obj.parent}" instead?'): obj = hw_disk.Disk(obj.parent) # Done return obj # Raw image objects if path.is_file(): loop_path = mount_raw_image(path) return hw_disk.Disk(loop_path) # Abort if object type couldn't be determined # NOTE: This shouldn't every be reached? cli.print_error(f'Invalid {label} path: {disk_path}') raise std.GenericAbort() def set_mode(docopt_args) -> str: """Set mode from docopt_args or user selection, returns str.""" mode = '?' # Check docopt_args if docopt_args['clone']: mode = 'Clone' elif docopt_args['image']: mode = 'Image' # Ask user if necessary if not mode: answer = cli.choice('Are we cloning or imaging?', ['C', 'I']) if answer == 'C': mode = 'Clone' else: mode = 'Image' # Done return mode if __name__ == '__main__': print("This file is not meant to be called directly.")