Addresses issue #184 The first read phase will skip a lot more to try to recover more data from the whole source. Then the second read phase will fill in like the previous configuration.
2410 lines
70 KiB
Python
2410 lines
70 KiB
Python
"""WizardKit: ddrescue TUI"""
|
|
# pylint: disable=too-many-lines
|
|
# vim: sts=2 sw=2 ts=2
|
|
|
|
import atexit
|
|
import datetime
|
|
import json
|
|
import logging
|
|
import math
|
|
import os
|
|
import pathlib
|
|
import plistlib
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import time
|
|
|
|
from collections import OrderedDict
|
|
from docopt import docopt
|
|
|
|
import psutil
|
|
import pytz
|
|
|
|
from wk import cfg, debug, exe, io, log, net, std, tmux
|
|
from wk.cfg.ddrescue import (
|
|
DDRESCUE_MAP_TEMPLATE,
|
|
DDRESCUE_SETTINGS,
|
|
DDRESCUE_SPECIFIC_PASS_SETTINGS,
|
|
)
|
|
from wk.hw import obj as hw_obj
|
|
|
|
|
|
# STATIC VARIABLES
|
|
DOCSTRING = f'''{cfg.main.KIT_NAME_FULL}: ddrescue TUI
|
|
|
|
Usage:
|
|
ddrescue-tui
|
|
ddrescue-tui [options] (clone|image) [<source> [<destination>]]
|
|
ddrescue-tui (-h | --help)
|
|
|
|
Options:
|
|
-h --help Show this page
|
|
-s --dry-run Print commands to be used instead of running them
|
|
--force-local-map Skip mounting shares and save map to local drive
|
|
--start-fresh Ignore previous runs and start new recovery
|
|
'''
|
|
DETECT_DRIVES_NOTICE = '''
|
|
This option will force the drive controllers to rescan for devices.
|
|
The method used is not 100% reliable and may cause issues. If you see
|
|
any script errors or crashes after running this option then please
|
|
restart the computer and try again.
|
|
'''
|
|
CLONE_SETTINGS = {
|
|
'Source': None,
|
|
'Destination': None,
|
|
'Create Boot Partition': False,
|
|
'First Run': True,
|
|
'Needs Format': False,
|
|
'Table Type': None,
|
|
'Partition Mapping': [
|
|
# (5, 1) ## Clone source partition #5 to destination partition #1
|
|
],
|
|
}
|
|
if std.PLATFORM == 'Darwin':
|
|
DDRESCUE_SETTINGS['Default']['--idirect'] = {'Selected': False, 'Hidden': True}
|
|
DDRESCUE_SETTINGS['Default']['--odirect'] = {'Selected': False, 'Hidden': True}
|
|
DDRESCUE_LOG_REGEX = re.compile(
|
|
r'^\s*(?P<key>\S+):\s+'
|
|
r'(?P<size>\d+)\s+'
|
|
r'(?P<unit>[PTGMKB]i?B?)'
|
|
r'.*\(\s*(?P<percent>\d+\.?\d*)%\)$',
|
|
re.IGNORECASE,
|
|
)
|
|
INITIAL_SKIP_MIN = 64 * 1024 # This is ddrescue's minimum accepted value
|
|
REGEX_REMAINING_TIME = re.compile(
|
|
r'remaining time:'
|
|
r'\s*((?P<days>\d+)d)?'
|
|
r'\s*((?P<hours>\d+)h)?'
|
|
r'\s*((?P<minutes>\d+)m)?'
|
|
r'\s*((?P<seconds>\d+)s)?'
|
|
r'\s*(?P<na>n/a)?',
|
|
re.IGNORECASE
|
|
)
|
|
LOG = logging.getLogger(__name__)
|
|
MENU_ACTIONS = (
|
|
'Start',
|
|
f'Change settings {std.color_string("(experts only)", "YELLOW")}',
|
|
f'Detect drives {std.color_string("(experts only)", "YELLOW")}',
|
|
'Quit')
|
|
MENU_TOGGLES = {
|
|
'Auto continue (if recovery % over threshold)': True,
|
|
'Retry (mark non-rescued sectors "non-tried")': False,
|
|
}
|
|
PANE_RATIOS = (
|
|
12, # SMART
|
|
22, # ddrescue progress
|
|
4, # Journal (kernel messages)
|
|
)
|
|
PLATFORM = std.PLATFORM
|
|
RECOMMENDED_FSTYPES = re.compile(r'^(ext[234]|ntfs|xfs)$')
|
|
if PLATFORM == 'Darwin':
|
|
RECOMMENDED_FSTYPES = re.compile(r'^(apfs|hfs.?)$')
|
|
RECOMMENDED_MAP_FSTYPES = re.compile(
|
|
r'^(apfs|cifs|ext[234]|hfs.?|ntfs|smbfs|vfat|xfs)$'
|
|
)
|
|
SETTING_PRESETS = (
|
|
'Default',
|
|
'Fast',
|
|
'Safe',
|
|
)
|
|
STATUS_COLORS = {
|
|
'Passed': 'GREEN',
|
|
'Aborted': 'YELLOW',
|
|
'Skipped': 'YELLOW',
|
|
'Working': 'YELLOW',
|
|
'ERROR': 'RED',
|
|
}
|
|
TIMEZONE = pytz.timezone(cfg.main.LINUX_TIME_ZONE)
|
|
|
|
|
|
# Classes
|
|
class BlockPair():
|
|
"""Object for tracking source to dest recovery data."""
|
|
# pylint: disable=too-many-instance-attributes
|
|
def __init__(self, source, destination, model, working_dir):
|
|
"""Initialize BlockPair()
|
|
|
|
NOTE: source should be a wk.hw.obj.Disk() object
|
|
and destination should be a pathlib.Path() object.
|
|
"""
|
|
self.sector_size = source.details.get('phy-sec', 512)
|
|
self.source = source.path
|
|
self.destination = destination
|
|
self.map_data = {}
|
|
self.map_path = None
|
|
self.size = source.details['size']
|
|
self.status = OrderedDict({
|
|
'read-skip': 'Pending',
|
|
'read-full': 'Pending',
|
|
'trim': 'Pending',
|
|
'scrape': 'Pending',
|
|
})
|
|
self.view_map = 'DISPLAY' in os.environ or 'WAYLAND_DISPLAY' in os.environ
|
|
|
|
# Set map path
|
|
# e.g. '(Clone|Image)_Model[_p#]_Size[_Label].map'
|
|
map_name = model if model else 'None'
|
|
if source.details['bus'] == 'Image':
|
|
map_name = 'Image'
|
|
if source.details['parent']:
|
|
part_num = re.sub(r"^.*?(\d+)$", r"\1", source.path.name)
|
|
map_name += f'_p{part_num}'
|
|
size_str = std.bytes_to_string(
|
|
size=self.size,
|
|
use_binary=False,
|
|
)
|
|
map_name += f'_{size_str.replace(" ", "")}'
|
|
if source.details.get('label', ''):
|
|
map_name += f'_{source.details["label"]}'
|
|
map_name = map_name.replace(' ', '_')
|
|
map_name = map_name.replace('/', '_')
|
|
if destination.is_dir():
|
|
# Imaging
|
|
self.map_path = pathlib.Path(f'{destination}/Image_{map_name}.map')
|
|
self.destination = self.map_path.with_suffix('.dd')
|
|
self.destination.touch()
|
|
else:
|
|
# Cloning
|
|
self.map_path = pathlib.Path(f'{working_dir}/Clone_{map_name}.map')
|
|
|
|
# Create map file if needed
|
|
# NOTE: We need to set the domain size for --complete-only to work
|
|
if not self.map_path.exists():
|
|
self.map_path.write_text(
|
|
data=DDRESCUE_MAP_TEMPLATE.format(
|
|
name=cfg.main.KIT_NAME_FULL,
|
|
size=self.size,
|
|
),
|
|
encoding='utf-8',
|
|
)
|
|
|
|
# Set initial status
|
|
self.set_initial_status()
|
|
|
|
def get_error_size(self):
|
|
"""Get error size in bytes, returns int."""
|
|
return self.size - self.get_rescued_size()
|
|
|
|
def get_percent_recovered(self):
|
|
"""Get percent rescued from map_data, returns float."""
|
|
return 100 * self.map_data.get('rescued', 0) / self.size
|
|
|
|
def get_rescued_size(self):
|
|
"""Get rescued size using map data.
|
|
|
|
NOTE: Returns 0 if no map data is available.
|
|
"""
|
|
self.load_map_data()
|
|
return self.map_data.get('rescued', 0)
|
|
|
|
def load_map_data(self):
|
|
"""Load map data from file.
|
|
|
|
NOTE: If the file is missing it is assumed that recovery hasn't
|
|
started yet so default values will be returned instead.
|
|
"""
|
|
data = {'full recovery': False, 'pass completed': False}
|
|
|
|
# Get output from ddrescuelog
|
|
cmd = [
|
|
'ddrescuelog',
|
|
'--binary-prefixes',
|
|
'--show-status',
|
|
f'--size={self.size}',
|
|
self.map_path,
|
|
]
|
|
proc = exe.run_program(cmd, check=False)
|
|
|
|
# Parse output
|
|
for line in proc.stdout.splitlines():
|
|
_r = DDRESCUE_LOG_REGEX.search(line)
|
|
if _r:
|
|
if _r.group('key') == 'rescued' and _r.group('percent') == '100':
|
|
# Fix rounding errors from ddrescuelog output
|
|
data['rescued'] = self.size
|
|
else:
|
|
data[_r.group('key')] = std.string_to_bytes(
|
|
f'{_r.group("size")} {_r.group("unit")}',
|
|
)
|
|
data['pass completed'] = 'current status: finished' in line.lower()
|
|
|
|
# Check if 100% done (only if map is present and non-zero size
|
|
# NOTE: ddrescuelog returns 0 (i.e. 100% done) for empty files
|
|
if self.map_path.exists() and self.map_path.stat().st_size != 0:
|
|
cmd = [
|
|
'ddrescuelog',
|
|
'--done-status',
|
|
f'--size={self.size}',
|
|
self.map_path,
|
|
]
|
|
proc = exe.run_program(cmd, check=False)
|
|
data['full recovery'] = proc.returncode == 0
|
|
|
|
# Done
|
|
self.map_data.update(data)
|
|
|
|
def pass_complete(self, pass_name):
|
|
"""Check if pass_num is complete based on map data, returns bool."""
|
|
complete = False
|
|
pending_size = 0
|
|
|
|
# Check map data
|
|
if self.map_data.get('full recovery', False):
|
|
complete = True
|
|
elif 'non-tried' not in self.map_data:
|
|
# Assuming recovery has not been attempted yet
|
|
complete = False
|
|
else:
|
|
# Check that current and previous passes are complete
|
|
pending_size = self.map_data['non-tried']
|
|
if pass_name in ('trim', 'scrape'):
|
|
pending_size += self.map_data['non-trimmed']
|
|
if pass_name == 'scrape':
|
|
pending_size += self.map_data['non-scraped']
|
|
if pending_size == 0:
|
|
complete = True
|
|
|
|
# Done
|
|
return complete
|
|
|
|
def safety_check(self):
|
|
"""Run safety check and abort if necessary."""
|
|
dest_size = -1
|
|
if self.destination.exists():
|
|
dest_obj = hw_obj.Disk(self.destination)
|
|
dest_size = dest_obj.details['size']
|
|
del dest_obj
|
|
|
|
# Check destination size if cloning
|
|
if not self.destination.is_file() and dest_size < self.size:
|
|
std.print_error(f'Invalid destination: {self.destination}')
|
|
raise std.GenericAbort()
|
|
|
|
def set_initial_status(self):
|
|
"""Read map data and set initial statuses."""
|
|
self.load_map_data()
|
|
percent = self.get_percent_recovered()
|
|
for name in self.status.keys():
|
|
if self.pass_complete(name):
|
|
self.status[name] = percent
|
|
else:
|
|
# Stop checking
|
|
if percent > 0:
|
|
self.status[name] = percent
|
|
break
|
|
|
|
def skip_pass(self, pass_name):
|
|
"""Mark pass as skipped if applicable."""
|
|
if self.status[pass_name] == 'Pending':
|
|
self.status[pass_name] = 'Skipped'
|
|
|
|
def update_progress(self, pass_name):
|
|
"""Update progress via map data."""
|
|
self.load_map_data()
|
|
|
|
# Update status
|
|
percent = self.get_percent_recovered()
|
|
if percent > 0:
|
|
self.status[pass_name] = percent
|
|
|
|
# Mark future passes as skipped if applicable
|
|
if percent == 100:
|
|
status_keys = list(self.status.keys())
|
|
for i in status_keys[status_keys.index(pass_name)+1:]:
|
|
self.status[status_keys[i]] = 'Skipped'
|
|
|
|
|
|
class State():
|
|
# pylint: disable=too-many-public-methods
|
|
"""Object for tracking hardware diagnostic data."""
|
|
def __init__(self):
|
|
self.block_pairs = []
|
|
self.destination = None
|
|
self.log_dir = None
|
|
self.mode = None
|
|
self.panes = {}
|
|
self.source = None
|
|
self.working_dir = None
|
|
|
|
# Start a background process to maintain layout
|
|
self._init_tmux()
|
|
exe.start_thread(self._fix_tmux_layout_loop)
|
|
|
|
def _add_block_pair(self, source, destination):
|
|
"""Add BlockPair object and run safety checks."""
|
|
self.block_pairs.append(
|
|
BlockPair(
|
|
source=source,
|
|
destination=destination,
|
|
model=self.source.details['model'],
|
|
working_dir=self.working_dir,
|
|
))
|
|
|
|
def _get_clone_settings_path(self):
|
|
"""get Clone settings file path, returns pathlib.Path obj."""
|
|
description = self.source.details['model']
|
|
if not description:
|
|
description = self.source.path.name
|
|
return pathlib.Path(f'{self.working_dir}/Clone_{description}.json')
|
|
|
|
def _fix_tmux_layout(self, forced=True):
|
|
"""Fix tmux layout based on cfg.ddrescue.TMUX_LAYOUT."""
|
|
layout = cfg.ddrescue.TMUX_LAYOUT
|
|
needs_fixed = tmux.layout_needs_fixed(self.panes, layout)
|
|
|
|
# Main layout fix
|
|
try:
|
|
tmux.fix_layout(self.panes, layout, forced=forced)
|
|
except RuntimeError:
|
|
# Assuming self.panes changed while running
|
|
pass
|
|
|
|
# Source/Destination
|
|
if forced or needs_fixed:
|
|
self.update_top_panes()
|
|
|
|
# Return if Progress pane not present
|
|
if 'Progress' not in self.panes:
|
|
return
|
|
|
|
# SMART/Journal
|
|
if forced or needs_fixed:
|
|
height = tmux.get_pane_size(self.panes['Progress'])[1] - 2
|
|
p_ratios = [int((x/sum(PANE_RATIOS)) * height) for x in PANE_RATIOS]
|
|
if 'SMART' in self.panes:
|
|
tmux.resize_pane(self.panes['SMART'], height=p_ratios[0])
|
|
tmux.resize_pane(height=p_ratios[1])
|
|
if 'Journal' in self.panes:
|
|
tmux.resize_pane(self.panes['Journal'], height=p_ratios[2])
|
|
|
|
def _fix_tmux_layout_loop(self):
|
|
"""Fix tmux layout on a loop.
|
|
|
|
NOTE: This should be called as a thread.
|
|
"""
|
|
while True:
|
|
self._fix_tmux_layout(forced=False)
|
|
std.sleep(1)
|
|
|
|
def _init_tmux(self):
|
|
"""Initialize tmux layout."""
|
|
tmux.kill_all_panes()
|
|
|
|
# Source (placeholder)
|
|
self.panes['Source'] = tmux.split_window(
|
|
behind=True,
|
|
lines=2,
|
|
text=' ',
|
|
vertical=True,
|
|
)
|
|
|
|
# Started
|
|
self.panes['Started'] = tmux.split_window(
|
|
lines=cfg.ddrescue.TMUX_SIDE_WIDTH,
|
|
target_id=self.panes['Source'],
|
|
text=std.color_string(
|
|
['Started', time.strftime("%Y-%m-%d %H:%M %Z")],
|
|
['BLUE', None],
|
|
sep='\n',
|
|
),
|
|
)
|
|
|
|
# Source / Dest
|
|
self.update_top_panes()
|
|
|
|
def _load_settings(self, discard_unused_settings=False):
|
|
"""Load settings from previous run, returns dict."""
|
|
settings = {}
|
|
settings_file = self._get_clone_settings_path()
|
|
|
|
# Try loading JSON data
|
|
if settings_file.exists():
|
|
with open(settings_file, 'r', encoding='utf-8') as _f:
|
|
try:
|
|
settings = json.loads(_f.read())
|
|
except (OSError, json.JSONDecodeError) as err:
|
|
LOG.error('Failed to load clone settings')
|
|
std.print_error('Invalid clone settings detected.')
|
|
raise std.GenericAbort() from err
|
|
|
|
# Check settings
|
|
if settings:
|
|
if settings['First Run'] and discard_unused_settings:
|
|
# Previous run aborted before starting recovery, discard settings
|
|
settings = {}
|
|
else:
|
|
bail = False
|
|
for key in ('model', 'serial'):
|
|
if settings['Source'][key] != self.source.details[key]:
|
|
std.print_error(f"Clone settings don't match source {key}")
|
|
bail = True
|
|
if settings['Destination'][key] != self.destination.details[key]:
|
|
std.print_error(f"Clone settings don't match destination {key}")
|
|
bail = True
|
|
if bail:
|
|
raise std.GenericAbort()
|
|
|
|
# Update settings
|
|
if not settings:
|
|
settings = CLONE_SETTINGS.copy()
|
|
if not settings['Source']:
|
|
settings['Source'] = {
|
|
'model': self.source.details['model'],
|
|
'serial': self.source.details['serial'],
|
|
}
|
|
if not settings['Destination']:
|
|
settings['Destination'] = {
|
|
'model': self.destination.details['model'],
|
|
'serial': self.destination.details['serial'],
|
|
}
|
|
|
|
# Done
|
|
return settings
|
|
|
|
def _save_settings(self, settings):
|
|
"""Save settings for future runs."""
|
|
settings_file = self._get_clone_settings_path()
|
|
|
|
# Try saving JSON data
|
|
try:
|
|
with open(settings_file, 'w', encoding='utf-8') as _f:
|
|
json.dump(settings, _f)
|
|
except OSError as err:
|
|
std.print_error('Failed to save clone settings')
|
|
raise std.GenericAbort() from err
|
|
|
|
def add_clone_block_pairs(self):
|
|
"""Add device to device block pairs and set settings if necessary."""
|
|
source_sep = get_partition_separator(self.source.path.name)
|
|
dest_sep = get_partition_separator(self.destination.path.name)
|
|
settings = {}
|
|
source_parts = []
|
|
|
|
# Clone settings
|
|
settings = self._load_settings(discard_unused_settings=True)
|
|
|
|
# Add pairs
|
|
if settings['Partition Mapping']:
|
|
# Resume previous run, load pairs from settings file
|
|
for part_map in settings['Partition Mapping']:
|
|
bp_source = hw_obj.Disk(
|
|
f'{self.source.path}{source_sep}{part_map[0]}',
|
|
)
|
|
bp_dest = pathlib.Path(
|
|
f'{self.destination.path}{dest_sep}{part_map[1]}',
|
|
)
|
|
self._add_block_pair(bp_source, bp_dest)
|
|
else:
|
|
source_parts = select_disk_parts('Clone', self.source)
|
|
if self.source.path.samefile(source_parts[0].path):
|
|
# Whole disk (or single partition via args), skip settings
|
|
bp_dest = self.destination.path
|
|
self._add_block_pair(self.source, bp_dest)
|
|
else:
|
|
# New run, use new settings file
|
|
settings['Needs Format'] = True
|
|
offset = 0
|
|
user_choice = std.choice(
|
|
['G', 'M', 'S'],
|
|
'Format clone using GPT, MBR, or match Source type?',
|
|
)
|
|
if user_choice == 'G':
|
|
settings['Table Type'] = 'GPT'
|
|
elif user_choice == 'M':
|
|
settings['Table Type'] = 'MBR'
|
|
else:
|
|
# Match source type
|
|
settings['Table Type'] = get_table_type(self.source)
|
|
if std.ask('Create an empty Windows boot partition on the clone?'):
|
|
settings['Create Boot Partition'] = True
|
|
offset = 2 if settings['Table Type'] == 'GPT' else 1
|
|
|
|
# Add pairs
|
|
for dest_num, part in enumerate(source_parts):
|
|
dest_num += offset + 1
|
|
bp_dest = pathlib.Path(
|
|
f'{self.destination.path}{dest_sep}{dest_num}',
|
|
)
|
|
self._add_block_pair(part, bp_dest)
|
|
|
|
# Add to settings file
|
|
source_num = re.sub(r'^.*?(\d+)$', r'\1', part.path.name)
|
|
settings['Partition Mapping'].append([source_num, dest_num])
|
|
|
|
# Save settings
|
|
self._save_settings(settings)
|
|
|
|
# Done
|
|
return source_parts
|
|
|
|
def add_image_block_pairs(self, source_parts):
|
|
"""Add device to image file block pairs."""
|
|
for part in source_parts:
|
|
bp_dest = self.destination
|
|
self._add_block_pair(part, bp_dest)
|
|
|
|
def confirm_selections(self, prompt, source_parts=None):
|
|
"""Show selection details and prompt for confirmation."""
|
|
report = []
|
|
|
|
# Source
|
|
report.append(std.color_string('Source', 'GREEN'))
|
|
report.extend(build_object_report(self.source))
|
|
report.append(' ')
|
|
|
|
# Destination
|
|
report.append(std.color_string('Destination', 'GREEN'))
|
|
if self.mode == 'Clone':
|
|
report[-1] += std.color_string(' (ALL DATA WILL BE DELETED)', 'RED')
|
|
report.extend(build_object_report(self.destination))
|
|
report.append(' ')
|
|
|
|
# Show deletion warning if necessary
|
|
# NOTE: The check for block_pairs is to limit this section
|
|
# to the second confirmation
|
|
if self.mode == 'Clone' and self.block_pairs:
|
|
report.append(std.color_string('WARNING', 'YELLOW'))
|
|
report.append(
|
|
'All data will be deleted from the destination listed above.',
|
|
)
|
|
report.append(
|
|
std.color_string(
|
|
['This is irreversible and will lead to', 'DATA LOSS.'],
|
|
['YELLOW', 'RED'],
|
|
),
|
|
)
|
|
report.append(' ')
|
|
|
|
# Block pairs
|
|
if self.block_pairs:
|
|
report.extend(
|
|
build_block_pair_report(
|
|
self.block_pairs,
|
|
self._load_settings() if self.mode == 'Clone' else {},
|
|
),
|
|
)
|
|
report.append(' ')
|
|
|
|
# Map dir
|
|
if self.working_dir:
|
|
report.append(std.color_string('Map Save Directory', 'GREEN'))
|
|
report.append(f'{self.working_dir}/')
|
|
report.append(' ')
|
|
if not fstype_is_ok(self.working_dir, map_dir=True):
|
|
report.append(
|
|
std.color_string(
|
|
'Map file(s) are being saved to a non-recommended filesystem.',
|
|
'YELLOW',
|
|
),
|
|
)
|
|
report.append(
|
|
std.color_string(
|
|
['This is strongly discouraged and may lead to', 'DATA LOSS'],
|
|
[None, 'RED'],
|
|
),
|
|
)
|
|
report.append(' ')
|
|
|
|
# Source part(s) selected
|
|
if source_parts:
|
|
report.append(std.color_string('Source Part(s) selected', 'GREEN'))
|
|
if self.source.path.samefile(source_parts[0].path):
|
|
report.append('Whole Disk')
|
|
else:
|
|
report.append(std.color_string(f'{"NAME":<9} SIZE', 'BLUE'))
|
|
for part in source_parts:
|
|
report.append(
|
|
f'{part.path.name:<9} '
|
|
f'{std.bytes_to_string(part.details["size"], use_binary=False)}'
|
|
)
|
|
report.append(' ')
|
|
|
|
# Prompt user
|
|
std.clear_screen()
|
|
std.print_report(report)
|
|
if not std.ask(prompt):
|
|
raise std.GenericAbort()
|
|
|
|
def generate_report(self):
|
|
"""Generate report of overall and per block_pair results, returns list."""
|
|
report = []
|
|
|
|
# Header
|
|
report.append(f'{self.mode.title()} Results:')
|
|
report.append(' ')
|
|
report.append(f'Source: {self.source.description}')
|
|
if self.mode == 'Clone':
|
|
report.append(f'Destination: {self.destination.description}')
|
|
else:
|
|
report.append(f'Destination: {self.destination}/')
|
|
|
|
# Overall
|
|
report.append(' ')
|
|
error_size = self.get_error_size()
|
|
error_size_str = std.bytes_to_string(error_size, decimals=2)
|
|
if error_size > 0:
|
|
error_size_str = std.color_string(error_size_str, 'YELLOW')
|
|
percent = self.get_percent_recovered()
|
|
percent = format_status_string(percent, width=0)
|
|
report.append(f'Overall rescued: {percent}, error size: {error_size_str}')
|
|
|
|
# Block-Pairs
|
|
if len(self.block_pairs) > 1:
|
|
report.append(' ')
|
|
for pair in self.block_pairs:
|
|
error_size = pair.get_error_size()
|
|
error_size_str = std.bytes_to_string(error_size, decimals=2)
|
|
if error_size > 0:
|
|
error_size_str = std.color_string(error_size_str, 'YELLOW')
|
|
pair_size = std.bytes_to_string(pair.size, decimals=2)
|
|
percent = pair.get_percent_recovered()
|
|
percent = format_status_string(percent, width=0)
|
|
report.append(
|
|
f'{pair.source.name} ({pair_size}) '
|
|
f'rescued: {percent}, '
|
|
f'error size: {error_size_str}'
|
|
)
|
|
|
|
# Done
|
|
return report
|
|
|
|
def get_error_size(self):
|
|
"""Get total error size from block_pairs in bytes, returns int."""
|
|
return self.get_total_size() - self.get_rescued_size()
|
|
|
|
def get_percent_recovered(self):
|
|
"""Get total percent rescued from block_pairs, returns float."""
|
|
return 100 * self.get_rescued_size() / self.get_total_size()
|
|
|
|
def get_rescued_size(self):
|
|
"""Get total rescued size from all block pairs, returns int."""
|
|
return sum(pair.get_rescued_size() for pair in self.block_pairs)
|
|
|
|
def get_total_size(self):
|
|
"""Get total size of all block_pairs in bytes, returns int."""
|
|
return sum(pair.size for pair in self.block_pairs)
|
|
|
|
def init_recovery(self, docopt_args):
|
|
# pylint: disable=too-many-branches
|
|
"""Select source/dest and set env."""
|
|
std.clear_screen()
|
|
source_parts = []
|
|
|
|
# Set log
|
|
self.log_dir = log.format_log_path()
|
|
self.log_dir = pathlib.Path(
|
|
f'{self.log_dir.parent}/'
|
|
f'ddrescue-TUI_{time.strftime("%Y-%m-%d_%H%M%S%z")}/'
|
|
)
|
|
log.update_log_path(
|
|
dest_dir=self.log_dir,
|
|
dest_name='main',
|
|
keep_history=True,
|
|
timestamp=False,
|
|
)
|
|
|
|
# Set mode
|
|
self.mode = set_mode(docopt_args)
|
|
|
|
# Image mode is broken..
|
|
# TODO: Fix image mode
|
|
# Definitely for Linux, maybe for macOS
|
|
if self.mode == 'Image':
|
|
std.print_error("I'm sorry but image mode is currently broken...")
|
|
std.abort()
|
|
|
|
# Select source
|
|
self.source = get_object(docopt_args['<source>'])
|
|
if not self.source:
|
|
self.source = select_disk('Source')
|
|
self.update_top_panes()
|
|
|
|
# Select destination
|
|
self.destination = get_object(docopt_args['<destination>'])
|
|
if not self.destination:
|
|
if self.mode == 'Clone':
|
|
self.destination = select_disk('Destination', self.source)
|
|
elif self.mode == 'Image':
|
|
self.destination = select_path('Destination')
|
|
self.update_top_panes()
|
|
|
|
# Confirmation #1
|
|
self.confirm_selections(
|
|
prompt='Are these selections correct?',
|
|
source_parts=source_parts,
|
|
)
|
|
|
|
# Update panes
|
|
self.panes['Progress'] = tmux.split_window(
|
|
lines=cfg.ddrescue.TMUX_SIDE_WIDTH,
|
|
watch_file=f'{self.log_dir}/progress.out',
|
|
)
|
|
self.update_progress_pane('Idle')
|
|
|
|
# Set working dir
|
|
self.working_dir = get_working_dir(
|
|
self.mode,
|
|
self.destination,
|
|
force_local=docopt_args['--force-local-map'],
|
|
)
|
|
|
|
# Start fresh if requested
|
|
if docopt_args['--start-fresh']:
|
|
clean_working_dir(self.working_dir)
|
|
|
|
# Add block pairs
|
|
if self.mode == 'Clone':
|
|
source_parts = self.add_clone_block_pairs()
|
|
else:
|
|
source_parts = select_disk_parts(self.mode, self.source)
|
|
self.add_image_block_pairs(source_parts)
|
|
|
|
# Safety Checks #1
|
|
if self.mode == 'Clone':
|
|
self.safety_check_destination()
|
|
self.safety_check_size()
|
|
|
|
# Confirmation #2
|
|
self.update_progress_pane('Idle')
|
|
self.confirm_selections('Start recovery?')
|
|
|
|
# Unmount source and/or destination under macOS
|
|
if PLATFORM == 'Darwin':
|
|
for disk in (self.source, self.destination):
|
|
cmd = ['diskutil', 'unmountDisk', disk.path]
|
|
try:
|
|
exe.run_program(cmd)
|
|
except subprocess.CalledProcessError:
|
|
std.print_error('Failed to unmount source and/or destination')
|
|
std.abort()
|
|
|
|
# Prep destination
|
|
if self.mode == 'Clone':
|
|
self.prep_destination(source_parts, dry_run=docopt_args['--dry-run'])
|
|
|
|
# Safety Checks #2
|
|
if not docopt_args['--dry-run']:
|
|
for pair in self.block_pairs:
|
|
pair.safety_check()
|
|
|
|
def mark_started(self):
|
|
"""Edit clone settings, if applicable, to mark recovery as started."""
|
|
# Skip if not cloning
|
|
if self.mode != 'Clone':
|
|
return
|
|
|
|
# Skip if not using settings
|
|
# i.e. Cloning whole disk (or single partition via args)
|
|
if self.source.path.samefile(self.block_pairs[0].source):
|
|
return
|
|
|
|
# Update settings
|
|
settings = self._load_settings()
|
|
if settings.get('First Run', False):
|
|
settings['First Run'] = False
|
|
self._save_settings(settings)
|
|
|
|
def pass_above_threshold(self, pass_name):
|
|
"""Check if all block_pairs meet the pass threshold, returns bool."""
|
|
threshold = cfg.ddrescue.AUTO_PASS_THRESHOLDS[pass_name]
|
|
return all(
|
|
p.get_percent_recovered() >= threshold for p in self.block_pairs
|
|
)
|
|
|
|
def pass_complete(self, pass_name):
|
|
"""Check if all block_pairs completed pass_name, returns bool."""
|
|
return all(p.pass_complete(pass_name) for p in self.block_pairs)
|
|
|
|
def prep_destination(self, source_parts, dry_run=True):
|
|
"""Prep destination as necessary."""
|
|
# TODO: Split into Linux and macOS
|
|
# logical sector size is not easily found under macOS
|
|
# It might be easier to rewrite this section using macOS tools
|
|
dest_prefix = str(self.destination.path)
|
|
dest_prefix += get_partition_separator(self.destination.path.name)
|
|
esp_type = 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B'
|
|
msr_type = 'E3C9E316-0B5C-4DB8-817D-F92DF00215AE'
|
|
part_num = 0
|
|
sfdisk_script = []
|
|
settings = self._load_settings()
|
|
|
|
# Bail early
|
|
if not settings['Needs Format']:
|
|
return
|
|
|
|
# Add partition table settings
|
|
if settings['Table Type'] == 'GPT':
|
|
sfdisk_script.append('label: gpt')
|
|
else:
|
|
sfdisk_script.append('label: dos')
|
|
sfdisk_script.append('unit: sectors')
|
|
sfdisk_script.append('')
|
|
|
|
# Add boot partition if requested
|
|
if settings['Create Boot Partition']:
|
|
if settings['Table Type'] == 'GPT':
|
|
part_num += 1
|
|
sfdisk_script.append(
|
|
build_sfdisk_partition_line(
|
|
table_type='GPT',
|
|
dev_path=f'{dest_prefix}{part_num}',
|
|
size='384MiB',
|
|
details={'parttype': esp_type, 'partlabel': 'EFI System'},
|
|
),
|
|
)
|
|
part_num += 1
|
|
sfdisk_script.append(
|
|
build_sfdisk_partition_line(
|
|
table_type=settings['Table Type'],
|
|
dev_path=f'{dest_prefix}{part_num}',
|
|
size='16MiB',
|
|
details={'parttype': msr_type, 'partlabel': 'Microsoft Reserved'},
|
|
),
|
|
)
|
|
elif settings['Table Type'] == 'MBR':
|
|
part_num += 1
|
|
sfdisk_script.append(
|
|
build_sfdisk_partition_line(
|
|
table_type='MBR',
|
|
dev_path=f'{dest_prefix}{part_num}',
|
|
size='100MiB',
|
|
details={'parttype': '0x7', 'partlabel': 'System Reserved'},
|
|
),
|
|
)
|
|
|
|
# Add selected partition(s)
|
|
for part in source_parts:
|
|
num_sectors = part.details['size'] / self.destination.details['log-sec']
|
|
num_sectors = math.ceil(num_sectors)
|
|
part_num += 1
|
|
sfdisk_script.append(
|
|
build_sfdisk_partition_line(
|
|
table_type=settings['Table Type'],
|
|
dev_path=f'{dest_prefix}{part_num}',
|
|
size=num_sectors,
|
|
details=part.details,
|
|
),
|
|
)
|
|
|
|
# Save sfdisk script
|
|
script_path = (
|
|
f'{self.working_dir}/'
|
|
f'sfdisk_{self.destination.path.name}.script'
|
|
)
|
|
with open(script_path, 'w', encoding='utf-8') as _f:
|
|
_f.write('\n'.join(sfdisk_script))
|
|
|
|
# Skip real format for dry runs
|
|
if dry_run:
|
|
LOG.info('Dry run, refusing to format destination')
|
|
return
|
|
|
|
# Format disk
|
|
LOG.warning('Formatting destination: %s', self.destination.path)
|
|
with open(script_path, 'r', encoding='utf-8') as _f:
|
|
proc = exe.run_program(
|
|
cmd=['sudo', 'sfdisk', self.destination.path],
|
|
stdin=_f,
|
|
check=False,
|
|
)
|
|
if proc.returncode != 0:
|
|
std.print_error('Error(s) encoundtered while formatting destination')
|
|
raise std.GenericAbort()
|
|
|
|
# Update settings
|
|
settings['Needs Format'] = False
|
|
self._save_settings(settings)
|
|
|
|
def retry_all_passes(self):
|
|
"""Prep block_pairs for a retry recovery attempt."""
|
|
bad_statuses = ('*', '/', '-')
|
|
LOG.warning('Updating block_pairs for retry')
|
|
|
|
# Update all block_pairs
|
|
for pair in self.block_pairs:
|
|
map_data = []
|
|
|
|
# Reset status strings
|
|
for name in pair.status.keys():
|
|
pair.status[name] = 'Pending'
|
|
|
|
# Mark all non-trimmed, non-scraped, and bad areas as non-tried
|
|
with open(pair.map_path, 'r', encoding='utf-8') as _f:
|
|
for line in _f.readlines():
|
|
line = line.strip()
|
|
if line.startswith('0x') and line.endswith(bad_statuses):
|
|
line = f'{line[:-1]}?'
|
|
map_data.append(line)
|
|
|
|
# Save updated map
|
|
with open(pair.map_path, 'w', encoding='utf-8') as _f:
|
|
_f.write('\n'.join(map_data))
|
|
|
|
# Reinitialize status
|
|
pair.set_initial_status()
|
|
|
|
def safety_check_destination(self):
|
|
"""Run safety checks for destination and abort if necessary."""
|
|
try:
|
|
self.destination.safety_checks()
|
|
except hw_obj.CriticalHardwareError as err:
|
|
std.print_error(
|
|
f'Critical error(s) detected for: {self.destination.path}',
|
|
)
|
|
raise std.GenericAbort() from err
|
|
|
|
def safety_check_size(self):
|
|
"""Run size safety check and abort if necessary."""
|
|
required_size = sum(pair.size for pair in self.block_pairs)
|
|
settings = self._load_settings() if self.mode == 'Clone' else {}
|
|
|
|
# Increase required_size if necessary
|
|
if self.mode == 'Clone' and settings.get('Needs Format', False):
|
|
if settings['Table Type'] == 'GPT':
|
|
# Below is the size calculation for the GPT
|
|
# 1 LBA for the protective MBR
|
|
# 33 LBAs each for the primary and backup GPT tables
|
|
# Source: https://en.wikipedia.org/wiki/GUID_Partition_Table
|
|
required_size += (1 + 33 + 33) * self.destination.details['phy-sec']
|
|
if settings['Create Boot Partition']:
|
|
# 384MiB EFI System Partition and a 16MiB MS Reserved partition
|
|
required_size += (384 + 16) * 1024**2
|
|
else:
|
|
# MBR only requires one LBA but adding a full 4096 bytes anyway
|
|
required_size += 4096
|
|
if settings['Create Boot Partition']:
|
|
# 100MiB System Reserved partition
|
|
required_size += 100 * 1024**2
|
|
|
|
# Reduce required_size if necessary
|
|
if self.mode == 'Image':
|
|
for pair in self.block_pairs:
|
|
if pair.destination.exists():
|
|
# NOTE: This uses the "max space" of the destination
|
|
# i.e. not the apparent size which is smaller for sparse files
|
|
# While this can result in an out-of-space error it's better
|
|
# than nothing.
|
|
required_size -= pair.destination.stat().st_size
|
|
|
|
# Check destination size
|
|
if self.mode == 'Clone':
|
|
destination_size = self.destination.details['size']
|
|
error_msg = 'A larger destination disk is required'
|
|
else:
|
|
# NOTE: Adding an extra 5% here to better ensure it will fit
|
|
destination_size = psutil.disk_usage(self.destination).free
|
|
destination_size *= 1.05
|
|
error_msg = 'Not enough free space on the destination'
|
|
if required_size > destination_size:
|
|
std.print_error(error_msg)
|
|
raise std.GenericAbort()
|
|
|
|
def save_debug_reports(self):
|
|
"""Save debug reports to disk."""
|
|
LOG.info('Saving debug reports')
|
|
debug_dir = pathlib.Path(f'{self.log_dir}/debug')
|
|
if not debug_dir.exists():
|
|
debug_dir.mkdir()
|
|
|
|
# State (self)
|
|
std.save_pickles({'state': self}, debug_dir)
|
|
with open(f'{debug_dir}/state.report', 'a', encoding='utf-8') as _f:
|
|
_f.write('[Debug report]\n')
|
|
_f.write('\n'.join(debug.generate_object_report(self)))
|
|
_f.write('\n')
|
|
|
|
# Block pairs
|
|
for _bp in self.block_pairs:
|
|
with open(
|
|
f'{debug_dir}/block_pairs.report', 'a', encoding='utf-8') as _f:
|
|
_f.write('[Debug report]\n')
|
|
_f.write('\n'.join(debug.generate_object_report(_bp)))
|
|
_f.write('\n')
|
|
|
|
def skip_pass(self, pass_name):
|
|
"""Mark block_pairs as skipped if applicable."""
|
|
for pair in self.block_pairs:
|
|
if pair.status[pass_name] == 'Pending':
|
|
pair.status[pass_name] = 'Skipped'
|
|
|
|
def update_progress_pane(self, overall_status):
|
|
"""Update progress pane."""
|
|
report = []
|
|
separator = '─────────────────────'
|
|
width = cfg.ddrescue.TMUX_SIDE_WIDTH
|
|
|
|
# Status
|
|
report.append(std.color_string(f'{"Status":^{width}}', 'BLUE'))
|
|
if 'NEEDS ATTENTION' in overall_status:
|
|
report.append(
|
|
std.color_string(f'{overall_status:^{width}}', 'YELLOW_BLINK'),
|
|
)
|
|
else:
|
|
report.append(f'{overall_status:^{width}}')
|
|
report.append(separator)
|
|
|
|
# Overall progress
|
|
if self.block_pairs:
|
|
total_rescued = self.get_rescued_size()
|
|
percent = self.get_percent_recovered()
|
|
report.append(std.color_string('Overall Progress', 'BLUE'))
|
|
report.append(
|
|
f'Rescued: {format_status_string(percent, width=width-9)}',
|
|
)
|
|
report.append(
|
|
std.color_string(
|
|
[f'{std.bytes_to_string(total_rescued, decimals=2):>{width}}'],
|
|
[get_percent_color(percent)],
|
|
),
|
|
)
|
|
report.append(separator)
|
|
|
|
# Block pair progress
|
|
for pair in self.block_pairs:
|
|
report.append(std.color_string(pair.source, 'BLUE'))
|
|
for name, status in pair.status.items():
|
|
name = name.title()
|
|
report.append(
|
|
f'{name}{format_status_string(status, width=width-len(name))}',
|
|
)
|
|
report.append(' ')
|
|
|
|
# EToC
|
|
if overall_status in ('Active', 'NEEDS ATTENTION'):
|
|
etoc = get_etoc()
|
|
report.append(separator)
|
|
report.append(std.color_string('Estimated Pass Finish', 'BLUE'))
|
|
if overall_status == 'NEEDS ATTENTION' or etoc == 'N/A':
|
|
report.append(std.color_string('N/A', 'YELLOW'))
|
|
else:
|
|
report.append(etoc)
|
|
|
|
# Write to progress file
|
|
out_path = pathlib.Path(f'{self.log_dir}/progress.out')
|
|
with open(out_path, 'w', encoding='utf-8') as _f:
|
|
_f.write('\n'.join(report))
|
|
|
|
def update_top_panes(self):
|
|
"""(Re)create top source/destination panes."""
|
|
source_exists = True
|
|
dest_exists = True
|
|
width = tmux.get_pane_size()[0]
|
|
width = int(width / 2) - 1
|
|
|
|
def _format_string(obj, width):
|
|
"""Format source/dest string using obj and width, returns str."""
|
|
string = ''
|
|
|
|
# Build base string
|
|
if isinstance(obj, hw_obj.Disk):
|
|
string = f'{obj.path} {obj.description}'
|
|
elif obj.is_dir():
|
|
string = f'{obj}/'
|
|
elif obj.is_file():
|
|
size_str = std.bytes_to_string(
|
|
obj.stat().st_size,
|
|
decimals=0,
|
|
use_binary=False)
|
|
string = f'{obj.name} {size_str}'
|
|
|
|
# Adjust for width
|
|
if len(string) > width:
|
|
if hasattr(obj, 'is_dir') and obj.is_dir():
|
|
string = f'...{string[-width+3:]}'
|
|
else:
|
|
string = f'{string[:width-3]}...'
|
|
|
|
# Done
|
|
return string
|
|
|
|
# Check source/dest existance
|
|
if self.source:
|
|
source_exists = self.source.path.exists()
|
|
if self.destination:
|
|
if isinstance(self.destination, hw_obj.Disk):
|
|
dest_exists = self.destination.path.exists()
|
|
else:
|
|
dest_exists = self.destination.exists()
|
|
|
|
# Kill destination pane
|
|
if 'Destination' in self.panes:
|
|
tmux.kill_pane(self.panes.pop('Destination'))
|
|
|
|
# Source
|
|
source_str = ' '
|
|
if self.source:
|
|
source_str = _format_string(self.source, width)
|
|
tmux.respawn_pane(
|
|
self.panes['Source'],
|
|
text=std.color_string(
|
|
['Source', '' if source_exists else ' (Missing)', '\n', source_str],
|
|
['BLUE', 'RED', None, None],
|
|
sep='',
|
|
),
|
|
)
|
|
|
|
# Destination
|
|
dest_str = ''
|
|
if self.destination:
|
|
dest_str = _format_string(self.destination, width)
|
|
self.panes['Destination'] = tmux.split_window(
|
|
percent=50,
|
|
vertical=False,
|
|
target_id=self.panes['Source'],
|
|
text=std.color_string(
|
|
['Destination', '' if dest_exists else ' (Missing)', '\n', dest_str],
|
|
['BLUE', 'RED', None, None],
|
|
sep='',
|
|
),
|
|
)
|
|
|
|
|
|
# Functions
|
|
def build_block_pair_report(block_pairs, settings):
|
|
"""Build block pair report, returns list."""
|
|
report = []
|
|
notes = []
|
|
if block_pairs:
|
|
report.append(std.color_string('Block Pairs', 'GREEN'))
|
|
else:
|
|
# Bail early
|
|
return report
|
|
|
|
# Show block pair mapping
|
|
if settings and settings['Create Boot Partition']:
|
|
if settings['Table Type'] == 'GPT':
|
|
report.append(f'{" —— ":<9} --> EFI System Partition')
|
|
report.append(f'{" —— ":<9} --> Microsoft Reserved Partition')
|
|
elif settings['Table Type'] == 'MBR':
|
|
report.append(f'{" —— ":<9} --> System Reserved')
|
|
for pair in block_pairs:
|
|
report.append(f'{pair.source.name:<9} --> {pair.destination.name}')
|
|
|
|
# Show resume messages as necessary
|
|
if settings:
|
|
if not settings['First Run']:
|
|
notes.append(
|
|
std.color_string(
|
|
['NOTE:', 'Clone settings loaded from previous run.'],
|
|
['BLUE', None],
|
|
),
|
|
)
|
|
if settings['Needs Format'] and settings['Table Type']:
|
|
msg = f'Destination will be formatted using {settings["Table Type"]}'
|
|
notes.append(
|
|
std.color_string(
|
|
['NOTE:', msg],
|
|
['BLUE', None],
|
|
),
|
|
)
|
|
if any(pair.get_rescued_size() > 0 for pair in block_pairs):
|
|
notes.append(
|
|
std.color_string(
|
|
['NOTE:', 'Resume data loaded from map file(s).'],
|
|
['BLUE', None],
|
|
),
|
|
)
|
|
|
|
# Add notes to report
|
|
if notes:
|
|
report.append(' ')
|
|
report.extend(notes)
|
|
|
|
# Done
|
|
return report
|
|
|
|
|
|
def build_ddrescue_cmd(block_pair, pass_name, settings_menu):
|
|
"""Build ddrescue cmd using passed details, returns list."""
|
|
cmd = ['sudo', 'ddrescue']
|
|
if (block_pair.destination.is_block_device()
|
|
or block_pair.destination.is_char_device()):
|
|
cmd.append('--force')
|
|
cmd.extend(DDRESCUE_SPECIFIC_PASS_SETTINGS.get(pass_name, []))
|
|
|
|
# Fix domain size based on starting position
|
|
domain_size = block_pair.size
|
|
if settings_menu.options['--input-position']['Selected']:
|
|
settings_menu.options['--reverse']['Selected'] = False
|
|
input_position = std.string_to_bytes(
|
|
settings_menu.options['--input-position']['Value'],
|
|
)
|
|
domain_size -= input_position
|
|
cmd.append(f'--size={domain_size}')
|
|
|
|
# Determine skip sizes
|
|
if settings_menu.options['--skip-size']['Selected']:
|
|
skip_sizes = settings_menu.options['--skip-size']['Value'].split(',')
|
|
skip_sizes = [float(s) for s in skip_sizes]
|
|
initial_skip = max(INITIAL_SKIP_MIN, int(block_pair.size * skip_sizes[0]))
|
|
max_skip = min(int(block_pair.size * skip_sizes[1]), domain_size)
|
|
max_skip = max(INITIAL_SKIP_MIN, max_skip)
|
|
cmd.append(f'--skip-size={initial_skip},{max_skip}')
|
|
cmd.extend(get_ddrescue_settings(settings_menu))
|
|
|
|
# Add source physical sector size (if possible)
|
|
cmd.append(f'--sector-size={block_pair.sector_size}')
|
|
|
|
# Add block pair and map file
|
|
if PLATFORM == 'Darwin':
|
|
# Use Raw disks if possible
|
|
for dev in (block_pair.source, block_pair.destination):
|
|
raw_dev = pathlib.Path(dev.with_name(f'r{dev.name}'))
|
|
if raw_dev.exists():
|
|
cmd.append(raw_dev)
|
|
else:
|
|
cmd.append(dev)
|
|
else:
|
|
cmd.append(block_pair.source)
|
|
cmd.append(block_pair.destination)
|
|
cmd.append(block_pair.map_path)
|
|
|
|
# Done
|
|
LOG.debug('ddrescue cmd: %s', cmd)
|
|
return cmd
|
|
|
|
|
|
def build_directory_report(path):
|
|
"""Build directory report, returns list."""
|
|
path = f'{path}/'
|
|
report = []
|
|
|
|
# Get details
|
|
if PLATFORM == 'Linux':
|
|
cmd = [
|
|
'findmnt',
|
|
'--output', 'SIZE,AVAIL,USED,FSTYPE,OPTIONS',
|
|
'--target', path,
|
|
]
|
|
proc = exe.run_program(cmd)
|
|
width = len(path) + 1
|
|
for line in proc.stdout.splitlines():
|
|
line = line.replace('\n', '')
|
|
if 'FSTYPE' in line:
|
|
line = std.color_string(f'{"PATH":<{width}}{line}', 'BLUE')
|
|
else:
|
|
line = f'{path:<{width}}{line}'
|
|
report.append(line)
|
|
else:
|
|
report.append(std.color_string('PATH', 'BLUE'))
|
|
report.append(str(path))
|
|
|
|
# Done
|
|
return report
|
|
|
|
|
|
def build_disk_report(dev):
|
|
"""Build device report, returns list."""
|
|
children = dev.details.get('children', [])
|
|
report = []
|
|
|
|
# Get widths
|
|
widths = {
|
|
'fstype': max(6, len(str(dev.details.get('fstype', '')))),
|
|
'label': max(5, len(str(dev.details.get('label', '')))),
|
|
'name': max(4, len(dev.path.name)),
|
|
}
|
|
for child in children:
|
|
widths['fstype'] = max(widths['fstype'], len(str(child['fstype'])))
|
|
widths['label'] = max(widths['label'], len(str(child['label'])))
|
|
widths['name'] = max(
|
|
widths['name'],
|
|
len(child['name'].replace('/dev/', '')),
|
|
)
|
|
widths = {k: v+1 for k, v in widths.items()}
|
|
|
|
# Disk details
|
|
report.append(f'{dev.path.name} {dev.description}')
|
|
report.append(' ')
|
|
dev_fstype = dev.details.get('fstype', '')
|
|
dev_label = dev.details.get('label', '')
|
|
dev_name = dev.path.name
|
|
dev_size = std.bytes_to_string(dev.details["size"], use_binary=False)
|
|
|
|
# Partition details
|
|
report.append(
|
|
std.color_string(
|
|
(
|
|
f'{"NAME":<{widths["name"]}}'
|
|
f'{" " if children else ""}'
|
|
f'{"SIZE":<7}'
|
|
f'{"FSTYPE":<{widths["fstype"]}}'
|
|
f'{"LABEL":<{widths["label"]}}'
|
|
),
|
|
'BLUE',
|
|
),
|
|
)
|
|
report.append(
|
|
f'{dev_name if dev_name else "":<{widths["name"]}}'
|
|
f'{" " if children else ""}'
|
|
f'{dev_size:>6} '
|
|
f'{dev_fstype if dev_fstype else "":<{widths["fstype"]}}'
|
|
f'{dev_label if dev_label else "":<{widths["label"]}}'
|
|
)
|
|
for child in children:
|
|
fstype = child['fstype']
|
|
label = child['label']
|
|
name = child['name'].replace('/dev/', '')
|
|
size = std.bytes_to_string(child["size"], use_binary=False)
|
|
report.append(
|
|
f'{name if name else "":<{widths["name"]}}'
|
|
f'{size:>6} '
|
|
f'{fstype if fstype else "":<{widths["fstype"]}}'
|
|
f'{label if label else "":<{widths["label"]}}'
|
|
)
|
|
|
|
# Indent children
|
|
if len(children) > 1:
|
|
report = [
|
|
*report[:4],
|
|
*[f'├─{line}' for line in report[4:-1]],
|
|
f'└─{report[-1]}',
|
|
]
|
|
elif len(children) == 1:
|
|
report[-1] = f'└─{report[-1]}'
|
|
|
|
# Done
|
|
return report
|
|
|
|
|
|
def build_main_menu():
|
|
"""Build main menu, returns wk.std.Menu."""
|
|
menu = std.Menu(title=std.color_string('ddrescue TUI: Main Menu', 'GREEN'))
|
|
menu.separator = ' '
|
|
|
|
# Add actions, options, etc
|
|
for action in MENU_ACTIONS:
|
|
if not (PLATFORM == 'Darwin' and 'Detect drives' in action):
|
|
menu.add_action(action)
|
|
for toggle, selected in MENU_TOGGLES.items():
|
|
menu.add_toggle(toggle, {'Selected': selected})
|
|
|
|
# Done
|
|
return menu
|
|
|
|
|
|
def build_object_report(obj):
|
|
"""Build object report, returns list."""
|
|
report = []
|
|
|
|
# Get details based on object given
|
|
if hasattr(obj, 'is_dir') and obj.is_dir():
|
|
# Directory report
|
|
report = build_directory_report(obj)
|
|
else:
|
|
# Device report
|
|
report = build_disk_report(obj)
|
|
|
|
# Done
|
|
return report
|
|
|
|
|
|
def build_settings_menu(silent=True):
|
|
"""Build settings menu, returns wk.std.Menu."""
|
|
title_text = [
|
|
std.color_string('ddrescue TUI: Expert Settings', 'GREEN'),
|
|
' ',
|
|
std.color_string(
|
|
['These settings can cause', 'MAJOR DAMAGE', 'to drives'],
|
|
['YELLOW', 'RED', 'YELLOW'],
|
|
),
|
|
'Please read the manual before making changes',
|
|
]
|
|
menu = std.Menu(title='\n'.join(title_text))
|
|
menu.separator = ' '
|
|
preset = 'Default'
|
|
if not silent:
|
|
# Ask which preset to use
|
|
print(f'Available ddrescue presets: {" / ".join(SETTING_PRESETS)}')
|
|
preset = std.choice(SETTING_PRESETS, 'Please select a preset:')
|
|
|
|
# Fix selection
|
|
for _p in SETTING_PRESETS:
|
|
if _p.startswith(preset):
|
|
preset = _p
|
|
|
|
# Add default settings
|
|
menu.add_action('Load Preset')
|
|
menu.add_action('Main Menu')
|
|
for name, details in DDRESCUE_SETTINGS['Default'].items():
|
|
menu.add_option(name, details.copy())
|
|
|
|
# Update settings using preset
|
|
if preset != 'Default':
|
|
for name, details in DDRESCUE_SETTINGS[preset].items():
|
|
menu.options[name].update(details.copy())
|
|
|
|
# Done
|
|
return menu
|
|
|
|
|
|
def build_sfdisk_partition_line(table_type, dev_path, size, details):
|
|
"""Build sfdisk partition line using passed details, returns str."""
|
|
line = f'{dev_path} : size={size}'
|
|
dest_type = ''
|
|
source_filesystem = str(details.get('fstype', '')).upper()
|
|
source_table_type = ''
|
|
source_type = details.get('parttype', '')
|
|
|
|
# Set dest type
|
|
if re.match(r'^0x\w+$', source_type):
|
|
# Both source and dest are MBR
|
|
source_table_type = 'MBR'
|
|
if table_type == 'MBR':
|
|
dest_type = source_type.replace('0x', '').lower()
|
|
elif re.match(r'^\w{8}-\w{4}-\w{4}-\w{4}-\w{12}$', source_type):
|
|
# Source is a GPT type
|
|
source_table_type = 'GPT'
|
|
if table_type == 'GPT':
|
|
dest_type = source_type.upper()
|
|
if not dest_type:
|
|
# Assuming changing table types, set based on FS
|
|
if source_filesystem in cfg.ddrescue.PARTITION_TYPES.get(table_type, {}):
|
|
dest_type = cfg.ddrescue.PARTITION_TYPES[table_type][source_filesystem]
|
|
line += f', type={dest_type}'
|
|
|
|
# Safety Check
|
|
if not dest_type:
|
|
std.print_error(f'Failed to determine partition type for: {dev_path}')
|
|
raise std.GenericAbort()
|
|
|
|
# Add extra details
|
|
if details.get('partlabel', ''):
|
|
line += f', name="{details["partlabel"]}"'
|
|
if details.get('partuuid', '') and source_table_type == table_type:
|
|
# Only add UUID if source/dest table types match
|
|
line += f', uuid={details["partuuid"].upper()}'
|
|
|
|
# Done
|
|
return line
|
|
|
|
|
|
def check_destination_health(destination):
|
|
"""Check destination health, returns str."""
|
|
result = ''
|
|
|
|
# Bail early
|
|
if not isinstance(destination, hw_obj.Disk):
|
|
# Return empty string
|
|
return result
|
|
|
|
# Run safety checks
|
|
try:
|
|
destination.safety_checks()
|
|
except hw_obj.CriticalHardwareError:
|
|
result = 'Critical hardware error detected on destination'
|
|
except hw_obj.SMARTSelfTestInProgressError:
|
|
result = 'SMART self-test in progress on destination'
|
|
except hw_obj.SMARTNotSupportedError:
|
|
pass
|
|
|
|
# Done
|
|
return result
|
|
|
|
|
|
def clean_working_dir(working_dir):
|
|
"""Clean working directory to ensure a fresh recovery session.
|
|
|
|
NOTE: Data from previous sessions will be preserved
|
|
in a backup directory.
|
|
"""
|
|
backup_dir = pathlib.Path(f'{working_dir}/prev')
|
|
backup_dir = io.non_clobber_path(backup_dir)
|
|
backup_dir.mkdir()
|
|
|
|
# Move settings, maps, etc to backup_dir
|
|
for entry in os.scandir(working_dir):
|
|
if entry.name.endswith(('.dd', '.json', '.map')):
|
|
new_path = f'{backup_dir}/{entry.name}'
|
|
new_path = io.non_clobber_path(new_path)
|
|
shutil.move(entry.path, new_path)
|
|
|
|
|
|
def format_status_string(status, width):
|
|
"""Format colored status string, returns str."""
|
|
color = None
|
|
percent = -1
|
|
status_str = str(status)
|
|
|
|
# Check if status is percentage
|
|
try:
|
|
percent = float(status_str)
|
|
except ValueError:
|
|
# Assuming status is text
|
|
pass
|
|
|
|
# Format status
|
|
if percent >= 0:
|
|
# Percentage
|
|
color = get_percent_color(percent)
|
|
status_str = f'{percent:{width-2}.2f} %'
|
|
if '100.00' in status_str and percent < 100:
|
|
# Always round down to 99.99%
|
|
LOG.warning('Rounding down to 99.99 from %s', percent)
|
|
status_str = f'{"99.99 %":>{width}}'
|
|
else:
|
|
# Text
|
|
color = STATUS_COLORS.get(status_str, None)
|
|
status_str = f'{status_str:>{width}}'
|
|
|
|
# Add color if necessary
|
|
if color:
|
|
status_str = std.color_string(status_str, color)
|
|
|
|
# Done
|
|
return status_str
|
|
|
|
|
|
def fstype_is_ok(path, map_dir=False):
|
|
"""Check if filesystem type is acceptable, returns bool."""
|
|
is_ok = False
|
|
fstype = None
|
|
|
|
# Get fstype
|
|
if PLATFORM == 'Darwin':
|
|
# Check all parent dirs until a mountpoint is found
|
|
test_path = pathlib.Path(path)
|
|
while test_path:
|
|
fstype = get_fstype_macos(test_path)
|
|
if fstype != 'UNKNOWN':
|
|
break
|
|
fstype = None
|
|
test_path = test_path.parent
|
|
elif PLATFORM == 'Linux':
|
|
cmd = [
|
|
'findmnt',
|
|
'--noheadings',
|
|
'--output', 'FSTYPE',
|
|
'--target', path,
|
|
]
|
|
proc = exe.run_program(cmd, check=False)
|
|
fstype = proc.stdout
|
|
fstype = fstype.strip().lower()
|
|
|
|
# Check fstype
|
|
if map_dir:
|
|
is_ok = RECOMMENDED_MAP_FSTYPES.match(fstype)
|
|
else:
|
|
is_ok = RECOMMENDED_FSTYPES.match(fstype)
|
|
|
|
# Done
|
|
return is_ok
|
|
|
|
|
|
def get_ddrescue_settings(settings_menu):
|
|
"""Get ddrescue settings from menu selections, returns list."""
|
|
settings = []
|
|
|
|
# Check menu selections
|
|
for name, details in settings_menu.options.items():
|
|
if name == '--skip-size':
|
|
continue
|
|
if details['Selected']:
|
|
if 'Value' in details:
|
|
settings.append(f'{name}={details["Value"]}')
|
|
else:
|
|
settings.append(name)
|
|
|
|
# Done
|
|
return settings
|
|
|
|
|
|
def get_etoc():
|
|
"""Get EToC from ddrescue output, returns str."""
|
|
delta = None
|
|
delta_dict = {}
|
|
etoc = 'Unknown'
|
|
now = datetime.datetime.now(tz=TIMEZONE)
|
|
output = tmux.capture_pane()
|
|
|
|
# Search for EToC delta
|
|
matches = re.findall(r'remaining time:.*$', output, re.MULTILINE)
|
|
if matches:
|
|
match = REGEX_REMAINING_TIME.search(matches[-1])
|
|
if match.group('na'):
|
|
etoc = 'N/A'
|
|
else:
|
|
for key in ('days', 'hours', 'minutes', 'seconds'):
|
|
delta_dict[key] = match.group(key)
|
|
delta_dict = {k: int(v) if v else 0 for k, v in delta_dict.items()}
|
|
delta = datetime.timedelta(**delta_dict)
|
|
|
|
# Calc EToC if delta found
|
|
if delta:
|
|
etoc_datetime = now + delta
|
|
etoc = etoc_datetime.strftime('%Y-%m-%d %H:%M %Z')
|
|
|
|
# Done
|
|
return etoc
|
|
|
|
|
|
def get_fstype_macos(path):
|
|
"""Get fstype for path under macOS, returns str."""
|
|
fstype = 'UNKNOWN'
|
|
proc = exe.run_program(['mount'], check=False)
|
|
|
|
# Bail early
|
|
if proc.returncode:
|
|
return fstype
|
|
|
|
# Parse output
|
|
match = re.search(rf'{path} \((\w+)', proc.stdout)
|
|
if match:
|
|
fstype = match.group(1)
|
|
|
|
# Done
|
|
return fstype
|
|
|
|
|
|
def get_object(path):
|
|
"""Get object based on path, returns obj."""
|
|
obj = None
|
|
|
|
# Bail early
|
|
if not path:
|
|
return obj
|
|
|
|
# Check path
|
|
path = pathlib.Path(path).resolve()
|
|
if path.is_block_device() or path.is_char_device():
|
|
obj = hw_obj.Disk(path)
|
|
|
|
# Child/Parent check
|
|
parent = obj.details['parent']
|
|
if parent:
|
|
std.print_warning(f'"{obj.path}" is a child device')
|
|
if std.ask(f'Use parent device "{parent}" instead?'):
|
|
obj = hw_obj.Disk(parent)
|
|
elif path.is_dir():
|
|
obj = path
|
|
elif path.is_file():
|
|
# Assuming file is a raw image, mounting
|
|
loop_path = mount_raw_image(path)
|
|
obj = hw_obj.Disk(loop_path)
|
|
|
|
# Abort if obj not set
|
|
if not obj:
|
|
std.print_error(f'Invalid source/dest path: {path}')
|
|
raise std.GenericAbort()
|
|
|
|
# Done
|
|
return obj
|
|
|
|
|
|
def get_partition_separator(name):
|
|
"""Get partition separator based on device name, returns str."""
|
|
separator = ''
|
|
if re.search(r'(loop|mmc|nvme)', name, re.IGNORECASE):
|
|
separator = 'p'
|
|
|
|
return separator
|
|
|
|
|
|
def get_percent_color(percent):
|
|
"""Get color based on percentage, returns str."""
|
|
color = None
|
|
if percent > 100:
|
|
color = 'PURPLE'
|
|
elif percent >= 99:
|
|
color = 'GREEN'
|
|
elif percent >= 90:
|
|
color = 'YELLOW'
|
|
elif percent > 0:
|
|
color = 'RED'
|
|
|
|
# Done
|
|
return color
|
|
|
|
|
|
def get_table_type(disk):
|
|
"""Get disk partition table type, returns str.
|
|
|
|
NOTE: If resulting table type is not GPT or MBR
|
|
then an exception is raised.
|
|
"""
|
|
table_type = str(disk.details.get('pttype', '')).upper()
|
|
table_type = table_type.replace('DOS', 'MBR')
|
|
|
|
# Check type
|
|
if table_type not in ('GPT', 'MBR'):
|
|
std.print_error(f'Unsupported partition table type: {table_type}')
|
|
raise std.GenericAbort()
|
|
|
|
# Done
|
|
return table_type
|
|
|
|
|
|
def get_working_dir(mode, destination, force_local=False):
|
|
"""Get working directory using mode and destination, returns path."""
|
|
ticket_id = None
|
|
working_dir = None
|
|
|
|
# Set ticket ID
|
|
while ticket_id is None:
|
|
ticket_id = std.input_text(
|
|
prompt='Please enter ticket ID:',
|
|
allow_empty_response=False,
|
|
)
|
|
ticket_id = ticket_id.replace(' ', '_')
|
|
if not re.match(r'^\d+', ticket_id):
|
|
ticket_id = None
|
|
|
|
# Use preferred path if possible
|
|
if mode == 'Image':
|
|
try:
|
|
path = pathlib.Path(destination).resolve()
|
|
except TypeError as err:
|
|
std.print_error(f'Invalid destination: {destination}')
|
|
raise std.GenericAbort() from err
|
|
if path.exists() and fstype_is_ok(path, map_dir=False):
|
|
working_dir = path
|
|
elif mode == 'Clone' and not force_local:
|
|
std.print_info('Mounting backup shares...')
|
|
net.mount_backup_shares(read_write=True)
|
|
for server in cfg.net.BACKUP_SERVERS:
|
|
path = pathlib.Path(
|
|
f'/{"Volumes" if PLATFORM == "Darwin" else "Backups"}/{server}',
|
|
)
|
|
if path.exists() and fstype_is_ok(path, map_dir=True):
|
|
# Acceptable path found
|
|
working_dir = path
|
|
break
|
|
|
|
# Default to current dir if necessary
|
|
if not working_dir:
|
|
LOG.error('Failed to set preferred working directory')
|
|
working_dir = pathlib.Path(os.getcwd())
|
|
|
|
# Set subdir using ticket ID
|
|
if mode == 'Clone':
|
|
working_dir = working_dir.joinpath(ticket_id)
|
|
|
|
# Create directory
|
|
working_dir.mkdir(parents=True, exist_ok=True)
|
|
os.chdir(working_dir)
|
|
|
|
# Done
|
|
LOG.info('Set working directory to: %s', working_dir)
|
|
return working_dir
|
|
|
|
|
|
def is_missing_source_or_destination(state):
|
|
"""Check if source or destination dissapeared, returns bool."""
|
|
missing = False
|
|
items = {
|
|
'Source': state.source,
|
|
'Destination': state.destination,
|
|
}
|
|
|
|
# Check items
|
|
for name, item in items.items():
|
|
if not item:
|
|
continue
|
|
if hasattr(item, 'path'):
|
|
if not item.path.exists():
|
|
missing = True
|
|
std.print_error(f'{name} disappeared')
|
|
elif hasattr(item, 'exists'):
|
|
if not item.exists():
|
|
missing = True
|
|
std.print_error(f'{name} disappeared')
|
|
else:
|
|
LOG.error('Unknown %s type: %s', name, item)
|
|
|
|
# Update top panes
|
|
state.update_top_panes()
|
|
|
|
# Done
|
|
return missing
|
|
|
|
|
|
def source_or_destination_changed(state):
|
|
"""Verify the source and destination objects are still valid."""
|
|
changed = False
|
|
|
|
# Compare objects
|
|
for obj in (state.source, state.destination):
|
|
if not obj:
|
|
changed = True
|
|
elif hasattr(obj, 'exists'):
|
|
# Assuming dest path
|
|
changed = changed or not obj.exists()
|
|
elif isinstance(obj, hw_obj.Disk):
|
|
compare_dev = hw_obj.Disk(obj.path)
|
|
for key in ('model', 'serial'):
|
|
changed = changed or obj.details[key] != compare_dev.details[key]
|
|
|
|
# Update top panes
|
|
state.update_top_panes()
|
|
|
|
# Done
|
|
if changed:
|
|
std.print_error('Source and/or Destination changed')
|
|
return changed
|
|
|
|
|
|
def main():
|
|
# pylint: disable=too-many-branches
|
|
"""Main function for ddrescue TUI."""
|
|
args = docopt(DOCSTRING)
|
|
log.update_log_path(dest_name='ddrescue-TUI', timestamp=True)
|
|
|
|
# Check if running inside tmux
|
|
if 'TMUX' not in os.environ:
|
|
LOG.error('tmux session not found')
|
|
raise RuntimeError('tmux session not found')
|
|
|
|
# Init
|
|
atexit.register(tmux.kill_all_panes)
|
|
main_menu = build_main_menu()
|
|
settings_menu = build_settings_menu()
|
|
state = State()
|
|
try:
|
|
state.init_recovery(args)
|
|
except (FileNotFoundError, std.GenericAbort):
|
|
is_missing_source_or_destination(state)
|
|
std.abort()
|
|
|
|
# Show menu
|
|
while True:
|
|
selection = main_menu.advanced_select()
|
|
|
|
# Change settings
|
|
if 'Change settings' in selection[0]:
|
|
while True:
|
|
selection = settings_menu.settings_select()
|
|
if 'Load Preset' in selection:
|
|
# Rebuild settings menu using preset
|
|
settings_menu = build_settings_menu(silent=False)
|
|
else:
|
|
break
|
|
|
|
# Detect drives
|
|
if 'Detect drives' in selection[0]:
|
|
std.clear_screen()
|
|
std.print_warning(DETECT_DRIVES_NOTICE)
|
|
if std.ask('Are you sure you proceed?'):
|
|
std.print_standard('Forcing controllers to rescan for devices...')
|
|
cmd = 'echo "- - -" | sudo tee /sys/class/scsi_host/host*/scan'
|
|
exe.run_program(cmd, check=False, shell=True)
|
|
if source_or_destination_changed(state):
|
|
std.abort()
|
|
|
|
# Start recovery
|
|
if 'Start' in selection:
|
|
std.clear_screen()
|
|
run_recovery(state, main_menu, settings_menu, dry_run=args['--dry-run'])
|
|
|
|
# Quit
|
|
if 'Quit' in selection:
|
|
total_percent = state.get_percent_recovered()
|
|
if total_percent == 100:
|
|
break
|
|
|
|
# Recovey < 100%
|
|
std.print_warning('Recovery is less than 100%')
|
|
if std.ask('Are you sure you want to quit?'):
|
|
break
|
|
|
|
# Save results to log
|
|
LOG.info('')
|
|
for line in state.generate_report():
|
|
LOG.info(' %s', std.strip_colors(line))
|
|
|
|
|
|
def mount_raw_image(path):
|
|
"""Mount raw image using OS specific methods, returns pathlib.Path."""
|
|
loopback_path = None
|
|
|
|
if PLATFORM == 'Darwin':
|
|
loopback_path = mount_raw_image_macos(path)
|
|
elif PLATFORM == 'Linux':
|
|
loopback_path = mount_raw_image_linux(path)
|
|
|
|
# Check
|
|
if not loopback_path:
|
|
std.print_error(f'Failed to mount image: {path}')
|
|
|
|
# Register unmount atexit
|
|
atexit.register(unmount_loopback_device, loopback_path)
|
|
|
|
# Done
|
|
return loopback_path
|
|
|
|
|
|
def mount_raw_image_linux(path):
|
|
"""Mount raw image using losetup, returns pathlib.Path."""
|
|
loopback_path = None
|
|
|
|
# Mount using losetup
|
|
cmd = [
|
|
'sudo',
|
|
'losetup',
|
|
'--find',
|
|
'--partscan',
|
|
'--show',
|
|
path,
|
|
]
|
|
proc = exe.run_program(cmd, check=False)
|
|
|
|
# Check result
|
|
if proc.returncode == 0:
|
|
loopback_path = proc.stdout.strip()
|
|
|
|
# Done
|
|
return loopback_path
|
|
|
|
def mount_raw_image_macos(path):
|
|
"""Mount raw image using hdiutil, returns pathlib.Path."""
|
|
loopback_path = None
|
|
plist_data = {}
|
|
|
|
# Mount using hdiutil
|
|
# plistdata['system-entities'][{}...]
|
|
cmd = [
|
|
'hdiutil', 'attach',
|
|
'-imagekey', 'diskimage-class=CRawDiskImage',
|
|
'-nomount',
|
|
'-plist',
|
|
'-readonly',
|
|
path,
|
|
]
|
|
proc = exe.run_program(cmd, check=False, encoding=None, errors=None)
|
|
|
|
# Check result
|
|
try:
|
|
plist_data = plistlib.loads(proc.stdout)
|
|
except plistlib.InvalidFileException:
|
|
return None
|
|
for dev in plist_data.get('system-entities', []):
|
|
dev_path = dev.get('dev-entry', '')
|
|
if re.match(r'^/dev/disk\d+$', dev_path):
|
|
loopback_path = dev_path
|
|
|
|
# Done
|
|
return loopback_path
|
|
|
|
|
|
def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True):
|
|
# pylint: disable=too-many-statements
|
|
"""Run ddrescue using passed settings."""
|
|
cmd = build_ddrescue_cmd(block_pair, pass_name, settings)
|
|
poweroff_source_after_idle = True
|
|
state.update_progress_pane('Active')
|
|
std.clear_screen()
|
|
warning_message = ''
|
|
|
|
def _poweroff_source_drive(idle_minutes):
|
|
"""Power off source drive after a while."""
|
|
source_dev = state.source.path
|
|
|
|
# Bail early
|
|
if PLATFORM == 'Darwin':
|
|
return
|
|
|
|
# Sleep
|
|
i = 0
|
|
while i < idle_minutes*60:
|
|
if not poweroff_source_after_idle:
|
|
# Countdown canceled, exit without powering-down drives
|
|
return
|
|
if i % 600 == 0 and i > 0:
|
|
if i == 600:
|
|
std.print_standard(' ', flush=True)
|
|
std.print_warning(
|
|
f'Powering off source in {int((idle_minutes*60-i)/60)} minutes...',
|
|
)
|
|
std.sleep(5)
|
|
i += 5
|
|
|
|
# Power off drive
|
|
cmd = ['sudo', 'hdparm', '-Y', source_dev]
|
|
proc = exe.run_program(cmd, check=False)
|
|
if proc.returncode:
|
|
std.print_error(f'Failed to poweroff source {source_dev}')
|
|
else:
|
|
std.print_warning(f'Powered off source {source_dev}')
|
|
std.print_standard(
|
|
'Press Enter to return to main menu...', end='', flush=True,
|
|
)
|
|
|
|
def _update_smart_pane():
|
|
"""Update SMART pane every 30 seconds."""
|
|
state.source.update_smart_details()
|
|
now = datetime.datetime.now(tz=TIMEZONE).strftime('%Y-%m-%d %H:%M %Z')
|
|
with open(f'{state.log_dir}/smart.out', 'w', encoding='utf-8') as _f:
|
|
_f.write(
|
|
std.color_string(
|
|
['SMART Attributes', f'Updated: {now}\n'],
|
|
['BLUE', 'YELLOW'],
|
|
sep='\t\t',
|
|
),
|
|
)
|
|
_f.write('\n'.join(state.source.generate_report(header=False)))
|
|
|
|
# Dry run
|
|
if dry_run:
|
|
LOG.info('ddrescue cmd: %s', cmd)
|
|
return
|
|
|
|
# Start ddrescue and ddrescueview (if enabled)
|
|
proc = exe.popen_program(cmd)
|
|
if block_pair.view_map:
|
|
exe.popen_program(
|
|
['ddrescueview', '-r', '5s', block_pair.map_path],
|
|
pipe=True,
|
|
)
|
|
|
|
# ddrescue loop
|
|
_i = 0
|
|
while True:
|
|
if _i % 30 == 0:
|
|
# Update SMART pane
|
|
_update_smart_pane()
|
|
|
|
# Check destination
|
|
warning_message = check_destination_health(state.destination)
|
|
if warning_message:
|
|
# Error detected on destination, stop recovery
|
|
exe.stop_process(proc)
|
|
std.print_error(warning_message)
|
|
break
|
|
|
|
if _i % 60 == 0:
|
|
# Clear ddrescue pane
|
|
tmux.clear_pane()
|
|
_i += 1
|
|
|
|
# Update progress
|
|
block_pair.update_progress(pass_name)
|
|
state.update_progress_pane('Active')
|
|
|
|
# Check if complete
|
|
try:
|
|
proc.wait(timeout=1)
|
|
break
|
|
except KeyboardInterrupt:
|
|
# Wait a bit to let ddrescue exit safely
|
|
LOG.warning('ddrescue stopped by user')
|
|
warning_message = 'Aborted'
|
|
std.sleep(2)
|
|
exe.stop_process(proc, graceful=False)
|
|
break
|
|
except subprocess.TimeoutExpired:
|
|
# Continue to next loop to update panes
|
|
pass
|
|
else:
|
|
# Done
|
|
std.sleep(1)
|
|
break
|
|
|
|
# Update progress
|
|
# NOTE: Using 'Active' here to avoid flickering between block pairs
|
|
block_pair.update_progress(pass_name)
|
|
state.update_progress_pane('Active')
|
|
|
|
# Check result
|
|
if proc.poll():
|
|
# True if return code is non-zero (poll() returns None if still running)
|
|
poweroff_thread = exe.start_thread(
|
|
_poweroff_source_drive,
|
|
[cfg.ddrescue.DRIVE_POWEROFF_TIMEOUT],
|
|
)
|
|
warning_message = 'Error(s) encountered, see message above'
|
|
state.update_top_panes()
|
|
if warning_message:
|
|
print(' ')
|
|
print(' ')
|
|
std.print_error('DDRESCUE PROCESS HALTED')
|
|
print(' ')
|
|
std.print_warning(warning_message)
|
|
|
|
# Needs attention?
|
|
if str(proc.poll()) != '0':
|
|
state.update_progress_pane('NEEDS ATTENTION')
|
|
std.pause('Press Enter to return to main menu...')
|
|
|
|
# Stop source poweroff countdown
|
|
std.print_standard('Stopping device poweroff countdown...', flush=True)
|
|
poweroff_source_after_idle = False
|
|
poweroff_thread.join()
|
|
|
|
# Done
|
|
raise std.GenericAbort()
|
|
|
|
|
|
def run_recovery(state, main_menu, settings_menu, dry_run=True):
|
|
# pylint: disable=too-many-branches
|
|
"""Run recovery passes."""
|
|
atexit.register(state.save_debug_reports)
|
|
attempted_recovery = False
|
|
auto_continue = False
|
|
|
|
# Bail early
|
|
if is_missing_source_or_destination(state):
|
|
std.print_standard('')
|
|
std.pause('Press Enter to return to main menu...')
|
|
return
|
|
if source_or_destination_changed(state):
|
|
std.print_standard('')
|
|
std.abort()
|
|
|
|
# Get settings
|
|
for name, details in main_menu.toggles.items():
|
|
if 'Auto continue' in name and details['Selected']:
|
|
auto_continue = True
|
|
if 'Retry' in name and details['Selected']:
|
|
details['Selected'] = False
|
|
state.retry_all_passes()
|
|
|
|
# Start SMART/Journal
|
|
state.panes['SMART'] = tmux.split_window(
|
|
behind=True, lines=12, vertical=True,
|
|
watch_file=f'{state.log_dir}/smart.out',
|
|
)
|
|
if PLATFORM != 'Darwin':
|
|
state.panes['Journal'] = tmux.split_window(
|
|
lines=4, vertical=True, cmd='journalctl --dmesg --follow',
|
|
)
|
|
|
|
# Run pass(es)
|
|
for pass_name in ('read-skip', 'read-full', 'trim', 'scrape'):
|
|
abort = False
|
|
|
|
# Skip to next pass
|
|
if state.pass_complete(pass_name):
|
|
# NOTE: This bypasses auto_continue
|
|
state.skip_pass(pass_name)
|
|
continue
|
|
|
|
# Run ddrescue
|
|
for pair in state.block_pairs:
|
|
if not pair.pass_complete(pass_name):
|
|
attempted_recovery = True
|
|
state.mark_started()
|
|
try:
|
|
run_ddrescue(state, pair, pass_name, settings_menu, dry_run=dry_run)
|
|
except (FileNotFoundError, KeyboardInterrupt, std.GenericAbort):
|
|
is_missing_source_or_destination(state)
|
|
abort = True
|
|
break
|
|
|
|
# Continue or return to menu
|
|
all_complete = state.pass_complete(pass_name)
|
|
all_above_threshold = state.pass_above_threshold(pass_name)
|
|
if abort or not (all_complete and all_above_threshold and auto_continue):
|
|
LOG.warning('Recovery halted')
|
|
break
|
|
|
|
# Stop SMART/Journal
|
|
for pane in ('SMART', 'Journal'):
|
|
if pane in state.panes:
|
|
tmux.kill_pane(state.panes.pop(pane))
|
|
|
|
# Show warning if nothing was done
|
|
if not attempted_recovery:
|
|
std.print_warning('No actions performed')
|
|
std.print_standard(' ')
|
|
std.pause('Press Enter to return to main menu...')
|
|
|
|
# Done
|
|
state.save_debug_reports()
|
|
atexit.unregister(state.save_debug_reports)
|
|
state.update_progress_pane('Idle')
|
|
|
|
|
|
def select_disk(prompt, skip_disk=None):
|
|
"""Select disk from list, returns Disk()."""
|
|
std.print_info('Scanning disks...')
|
|
disks = hw_obj.get_disks()
|
|
menu = std.Menu(
|
|
title=std.color_string(f'ddrescue TUI: {prompt} Selection', 'GREEN'),
|
|
)
|
|
menu.disabled_str = 'Already selected'
|
|
menu.separator = ' '
|
|
menu.add_action('Quit')
|
|
for disk in disks:
|
|
disable_option = False
|
|
size = disk.details["size"]
|
|
|
|
# Check if option should be disabled
|
|
if skip_disk:
|
|
parent = skip_disk.details.get('parent', None)
|
|
if (disk.path.samefile(skip_disk.path)
|
|
or (parent and disk.path.samefile(parent))):
|
|
disable_option = True
|
|
|
|
# Add to menu
|
|
menu.add_option(
|
|
name=(
|
|
f'{str(disk.path):<12} '
|
|
f'{disk.details["bus"]:<5} '
|
|
f'{std.bytes_to_string(size, decimals=1, use_binary=False):<8} '
|
|
f'{disk.details["model"]} '
|
|
f'{disk.details["serial"]}'
|
|
),
|
|
details={'Disabled': disable_option, 'Object': disk},
|
|
)
|
|
|
|
# Get selection
|
|
selection = menu.simple_select()
|
|
if 'Quit' in selection:
|
|
raise std.GenericAbort()
|
|
|
|
# Done
|
|
return selection[-1]['Object']
|
|
|
|
|
|
def select_disk_parts(prompt, disk):
|
|
"""Select disk parts from list, returns list of Disk()."""
|
|
title = std.color_string('ddrescue TUI: Partition Selection', 'GREEN')
|
|
title += f'\n\nDisk: {disk.path} {disk.description}'
|
|
menu = std.Menu(title)
|
|
menu.separator = ' '
|
|
menu.add_action('All')
|
|
menu.add_action('None')
|
|
menu.add_action('Proceed', {'Separator': True})
|
|
menu.add_action('Quit')
|
|
object_list = []
|
|
|
|
def _select_parts(menu):
|
|
"""Loop over selection menu until at least one partition selected."""
|
|
while True:
|
|
selection = menu.advanced_select(
|
|
f'Please select the parts to {prompt.lower()}: ',
|
|
)
|
|
if 'All' in selection:
|
|
for option in menu.options.values():
|
|
option['Selected'] = True
|
|
elif 'None' in selection:
|
|
for option in menu.options.values():
|
|
option['Selected'] = False
|
|
elif 'Proceed' in selection:
|
|
if any(option['Selected'] for option in menu.options.values()):
|
|
# At least one partition/device selected/device selected
|
|
break
|
|
elif 'Quit' in selection:
|
|
raise std.GenericAbort()
|
|
|
|
# Bail early if running under macOS
|
|
if PLATFORM == 'Darwin':
|
|
return [disk]
|
|
|
|
# Bail early if child device selected
|
|
if disk.details.get('parent', False):
|
|
return [disk]
|
|
|
|
# Add parts
|
|
whole_disk_str = f'{str(disk.path):<14} (Whole device)'
|
|
for part in disk.details.get('children', []):
|
|
size = part["size"]
|
|
name = (
|
|
f'{str(part["path"]):<14} '
|
|
f'({std.bytes_to_string(size, decimals=1, use_binary=False):>6})'
|
|
)
|
|
menu.add_option(name, details={'Selected': True, 'Path': part['path']})
|
|
|
|
# Add whole disk if necessary
|
|
if not menu.options:
|
|
menu.add_option(whole_disk_str, {'Selected': True, 'Path': disk.path})
|
|
menu.title += '\n\n'
|
|
menu.title += std.color_string(' No partitions detected.', 'YELLOW')
|
|
|
|
# Get selection
|
|
_select_parts(menu)
|
|
|
|
# Build list of Disk() object_list
|
|
for option in menu.options.values():
|
|
if option['Selected']:
|
|
object_list.append(option['Path'])
|
|
|
|
# Check if whole disk selected
|
|
if len(object_list) == len(disk.details.get('children', [])):
|
|
# NOTE: This is not true if the disk has no partitions
|
|
msg = f'Preserve partition table and unused space in {prompt.lower()}?'
|
|
if std.ask(msg):
|
|
# Replace part list with whole disk obj
|
|
object_list = [disk.path]
|
|
|
|
# Convert object_list to hw_obj.Disk() objects
|
|
print(' ')
|
|
std.print_info('Getting disk/partition details...')
|
|
object_list = [hw_obj.Disk(path) for path in object_list]
|
|
|
|
# Done
|
|
return object_list
|
|
|
|
|
|
def select_path(prompt):
|
|
"""Select path, returns pathlib.Path."""
|
|
invalid = False
|
|
menu = std.Menu(
|
|
title=std.color_string(f'ddrescue TUI: {prompt} Path Selection', 'GREEN'),
|
|
)
|
|
menu.separator = ' '
|
|
menu.add_action('Quit')
|
|
menu.add_option('Current directory')
|
|
menu.add_option('Enter manually')
|
|
path = None
|
|
|
|
# Make selection
|
|
selection = menu.simple_select()
|
|
if 'Current directory' in selection:
|
|
path = os.getcwd()
|
|
elif 'Enter manually' in selection:
|
|
path = std.input_text('Please enter path: ')
|
|
elif 'Quit' in selection:
|
|
raise std.GenericAbort()
|
|
|
|
# Check
|
|
try:
|
|
path = pathlib.Path(path).resolve()
|
|
except TypeError:
|
|
invalid = True
|
|
if invalid or not path.is_dir():
|
|
std.print_error(f'Invalid path: {path}')
|
|
raise std.GenericAbort()
|
|
|
|
# Done
|
|
return path
|
|
|
|
|
|
def set_mode(docopt_args):
|
|
"""Set mode from docopt_args or user selection, returns str."""
|
|
mode = None
|
|
|
|
# Check docopt_args
|
|
if docopt_args['clone']:
|
|
mode = 'Clone'
|
|
elif docopt_args['image']:
|
|
mode = 'Image'
|
|
|
|
# Ask user if necessary
|
|
if not mode:
|
|
answer = std.choice(['C', 'I'], 'Are we cloning or imaging?')
|
|
if answer == 'C':
|
|
mode = 'Clone'
|
|
else:
|
|
mode = 'Image'
|
|
|
|
# Done
|
|
return mode
|
|
|
|
|
|
def unmount_loopback_device(path):
|
|
"""Unmount loopback device using OS specific methods."""
|
|
cmd = []
|
|
|
|
# Build OS specific cmd
|
|
if PLATFORM == 'Darwin':
|
|
cmd = ['hdiutil', 'detach', path]
|
|
elif PLATFORM == 'Linux':
|
|
cmd = ['sudo', 'losetup', '--detach', path]
|
|
|
|
# Unmount loopback device
|
|
exe.run_program(cmd, check=False)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
print("This file is not meant to be called directly.")
|