Add finalization menu to ddrescue-tui

Addresses issue #220
This commit is contained in:
2Shirt 2023-08-26 16:54:53 -07:00
parent dbe4a342cc
commit f5681a93d8
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C
2 changed files with 99 additions and 31 deletions

View file

@ -17,7 +17,7 @@ from docopt import docopt
from wk import cfg, exe, io, log, std from wk import cfg, exe, io, log, std
from wk.cfg.ddrescue import DDRESCUE_SPECIFIC_PASS_SETTINGS from wk.cfg.ddrescue import DDRESCUE_SPECIFIC_PASS_SETTINGS
from wk.clone import menus from wk.clone import menus
from wk.clone.state import State from wk.clone.state import State, mark_non_recovered_as_non_tried
from wk.hw import disk as hw_disk from wk.hw import disk as hw_disk
from wk.hw.smart import ( from wk.hw.smart import (
check_attributes, check_attributes,
@ -177,10 +177,68 @@ def get_ddrescue_settings(settings_menu) -> list:
def finalize_recovery(state: State, dry_run: bool = True) -> None: def finalize_recovery(state: State, dry_run: bool = True) -> None:
"""Show recovery finalization options.""" """Show recovery finalization options and run selected functions."""
zero_fill_destination(state, dry_run=dry_run) options = (
if state.mode == 'Clone': 'Zero-fill Extra Space',
)
toggles = (
'Relocate Backup GPT',
'Zero-fill Gaps',
)
# Get destination size
dest_size = -1
if hasattr(state.destination, 'size'):
# hw_disk.Disk
dest_size = state.destination.size
if hasattr(state.destination, 'stat'):
# pathlib.Path
dest_size = state.destination.stat().st_size
# Run checks to disable item(s)
whole_disk = bool(
len(state.block_pairs) == 1
and not state.source.parent
)
disable_gpt_toggle = not bool(
## Breakdown of below tests:
## Only offer this option when cloning a whole, non-child device
## where the source is using a GUID_Partition_Table
## and the source is smaller than the destination
whole_disk
and str(state.source.raw_details.get('pttype', 'Unknown')).lower() == 'gpt'
and state.source.size < dest_size
)
# Build menu
menu = cli.Menu(title=ansi.color_string('ddrescue TUI: Finalization', 'GREEN'))
menu.separator = ' '
menu.add_action('Start')
menu.add_action('Quit')
for name in options:
menu.add_option(name, details={'Selected': True})
for name in toggles:
details = {'Selected': True}
if 'GPT' in name and disable_gpt_toggle:
details['Disabled'] = True
details['Selected'] = False
menu.add_toggle(name, details)
# Show menu
selection = menu.advanced_select()
if 'Quit' in selection:
return
# Run functions
if menu.toggles['Relocate Backup GPT']['Selected']:
relocate_backup_gpt(state, dry_run=dry_run) relocate_backup_gpt(state, dry_run=dry_run)
if menu.toggles['Zero-fill Gaps']['Selected']:
zero_fill_gaps(
state,
dest_size=dest_size,
dry_run=dry_run,
extend_to_end=menu.options['Zero-fill Extra Space']['Selected'],
)
def is_missing_source_or_destination(state) -> bool: def is_missing_source_or_destination(state) -> bool:
@ -587,25 +645,28 @@ def source_or_destination_changed(state) -> bool:
return changed return changed
def zero_fill_destination(state: State, dry_run: bool = True) -> None: def zero_fill_gaps(
"""Zero-fill any gaps and space on destination beyond the source size.""" state: State,
dest_size: int,
dry_run: bool = True,
extend_to_end: bool = False,
) -> None:
"""Zero-fill any gaps on the destination."""
#fake_settings_menu = menus.settings(state.mode)
full_disk_clone = False full_disk_clone = False
larger_destination = False larger_destination = state.source.size < dest_size
percent_recovered = state.get_percent_recovered() percent_recovered = state.get_percent_recovered()
if state.mode == 'Clone' and len(state.block_pairs) == 1: if state.mode == 'Clone' and len(state.block_pairs) == 1:
full_disk_clone = True full_disk_clone = True
# Bail early # Bail early
if not ( if percent_recovered == 100 and not (larger_destination and extend_to_end):
(percent_recovered < 100
or (full_disk_clone and state.source.size < state.destination.size))
and cli.ask('Fill gaps with zeros?')):
return return
for block_pair in state.block_pairs: for block_pair in state.block_pairs:
destination_size = block_pair.size domain_size = block_pair.size
if (full_disk_clone and state.source.size < state.destination.size): if (full_disk_clone and state.source.size < state.destination.size):
destination_size = state.destination.size domain_size = dest_size
larger_destination = True larger_destination = True
# Prep zero-fill map file # Prep zero-fill map file
@ -613,11 +674,13 @@ def zero_fill_destination(state: State, dry_run: bool = True) -> None:
f'{block_pair.map_path.stem}_zero-fill', f'{block_pair.map_path.stem}_zero-fill',
) )
io.copy_file(block_pair.map_path, zero_map_path, overwrite=True) io.copy_file(block_pair.map_path, zero_map_path, overwrite=True)
if larger_destination: mark_non_recovered_as_non_tried(zero_map_path)
if full_disk_clone and larger_destination and extend_to_end:
# Extend domain size
with open(zero_map_path, 'a', encoding='utf-8') as f: with open(zero_map_path, 'a', encoding='utf-8') as f:
f.write( f.write(
f'{hex(block_pair.size)} ' f'\n{hex(block_pair.size)} '
f'{hex(destination_size - block_pair.size)} ?' f'{hex(domain_size - block_pair.size)} ?'
) )
# Build cmd # Build cmd
@ -625,7 +688,7 @@ def zero_fill_destination(state: State, dry_run: bool = True) -> None:
'sudo', 'sudo',
'ddrescue', 'ddrescue',
'--force', '--force',
f'--size={destination_size}', f'--size={domain_size}',
'--binary-prefixes', '--binary-prefixes',
'--complete-only', '--complete-only',
'--data-preview=5', '--data-preview=5',

