"""WizardKit: ddrescue TUI - Block Pairs""" # vim: sts=2 sw=2 ts=2 import logging import os import pathlib import re import subprocess from wk import cfg, exe, std from wk.hw import disk as hw_disk from wk.ui import cli # STATIC VARIABLES LOG = logging.getLogger(__name__) DDRESCUE_LOG_REGEX = re.compile( r'^\s*(?P\S+):\s+' r'(?P\d+)\s+' r'(?P[PTGMKB]i?B?)' r'.*\(\s*(?P\d+\.?\d*)%\)$', re.IGNORECASE, ) # Classes class BlockPair(): """Object for tracking source to dest recovery data.""" def __init__( self, source_dev: hw_disk.Disk, destination: pathlib.Path, working_dir: pathlib.Path, ): self.sector_size: int = source_dev.phy_sec self.source: pathlib.Path = pathlib.Path(source_dev.path) self.destination: pathlib.Path = destination self.map_data: dict[str, bool | int] = {} self.map_path: pathlib.Path = pathlib.Path() self.size: int = source_dev.size self.status: dict[str, float | int | str] = { 'read-skip': 'Pending', 'read-full': 'Pending', 'trim': 'Pending', 'scrape': 'Pending', } self.test_map: pathlib.Path | None = None self.view_map: bool = 'DISPLAY' in os.environ or 'WAYLAND_DISPLAY' in os.environ self.view_proc: subprocess.Popen | None = None # Set map path # e.g. '(Clone|Image)_Model[_p#]_Size[_Label].map' map_name = source_dev.model if source_dev.bus == 'Image': map_name = 'Image' if source_dev.parent: part_num = re.sub(r"^.*?(\d+)$", r"\1", self.source.name) map_name += f'_p{part_num}' size_str = std.bytes_to_string( size=self.size, use_binary=False, ) map_name += f'_{size_str.replace(" ", "")}' if source_dev.raw_details.get('label', ''): map_name += f'_{source_dev.raw_details["label"]}' map_name = map_name.replace(' ', '_') map_name = map_name.replace('/', '_') map_name = map_name.replace('\\', '_') if destination.is_dir(): # Imaging self.map_path = pathlib.Path(f'{destination}/Image_{map_name}.map') self.destination = self.map_path.with_suffix('.dd') self.destination.touch() else: # Cloning self.map_path = pathlib.Path(f'{working_dir}/Clone_{map_name}.map') # Create map file if needed # NOTE: We need to set the domain size for --complete-only to work if not self.map_path.exists(): self.map_path.write_text( data=cfg.ddrescue.DDRESCUE_MAP_TEMPLATE.format( name=cfg.main.KIT_NAME_FULL, size=self.size, ), encoding='utf-8', ) # Set initial status self.set_initial_status() def __getstate__(self): """Override to allow pickling ddrescue.State() objects.""" bp_state = self.__dict__.copy() del bp_state['view_proc'] return bp_state def get_error_size(self) -> int: """Get error size in bytes, returns int.""" return self.size - self.get_rescued_size() def get_percent_recovered(self) -> float: """Get percent rescued from map_data, returns float.""" return 100 * self.map_data.get('rescued', 0) / self.size def get_rescued_size(self) -> int: """Get rescued size using map data. NOTE: Returns 0 if no map data is available. """ self.load_map_data() return self.map_data.get('rescued', 0) def load_map_data(self) -> None: """Load map data from file. NOTE: If the file is missing it is assumed that recovery hasn't started yet so default values will be returned instead. """ data: dict[str, bool | int] = {'full recovery': False, 'pass completed': False} # Get output from ddrescuelog cmd = [ 'ddrescuelog', '--binary-prefixes', '--show-status', f'--size={self.size}', self.map_path, ] proc = exe.run_program(cmd, check=False) # Parse output for line in proc.stdout.splitlines(): _r = DDRESCUE_LOG_REGEX.search(line) if _r: if _r.group('key') == 'rescued' and _r.group('percent') == '100': # Fix rounding errors from ddrescuelog output data['rescued'] = self.size else: data[_r.group('key')] = std.string_to_bytes( f'{_r.group("size")} {_r.group("unit")}', ) data['pass completed'] = 'current status: finished' in line.lower() # Check if 100% done (only if map is present and non-zero size # NOTE: ddrescuelog returns 0 (i.e. 100% done) for empty files if self.map_path.exists() and self.map_path.stat().st_size != 0: cmd = [ 'ddrescuelog', '--done-status', f'--size={self.size}', self.map_path, ] proc = exe.run_program(cmd, check=False) data['full recovery'] = proc.returncode == 0 # Done self.map_data.update(data) def pass_complete(self, pass_name) -> bool: """Check if pass_name is complete based on map data, returns bool.""" pending_size = self.map_data['non-tried'] # Full recovery if self.map_data.get('full recovery', False): return True # New recovery if 'non-tried' not in self.map_data: return False # Initial read skip pass if pass_name == 'read-skip': pass_threshold = cfg.ddrescue.AUTO_PASS_THRESHOLDS[pass_name] if self.get_percent_recovered() >= pass_threshold: return True # Recovery in progress if pass_name in ('trim', 'scrape'): pending_size += self.map_data['non-trimmed'] if pass_name == 'scrape': pending_size += self.map_data['non-scraped'] if pending_size == 0: # This is true when the previous and current passes are complete return True # This should never be reached return False def safety_check(self) -> None: """Run safety check and abort if necessary.""" # TODO: Expand section to support non-Linux systems dest_size = -1 if self.destination.is_block_device(): cmd = [ 'lsblk', '--bytes', '--json', '--nodeps', '--noheadings', '--output=size', self.destination, ] json_data = exe.get_json_from_command(cmd) dest_size = json_data['blockdevices'][0]['size'] del json_data # Check destination size if cloning if not self.destination.is_file() and dest_size < self.size: cli.print_error(f'Invalid destination: {self.destination}') raise std.GenericAbort() def set_initial_status(self) -> None: """Read map data and set initial statuses.""" self.load_map_data() percent = self.get_percent_recovered() for name in self.status: if self.pass_complete(name): self.status[name] = percent else: # Stop checking if percent > 0: self.status[name] = percent break def skip_pass(self, pass_name) -> None: """Mark pass as skipped if applicable.""" if self.status[pass_name] == 'Pending': self.status[pass_name] = 'Skipped' def update_progress(self, pass_name) -> None: """Update progress via map data.""" self.load_map_data() # Update status percent = self.get_percent_recovered() if percent > 0: self.status[pass_name] = percent # Mark future passes as skipped if applicable if percent == 100: status_keys = list(self.status.keys()) for pass_n in status_keys[status_keys.index(pass_name)+1:]: self.status[pass_n] = 'Skipped' if __name__ == '__main__': print("This file is not meant to be called directly.")