From 986c8700908e0c0213304b2927d36018b9fa3525 Mon Sep 17 00:00:00 2001 From: 2Shirt <2xShirt@gmail.com> Date: Sat, 10 Jun 2023 20:27:50 -0700 Subject: [PATCH] Move ddrescue-tui menus to a separate file --- scripts/wk/clone/__init__.py | 1 + scripts/wk/clone/ddrescue.py | 254 ++------------------------------- scripts/wk/clone/menus.py | 265 +++++++++++++++++++++++++++++++++++ scripts/wk/hw/disk.py | 2 +- 4 files changed, 278 insertions(+), 244 deletions(-) create mode 100644 scripts/wk/clone/menus.py diff --git a/scripts/wk/clone/__init__.py b/scripts/wk/clone/__init__.py index 43ffabbc..f6282d3a 100644 --- a/scripts/wk/clone/__init__.py +++ b/scripts/wk/clone/__init__.py @@ -1,3 +1,4 @@ """WizardKit: ddrescue-tui module init""" from . import ddrescue +from . import menus diff --git a/scripts/wk/clone/ddrescue.py b/scripts/wk/clone/ddrescue.py index de6a94d1..8c955bb4 100644 --- a/scripts/wk/clone/ddrescue.py +++ b/scripts/wk/clone/ddrescue.py @@ -24,9 +24,9 @@ from docopt import docopt from wk import cfg, debug, exe, io, log, net, std from wk.cfg.ddrescue import ( DDRESCUE_MAP_TEMPLATE, - DDRESCUE_SETTINGS, DDRESCUE_SPECIFIC_PASS_SETTINGS, ) +from wk.clone import menus from wk.hw import disk as hw_disk from wk.hw.smart import ( check_attributes, @@ -38,6 +38,7 @@ from wk.ui import ansi, cli, tmux, tui # STATIC VARIABLES +LOG = logging.getLogger(__name__) DOCSTRING = f'''{cfg.main.KIT_NAME_FULL}: ddrescue TUI Usage: @@ -68,9 +69,6 @@ CLONE_SETTINGS = { # (5, 1) ## Clone source partition #5 to destination partition #1 ], } -if std.PLATFORM == 'Darwin': - DDRESCUE_SETTINGS['Default']['--idirect'] = {'Selected': False, 'Hidden': True} - DDRESCUE_SETTINGS['Default']['--odirect'] = {'Selected': False, 'Hidden': True} DDRESCUE_LOG_REGEX = re.compile( r'^\s*(?P\S+):\s+' r'(?P\d+)\s+' @@ -89,16 +87,6 @@ REGEX_REMAINING_TIME = re.compile( r'\s*(?Pn/a)?', re.IGNORECASE ) -LOG = logging.getLogger(__name__) -MENU_ACTIONS = ( - 'Start', - f'Change settings {ansi.color_string("(experts only)", "YELLOW")}', - f'Detect drives {ansi.color_string("(experts only)", "YELLOW")}', - 'Quit') -MENU_TOGGLES = { - 'Auto continue (if recovery % over threshold)': True, - 'Retry (mark non-rescued sectors "non-tried")': False, - } PANE_RATIOS = ( 12, # SMART 22, # ddrescue progress @@ -111,11 +99,6 @@ if PLATFORM == 'Darwin': RECOMMENDED_MAP_FSTYPES = re.compile( r'^(apfs|cifs|ext[234]|hfs.?|ntfs|smbfs|vfat|xfs)$' ) -SETTING_PRESETS = ( - 'Default', - 'Fast', - 'Safe', - ) STATUS_COLORS = { 'Passed': 'GREEN', 'Aborted': 'YELLOW', @@ -450,7 +433,7 @@ class State(): ) self._add_block_pair(bp_source, bp_dest) else: - source_parts = select_disk_parts('Clone', self.source) + source_parts = menus.select_disk_parts('Clone', self.source) if self.source.path.samefile(source_parts[0].path): # Whole disk (or single partition via args), skip settings bp_dest = self.destination.path @@ -645,6 +628,7 @@ class State(): def init_recovery(self, docopt_args: dict[str, Any]) -> None: """Select source/dest and set env.""" cli.clear_screen() + disk_menu = menus.disks() source_parts = [] # Set log @@ -662,16 +646,16 @@ class State(): # Select source self.source = get_object(docopt_args['']) if not self.source: - self.source = select_disk('Source') + self.source = menus.select_disk('Source', disk_menu) self.ui.set_title('Source', self.source.name) # Select destination self.destination = get_object(docopt_args['']) if not self.destination: if self.mode == 'Clone': - self.destination = select_disk('Destination', self.source) + self.destination = menus.select_disk('Destination', disk_menu) elif self.mode == 'Image': - self.destination = select_path('Destination') + self.destination = menus.select_path('Destination') self.ui.add_title_pane('Destination', self.destination.name) # Update details @@ -702,7 +686,7 @@ class State(): if self.mode == 'Clone': source_parts = self.add_clone_block_pairs() else: - source_parts = select_disk_parts(self.mode, self.source) + source_parts = menus.select_disk_parts(self.mode, self.source) self.add_image_block_pairs(source_parts) # Update SMART data @@ -1325,22 +1309,6 @@ def build_disk_report(dev) -> list[str]: return report -def build_main_menu() -> cli.Menu: - """Build main menu, returns wk.ui.cli.Menu.""" - menu = cli.Menu(title=ansi.color_string('ddrescue TUI: Main Menu', 'GREEN')) - menu.separator = ' ' - - # Add actions, options, etc - for action in MENU_ACTIONS: - if not (PLATFORM == 'Darwin' and 'Detect drives' in action): - menu.add_action(action) - for toggle, selected in MENU_TOGGLES.items(): - menu.add_toggle(toggle, {'Selected': selected}) - - # Done - return menu - - def build_object_report(obj) -> list[str]: """Build object report, returns list.""" report = [] @@ -1357,47 +1325,6 @@ def build_object_report(obj) -> list[str]: return report -def build_settings_menu(silent=True) -> cli.Menu: - """Build settings menu, returns wk.ui.cli.Menu.""" - title_text = [ - ansi.color_string('ddrescue TUI: Expert Settings', 'GREEN'), - ' ', - ansi.color_string( - ['These settings can cause', 'MAJOR DAMAGE', 'to drives'], - ['YELLOW', 'RED', 'YELLOW'], - ), - 'Please read the manual before making changes', - ] - menu = cli.Menu(title='\n'.join(title_text)) - menu.separator = ' ' - preset = 'Default' - if not silent: - # Ask which preset to use - cli.print_standard( - f'Available ddrescue presets: {" / ".join(SETTING_PRESETS)}' - ) - preset = cli.choice('Please select a preset:', SETTING_PRESETS) - - # Fix selection - for _p in SETTING_PRESETS: - if _p.startswith(preset): - preset = _p - - # Add default settings - menu.add_action('Load Preset') - menu.add_action('Main Menu') - for name, details in DDRESCUE_SETTINGS['Default'].items(): - menu.add_option(name, details.copy()) - - # Update settings using preset - if preset != 'Default': - for name, details in DDRESCUE_SETTINGS[preset].items(): - menu.options[name].update(details.copy()) - - # Done - return menu - - def build_sfdisk_partition_line(table_type, dev_path, size, details) -> str: """Build sfdisk partition line using passed details, returns str.""" line = f'{dev_path} : size={size}' @@ -1826,8 +1753,8 @@ def main() -> None: raise RuntimeError('tmux session not found') # Init - main_menu = build_main_menu() - settings_menu = build_settings_menu() + main_menu = menus.main() + settings_menu = menus.settings() state = State() try: state.init_recovery(args) @@ -1845,7 +1772,7 @@ def main() -> None: selection = settings_menu.settings_select() if 'Load Preset' in selection: # Rebuild settings menu using preset - settings_menu = build_settings_menu(silent=False) + settings_menu = menus.settings(silent=False) else: break @@ -2179,165 +2106,6 @@ def run_recovery(state: State, main_menu, settings_menu, dry_run=True) -> None: state.update_progress_pane('Idle') -def select_disk(prompt_msg, skip_disk=None) -> hw_disk.Disk: - """Select disk from list, returns Disk().""" - cli.print_info('Scanning disks...') - disks = hw_disk.get_disks() - menu = cli.Menu( - title=ansi.color_string(f'ddrescue TUI: {prompt_msg} Selection', 'GREEN'), - ) - menu.disabled_str = 'Already selected' - menu.separator = ' ' - menu.add_action('Quit') - for disk in disks: - disable_option = False - size = disk.size - - # Check if option should be disabled - if skip_disk: - if (disk.path.samefile(skip_disk.path) - or (skip_disk.parent and disk.path.samefile(skip_disk.parent))): - disable_option = True - - # Add to menu - menu.add_option( - name=( - f'{str(disk.path):<12} ' - f'{disk.bus:<5} ' - f'{std.bytes_to_string(size, decimals=1, use_binary=False):<8} ' - f'{disk.model} ' - f'{disk.serial}' - ), - details={'Disabled': disable_option, 'Object': disk}, - ) - - # Get selection - selection = menu.simple_select() - if 'Quit' in selection: - raise std.GenericAbort() - - # Update details to include child devices - selected_disk = selection[-1]['Object'] - selected_disk.update_details(skip_children=False) - - # Done - return selected_disk - - -def select_disk_parts(prompt_msg, disk) -> list[hw_disk.Disk]: - """Select disk parts from list, returns list of Disk().""" - title = ansi.color_string('ddrescue TUI: Partition Selection', 'GREEN') - title += f'\n\nDisk: {disk.path} {disk.description}' - menu = cli.Menu(title) - menu.separator = ' ' - menu.add_action('All') - menu.add_action('None') - menu.add_action('Proceed', {'Separator': True}) - menu.add_action('Quit') - object_list = [] - - def _select_parts(menu) -> None: - """Loop over selection menu until at least one partition selected.""" - while True: - selection = menu.advanced_select( - f'Please select the parts to {prompt_msg.lower()}: ', - ) - if 'All' in selection: - for option in menu.options.values(): - option['Selected'] = True - elif 'None' in selection: - for option in menu.options.values(): - option['Selected'] = False - elif 'Proceed' in selection: - if any(option['Selected'] for option in menu.options.values()): - # At least one partition/device selected/device selected - break - elif 'Quit' in selection: - raise std.GenericAbort() - - # Bail early if running under macOS - if PLATFORM == 'Darwin': - return [disk] - - # Bail early if child device selected - if disk.parent: - return [disk] - - # Add parts - whole_disk_str = f'{str(disk.path):<14} (Whole device)' - for part in disk.children: - size = part["size"] - name = ( - f'{str(part["path"]):<14} ' - f'({std.bytes_to_string(size, decimals=1, use_binary=False):>6})' - ) - menu.add_option(name, details={'Selected': True, 'Path': part['path']}) - - # Add whole disk if necessary - if not menu.options: - menu.add_option(whole_disk_str, {'Selected': True, 'Path': disk.path}) - menu.title += '\n\n' - menu.title += ansi.color_string(' No partitions detected.', 'YELLOW') - - # Get selection - _select_parts(menu) - - # Build list of Disk() object_list - for option in menu.options.values(): - if option['Selected']: - object_list.append(option['Path']) - - # Check if whole disk selected - if len(object_list) == len(disk.children): - # NOTE: This is not true if the disk has no partitions - msg = f'Preserve partition table and unused space in {prompt_msg.lower()}?' - if cli.ask(msg): - # Replace part list with whole disk obj - object_list = [disk.path] - - # Convert object_list to hw_disk.Disk() objects - cli.print_standard(' ') - cli.print_info('Getting disk/partition details...') - object_list = [hw_disk.Disk(path) for path in object_list] - - # Done - return object_list - - -def select_path(prompt_msg) -> pathlib.Path: - """Select path, returns pathlib.Path.""" - invalid = False - menu = cli.Menu( - title=ansi.color_string(f'ddrescue TUI: {prompt_msg} Path Selection', 'GREEN'), - ) - menu.separator = ' ' - menu.add_action('Quit') - menu.add_option('Current directory') - menu.add_option('Enter manually') - path = None - - # Make selection - selection = menu.simple_select() - if 'Current directory' in selection: - path = os.getcwd() - elif 'Enter manually' in selection: - path = cli.input_text('Please enter path: ') - elif 'Quit' in selection: - raise std.GenericAbort() - - # Check - try: - path = pathlib.Path(path).resolve() - except TypeError: - invalid = True - if invalid or not path.is_dir(): - cli.print_error(f'Invalid path: {path}') - raise std.GenericAbort() - - # Done - return path - - def set_mode(docopt_args) -> str: """Set mode from docopt_args or user selection, returns str.""" mode = '?' diff --git a/scripts/wk/clone/menus.py b/scripts/wk/clone/menus.py new file mode 100644 index 00000000..0f04ebba --- /dev/null +++ b/scripts/wk/clone/menus.py @@ -0,0 +1,265 @@ +"""WizardKit: ddrescue TUI - Menus""" +# vim: sts=2 sw=2 ts=2 + +import logging +import pathlib + +from wk.cfg.ddrescue import DDRESCUE_SETTINGS +from wk.hw.disk import Disk, get_disks +from wk.std import GenericAbort, PLATFORM, bytes_to_string +from wk.ui import ansi, cli + + +# STATIC VARIABLES +LOG = logging.getLogger(__name__) +CLONE_SETTINGS = { + 'Source': None, + 'Destination': None, + 'Create Boot Partition': False, + 'First Run': True, + 'Needs Format': False, + 'Table Type': None, + 'Partition Mapping': [ + # (5, 1) ## Clone source partition #5 to destination partition #1 + ], + } +if PLATFORM == 'Darwin': + # TODO: Direct I/O needs more testing under macOS + DDRESCUE_SETTINGS['Default']['--idirect'] = {'Selected': False, 'Hidden': True} + DDRESCUE_SETTINGS['Default']['--odirect'] = {'Selected': False, 'Hidden': True} +MENU_ACTIONS = ( + 'Start', + f'Change settings {ansi.color_string("(experts only)", "YELLOW")}', + f'Detect drives {ansi.color_string("(experts only)", "YELLOW")}', + 'Quit') +MENU_TOGGLES = { + 'Auto continue (if recovery % over threshold)': True, + 'Retry (mark non-rescued sectors "non-tried")': False, + } +SETTING_PRESETS = ( + 'Default', + 'Fast', + 'Safe', + ) + + +# Functions +def main() -> cli.Menu: + """Main menu, returns wk.ui.cli.Menu.""" + menu = cli.Menu(title=ansi.color_string('ddrescue TUI: Main Menu', 'GREEN')) + menu.separator = ' ' + + # Add actions, options, etc + for action in MENU_ACTIONS: + if not (PLATFORM == 'Darwin' and 'Detect drives' in action): + menu.add_action(action) + for toggle, selected in MENU_TOGGLES.items(): + menu.add_toggle(toggle, {'Selected': selected}) + + # Done + return menu + + +def settings(silent: bool = True) -> cli.Menu: + """Settings menu, returns wk.ui.cli.Menu.""" + title_text = [ + ansi.color_string('ddrescue TUI: Expert Settings', 'GREEN'), + ' ', + ansi.color_string( + ['These settings can cause', 'MAJOR DAMAGE', 'to drives'], + ['YELLOW', 'RED', 'YELLOW'], + ), + 'Please read the manual before making changes', + ] + menu = cli.Menu(title='\n'.join(title_text)) + menu.separator = ' ' + preset = 'Default' + if not silent: + # Ask which preset to use + cli.print_standard( + f'Available ddrescue presets: {" / ".join(SETTING_PRESETS)}' + ) + preset = cli.choice('Please select a preset:', SETTING_PRESETS) + + # Fix selection + for _p in SETTING_PRESETS: + if _p.startswith(preset): + preset = _p + + # Add default settings + menu.add_action('Load Preset') + menu.add_action('Main Menu') + for name, details in DDRESCUE_SETTINGS['Default'].items(): + menu.add_option(name, details.copy()) + + # Update settings using preset + if preset != 'Default': + for name, details in DDRESCUE_SETTINGS[preset].items(): + menu.options[name].update(details.copy()) + + # Done + return menu + + +def disks() -> cli.Menu: + """Disk menu, returns wk.ui.cli.Menu().""" + cli.print_info('Scanning disks...') + available_disks = get_disks() + menu = cli.Menu('ddrescue TUI: Disk selection') + menu.disabled_str = 'Already selected' + menu.separator = ' ' + menu.add_action('Quit') + for disk in available_disks: + menu.add_option( + name=( + f'{str(disk.path):<12} ' + f'{disk.bus:<5} ' + f'{bytes_to_string(disk.size, decimals=1, use_binary=False):<8} ' + f'{disk.model} ' + f'{disk.serial}' + ), + details={'Object': disk}, + ) + + # Done + return menu + + +def select_disk(prompt_msg: str, menu: cli.Menu) -> Disk: + """Select disk from provided Menu, returns Disk().""" + menu.title = ansi.color_string( + f'ddrescue TUI: {prompt_msg} Selection', 'GREEN', + ) + + # Get selection + selection = menu.simple_select() + if 'Quit' in selection: + raise GenericAbort() + + # Disable selected disk's menu entry + menu.options[selection[0]]['Disabled'] = True + + # Update details to include child devices + selected_disk = selection[-1]['Object'] + selected_disk.update_details(skip_children=False) + + # Done + return selected_disk + + +def select_disk_parts(prompt_msg, disk) -> list[Disk]: + """Select disk parts from list, returns list of Disk().""" + title = ansi.color_string('ddrescue TUI: Partition Selection', 'GREEN') + title += f'\n\nDisk: {disk.path} {disk.description}' + menu = cli.Menu(title) + menu.separator = ' ' + menu.add_action('All') + menu.add_action('None') + menu.add_action('Proceed', {'Separator': True}) + menu.add_action('Quit') + object_list = [] + + def _select_parts(menu) -> None: + """Loop over selection menu until at least one partition selected.""" + while True: + selection = menu.advanced_select( + f'Please select the parts to {prompt_msg.lower()}: ', + ) + if 'All' in selection: + for option in menu.options.values(): + option['Selected'] = True + elif 'None' in selection: + for option in menu.options.values(): + option['Selected'] = False + elif 'Proceed' in selection: + if any(option['Selected'] for option in menu.options.values()): + # At least one partition/device selected/device selected + break + elif 'Quit' in selection: + raise GenericAbort() + + # Bail early if running under macOS + if PLATFORM == 'Darwin': + return [disk] + + # Bail early if child device selected + if disk.parent: + return [disk] + + # Add parts + whole_disk_str = f'{str(disk.path):<14} (Whole device)' + for part in disk.children: + size = part["size"] + name = ( + f'{str(part["path"]):<14} ' + f'({bytes_to_string(size, decimals=1, use_binary=False):>6})' + ) + menu.add_option(name, details={'Selected': True, 'pathlib.Path': part['path']}) + + # Add whole disk if necessary + if not menu.options: + menu.add_option(whole_disk_str, {'Selected': True, 'pathlib.Path': disk.path}) + menu.title += '\n\n' + menu.title += ansi.color_string(' No partitions detected.', 'YELLOW') + + # Get selection + _select_parts(menu) + + # Build list of Disk() object_list + for option in menu.options.values(): + if option['Selected']: + object_list.append(option['pathlib.Path']) + + # Check if whole disk selected + if len(object_list) == len(disk.children): + # NOTE: This is not true if the disk has no partitions + msg = f'Preserve partition table and unused space in {prompt_msg.lower()}?' + if cli.ask(msg): + # Replace part list with whole disk obj + object_list = [disk.path] + + # Convert object_list to Disk() objects + cli.print_standard(' ') + cli.print_info('Getting disk/partition details...') + object_list = [Disk(path) for path in object_list] + + # Done + return object_list + + +def select_path(prompt_msg) -> pathlib.Path: + """Select path, returns pathlib.Path.""" + invalid = False + menu = cli.Menu( + title=ansi.color_string(f'ddrescue TUI: {prompt_msg} Path Selection', 'GREEN'), + ) + menu.separator = ' ' + menu.add_action('Quit') + menu.add_option('Current directory') + menu.add_option('Enter manually') + path = pathlib.Path.cwd() + + # Make selection + selection = menu.simple_select() + if 'Current directory' in selection: + pass + elif 'Enter manually' in selection: + path = pathlib.Path(cli.input_text('Please enter path: ')) + elif 'Quit' in selection: + raise GenericAbort() + + # Check + try: + path = path.resolve() + except TypeError: + invalid = True + if invalid or not path.is_dir(): + cli.print_error(f'Invalid path: {path}') + raise GenericAbort() + + # Done + return path + + +if __name__ == '__main__': + print("This file is not meant to be called directly.") diff --git a/scripts/wk/hw/disk.py b/scripts/wk/hw/disk.py index 8698ee8b..2b8a421e 100644 --- a/scripts/wk/hw/disk.py +++ b/scripts/wk/hw/disk.py @@ -50,7 +50,7 @@ class Disk: parent: str = field(init=False) phy_sec: int = field(init=False) raw_details: dict[str, Any] = field(init=False) - raw_smartctl: dict[str, Any] = field(init=False) + raw_smartctl: dict[str, Any] = field(init=False, default_factory=dict) serial: str = field(init=False) size: int = field(init=False) ssd: bool = field(init=False)