View file

@ -497,29 +497,16 @@ class State():
def retry_all_passes(self) -> None: def retry_all_passes(self) -> None:
"""Prep block_pairs for a retry recovery attempt.""" """Prep block_pairs for a retry recovery attempt."""
bad_statuses = ('*', '/', '-')
LOG.warning('Updating block_pairs for retry') LOG.warning('Updating block_pairs for retry')
# Update all block_pairs # Update all block_pairs
for pair in self.block_pairs: for pair in self.block_pairs:
map_data = [] mark_non_recovered_as_non_tried(pair.map_path)
# Reset status strings # Reset status strings
for name in pair.status.keys(): for name in pair.status.keys():
pair.status[name] = 'Pending' pair.status[name] = 'Pending'
# Mark all non-trimmed, non-scraped, and bad areas as non-tried
with open(pair.map_path, 'r', encoding='utf-8') as _f:
for line in _f.readlines():
line = line.strip()
if line.startswith('0x') and line.endswith(bad_statuses):
line = f'{line[:-1]}?'
map_data.append(line)
# Save updated map
with open(pair.map_path, 'w', encoding='utf-8') as _f:
_f.write('\n'.join(map_data))
# Reinitialize status # Reinitialize status
pair.set_initial_status() pair.set_initial_status()
@ -1009,6 +996,24 @@ def get_working_dir(mode, destination, force_local=False) -> pathlib.Path:
return working_dir return working_dir
def mark_non_recovered_as_non_tried(map_path: pathlib.Path) -> None:
"""Read map file and mark all non-recovered sections as non-tried."""
bad_statuses = ('*', '/', '-')
map_data = []
# Mark all non-trimmed, non-scraped, and bad areas as non-tried
with open(map_path, 'r', encoding='utf-8') as _f:
for line in _f.readlines():
line = line.strip()
if line.startswith('0x') and line.endswith(bad_statuses):
line = f'{line[:-1]}?'
map_data.append(line)
# Save updated map
with open(map_path, 'w', encoding='utf-8') as _f:
_f.write('\n'.join(map_data))
def select_disk_obj(label:str, disk_menu: cli.Menu, disk_path: str) -> hw_disk.Disk: def select_disk_obj(label:str, disk_menu: cli.Menu, disk_path: str) -> hw_disk.Disk:
"""Get disk based on path or menu selection, returns Disk.""" """Get disk based on path or menu selection, returns Disk."""
if not disk_path: if not disk_path: