Use new TUI layout in ddrescue-tui

This commit is contained in:
2Shirt 2023-06-04 17:43:02 -07:00
parent 7ab6ccbd36
commit becc564269
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C

View file

@ -32,8 +32,7 @@ from wk.hw.smart import (
smart_status_ok, smart_status_ok,
update_smart_details, update_smart_details,
) )
from wk.ui import cli as ui from wk.ui import ansi, cli, tmux, tui
from wk.ui import ansi, tmux
# STATIC VARIABLES # STATIC VARIABLES
@ -77,6 +76,7 @@ DDRESCUE_LOG_REGEX = re.compile(
r'.*\(\s*(?P<percent>\d+\.?\d*)%\)$', r'.*\(\s*(?P<percent>\d+\.?\d*)%\)$',
re.IGNORECASE, re.IGNORECASE,
) )
DDRESCUE_OUTPUT_HEIGHT = 14
INITIAL_SKIP_MIN = 64 * 1024 # This is ddrescue's minimum accepted value INITIAL_SKIP_MIN = 64 * 1024 # This is ddrescue's minimum accepted value
REGEX_REMAINING_TIME = re.compile( REGEX_REMAINING_TIME = re.compile(
r'remaining time:' r'remaining time:'
@ -297,14 +297,14 @@ class BlockPair():
# Check destination size if cloning # Check destination size if cloning
if not self.destination.is_file() and dest_size < self.size: if not self.destination.is_file() and dest_size < self.size:
ui.print_error(f'Invalid destination: {self.destination}') cli.print_error(f'Invalid destination: {self.destination}')
raise std.GenericAbort() raise std.GenericAbort()
def set_initial_status(self) -> None: def set_initial_status(self) -> None:
"""Read map data and set initial statuses.""" """Read map data and set initial statuses."""
self.load_map_data() self.load_map_data()
percent = self.get_percent_recovered() percent = self.get_percent_recovered()
for name in self.status.keys(): for name in self.status:
if self.pass_complete(name): if self.pass_complete(name):
self.status[name] = percent self.status[name] = percent
else: else:
@ -337,17 +337,15 @@ class BlockPair():
class State(): class State():
"""Object for tracking hardware diagnostic data.""" """Object for tracking hardware diagnostic data."""
def __init__(self): def __init__(self):
self.block_pairs = [] self.block_pairs: list[BlockPair] = []
self.destination = None self.destination = None
self.log_dir = None self.log_dir = None
self.mode = None self.mode = None
self.panes = {} self.panes = {}
self.source = None self.source = None
self.working_dir = None self.working_dir = None
self.ui: tui.TUI = tui.TUI('Source')
# Start a background process to maintain layout self.ui.add_title_pane('Destination')
self._init_tmux()
exe.start_thread(self._fix_tmux_layout_loop)
def _add_block_pair(self, source, destination) -> None: def _add_block_pair(self, source, destination) -> None:
"""Add BlockPair object and run safety checks.""" """Add BlockPair object and run safety checks."""
@ -365,71 +363,6 @@ class State():
description = self.source.path.name description = self.source.path.name
return pathlib.Path(f'{self.working_dir}/Clone_{description}.json') return pathlib.Path(f'{self.working_dir}/Clone_{description}.json')
def _fix_tmux_layout(self, forced=True) -> None:
"""Fix tmux layout based on cfg.ddrescue.TMUX_LAYOUT."""
layout = cfg.ddrescue.TMUX_LAYOUT
needs_fixed = tmux.layout_needs_fixed(self.panes, layout)
# Main layout fix
try:
tmux.fix_layout(self.panes, layout, forced=forced)
except RuntimeError:
# Assuming self.panes changed while running
pass
# Source/Destination
if forced or needs_fixed:
self.update_top_panes()
# Return if Progress pane not present
if 'Progress' not in self.panes:
return
# SMART/Journal
if forced or needs_fixed:
height = tmux.get_pane_size(self.panes['Progress'])[1] - 2
p_ratios = [int((x/sum(PANE_RATIOS)) * height) for x in PANE_RATIOS]
if 'SMART' in self.panes:
tmux.resize_pane(self.panes['SMART'], height=p_ratios[0])
tmux.resize_pane(height=p_ratios[1])
if 'Journal' in self.panes:
tmux.resize_pane(self.panes['Journal'], height=p_ratios[2])
def _fix_tmux_layout_loop(self) -> None:
"""Fix tmux layout on a loop.
NOTE: This should be called as a thread.
"""
while True:
self._fix_tmux_layout(forced=False)
std.sleep(1)
def _init_tmux(self) -> None:
"""Initialize tmux layout."""
tmux.kill_all_panes()
# Source (placeholder)
self.panes['Source'] = tmux.split_window(
behind=True,
lines=2,
text=' ',
vertical=True,
)
# Started
self.panes['Started'] = tmux.split_window(
lines=cfg.ddrescue.TMUX_SIDE_WIDTH,
target_id=self.panes['Source'],
text=ansi.color_string(
['Started', time.strftime("%Y-%m-%d %H:%M %Z")],
['BLUE', None],
sep='\n',
),
)
# Source / Dest
self.update_top_panes()
def _load_settings(self, discard_unused_settings=False) -> dict: def _load_settings(self, discard_unused_settings=False) -> dict:
"""Load settings from previous run, returns dict.""" """Load settings from previous run, returns dict."""
settings = {} settings = {}
@ -442,7 +375,7 @@ class State():
settings = json.loads(_f.read()) settings = json.loads(_f.read())
except (OSError, json.JSONDecodeError) as err: except (OSError, json.JSONDecodeError) as err:
LOG.error('Failed to load clone settings') LOG.error('Failed to load clone settings')
ui.print_error('Invalid clone settings detected.') cli.print_error('Invalid clone settings detected.')
raise std.GenericAbort() from err raise std.GenericAbort() from err
# Check settings # Check settings
@ -454,10 +387,10 @@ class State():
bail = False bail = False
for key in ('model', 'serial'): for key in ('model', 'serial'):
if settings['Source'][key] != getattr(self.source, key): if settings['Source'][key] != getattr(self.source, key):
ui.print_error(f"Clone settings don't match source {key}") cli.print_error(f"Clone settings don't match source {key}")
bail = True bail = True
if settings['Destination'][key] != getattr(self.destination, key): if settings['Destination'][key] != getattr(self.destination, key):
ui.print_error(f"Clone settings don't match destination {key}") cli.print_error(f"Clone settings don't match destination {key}")
bail = True bail = True
if bail: if bail:
raise std.GenericAbort() raise std.GenericAbort()
@ -488,7 +421,7 @@ class State():
with open(settings_file, 'w', encoding='utf-8') as _f: with open(settings_file, 'w', encoding='utf-8') as _f:
json.dump(settings, _f) json.dump(settings, _f)
except OSError as err: except OSError as err:
ui.print_error('Failed to save clone settings') cli.print_error('Failed to save clone settings')
raise std.GenericAbort() from err raise std.GenericAbort() from err
def add_clone_block_pairs(self) -> None: def add_clone_block_pairs(self) -> None:
@ -522,7 +455,7 @@ class State():
# New run, use new settings file # New run, use new settings file
settings['Needs Format'] = True settings['Needs Format'] = True
offset = 0 offset = 0
user_choice = ui.choice( user_choice = cli.choice(
'Format clone using GPT, MBR, or match Source type?', 'Format clone using GPT, MBR, or match Source type?',
['G', 'M', 'S'], ['G', 'M', 'S'],
) )
@ -533,7 +466,7 @@ class State():
else: else:
# Match source type # Match source type
settings['Table Type'] = get_table_type(self.source.path) settings['Table Type'] = get_table_type(self.source.path)
if ui.ask('Create an empty Windows boot partition on the clone?'): if cli.ask('Create an empty Windows boot partition on the clone?'):
settings['Create Boot Partition'] = True settings['Create Boot Partition'] = True
offset = 2 if settings['Table Type'] == 'GPT' else 1 offset = 2 if settings['Table Type'] == 'GPT' else 1
@ -638,9 +571,9 @@ class State():
report.append(' ') report.append(' ')
# Prompt user # Prompt user
ui.clear_screen() cli.clear_screen()
ui.print_report(report) cli.print_report(report)
if not ui.ask(prompt_msg): if not cli.ask(prompt_msg):
raise std.GenericAbort() raise std.GenericAbort()
def generate_report(self) -> list[str]: def generate_report(self) -> list[str]:
@ -704,7 +637,7 @@ class State():
def init_recovery(self, docopt_args) -> None: def init_recovery(self, docopt_args) -> None:
"""Select source/dest and set env.""" """Select source/dest and set env."""
ui.clear_screen() cli.clear_screen()
source_parts = [] source_parts = []
# Set log # Set log
@ -719,6 +652,8 @@ class State():
keep_history=True, keep_history=True,
timestamp=False, timestamp=False,
) )
self.progress_out = self.log_dir.joinpath('progress.out')
self.ui.set_progress_file(self.progress_out)
# Set mode # Set mode
self.mode = set_mode(docopt_args) self.mode = set_mode(docopt_args)
@ -727,7 +662,7 @@ class State():
self.source = get_object(docopt_args['<source>']) self.source = get_object(docopt_args['<source>'])
if not self.source: if not self.source:
self.source = select_disk('Source') self.source = select_disk('Source')
self.update_top_panes() self.ui.set_title('Source', self.source.name)
# Select destination # Select destination
self.destination = get_object(docopt_args['<destination>']) self.destination = get_object(docopt_args['<destination>'])
@ -736,7 +671,7 @@ class State():
self.destination = select_disk('Destination', self.source) self.destination = select_disk('Destination', self.source)
elif self.mode == 'Image': elif self.mode == 'Image':
self.destination = select_path('Destination') self.destination = select_path('Destination')
self.update_top_panes() self.ui.add_title_pane('Destination', self.destination.name)
# Update details # Update details
self.source.update_details(skip_children=False) self.source.update_details(skip_children=False)
@ -749,10 +684,6 @@ class State():
) )
# Update panes # Update panes
self.panes['Progress'] = tmux.split_window(
lines=cfg.ddrescue.TMUX_SIDE_WIDTH,
watch_file=f'{self.log_dir}/progress.out',
)
self.update_progress_pane('Idle') self.update_progress_pane('Idle')
# Set working dir # Set working dir
@ -795,8 +726,8 @@ class State():
try: try:
exe.run_program(cmd) exe.run_program(cmd)
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
ui.print_error('Failed to unmount source and/or destination') cli.print_error('Failed to unmount source and/or destination')
ui.abort() cli.abort()
# Prep destination # Prep destination
if self.mode == 'Clone': if self.mode == 'Clone':
@ -928,7 +859,7 @@ class State():
check=False, check=False,
) )
if proc.returncode != 0: if proc.returncode != 0:
ui.print_error('Error(s) encoundtered while formatting destination') cli.print_error('Error(s) encoundtered while formatting destination')
raise std.GenericAbort() raise std.GenericAbort()
# Update settings # Update settings
@ -969,13 +900,13 @@ class State():
# Check for critical errors # Check for critical errors
if not smart_status_ok(self.destination): if not smart_status_ok(self.destination):
ui.print_error( cli.print_error(
f'Critical error(s) detected for: {self.destination.path}', f'Critical error(s) detected for: {self.destination.path}',
) )
# Check for minor errors # Check for minor errors
if not check_attributes(self.destination, only_blocking=False): if not check_attributes(self.destination, only_blocking=False):
ui.print_warning( cli.print_warning(
f'Attribute error(s) detected for: {self.destination.path}', f'Attribute error(s) detected for: {self.destination.path}',
) )
@ -1026,7 +957,7 @@ class State():
destination_size *= 1.05 destination_size *= 1.05
error_msg = 'Not enough free space on the destination' error_msg = 'Not enough free space on the destination'
if required_size > destination_size: if required_size > destination_size:
ui.print_error(error_msg) cli.print_error(error_msg)
raise std.GenericAbort() raise std.GenericAbort()
def save_debug_reports(self) -> None: def save_debug_reports(self) -> None:
@ -1110,14 +1041,14 @@ class State():
report.append(etoc) report.append(etoc)
# Write to progress file # Write to progress file
out_path = pathlib.Path(f'{self.log_dir}/progress.out') self.progress_out.write_text('\n'.join(report), encoding='utf-8', errors='ignore')
with open(out_path, 'w', encoding='utf-8') as _f:
_f.write('\n'.join(report))
def update_top_panes(self) -> None: def update_top_panes(self) -> None:
"""(Re)create top source/destination panes.""" """(Re)create top source/destination panes."""
source_exists = True source_exists = True
source_str = ''
dest_exists = True dest_exists = True
dest_str = ''
width = tmux.get_pane_size()[0] width = tmux.get_pane_size()[0]
width = int(width / 2) - 1 width = int(width / 2) - 1
@ -1156,36 +1087,28 @@ class State():
else: else:
dest_exists = self.destination.exists() dest_exists = self.destination.exists()
# Kill destination pane
if 'Destination' in self.panes:
tmux.kill_pane(self.panes.pop('Destination'))
# Source # Source
source_str = ' '
if self.source: if self.source:
source_str = _format_string(self.source, width) source_str = _format_string(self.source, width)
tmux.respawn_pane(
self.panes['Source'],
text=ansi.color_string(
['Source', '' if source_exists else ' (Missing)', '\n', source_str],
['BLUE', 'RED', None, None],
sep='',
),
)
# Destination # Destination
dest_str = ''
if self.destination: if self.destination:
dest_str = _format_string(self.destination, width) dest_str = _format_string(self.destination, width)
self.panes['Destination'] = tmux.split_window(
percent=50, # Reset title panes
vertical=False, self.ui.reset_title_pane(
target_id=self.panes['Source'], ansi.color_string(
text=ansi.color_string( ['Source', '' if source_exists else ' (Missing)'],
['Destination', '' if dest_exists else ' (Missing)', '\n', dest_str], ['BLUE', 'RED'],
['BLUE', 'RED', None, None],
sep='',
), ),
source_str,
)
self.ui.add_title_pane(
ansi.color_string(
['Destination', '' if dest_exists else ' (Missing)'],
['BLUE', 'RED'],
),
dest_str,
) )
@ -1396,9 +1319,9 @@ def build_disk_report(dev) -> list[str]:
return report return report
def build_main_menu() -> ui.Menu: def build_main_menu() -> cli.Menu:
"""Build main menu, returns wk.ui.cli.Menu.""" """Build main menu, returns wk.ui.cli.Menu."""
menu = ui.Menu(title=ansi.color_string('ddrescue TUI: Main Menu', 'GREEN')) menu = cli.Menu(title=ansi.color_string('ddrescue TUI: Main Menu', 'GREEN'))
menu.separator = ' ' menu.separator = ' '
# Add actions, options, etc # Add actions, options, etc
@ -1428,7 +1351,7 @@ def build_object_report(obj) -> list[str]:
return report return report
def build_settings_menu(silent=True) -> ui.Menu: def build_settings_menu(silent=True) -> cli.Menu:
"""Build settings menu, returns wk.ui.cli.Menu.""" """Build settings menu, returns wk.ui.cli.Menu."""
title_text = [ title_text = [
ansi.color_string('ddrescue TUI: Expert Settings', 'GREEN'), ansi.color_string('ddrescue TUI: Expert Settings', 'GREEN'),
@ -1439,15 +1362,15 @@ def build_settings_menu(silent=True) -> ui.Menu:
), ),
'Please read the manual before making changes', 'Please read the manual before making changes',
] ]
menu = ui.Menu(title='\n'.join(title_text)) menu = cli.Menu(title='\n'.join(title_text))
menu.separator = ' ' menu.separator = ' '
preset = 'Default' preset = 'Default'
if not silent: if not silent:
# Ask which preset to use # Ask which preset to use
ui.print_standard( cli.print_standard(
f'Available ddrescue presets: {" / ".join(SETTING_PRESETS)}' f'Available ddrescue presets: {" / ".join(SETTING_PRESETS)}'
) )
preset = ui.choice('Please select a preset:', SETTING_PRESETS) preset = cli.choice('Please select a preset:', SETTING_PRESETS)
# Fix selection # Fix selection
for _p in SETTING_PRESETS: for _p in SETTING_PRESETS:
@ -1496,7 +1419,7 @@ def build_sfdisk_partition_line(table_type, dev_path, size, details) -> str:
# Safety Check # Safety Check
if not dest_type: if not dest_type:
ui.print_error(f'Failed to determine partition type for: {dev_path}') cli.print_error(f'Failed to determine partition type for: {dev_path}')
raise std.GenericAbort() raise std.GenericAbort()
# Add extra details # Add extra details
@ -1701,8 +1624,8 @@ def get_object(path) -> hw_disk.Disk | pathlib.Path:
# Child/Parent check # Child/Parent check
if obj.parent: if obj.parent:
ui.print_warning(f'"{obj.path}" is a child device') cli.print_warning(f'"{obj.path}" is a child device')
if ui.ask(f'Use parent device "{obj.parent}" instead?'): if cli.ask(f'Use parent device "{obj.parent}" instead?'):
obj = hw_disk.Disk(obj.parent) obj = hw_disk.Disk(obj.parent)
elif path.is_dir(): elif path.is_dir():
obj = path obj = path
@ -1713,7 +1636,7 @@ def get_object(path) -> hw_disk.Disk | pathlib.Path:
# Abort if obj not set # Abort if obj not set
if not obj: if not obj:
ui.print_error(f'Invalid source/dest path: {path}') cli.print_error(f'Invalid source/dest path: {path}')
raise std.GenericAbort() raise std.GenericAbort()
# Done # Done
@ -1778,7 +1701,7 @@ def get_table_type(disk_path) -> str:
# Check type # Check type
if table_type not in ('GPT', 'MBR'): if table_type not in ('GPT', 'MBR'):
ui.print_error(f'Unsupported partition table type: {table_type}') cli.print_error(f'Unsupported partition table type: {table_type}')
raise std.GenericAbort() raise std.GenericAbort()
# Done # Done
@ -1787,7 +1710,7 @@ def get_table_type(disk_path) -> str:
def get_working_dir(mode, destination, force_local=False) -> pathlib.Path: def get_working_dir(mode, destination, force_local=False) -> pathlib.Path:
"""Get working directory using mode and destination, returns path.""" """Get working directory using mode and destination, returns path."""
ticket_id = ui.get_ticket_id() ticket_id = cli.get_ticket_id()
working_dir = None working_dir = None
# Use preferred path if possible # Use preferred path if possible
@ -1795,12 +1718,12 @@ def get_working_dir(mode, destination, force_local=False) -> pathlib.Path:
try: try:
path = pathlib.Path(destination).resolve() path = pathlib.Path(destination).resolve()
except TypeError as err: except TypeError as err:
ui.print_error(f'Invalid destination: {destination}') cli.print_error(f'Invalid destination: {destination}')
raise std.GenericAbort() from err raise std.GenericAbort() from err
if path.exists() and fstype_is_ok(path, map_dir=False): if path.exists() and fstype_is_ok(path, map_dir=False):
working_dir = path working_dir = path
elif mode == 'Clone' and not force_local: elif mode == 'Clone' and not force_local:
ui.print_info('Mounting backup shares...') cli.print_info('Mounting backup shares...')
net.mount_backup_shares(read_write=True) net.mount_backup_shares(read_write=True)
for server in cfg.net.BACKUP_SERVERS: for server in cfg.net.BACKUP_SERVERS:
path = pathlib.Path( path = pathlib.Path(
@ -1844,11 +1767,11 @@ def is_missing_source_or_destination(state) -> bool:
if hasattr(item, 'path'): if hasattr(item, 'path'):
if not item.path.exists(): if not item.path.exists():
missing = True missing = True
ui.print_error(f'{name} disappeared') cli.print_error(f'{name} disappeared')
elif hasattr(item, 'exists'): elif hasattr(item, 'exists'):
if not item.exists(): if not item.exists():
missing = True missing = True
ui.print_error(f'{name} disappeared') cli.print_error(f'{name} disappeared')
else: else:
LOG.error('Unknown %s type: %s', name, item) LOG.error('Unknown %s type: %s', name, item)
@ -1880,7 +1803,7 @@ def source_or_destination_changed(state) -> bool:
# Done # Done
if changed: if changed:
ui.print_error('Source and/or Destination changed') cli.print_error('Source and/or Destination changed')
return changed return changed
@ -1895,7 +1818,6 @@ def main() -> None:
raise RuntimeError('tmux session not found') raise RuntimeError('tmux session not found')
# Init # Init
atexit.register(tmux.kill_all_panes)
main_menu = build_main_menu() main_menu = build_main_menu()
settings_menu = build_settings_menu() settings_menu = build_settings_menu()
state = State() state = State()
@ -1903,7 +1825,7 @@ def main() -> None:
state.init_recovery(args) state.init_recovery(args)
except (FileNotFoundError, std.GenericAbort): except (FileNotFoundError, std.GenericAbort):
is_missing_source_or_destination(state) is_missing_source_or_destination(state)
ui.abort() cli.abort()
# Show menu # Show menu
while True: while True:
@ -1921,18 +1843,18 @@ def main() -> None:
# Detect drives # Detect drives
if 'Detect drives' in selection[0]: if 'Detect drives' in selection[0]:
ui.clear_screen() cli.clear_screen()
ui.print_warning(DETECT_DRIVES_NOTICE) cli.print_warning(DETECT_DRIVES_NOTICE)
if ui.ask('Are you sure you proceed?'): if cli.ask('Are you sure you proceed?'):
ui.print_standard('Forcing controllers to rescan for devices...') cli.print_standard('Forcing controllers to rescan for devices...')
cmd = 'echo "- - -" | sudo tee /sys/class/scsi_host/host*/scan' cmd = 'echo "- - -" | sudo tee /sys/class/scsi_host/host*/scan'
exe.run_program(cmd, check=False, shell=True) exe.run_program([cmd], check=False, shell=True)
if source_or_destination_changed(state): if source_or_destination_changed(state):
ui.abort() cli.abort()
# Start recovery # Start recovery
if 'Start' in selection: if 'Start' in selection:
ui.clear_screen() cli.clear_screen()
run_recovery(state, main_menu, settings_menu, dry_run=args['--dry-run']) run_recovery(state, main_menu, settings_menu, dry_run=args['--dry-run'])
# Quit # Quit
@ -1942,8 +1864,8 @@ def main() -> None:
break break
# Recovey < 100% # Recovey < 100%
ui.print_warning('Recovery is less than 100%') cli.print_warning('Recovery is less than 100%')
if ui.ask('Are you sure you want to quit?'): if cli.ask('Are you sure you want to quit?'):
break break
# Save results to log # Save results to log
@ -1963,7 +1885,7 @@ def mount_raw_image(path) -> pathlib.Path:
# Check # Check
if not loopback_path: if not loopback_path:
ui.print_error(f'Failed to mount image: {path}') cli.print_error(f'Failed to mount image: {path}')
# Register unmount atexit # Register unmount atexit
atexit.register(unmount_loopback_device, loopback_path) atexit.register(unmount_loopback_device, loopback_path)
@ -2030,7 +1952,7 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True) -> None:
cmd = build_ddrescue_cmd(block_pair, pass_name, settings) cmd = build_ddrescue_cmd(block_pair, pass_name, settings)
poweroff_source_after_idle = True poweroff_source_after_idle = True
state.update_progress_pane('Active') state.update_progress_pane('Active')
ui.clear_screen() state.ui.clear_current_pane()
warning_message = '' warning_message = ''
def _poweroff_source_drive(idle_minutes) -> None: def _poweroff_source_drive(idle_minutes) -> None:
@ -2049,8 +1971,8 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True) -> None:
return return
if i % 600 == 0 and i > 0: if i % 600 == 0 and i > 0:
if i == 600: if i == 600:
ui.print_standard(' ', flush=True) cli.print_standard(' ', flush=True)
ui.print_warning( cli.print_warning(
f'Powering off source in {int((idle_minutes*60-i)/60)} minutes...', f'Powering off source in {int((idle_minutes*60-i)/60)} minutes...',
) )
std.sleep(5) std.sleep(5)
@ -2060,10 +1982,10 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True) -> None:
cmd = ['sudo', 'hdparm', '-Y', source_dev] cmd = ['sudo', 'hdparm', '-Y', source_dev]
proc = exe.run_program(cmd, check=False) proc = exe.run_program(cmd, check=False)
if proc.returncode: if proc.returncode:
ui.print_error(f'Failed to poweroff source {source_dev}') cli.print_error(f'Failed to poweroff source {source_dev}')
else: else:
ui.print_warning(f'Powered off source {source_dev}') cli.print_warning(f'Powered off source {source_dev}')
ui.print_standard( cli.print_standard(
'Press Enter to return to main menu...', end='', flush=True, 'Press Enter to return to main menu...', end='', flush=True,
) )
@ -2106,12 +2028,11 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True) -> None:
if warning_message: if warning_message:
# Error detected on destination, stop recovery # Error detected on destination, stop recovery
exe.stop_process(proc) exe.stop_process(proc)
ui.print_error(warning_message) cli.print_error(warning_message)
break break
if _i % 60 == 0:
# Clear ddrescue pane # Clear ddrescue pane
tmux.clear_pane() state.ui.clear_current_pane()
_i += 1 _i += 1
# Update progress # Update progress
@ -2152,19 +2073,19 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True) -> None:
warning_message = 'Error(s) encountered, see message above' warning_message = 'Error(s) encountered, see message above'
state.update_top_panes() state.update_top_panes()
if warning_message: if warning_message:
ui.print_standard(' ') cli.print_standard(' ')
ui.print_standard(' ') cli.print_standard(' ')
ui.print_error('DDRESCUE PROCESS HALTED') cli.print_error('DDRESCUE PROCESS HALTED')
ui.print_standard(' ') cli.print_standard(' ')
ui.print_warning(warning_message) cli.print_warning(warning_message)
# Needs attention? # Needs attention?
if str(proc.poll()) != '0': if str(proc.poll()) != '0':
state.update_progress_pane('NEEDS ATTENTION') state.update_progress_pane('NEEDS ATTENTION')
ui.pause('Press Enter to return to main menu...') cli.pause('Press Enter to return to main menu...')
# Stop source poweroff countdown # Stop source poweroff countdown
ui.print_standard('Stopping device poweroff countdown...', flush=True) cli.print_standard('Stopping device poweroff countdown...', flush=True)
poweroff_source_after_idle = False poweroff_source_after_idle = False
poweroff_thread.join() poweroff_thread.join()
@ -2172,7 +2093,7 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True) -> None:
raise std.GenericAbort() raise std.GenericAbort()
def run_recovery(state, main_menu, settings_menu, dry_run=True) -> None: def run_recovery(state: State, main_menu, settings_menu, dry_run=True) -> None:
"""Run recovery passes.""" """Run recovery passes."""
atexit.register(state.save_debug_reports) atexit.register(state.save_debug_reports)
attempted_recovery = False attempted_recovery = False
@ -2180,12 +2101,12 @@ def run_recovery(state, main_menu, settings_menu, dry_run=True) -> None:
# Bail early # Bail early
if is_missing_source_or_destination(state): if is_missing_source_or_destination(state):
ui.print_standard('') cli.print_standard('')
ui.pause('Press Enter to return to main menu...') cli.pause('Press Enter to return to main menu...')
return return
if source_or_destination_changed(state): if source_or_destination_changed(state):
ui.print_standard('') cli.print_standard('')
ui.abort() cli.abort()
# Get settings # Get settings
for name, details in main_menu.toggles.items(): for name, details in main_menu.toggles.items():
@ -2196,14 +2117,14 @@ def run_recovery(state, main_menu, settings_menu, dry_run=True) -> None:
state.retry_all_passes() state.retry_all_passes()
# Start SMART/Journal # Start SMART/Journal
state.panes['SMART'] = tmux.split_window( state.ui.add_info_pane(
behind=True, lines=12, vertical=True, percent=50,
update_layout=False,
watch_file=f'{state.log_dir}/smart.out', watch_file=f'{state.log_dir}/smart.out',
) )
if PLATFORM != 'Darwin': if PLATFORM == 'Linux':
state.panes['Journal'] = tmux.split_window( state.ui.add_worker_pane(lines=4, cmd='journalctl --dmesg --follow')
lines=4, vertical=True, cmd='journalctl --dmesg --follow', state.ui.set_current_pane_height(DDRESCUE_OUTPUT_HEIGHT)
)
# Run pass(es) # Run pass(es)
for pass_name in ('read-skip', 'read-full', 'trim', 'scrape'): for pass_name in ('read-skip', 'read-full', 'trim', 'scrape'):
@ -2235,15 +2156,19 @@ def run_recovery(state, main_menu, settings_menu, dry_run=True) -> None:
break break
# Stop SMART/Journal # Stop SMART/Journal
state.ui.remove_all_info_panes()
state.ui.remove_all_worker_panes()
state.ui.clear_current_pane_height()
for pane in ('SMART', 'Journal'): for pane in ('SMART', 'Journal'):
if pane in state.panes: if pane in state.panes:
tmux.kill_pane(state.panes.pop(pane)) tmux.kill_pane(state.panes.pop(pane))
# Show warning if nothing was done # Show warning if nothing was done
if not attempted_recovery: if not attempted_recovery:
ui.print_warning('No actions performed') cli.print_warning('No actions performed')
ui.print_standard(' ') cli.print_standard(' ')
ui.pause('Press Enter to return to main menu...') cli.pause('Press Enter to return to main menu...')
# Done # Done
state.save_debug_reports() state.save_debug_reports()
@ -2253,9 +2178,9 @@ def run_recovery(state, main_menu, settings_menu, dry_run=True) -> None:
def select_disk(prompt_msg, skip_disk=None) -> hw_disk.Disk: def select_disk(prompt_msg, skip_disk=None) -> hw_disk.Disk:
"""Select disk from list, returns Disk().""" """Select disk from list, returns Disk()."""
ui.print_info('Scanning disks...') cli.print_info('Scanning disks...')
disks = hw_disk.get_disks() disks = hw_disk.get_disks()
menu = ui.Menu( menu = cli.Menu(
title=ansi.color_string(f'ddrescue TUI: {prompt_msg} Selection', 'GREEN'), title=ansi.color_string(f'ddrescue TUI: {prompt_msg} Selection', 'GREEN'),
) )
menu.disabled_str = 'Already selected' menu.disabled_str = 'Already selected'
@ -2300,7 +2225,7 @@ def select_disk_parts(prompt_msg, disk) -> hw_disk.Disk:
"""Select disk parts from list, returns list of Disk().""" """Select disk parts from list, returns list of Disk()."""
title = ansi.color_string('ddrescue TUI: Partition Selection', 'GREEN') title = ansi.color_string('ddrescue TUI: Partition Selection', 'GREEN')
title += f'\n\nDisk: {disk.path} {disk.description}' title += f'\n\nDisk: {disk.path} {disk.description}'
menu = ui.Menu(title) menu = cli.Menu(title)
menu.separator = ' ' menu.separator = ' '
menu.add_action('All') menu.add_action('All')
menu.add_action('None') menu.add_action('None')
@ -2363,13 +2288,13 @@ def select_disk_parts(prompt_msg, disk) -> hw_disk.Disk:
if len(object_list) == len(disk.children): if len(object_list) == len(disk.children):
# NOTE: This is not true if the disk has no partitions # NOTE: This is not true if the disk has no partitions
msg = f'Preserve partition table and unused space in {prompt_msg.lower()}?' msg = f'Preserve partition table and unused space in {prompt_msg.lower()}?'
if ui.ask(msg): if cli.ask(msg):
# Replace part list with whole disk obj # Replace part list with whole disk obj
object_list = [disk.path] object_list = [disk.path]
# Convert object_list to hw_disk.Disk() objects # Convert object_list to hw_disk.Disk() objects
ui.print_standard(' ') cli.print_standard(' ')
ui.print_info('Getting disk/partition details...') cli.print_info('Getting disk/partition details...')
object_list = [hw_disk.Disk(path) for path in object_list] object_list = [hw_disk.Disk(path) for path in object_list]
# Done # Done
@ -2379,7 +2304,7 @@ def select_disk_parts(prompt_msg, disk) -> hw_disk.Disk:
def select_path(prompt_msg) -> pathlib.Path: def select_path(prompt_msg) -> pathlib.Path:
"""Select path, returns pathlib.Path.""" """Select path, returns pathlib.Path."""
invalid = False invalid = False
menu = ui.Menu( menu = cli.Menu(
title=ansi.color_string(f'ddrescue TUI: {prompt_msg} Path Selection', 'GREEN'), title=ansi.color_string(f'ddrescue TUI: {prompt_msg} Path Selection', 'GREEN'),
) )
menu.separator = ' ' menu.separator = ' '
@ -2393,7 +2318,7 @@ def select_path(prompt_msg) -> pathlib.Path:
if 'Current directory' in selection: if 'Current directory' in selection:
path = os.getcwd() path = os.getcwd()
elif 'Enter manually' in selection: elif 'Enter manually' in selection:
path = ui.input_text('Please enter path: ') path = cli.input_text('Please enter path: ')
elif 'Quit' in selection: elif 'Quit' in selection:
raise std.GenericAbort() raise std.GenericAbort()
@ -2403,7 +2328,7 @@ def select_path(prompt_msg) -> pathlib.Path:
except TypeError: except TypeError:
invalid = True invalid = True
if invalid or not path.is_dir(): if invalid or not path.is_dir():
ui.print_error(f'Invalid path: {path}') cli.print_error(f'Invalid path: {path}')
raise std.GenericAbort() raise std.GenericAbort()
# Done # Done
@ -2422,7 +2347,7 @@ def set_mode(docopt_args) -> str:
# Ask user if necessary # Ask user if necessary
if not mode: if not mode:
answer = ui.choice('Are we cloning or imaging?', ['C', 'I']) answer = cli.choice('Are we cloning or imaging?', ['C', 'I'])
if answer == 'C': if answer == 'C':
mode = 'Clone' mode = 'Clone'
else: else: