Merge remote-tracking branch 'upstream/dev' into dev

This commit is contained in:
2Shirt 2023-08-26 13:35:27 -07:00
commit f9da0c04ec
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C
8 changed files with 211 additions and 209 deletions

View file

@ -25,6 +25,7 @@ DDRESCUE_LOG_REGEX = re.compile(
re.IGNORECASE,
)
# Classes
class BlockPair():
"""Object for tracking source to dest recovery data."""

View file

@ -7,10 +7,9 @@ import logging
import os
import pathlib
import subprocess
import time
from random import randint
from typing import Any
from random import randint
import pytz
@ -245,44 +244,15 @@ def is_missing_source_or_destination(state) -> bool:
return missing
def source_or_destination_changed(state) -> bool:
"""Verify the source and destination objects are still valid."""
changed = False
# Compare objects
for obj in (state.source, state.destination):
if not obj:
changed = True
elif hasattr(obj, 'exists'):
# Assuming dest path
changed = changed or not obj.exists()
elif isinstance(obj, hw_disk.Disk):
compare_dev = hw_disk.Disk(obj.path)
for key in ('model', 'serial'):
changed = changed or getattr(obj, key) != getattr(compare_dev, key)
# Update top panes
state.update_top_panes()
# Done
if changed:
cli.print_error('Source and/or Destination changed')
return changed
def main() -> None:
"""Main function for ddrescue TUI."""
args = docopt(DOCSTRING)
# Log setup
log_dir = log.format_log_path()
log_dir = pathlib.Path(
f'{log_dir.parent}/'
f'ddrescue-TUI_{time.strftime("%Y-%m-%d_%H%M%S%z")}/'
)
log_path = log.format_log_path(log_name='main', sub_dir='ddrescue-TUI')
log.update_log_path(
dest_dir=log_dir,
dest_name='main',
dest_dir=log_path.parent,
dest_name=log_path.stem,
keep_history=False,
timestamp=False,
)
@ -294,18 +264,10 @@ def main() -> None:
raise RuntimeError('tmux session not found')
# Init
main_menu = menus.main()
state = State()
state = State(log_dir=log_path.parent)
if not args['--force-local-map']:
state.ost.select_ticket()
if state.ost.disabled:
main_menu.actions['Add tech note']['Disabled'] = True
main_menu.actions['Add tech note']['Hidden'] = True
# TODO: Remove this ugly call
main_menu.actions[menus.MENU_ACTIONS[2]]['Separator'] = True
else:
main_menu.actions['Add tech note']['Separator'] = True
state.update_top_panes()
state.update_top_panes()
try:
state.init_recovery(args)
except (FileNotFoundError, std.GenericAbort):
@ -313,6 +275,7 @@ def main() -> None:
cli.abort()
# Show menu
main_menu = menus.main(ost_disabled=state.ost.disabled)
settings_menu = menus.settings(state.mode)
while True:
selection = main_menu.advanced_select()
@ -648,6 +611,31 @@ def run_recovery(state: State, main_menu, settings_menu, dry_run=True) -> None:
state.update_progress_pane('Idle')
def source_or_destination_changed(state) -> bool:
"""Verify the source and destination objects are still valid."""
changed = False
# Compare objects
for obj in (state.source, state.destination):
if not obj:
changed = True
elif hasattr(obj, 'exists'):
# Assuming dest path
changed = changed or not obj.exists()
elif isinstance(obj, hw_disk.Disk):
compare_dev = hw_disk.Disk(obj.path)
for key in ('model', 'serial'):
changed = changed or getattr(obj, key) != getattr(compare_dev, key)
# Update top panes
state.update_top_panes()
# Done
if changed:
cli.print_error('Source and/or Destination changed')
return changed
def zero_fill_destination(state: State, dry_run: bool = True) -> None:
"""Zero-fill any gaps and space on destination beyond the source size."""
full_disk_clone = False

View file

@ -46,7 +46,7 @@ SETTING_PRESETS = (
# Functions
def main() -> cli.Menu:
def main(ost_disabled: bool) -> cli.Menu:
"""Main menu, returns wk.ui.cli.Menu."""
menu = cli.Menu(title=ansi.color_string('ddrescue TUI: Main Menu', 'GREEN'))
menu.separator = ' '
@ -58,6 +58,15 @@ def main() -> cli.Menu:
for toggle, selected in MENU_TOGGLES.items():
menu.add_toggle(toggle, {'Selected': selected})
# osTicket actions
if ost_disabled:
menu.actions['Add tech note']['Disabled'] = True
menu.actions['Add tech note']['Hidden'] = True
# TODO: Remove this ugly call
menu.actions[MENU_ACTIONS[2]]['Separator'] = True
else:
menu.actions['Add tech note']['Separator'] = True
# Done
return menu

View file

@ -9,14 +9,13 @@ import pathlib
import re
import shutil
import subprocess
import time
from typing import Any
import psutil
import pytz
from wk import cfg, debug, exe, io, log, net, osticket, std
from wk import cfg, debug, exe, io, net, osticket, std
from wk.clone import menus
from wk.clone.block_pair import (
BlockPair,
@ -78,13 +77,10 @@ TIMEZONE = pytz.timezone(cfg.main.LINUX_TIME_ZONE)
# Classes
class State():
"""Object for tracking hardware diagnostic data."""
def __init__(self):
def __init__(self, log_dir: pathlib.Path):
self.block_pairs: list[BlockPair] = []
self.destination: hw_disk.Disk | pathlib.Path = pathlib.Path('/dev/null')
self.log_dir: pathlib.Path = log.format_log_path()
self.log_dir = self.log_dir.parent.joinpath(
f'ddrescue-TUI_{time.strftime("%Y-%m-%d_%H%M%S%z")}/',
)
self.log_dir: pathlib.Path = log_dir
self.ost = osticket.osTicket()
self.progress_out: pathlib.Path = self.log_dir.joinpath('progress.out')
self.mode: str = '?'
@ -93,73 +89,73 @@ class State():
self.working_dir: pathlib.Path | None = None
self.ui: tui.TUI = tui.TUI('Source')
def _get_clone_settings_path(self) -> pathlib.Path:
"""get Clone settings file path, returns pathlib.Path obj."""
description = self.source.model
if not description:
description = self.source.path.name
return pathlib.Path(f'{self.working_dir}/Clone_{description}.json')
def _check_dest_size(self) -> None:
"""Run size safety check and abort if necessary."""
required_size = sum(pair.size for pair in self.block_pairs)
settings = self.load_settings() if self.mode == 'Clone' else {}
def load_settings(self, discard_unused_settings: bool = False) -> dict[Any, Any]:
"""Load settings from previous run, returns dict."""
settings = {}
settings_file = self._get_clone_settings_path()
# Try loading JSON data
if settings_file.exists():
with open(settings_file, 'r', encoding='utf-8') as _f:
try:
settings = json.loads(_f.read())
except (OSError, json.JSONDecodeError) as err:
LOG.error('Failed to load clone settings')
cli.print_error('Invalid clone settings detected.')
raise std.GenericAbort() from err
# Check settings
if settings:
if settings['First Run'] and discard_unused_settings:
# Previous run aborted before starting recovery, discard settings
settings = {}
# Increase required_size if necessary
if self.mode == 'Clone' and settings.get('Needs Format', False):
if settings['Table Type'] == 'GPT':
# Below is the size calculation for the GPT
# 1 LBA for the protective MBR
# 33 LBAs each for the primary and backup GPT tables
# Source: https://en.wikipedia.org/wiki/GUID_Partition_Table
required_size += (1 + 33 + 33) * self.destination.phy_sec
if settings['Create Boot Partition']:
# 260MiB EFI System Partition and a 16MiB MS Reserved partition
required_size += (260 + 16) * 1024**2
else:
bail = False
for key in ('model', 'serial'):
if settings['Source'][key] != getattr(self.source, key):
cli.print_error(f"Clone settings don't match source {key}")
bail = True
if settings['Destination'][key] != getattr(self.destination, key):
cli.print_error(f"Clone settings don't match destination {key}")
bail = True
if bail:
raise std.GenericAbort()
# MBR only requires one LBA but adding a full 4096 bytes anyway
required_size += 4096
if settings['Create Boot Partition']:
# 100MiB System Reserved partition
required_size += 100 * 1024**2
# Update settings
if not settings:
settings = CLONE_SETTINGS.copy()
if not settings['Source']:
settings['Source'] = {
'model': self.source.model,
'serial': self.source.serial,
}
if not settings['Destination']:
settings['Destination'] = {
'model': self.destination.model,
'serial': self.destination.serial,
}
# Reduce required_size if necessary
if self.mode == 'Image':
for pair in self.block_pairs:
if pair.destination.exists():
# NOTE: This uses the "max space" of the destination
# i.e. not the apparent size which is smaller for sparse files
# While this can result in an out-of-space error it's better
# than nothing.
required_size -= pair.destination.stat().st_size
# Check destination size
if self.mode == 'Clone':
destination_size = self.destination.size
error_msg = 'A larger destination disk is required'
else:
# NOTE: Adding an extra 5% here to better ensure it will fit
destination_size = psutil.disk_usage(self.destination).free
destination_size *= 1.05
error_msg = 'Not enough free space on the destination'
if required_size > destination_size:
cli.print_error(error_msg)
raise std.GenericAbort()
def _check_dest_smart(self) -> None:
"""Check SMART for destination."""
errors_detected = False
# Check for critical errors
if not smart_status_ok(self.destination):
cli.print_error(
f'Critical error(s) detected for: {self.destination.path}',
)
errors_detected = True
# Check for minor errors
if not check_attributes(self.destination, only_blocking=True):
cli.print_warning(
f'Attribute error(s) detected for: {self.destination.path}',
)
errors_detected = True
# Done
return settings
def save_settings(self, settings: dict[Any, Any]) -> None:
"""Save settings for future runs."""
settings_file = self._get_clone_settings_path()
# Try saving JSON data
try:
with open(settings_file, 'w', encoding='utf-8') as _f:
json.dump(settings, _f)
except OSError as err:
cli.print_error('Failed to save clone settings')
raise std.GenericAbort() from err
if errors_detected:
raise std.GenericAbort()
def add_block_pair(self, source: hw_disk.Disk, destination: pathlib.Path) -> None:
"""Add BlockPair object and run safety checks."""
@ -350,6 +346,13 @@ class State():
# Done
return report
def get_clone_settings_path(self) -> pathlib.Path:
"""get Clone settings file path, returns pathlib.Path obj."""
description = self.source.model
if not description:
description = self.source.path.name
return pathlib.Path(f'{self.working_dir}/Clone_{description}.json')
def get_error_size(self) -> int:
"""Get total error size from block_pairs in bytes, returns int."""
return self.get_total_size() - self.get_rescued_size()
@ -378,6 +381,7 @@ class State():
# Select source
self.source = select_disk_obj('source', disk_menu, docopt_args['<source>'])
self.update_top_panes()
if self.source.trim:
cli.print_warning('Source device supports TRIM')
if not cli.ask(' Proceed with recovery?'):
@ -398,6 +402,7 @@ class State():
else:
self.destination = menus.select_path('Destination')
self.ui.add_title_pane('Destination', self.destination)
self.update_top_panes()
# Update details
self.source.update_details(skip_children=False)
@ -455,9 +460,7 @@ class State():
update_smart_details(dev)
# Safety Checks #1
if self.mode == 'Clone':
self.safety_check_destination()
self.safety_check_size()
self.safety_check_destination()
# Confirmation #2
self.update_progress_pane('Idle')
@ -485,6 +488,55 @@ class State():
for pair in self.block_pairs:
pair.safety_check()
def load_settings(self, discard_unused_settings: bool = False) -> dict[Any, Any]:
"""Load settings from previous run, returns dict."""
settings = {}
settings_file = self.get_clone_settings_path()
# Try loading JSON data
if settings_file.exists():
with open(settings_file, 'r', encoding='utf-8') as _f:
try:
settings = json.loads(_f.read())
except (OSError, json.JSONDecodeError) as err:
LOG.error('Failed to load clone settings')
cli.print_error('Invalid clone settings detected.')
raise std.GenericAbort() from err
# Check settings
if settings:
if settings['First Run'] and discard_unused_settings:
# Previous run aborted before starting recovery, discard settings
settings = {}
else:
bail = False
for key in ('model', 'serial'):
if settings['Source'][key] != getattr(self.source, key):
cli.print_error(f"Clone settings don't match source {key}")
bail = True
if settings['Destination'][key] != getattr(self.destination, key):
cli.print_error(f"Clone settings don't match destination {key}")
bail = True
if bail:
raise std.GenericAbort()
# Update settings
if not settings:
settings = CLONE_SETTINGS.copy()
if not settings['Source']:
settings['Source'] = {
'model': self.source.model,
'serial': self.source.serial,
}
if not settings['Destination']:
settings['Destination'] = {
'model': self.destination.model,
'serial': self.destination.serial,
}
# Done
return settings
def mark_started(self) -> None:
"""Edit clone settings, if applicable, to mark recovery as started."""
# Skip if not cloning
@ -570,69 +622,9 @@ class State():
def safety_check_destination(self) -> None:
"""Run safety checks for destination and abort if necessary."""
errors_detected = False
# Check for critical errors
if not smart_status_ok(self.destination):
cli.print_error(
f'Critical error(s) detected for: {self.destination.path}',
)
# Check for minor errors
if not check_attributes(self.destination, only_blocking=True):
cli.print_warning(
f'Attribute error(s) detected for: {self.destination.path}',
)
# Done
if errors_detected:
raise std.GenericAbort()
def safety_check_size(self) -> None:
"""Run size safety check and abort if necessary."""
required_size = sum(pair.size for pair in self.block_pairs)
settings = self.load_settings() if self.mode == 'Clone' else {}
# Increase required_size if necessary
if self.mode == 'Clone' and settings.get('Needs Format', False):
if settings['Table Type'] == 'GPT':
# Below is the size calculation for the GPT
# 1 LBA for the protective MBR
# 33 LBAs each for the primary and backup GPT tables
# Source: https://en.wikipedia.org/wiki/GUID_Partition_Table
required_size += (1 + 33 + 33) * self.destination.phy_sec
if settings['Create Boot Partition']:
# 260MiB EFI System Partition and a 16MiB MS Reserved partition
required_size += (260 + 16) * 1024**2
else:
# MBR only requires one LBA but adding a full 4096 bytes anyway
required_size += 4096
if settings['Create Boot Partition']:
# 100MiB System Reserved partition
required_size += 100 * 1024**2
# Reduce required_size if necessary
if self.mode == 'Image':
for pair in self.block_pairs:
if pair.destination.exists():
# NOTE: This uses the "max space" of the destination
# i.e. not the apparent size which is smaller for sparse files
# While this can result in an out-of-space error it's better
# than nothing.
required_size -= pair.destination.stat().st_size
# Check destination size
if self.mode == 'Clone':
destination_size = self.destination.size
error_msg = 'A larger destination disk is required'
else:
# NOTE: Adding an extra 5% here to better ensure it will fit
destination_size = psutil.disk_usage(self.destination).free
destination_size *= 1.05
error_msg = 'Not enough free space on the destination'
if required_size > destination_size:
cli.print_error(error_msg)
raise std.GenericAbort()
self._check_dest_smart()
self._check_dest_size()
def save_debug_reports(self) -> None:
"""Save debug reports to disk."""
@ -656,6 +648,18 @@ class State():
_f.write('\n'.join(debug.generate_object_report(_bp)))
_f.write('\n')
def save_settings(self, settings: dict[Any, Any]) -> None:
"""Save settings for future runs."""
settings_file = self.get_clone_settings_path()
# Try saving JSON data
try:
with open(settings_file, 'w', encoding='utf-8') as _f:
json.dump(settings, _f)
except OSError as err:
cli.print_error('Failed to save clone settings')
raise std.GenericAbort() from err
def skip_pass(self, pass_name: str) -> None:
"""Mark block_pairs as skipped if applicable."""
for pair in self.block_pairs:
@ -1185,24 +1189,15 @@ def select_disk_obj(label:str, disk_menu: cli.Menu, disk_path: str) -> hw_disk.D
def set_mode(docopt_args) -> str:
"""Set mode from docopt_args or user selection, returns str."""
mode = '?'
# Check docopt_args
if docopt_args['clone']:
mode = 'Clone'
elif docopt_args['image']:
mode = 'Image'
return 'Clone'
if docopt_args['image']:
return 'Image'
# Ask user if necessary
if not mode:
answer = cli.choice('Are we cloning or imaging?', ['C', 'I'])
if answer == 'C':
mode = 'Clone'
else:
mode = 'Image'
# Done
return mode
answer = cli.choice('Are we cloning or imaging?', ['C', 'I'])
return 'Clone' if answer == 'C' else 'Image'
if __name__ == '__main__':

View file

@ -7,7 +7,6 @@ import os
import pathlib
import platform
import subprocess
import time
from docopt import docopt
@ -155,17 +154,17 @@ class State():
self.ost.disabled = not menu.toggles['osTicket Integration']['Selected']
# Set log
self.log_dir = log.format_log_path()
self.log_dir = pathlib.Path(
f'{self.log_dir.parent}/'
f'Hardware-Diagnostics_{time.strftime("%Y-%m-%d_%H%M%S%z")}/'
self.log_dir = log.format_log_path(
log_name='main',
sub_dir='Hardware-Diagnostics',
)
log.update_log_path(
dest_dir=self.log_dir,
dest_name='main',
dest_dir=self.log_dir.parent,
dest_name=self.log_dir.stem,
keep_history=False,
timestamp=False,
)
self.log_dir = self.log_dir.parent
cli.clear_screen()
cli.print_info('Initializing...')
@ -276,7 +275,7 @@ class State():
proc = exe.run_program(['smc', '-l'])
data.extend(proc.stdout.splitlines())
except Exception:
LOG.ERROR('Error(s) encountered while exporting SMC data')
LOG.error('Error(s) encountered while exporting SMC data')
data = [line.strip() for line in data]
with open(f'{debug_dir}/smc.data', 'a', encoding='utf-8') as _f:
_f.write('\n'.join(data))

View file

@ -39,16 +39,21 @@ def enable_debug_mode() -> None:
def format_log_path(
log_dir: None | pathlib.Path | str = None,
log_name: None | str = None,
log_dir: pathlib.Path | str | None = None,
log_name: str | None = None,
append: bool = False,
kit: bool = False,
sub_dir: str | None = None,
timestamp: bool = False,
kit: bool = False, tool: bool = False, append: bool = False,
tool: bool = False,
) -> pathlib.Path:
"""Format path based on args passed, returns pathlib.Path obj."""
log_path = pathlib.Path(
f'{log_dir if log_dir else DEFAULT_LOG_DIR}/'
f'{cfg.main.KIT_NAME_FULL+"/" if kit else ""}'
f'{"Tools/" if tool else ""}'
f'{sub_dir+"_" if sub_dir else ""}'
f'{time.strftime("%Y-%m-%d_%H%M%S%z") if sub_dir else ""}/'
f'{log_name if log_name else DEFAULT_LOG_NAME}'
f'{"_" if timestamp else ""}'
f'{time.strftime("%Y-%m-%d_%H%M%S%z") if timestamp else ""}'
@ -127,9 +132,11 @@ def start(config: dict[str, str] | None = None) -> None:
def update_log_path(
dest_dir: None | pathlib.Path | str = None,
dest_name: None | str = None,
keep_history: bool = True, timestamp: bool = True, append: bool = False,
dest_dir: None | pathlib.Path | str = None,
dest_name: None | str = None,
append: bool = False,
keep_history: bool = True,
timestamp: bool = True,
) -> None:
"""Moves current log file to new path and updates the root logger."""
root_logger = logging.getLogger()

View file

@ -80,6 +80,10 @@ function copy_live_env() {
mkdir -p "$PROFILE_DIR/airootfs/usr/local/bin"
rsync -aI "$ROOT_DIR/scripts/" "$PROFILE_DIR/airootfs/usr/local/bin/"
# Pre-compile Python scripts
unset PYTHONPYCACHEPREFIX
python -m compileall "$PROFILE_DIR/airootfs/usr/local/bin/"
# Update profiledef.sh to set proper permissions for executable files
for _file in $(find "$PROFILE_DIR/airootfs" -executable -type f | sed "s%$PROFILE_DIR/airootfs%%" | sort); do
sed -i "\$i\ [\"$_file\"]=\"0:0:755\"" "$PROFILE_DIR/profiledef.sh"

View file

@ -12,7 +12,6 @@ alias fix-perms='find -type d -exec chmod 755 "{}" \; && find -type f -exec chmo
alias hexedit='hexedit --color'
alias hw-info='sudo hw-info | less -S'
alias ip='ip -br -c'
alias journalctl-datarec="echo -e 'Monitoring journal output...\n' && journalctl -kf | grep -Ei 'ata|nvme|scsi|sd[a..z]+|usb|comreset|critical|error'"
alias less='less -S'
alias ls='ls --color=auto'
alias mkdir='mkdir -p'