From 203ad715e0cb78ea0538b4945f69987d1cf74ebc Mon Sep 17 00:00:00 2001 From: 2Shirt <2xShirt@gmail.com> Date: Sun, 11 Jun 2023 15:48:58 -0700 Subject: [PATCH] Refactor ddrescue-tui source/dest selection - Re-enables taking images instead of direct cloning! - Removed some safety checks for clearer code - We avoid a second scan by reusing the disk_menu object --- scripts/wk/clone/ddrescue.py | 104 +++++++++++++++++++---------------- scripts/wk/clone/menus.py | 7 ++- scripts/wk/hw/disk.py | 5 +- 3 files changed, 65 insertions(+), 51 deletions(-) diff --git a/scripts/wk/clone/ddrescue.py b/scripts/wk/clone/ddrescue.py index 8c955bb4..b8fe5f36 100644 --- a/scripts/wk/clone/ddrescue.py +++ b/scripts/wk/clone/ddrescue.py @@ -323,7 +323,7 @@ class State(): """Object for tracking hardware diagnostic data.""" def __init__(self): self.block_pairs: list[BlockPair] = [] - self.destination: pathlib.Path | None = None + self.destination: hw_disk.Disk | pathlib.Path = pathlib.Path('/dev/null') self.log_dir: pathlib.Path = log.format_log_path() self.log_dir = self.log_dir.parent.joinpath( f'ddrescue-TUI_{time.strftime("%Y-%m-%d_%H%M%S%z")}/', @@ -483,7 +483,7 @@ class State(): def confirm_selections( self, prompt_msg: str, - source_parts: list[hw_disk.Disk] | None = None, + source_parts: list[hw_disk.Disk], ) -> None: """Show selection details and prompt for confirmation.""" report = [] @@ -644,23 +644,28 @@ class State(): self.mode = set_mode(docopt_args) # Select source - self.source = get_object(docopt_args['']) - if not self.source: - self.source = menus.select_disk('Source', disk_menu) + self.source = select_disk_obj('source', disk_menu, docopt_args['']) self.ui.set_title('Source', self.source.name) # Select destination - self.destination = get_object(docopt_args['']) - if not self.destination: - if self.mode == 'Clone': - self.destination = menus.select_disk('Destination', disk_menu) - elif self.mode == 'Image': + if self.mode == 'Clone': + self.destination = select_disk_obj( + 'destination', + disk_menu, + docopt_args[''], + ) + self.ui.add_title_pane('Destination', self.destination.name) + elif self.mode == 'Image': + if docopt_args['']: + self.destination = pathlib.Path(docopt_args['']).resolve() + else: self.destination = menus.select_path('Destination') - self.ui.add_title_pane('Destination', self.destination.name) + self.ui.add_title_pane('Destination', self.destination) # Update details self.source.update_details(skip_children=False) - self.destination.update_details(skip_children=False) + if self.mode == 'Clone': + self.destination.update_details(skip_children=False) # Confirmation #1 self.confirm_selections( @@ -692,6 +697,8 @@ class State(): # Update SMART data ## TODO: Verify if needed for dev in (self.source, self.destination): + if not isinstance(dev, hw_disk.Disk): + continue enable_smart(dev) update_smart_details(dev) @@ -702,12 +709,14 @@ class State(): # Confirmation #2 self.update_progress_pane('Idle') - self.confirm_selections('Start recovery?') + self.confirm_selections('Start recovery?', source_parts) # Unmount source and/or destination under macOS if PLATFORM == 'Darwin': - for disk in (self.source, self.destination): - cmd = ['diskutil', 'unmountDisk', disk.path] + for dev in (self.source, self.destination): + if not isinstance(dev, hw_disk.Disk): + continue + cmd = ['diskutil', 'unmountDisk', dev.path] try: exe.run_program(cmd) except subprocess.CalledProcessError: @@ -1207,9 +1216,9 @@ def build_ddrescue_cmd(block_pair, pass_name, settings_menu) -> list[str]: return cmd -def build_directory_report(path) -> list[str]: +def build_directory_report(path: pathlib.Path) -> list[str]: """Build directory report, returns list.""" - path = f'{path}/' + path_str = f'{path}/' report = [] # Get details @@ -1217,26 +1226,26 @@ def build_directory_report(path) -> list[str]: cmd = [ 'findmnt', '--output', 'SIZE,AVAIL,USED,FSTYPE,OPTIONS', - '--target', path, + '--target', path_str, ] proc = exe.run_program(cmd) - width = len(path) + 1 + width = len(path_str) + 1 for line in proc.stdout.splitlines(): line = line.replace('\n', '') if 'FSTYPE' in line: - line = ansi.color_string(f'{"PATH":<{width}}{line}', 'BLUE') + line = ansi.color_string(f'{"path_str":<{width}}{line}', 'BLUE') else: - line = f'{path:<{width}}{line}' + line = f'{path_str:<{width}}{line}' report.append(line) else: - report.append(ansi.color_string('PATH', 'BLUE')) - report.append(str(path)) + report.append(ansi.color_string('path_str', 'BLUE')) + report.append(str(path_str)) # Done return report -def build_disk_report(dev) -> list[str]: +def build_disk_report(dev: hw_disk.Disk) -> list[str]: """Build device report, returns list.""" report = [] @@ -1541,19 +1550,19 @@ def get_fstype_macos(path) -> str: return fstype -def get_object(path) -> hw_disk.Disk | pathlib.Path: - """Get object based on path, returns obj.""" - # TODO: Refactor to avoid returning None - obj = None +def select_disk_obj(label:str, disk_menu: cli.Menu, disk_path: str) -> hw_disk.Disk: + """Get disk based on path or menu selection, returns Disk.""" + if not disk_path: + return menus.select_disk(label.capitalize(), disk_menu) + + # Source was provided, parse and run safety checks + path = pathlib.Path(disk_path).resolve() # Bail early - if path is None: - return obj - if not path: + if not path.exists(): raise FileNotFoundError(f'Path provided does not exist: {path}') - # Check path - path = pathlib.Path(path).resolve() + # Disk objects if path.is_block_device() or path.is_char_device(): obj = hw_disk.Disk(path) @@ -1562,20 +1571,19 @@ def get_object(path) -> hw_disk.Disk | pathlib.Path: cli.print_warning(f'"{obj.path}" is a child device') if cli.ask(f'Use parent device "{obj.parent}" instead?'): obj = hw_disk.Disk(obj.parent) - elif path.is_dir(): - obj = path - elif path.is_file(): - # Assuming file is a raw image, mounting + + # Done + return obj + + # Raw image objects + if path.is_file(): loop_path = mount_raw_image(path) - obj = hw_disk.Disk(loop_path) + return hw_disk.Disk(loop_path) - # Abort if obj not set - if not obj: - cli.print_error(f'Invalid source/dest path: {path}') - raise std.GenericAbort() - - # Done - return obj + # Abort if object type couldn't be determined + # NOTE: This shouldn't every be reached? + cli.print_error(f'Invalid {label} path: {disk_path}') + raise std.GenericAbort() def get_partition_separator(name) -> str: @@ -1753,8 +1761,6 @@ def main() -> None: raise RuntimeError('tmux session not found') # Init - main_menu = menus.main() - settings_menu = menus.settings() state = State() try: state.init_recovery(args) @@ -1763,6 +1769,8 @@ def main() -> None: cli.abort() # Show menu + main_menu = menus.main() + settings_menu = menus.settings(state.mode) while True: selection = main_menu.advanced_select() @@ -1772,7 +1780,7 @@ def main() -> None: selection = settings_menu.settings_select() if 'Load Preset' in selection: # Rebuild settings menu using preset - settings_menu = menus.settings(silent=False) + settings_menu = menus.settings(state.mode, silent=False) else: break diff --git a/scripts/wk/clone/menus.py b/scripts/wk/clone/menus.py index 0f04ebba..7d13a429 100644 --- a/scripts/wk/clone/menus.py +++ b/scripts/wk/clone/menus.py @@ -60,7 +60,7 @@ def main() -> cli.Menu: return menu -def settings(silent: bool = True) -> cli.Menu: +def settings(mode: str, silent: bool = True) -> cli.Menu: """Settings menu, returns wk.ui.cli.Menu.""" title_text = [ ansi.color_string('ddrescue TUI: Expert Settings', 'GREEN'), @@ -97,6 +97,11 @@ def settings(silent: bool = True) -> cli.Menu: for name, details in DDRESCUE_SETTINGS[preset].items(): menu.options[name].update(details.copy()) + # Disable direct output when saving to an image + if mode == 'Image': + menu.options['--odirect']['Disabled'] = True + menu.options['--odirect']['Selected'] = False + # Done return menu diff --git a/scripts/wk/hw/disk.py b/scripts/wk/hw/disk.py index 2b8a421e..7b06bbac 100644 --- a/scripts/wk/hw/disk.py +++ b/scripts/wk/hw/disk.py @@ -46,7 +46,8 @@ class Disk: model: str = field(init=False) name: str = field(init=False) notes: list[str] = field(init=False, default_factory=list) - path: pathlib.Path | str + path: pathlib.Path = field(init=False) + path_str: pathlib.Path | str parent: str = field(init=False) phy_sec: int = field(init=False) raw_details: dict[str, Any] = field(init=False) @@ -58,7 +59,7 @@ class Disk: use_sat: bool = field(init=False, default=False) def __post_init__(self): - self.path = pathlib.Path(self.path).resolve() + self.path = pathlib.Path(self.path_str).resolve() self.update_details() self.set_description() self.known_attributes = get_known_disk_attributes(self.model)