Refactor ddrescue-tui source/dest selection

- Re-enables taking images instead of direct cloning!
- Removed some safety checks for clearer code
- We avoid a second scan by reusing the disk_menu object
This commit is contained in:
2Shirt 2023-06-11 15:48:58 -07:00
parent 986c870090
commit 203ad715e0
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C
3 changed files with 65 additions and 51 deletions

View file

@ -323,7 +323,7 @@ class State():
"""Object for tracking hardware diagnostic data."""
def __init__(self):
self.block_pairs: list[BlockPair] = []
self.destination: pathlib.Path | None = None
self.destination: hw_disk.Disk | pathlib.Path = pathlib.Path('/dev/null')
self.log_dir: pathlib.Path = log.format_log_path()
self.log_dir = self.log_dir.parent.joinpath(
f'ddrescue-TUI_{time.strftime("%Y-%m-%d_%H%M%S%z")}/',
@ -483,7 +483,7 @@ class State():
def confirm_selections(
self,
prompt_msg: str,
source_parts: list[hw_disk.Disk] | None = None,
source_parts: list[hw_disk.Disk],
) -> None:
"""Show selection details and prompt for confirmation."""
report = []
@ -644,23 +644,28 @@ class State():
self.mode = set_mode(docopt_args)
# Select source
self.source = get_object(docopt_args['<source>'])
if not self.source:
self.source = menus.select_disk('Source', disk_menu)
self.source = select_disk_obj('source', disk_menu, docopt_args['<source>'])
self.ui.set_title('Source', self.source.name)
# Select destination
self.destination = get_object(docopt_args['<destination>'])
if not self.destination:
if self.mode == 'Clone':
self.destination = menus.select_disk('Destination', disk_menu)
elif self.mode == 'Image':
if self.mode == 'Clone':
self.destination = select_disk_obj(
'destination',
disk_menu,
docopt_args['<destination>'],
)
self.ui.add_title_pane('Destination', self.destination.name)
elif self.mode == 'Image':
if docopt_args['<destination>']:
self.destination = pathlib.Path(docopt_args['<destination>']).resolve()
else:
self.destination = menus.select_path('Destination')
self.ui.add_title_pane('Destination', self.destination.name)
self.ui.add_title_pane('Destination', self.destination)
# Update details
self.source.update_details(skip_children=False)
self.destination.update_details(skip_children=False)
if self.mode == 'Clone':
self.destination.update_details(skip_children=False)
# Confirmation #1
self.confirm_selections(
@ -692,6 +697,8 @@ class State():
# Update SMART data
## TODO: Verify if needed
for dev in (self.source, self.destination):
if not isinstance(dev, hw_disk.Disk):
continue
enable_smart(dev)
update_smart_details(dev)
@ -702,12 +709,14 @@ class State():
# Confirmation #2
self.update_progress_pane('Idle')
self.confirm_selections('Start recovery?')
self.confirm_selections('Start recovery?', source_parts)
# Unmount source and/or destination under macOS
if PLATFORM == 'Darwin':
for disk in (self.source, self.destination):
cmd = ['diskutil', 'unmountDisk', disk.path]
for dev in (self.source, self.destination):
if not isinstance(dev, hw_disk.Disk):
continue
cmd = ['diskutil', 'unmountDisk', dev.path]
try:
exe.run_program(cmd)
except subprocess.CalledProcessError:
@ -1207,9 +1216,9 @@ def build_ddrescue_cmd(block_pair, pass_name, settings_menu) -> list[str]:
return cmd
def build_directory_report(path) -> list[str]:
def build_directory_report(path: pathlib.Path) -> list[str]:
"""Build directory report, returns list."""
path = f'{path}/'
path_str = f'{path}/'
report = []
# Get details
@ -1217,26 +1226,26 @@ def build_directory_report(path) -> list[str]:
cmd = [
'findmnt',
'--output', 'SIZE,AVAIL,USED,FSTYPE,OPTIONS',
'--target', path,
'--target', path_str,
]
proc = exe.run_program(cmd)
width = len(path) + 1
width = len(path_str) + 1
for line in proc.stdout.splitlines():
line = line.replace('\n', '')
if 'FSTYPE' in line:
line = ansi.color_string(f'{"PATH":<{width}}{line}', 'BLUE')
line = ansi.color_string(f'{"path_str":<{width}}{line}', 'BLUE')
else:
line = f'{path:<{width}}{line}'
line = f'{path_str:<{width}}{line}'
report.append(line)
else:
report.append(ansi.color_string('PATH', 'BLUE'))
report.append(str(path))
report.append(ansi.color_string('path_str', 'BLUE'))
report.append(str(path_str))
# Done
return report
def build_disk_report(dev) -> list[str]:
def build_disk_report(dev: hw_disk.Disk) -> list[str]:
"""Build device report, returns list."""
report = []
@ -1541,19 +1550,19 @@ def get_fstype_macos(path) -> str:
return fstype
def get_object(path) -> hw_disk.Disk | pathlib.Path:
"""Get object based on path, returns obj."""
# TODO: Refactor to avoid returning None
obj = None
def select_disk_obj(label:str, disk_menu: cli.Menu, disk_path: str) -> hw_disk.Disk:
"""Get disk based on path or menu selection, returns Disk."""
if not disk_path:
return menus.select_disk(label.capitalize(), disk_menu)
# Source was provided, parse and run safety checks
path = pathlib.Path(disk_path).resolve()
# Bail early
if path is None:
return obj
if not path:
if not path.exists():
raise FileNotFoundError(f'Path provided does not exist: {path}')
# Check path
path = pathlib.Path(path).resolve()
# Disk objects
if path.is_block_device() or path.is_char_device():
obj = hw_disk.Disk(path)
@ -1562,20 +1571,19 @@ def get_object(path) -> hw_disk.Disk | pathlib.Path:
cli.print_warning(f'"{obj.path}" is a child device')
if cli.ask(f'Use parent device "{obj.parent}" instead?'):
obj = hw_disk.Disk(obj.parent)
elif path.is_dir():
obj = path
elif path.is_file():
# Assuming file is a raw image, mounting
# Done
return obj
# Raw image objects
if path.is_file():
loop_path = mount_raw_image(path)
obj = hw_disk.Disk(loop_path)
return hw_disk.Disk(loop_path)
# Abort if obj not set
if not obj:
cli.print_error(f'Invalid source/dest path: {path}')
raise std.GenericAbort()
# Done
return obj
# Abort if object type couldn't be determined
# NOTE: This shouldn't every be reached?
cli.print_error(f'Invalid {label} path: {disk_path}')
raise std.GenericAbort()
def get_partition_separator(name) -> str:
@ -1753,8 +1761,6 @@ def main() -> None:
raise RuntimeError('tmux session not found')
# Init
main_menu = menus.main()
settings_menu = menus.settings()
state = State()
try:
state.init_recovery(args)
@ -1763,6 +1769,8 @@ def main() -> None:
cli.abort()
# Show menu
main_menu = menus.main()
settings_menu = menus.settings(state.mode)
while True:
selection = main_menu.advanced_select()
@ -1772,7 +1780,7 @@ def main() -> None:
selection = settings_menu.settings_select()
if 'Load Preset' in selection:
# Rebuild settings menu using preset
settings_menu = menus.settings(silent=False)
settings_menu = menus.settings(state.mode, silent=False)
else:
break

View file

@ -60,7 +60,7 @@ def main() -> cli.Menu:
return menu
def settings(silent: bool = True) -> cli.Menu:
def settings(mode: str, silent: bool = True) -> cli.Menu:
"""Settings menu, returns wk.ui.cli.Menu."""
title_text = [
ansi.color_string('ddrescue TUI: Expert Settings', 'GREEN'),
@ -97,6 +97,11 @@ def settings(silent: bool = True) -> cli.Menu:
for name, details in DDRESCUE_SETTINGS[preset].items():
menu.options[name].update(details.copy())
# Disable direct output when saving to an image
if mode == 'Image':
menu.options['--odirect']['Disabled'] = True
menu.options['--odirect']['Selected'] = False
# Done
return menu

View file

@ -46,7 +46,8 @@ class Disk:
model: str = field(init=False)
name: str = field(init=False)
notes: list[str] = field(init=False, default_factory=list)
path: pathlib.Path | str
path: pathlib.Path = field(init=False)
path_str: pathlib.Path | str
parent: str = field(init=False)
phy_sec: int = field(init=False)
raw_details: dict[str, Any] = field(init=False)
@ -58,7 +59,7 @@ class Disk:
use_sat: bool = field(init=False, default=False)
def __post_init__(self):
self.path = pathlib.Path(self.path).resolve()
self.path = pathlib.Path(self.path_str).resolve()
self.update_details()
self.set_description()
self.known_attributes = get_known_disk_attributes(self.model)