Add more type hints to ddrescue-tui

This commit is contained in:
2Shirt 2023-06-04 18:54:16 -07:00
parent 13e14e6734
commit 7e6cfa1896
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C

View file

@ -14,6 +14,8 @@ import shutil
import subprocess import subprocess
import time import time
from typing import Any
import psutil import psutil
import pytz import pytz
@ -338,16 +340,19 @@ class State():
"""Object for tracking hardware diagnostic data.""" """Object for tracking hardware diagnostic data."""
def __init__(self): def __init__(self):
self.block_pairs: list[BlockPair] = [] self.block_pairs: list[BlockPair] = []
self.destination = None self.destination: pathlib.Path | None = None
self.log_dir = None self.log_dir: pathlib.Path = log.format_log_path()
self.mode = None self.log_dir = self.log_dir.parent.joinpath(
self.panes = {} f'ddrescue-TUI_{time.strftime("%Y-%m-%d_%H%M%S%z")}/',
self.source = None )
self.working_dir = None self.progress_out: pathlib.Path = self.log_dir.joinpath('progress.out')
self.mode: str = '?'
self.source: hw_disk.Disk | None = None
self.working_dir: pathlib.Path = pathlib.Path()
self.ui: tui.TUI = tui.TUI('Source') self.ui: tui.TUI = tui.TUI('Source')
self.ui.add_title_pane('Destination') self.ui.add_title_pane('Destination')
def _add_block_pair(self, source, destination) -> None: def _add_block_pair(self, source: hw_disk.Disk, destination: pathlib.Path) -> None:
"""Add BlockPair object and run safety checks.""" """Add BlockPair object and run safety checks."""
self.block_pairs.append( self.block_pairs.append(
BlockPair( BlockPair(
@ -363,7 +368,7 @@ class State():
description = self.source.path.name description = self.source.path.name
return pathlib.Path(f'{self.working_dir}/Clone_{description}.json') return pathlib.Path(f'{self.working_dir}/Clone_{description}.json')
def _load_settings(self, discard_unused_settings=False) -> dict: def _load_settings(self, discard_unused_settings: bool = False) -> dict[Any, Any]:
"""Load settings from previous run, returns dict.""" """Load settings from previous run, returns dict."""
settings = {} settings = {}
settings_file = self._get_clone_settings_path() settings_file = self._get_clone_settings_path()
@ -412,7 +417,7 @@ class State():
# Done # Done
return settings return settings
def _save_settings(self, settings) -> None: def _save_settings(self, settings: dict[Any, Any]) -> None:
"""Save settings for future runs.""" """Save settings for future runs."""
settings_file = self._get_clone_settings_path() settings_file = self._get_clone_settings_path()
@ -424,7 +429,7 @@ class State():
cli.print_error('Failed to save clone settings') cli.print_error('Failed to save clone settings')
raise std.GenericAbort() from err raise std.GenericAbort() from err
def add_clone_block_pairs(self) -> None: def add_clone_block_pairs(self) -> list[hw_disk.Disk]:
"""Add device to device block pairs and set settings if necessary.""" """Add device to device block pairs and set settings if necessary."""
source_sep = get_partition_separator(self.source.path.name) source_sep = get_partition_separator(self.source.path.name)
dest_sep = get_partition_separator(self.destination.path.name) dest_sep = get_partition_separator(self.destination.path.name)
@ -488,13 +493,16 @@ class State():
# Done # Done
return source_parts return source_parts
def add_image_block_pairs(self, source_parts) -> None: def add_image_block_pairs(self, source_parts: list[hw_disk.Disk]) -> None:
"""Add device to image file block pairs.""" """Add device to image file block pairs."""
for part in source_parts: for part in source_parts:
bp_dest = self.destination self._add_block_pair(part, self.destination)
self._add_block_pair(part, bp_dest)
def confirm_selections(self, prompt_msg, source_parts=None) -> None: def confirm_selections(
self,
prompt_msg: str,
source_parts: list[hw_disk.Disk] | None = None,
) -> None:
"""Show selection details and prompt for confirmation.""" """Show selection details and prompt for confirmation."""
report = [] report = []
@ -635,24 +643,18 @@ class State():
"""Get total size of all block_pairs in bytes, returns int.""" """Get total size of all block_pairs in bytes, returns int."""
return sum(pair.size for pair in self.block_pairs) return sum(pair.size for pair in self.block_pairs)
def init_recovery(self, docopt_args) -> None: def init_recovery(self, docopt_args: dict[str, Any]) -> None:
"""Select source/dest and set env.""" """Select source/dest and set env."""
cli.clear_screen() cli.clear_screen()
source_parts = [] source_parts = []
# Set log # Set log
self.log_dir = log.format_log_path()
self.log_dir = pathlib.Path(
f'{self.log_dir.parent}/'
f'ddrescue-TUI_{time.strftime("%Y-%m-%d_%H%M%S%z")}/'
)
log.update_log_path( log.update_log_path(
dest_dir=self.log_dir, dest_dir=self.log_dir,
dest_name='main', dest_name='main',
keep_history=True, keep_history=True,
timestamp=False, timestamp=False,
) )
self.progress_out = self.log_dir.joinpath('progress.out')
self.ui.set_progress_file(self.progress_out) self.ui.set_progress_file(self.progress_out)
# Set mode # Set mode
@ -755,18 +757,22 @@ class State():
settings['First Run'] = False settings['First Run'] = False
self._save_settings(settings) self._save_settings(settings)
def pass_above_threshold(self, pass_name) -> bool: def pass_above_threshold(self, pass_name: str) -> bool:
"""Check if all block_pairs meet the pass threshold, returns bool.""" """Check if all block_pairs meet the pass threshold, returns bool."""
threshold = cfg.ddrescue.AUTO_PASS_THRESHOLDS[pass_name] threshold = cfg.ddrescue.AUTO_PASS_THRESHOLDS[pass_name]
return all( return all(
p.get_percent_recovered() >= threshold for p in self.block_pairs p.get_percent_recovered() >= threshold for p in self.block_pairs
) )
def pass_complete(self, pass_name) -> bool: def pass_complete(self, pass_name: str) -> bool:
"""Check if all block_pairs completed pass_name, returns bool.""" """Check if all block_pairs completed pass_name, returns bool."""
return all(p.pass_complete(pass_name) for p in self.block_pairs) return all(p.pass_complete(pass_name) for p in self.block_pairs)
def prep_destination(self, source_parts, dry_run=True) -> None: def prep_destination(
self,
source_parts: list[hw_disk.Disk],
dry_run: bool = True,
) -> None:
"""Prep destination as necessary.""" """Prep destination as necessary."""
# TODO: Split into Linux and macOS # TODO: Split into Linux and macOS
# logical sector size is not easily found under macOS # logical sector size is not easily found under macOS
@ -982,13 +988,13 @@ class State():
_f.write('\n'.join(debug.generate_object_report(_bp))) _f.write('\n'.join(debug.generate_object_report(_bp)))
_f.write('\n') _f.write('\n')
def skip_pass(self, pass_name) -> None: def skip_pass(self, pass_name: str) -> None:
"""Mark block_pairs as skipped if applicable.""" """Mark block_pairs as skipped if applicable."""
for pair in self.block_pairs: for pair in self.block_pairs:
if pair.status[pass_name] == 'Pending': if pair.status[pass_name] == 'Pending':
pair.status[pass_name] = 'Skipped' pair.status[pass_name] = 'Skipped'
def update_progress_pane(self, overall_status) -> None: def update_progress_pane(self, overall_status: str) -> None:
"""Update progress pane.""" """Update progress pane."""
report = [] report = []
separator = '─────────────────────' separator = '─────────────────────'
@ -2159,10 +2165,6 @@ def run_recovery(state: State, main_menu, settings_menu, dry_run=True) -> None:
state.ui.remove_all_worker_panes() state.ui.remove_all_worker_panes()
state.ui.clear_current_pane_height() state.ui.clear_current_pane_height()
for pane in ('SMART', 'Journal'):
if pane in state.panes:
tmux.kill_pane(state.panes.pop(pane))
# Show warning if nothing was done # Show warning if nothing was done
if not attempted_recovery: if not attempted_recovery:
cli.print_warning('No actions performed') cli.print_warning('No actions performed')
@ -2220,7 +2222,7 @@ def select_disk(prompt_msg, skip_disk=None) -> hw_disk.Disk:
return selected_disk return selected_disk
def select_disk_parts(prompt_msg, disk) -> hw_disk.Disk: def select_disk_parts(prompt_msg, disk) -> list[hw_disk.Disk]:
"""Select disk parts from list, returns list of Disk().""" """Select disk parts from list, returns list of Disk()."""
title = ansi.color_string('ddrescue TUI: Partition Selection', 'GREEN') title = ansi.color_string('ddrescue TUI: Partition Selection', 'GREEN')
title += f'\n\nDisk: {disk.path} {disk.description}' title += f'\n\nDisk: {disk.path} {disk.description}'
@ -2336,7 +2338,7 @@ def select_path(prompt_msg) -> pathlib.Path:
def set_mode(docopt_args) -> str: def set_mode(docopt_args) -> str:
"""Set mode from docopt_args or user selection, returns str.""" """Set mode from docopt_args or user selection, returns str."""
mode = None mode = '?'
# Check docopt_args # Check docopt_args
if docopt_args['clone']: if docopt_args['clone']: