diff --git a/scripts/wk/clone/block_pair.py b/scripts/wk/clone/block_pair.py index bfe4f8a6..0c98e33d 100644 --- a/scripts/wk/clone/block_pair.py +++ b/scripts/wk/clone/block_pair.py @@ -25,6 +25,7 @@ DDRESCUE_LOG_REGEX = re.compile( re.IGNORECASE, ) + # Classes class BlockPair(): """Object for tracking source to dest recovery data.""" diff --git a/scripts/wk/clone/ddrescue.py b/scripts/wk/clone/ddrescue.py index e8a6f1fb..0ebb2fcc 100644 --- a/scripts/wk/clone/ddrescue.py +++ b/scripts/wk/clone/ddrescue.py @@ -7,10 +7,9 @@ import logging import os import pathlib import subprocess -import time -from random import randint from typing import Any +from random import randint import pytz @@ -245,44 +244,15 @@ def is_missing_source_or_destination(state) -> bool: return missing -def source_or_destination_changed(state) -> bool: - """Verify the source and destination objects are still valid.""" - changed = False - - # Compare objects - for obj in (state.source, state.destination): - if not obj: - changed = True - elif hasattr(obj, 'exists'): - # Assuming dest path - changed = changed or not obj.exists() - elif isinstance(obj, hw_disk.Disk): - compare_dev = hw_disk.Disk(obj.path) - for key in ('model', 'serial'): - changed = changed or getattr(obj, key) != getattr(compare_dev, key) - - # Update top panes - state.update_top_panes() - - # Done - if changed: - cli.print_error('Source and/or Destination changed') - return changed - - def main() -> None: """Main function for ddrescue TUI.""" args = docopt(DOCSTRING) # Log setup - log_dir = log.format_log_path() - log_dir = pathlib.Path( - f'{log_dir.parent}/' - f'ddrescue-TUI_{time.strftime("%Y-%m-%d_%H%M%S%z")}/' - ) + log_path = log.format_log_path(log_name='main', sub_dir='ddrescue-TUI') log.update_log_path( - dest_dir=log_dir, - dest_name='main', + dest_dir=log_path.parent, + dest_name=log_path.stem, keep_history=False, timestamp=False, ) @@ -294,18 +264,10 @@ def main() -> None: raise RuntimeError('tmux session not found') # Init - main_menu = menus.main() - state = State() + state = State(log_dir=log_path.parent) if not args['--force-local-map']: state.ost.select_ticket() - if state.ost.disabled: - main_menu.actions['Add tech note']['Disabled'] = True - main_menu.actions['Add tech note']['Hidden'] = True - # TODO: Remove this ugly call - main_menu.actions[menus.MENU_ACTIONS[2]]['Separator'] = True - else: - main_menu.actions['Add tech note']['Separator'] = True - state.update_top_panes() + state.update_top_panes() try: state.init_recovery(args) except (FileNotFoundError, std.GenericAbort): @@ -313,6 +275,7 @@ def main() -> None: cli.abort() # Show menu + main_menu = menus.main(ost_disabled=state.ost.disabled) settings_menu = menus.settings(state.mode) while True: selection = main_menu.advanced_select() @@ -648,6 +611,31 @@ def run_recovery(state: State, main_menu, settings_menu, dry_run=True) -> None: state.update_progress_pane('Idle') +def source_or_destination_changed(state) -> bool: + """Verify the source and destination objects are still valid.""" + changed = False + + # Compare objects + for obj in (state.source, state.destination): + if not obj: + changed = True + elif hasattr(obj, 'exists'): + # Assuming dest path + changed = changed or not obj.exists() + elif isinstance(obj, hw_disk.Disk): + compare_dev = hw_disk.Disk(obj.path) + for key in ('model', 'serial'): + changed = changed or getattr(obj, key) != getattr(compare_dev, key) + + # Update top panes + state.update_top_panes() + + # Done + if changed: + cli.print_error('Source and/or Destination changed') + return changed + + def zero_fill_destination(state: State, dry_run: bool = True) -> None: """Zero-fill any gaps and space on destination beyond the source size.""" full_disk_clone = False diff --git a/scripts/wk/clone/menus.py b/scripts/wk/clone/menus.py index 9cea1947..0a1797f9 100644 --- a/scripts/wk/clone/menus.py +++ b/scripts/wk/clone/menus.py @@ -46,7 +46,7 @@ SETTING_PRESETS = ( # Functions -def main() -> cli.Menu: +def main(ost_disabled: bool) -> cli.Menu: """Main menu, returns wk.ui.cli.Menu.""" menu = cli.Menu(title=ansi.color_string('ddrescue TUI: Main Menu', 'GREEN')) menu.separator = ' ' @@ -58,6 +58,15 @@ def main() -> cli.Menu: for toggle, selected in MENU_TOGGLES.items(): menu.add_toggle(toggle, {'Selected': selected}) + # osTicket actions + if ost_disabled: + menu.actions['Add tech note']['Disabled'] = True + menu.actions['Add tech note']['Hidden'] = True + # TODO: Remove this ugly call + menu.actions[MENU_ACTIONS[2]]['Separator'] = True + else: + menu.actions['Add tech note']['Separator'] = True + # Done return menu diff --git a/scripts/wk/clone/state.py b/scripts/wk/clone/state.py index 161c1680..152a5350 100644 --- a/scripts/wk/clone/state.py +++ b/scripts/wk/clone/state.py @@ -9,14 +9,13 @@ import pathlib import re import shutil import subprocess -import time from typing import Any import psutil import pytz -from wk import cfg, debug, exe, io, log, net, osticket, std +from wk import cfg, debug, exe, io, net, osticket, std from wk.clone import menus from wk.clone.block_pair import ( BlockPair, @@ -78,13 +77,10 @@ TIMEZONE = pytz.timezone(cfg.main.LINUX_TIME_ZONE) # Classes class State(): """Object for tracking hardware diagnostic data.""" - def __init__(self): + def __init__(self, log_dir: pathlib.Path): self.block_pairs: list[BlockPair] = [] self.destination: hw_disk.Disk | pathlib.Path = pathlib.Path('/dev/null') - self.log_dir: pathlib.Path = log.format_log_path() - self.log_dir = self.log_dir.parent.joinpath( - f'ddrescue-TUI_{time.strftime("%Y-%m-%d_%H%M%S%z")}/', - ) + self.log_dir: pathlib.Path = log_dir self.ost = osticket.osTicket() self.progress_out: pathlib.Path = self.log_dir.joinpath('progress.out') self.mode: str = '?' @@ -93,73 +89,73 @@ class State(): self.working_dir: pathlib.Path | None = None self.ui: tui.TUI = tui.TUI('Source') - def _get_clone_settings_path(self) -> pathlib.Path: - """get Clone settings file path, returns pathlib.Path obj.""" - description = self.source.model - if not description: - description = self.source.path.name - return pathlib.Path(f'{self.working_dir}/Clone_{description}.json') + def _check_dest_size(self) -> None: + """Run size safety check and abort if necessary.""" + required_size = sum(pair.size for pair in self.block_pairs) + settings = self.load_settings() if self.mode == 'Clone' else {} - def load_settings(self, discard_unused_settings: bool = False) -> dict[Any, Any]: - """Load settings from previous run, returns dict.""" - settings = {} - settings_file = self._get_clone_settings_path() - - # Try loading JSON data - if settings_file.exists(): - with open(settings_file, 'r', encoding='utf-8') as _f: - try: - settings = json.loads(_f.read()) - except (OSError, json.JSONDecodeError) as err: - LOG.error('Failed to load clone settings') - cli.print_error('Invalid clone settings detected.') - raise std.GenericAbort() from err - - # Check settings - if settings: - if settings['First Run'] and discard_unused_settings: - # Previous run aborted before starting recovery, discard settings - settings = {} + # Increase required_size if necessary + if self.mode == 'Clone' and settings.get('Needs Format', False): + if settings['Table Type'] == 'GPT': + # Below is the size calculation for the GPT + # 1 LBA for the protective MBR + # 33 LBAs each for the primary and backup GPT tables + # Source: https://en.wikipedia.org/wiki/GUID_Partition_Table + required_size += (1 + 33 + 33) * self.destination.phy_sec + if settings['Create Boot Partition']: + # 260MiB EFI System Partition and a 16MiB MS Reserved partition + required_size += (260 + 16) * 1024**2 else: - bail = False - for key in ('model', 'serial'): - if settings['Source'][key] != getattr(self.source, key): - cli.print_error(f"Clone settings don't match source {key}") - bail = True - if settings['Destination'][key] != getattr(self.destination, key): - cli.print_error(f"Clone settings don't match destination {key}") - bail = True - if bail: - raise std.GenericAbort() + # MBR only requires one LBA but adding a full 4096 bytes anyway + required_size += 4096 + if settings['Create Boot Partition']: + # 100MiB System Reserved partition + required_size += 100 * 1024**2 - # Update settings - if not settings: - settings = CLONE_SETTINGS.copy() - if not settings['Source']: - settings['Source'] = { - 'model': self.source.model, - 'serial': self.source.serial, - } - if not settings['Destination']: - settings['Destination'] = { - 'model': self.destination.model, - 'serial': self.destination.serial, - } + # Reduce required_size if necessary + if self.mode == 'Image': + for pair in self.block_pairs: + if pair.destination.exists(): + # NOTE: This uses the "max space" of the destination + # i.e. not the apparent size which is smaller for sparse files + # While this can result in an out-of-space error it's better + # than nothing. + required_size -= pair.destination.stat().st_size + + # Check destination size + if self.mode == 'Clone': + destination_size = self.destination.size + error_msg = 'A larger destination disk is required' + else: + # NOTE: Adding an extra 5% here to better ensure it will fit + destination_size = psutil.disk_usage(self.destination).free + destination_size *= 1.05 + error_msg = 'Not enough free space on the destination' + if required_size > destination_size: + cli.print_error(error_msg) + raise std.GenericAbort() + + def _check_dest_smart(self) -> None: + """Check SMART for destination.""" + errors_detected = False + + # Check for critical errors + if not smart_status_ok(self.destination): + cli.print_error( + f'Critical error(s) detected for: {self.destination.path}', + ) + errors_detected = True + + # Check for minor errors + if not check_attributes(self.destination, only_blocking=True): + cli.print_warning( + f'Attribute error(s) detected for: {self.destination.path}', + ) + errors_detected = True # Done - return settings - - def save_settings(self, settings: dict[Any, Any]) -> None: - """Save settings for future runs.""" - settings_file = self._get_clone_settings_path() - - # Try saving JSON data - try: - with open(settings_file, 'w', encoding='utf-8') as _f: - json.dump(settings, _f) - except OSError as err: - cli.print_error('Failed to save clone settings') - raise std.GenericAbort() from err + if errors_detected: + raise std.GenericAbort() def add_block_pair(self, source: hw_disk.Disk, destination: pathlib.Path) -> None: """Add BlockPair object and run safety checks.""" @@ -350,6 +346,13 @@ class State(): # Done return report + def get_clone_settings_path(self) -> pathlib.Path: + """get Clone settings file path, returns pathlib.Path obj.""" + description = self.source.model + if not description: + description = self.source.path.name + return pathlib.Path(f'{self.working_dir}/Clone_{description}.json') + def get_error_size(self) -> int: """Get total error size from block_pairs in bytes, returns int.""" return self.get_total_size() - self.get_rescued_size() @@ -378,6 +381,7 @@ class State(): # Select source self.source = select_disk_obj('source', disk_menu, docopt_args['']) + self.update_top_panes() if self.source.trim: cli.print_warning('Source device supports TRIM') if not cli.ask(' Proceed with recovery?'): @@ -398,6 +402,7 @@ class State(): else: self.destination = menus.select_path('Destination') self.ui.add_title_pane('Destination', self.destination) + self.update_top_panes() # Update details self.source.update_details(skip_children=False) @@ -455,9 +460,7 @@ class State(): update_smart_details(dev) # Safety Checks #1 - if self.mode == 'Clone': - self.safety_check_destination() - self.safety_check_size() + self.safety_check_destination() # Confirmation #2 self.update_progress_pane('Idle') @@ -485,6 +488,55 @@ class State(): for pair in self.block_pairs: pair.safety_check() + def load_settings(self, discard_unused_settings: bool = False) -> dict[Any, Any]: + """Load settings from previous run, returns dict.""" + settings = {} + settings_file = self.get_clone_settings_path() + + # Try loading JSON data + if settings_file.exists(): + with open(settings_file, 'r', encoding='utf-8') as _f: + try: + settings = json.loads(_f.read()) + except (OSError, json.JSONDecodeError) as err: + LOG.error('Failed to load clone settings') + cli.print_error('Invalid clone settings detected.') + raise std.GenericAbort() from err + + # Check settings + if settings: + if settings['First Run'] and discard_unused_settings: + # Previous run aborted before starting recovery, discard settings + settings = {} + else: + bail = False + for key in ('model', 'serial'): + if settings['Source'][key] != getattr(self.source, key): + cli.print_error(f"Clone settings don't match source {key}") + bail = True + if settings['Destination'][key] != getattr(self.destination, key): + cli.print_error(f"Clone settings don't match destination {key}") + bail = True + if bail: + raise std.GenericAbort() + + # Update settings + if not settings: + settings = CLONE_SETTINGS.copy() + if not settings['Source']: + settings['Source'] = { + 'model': self.source.model, + 'serial': self.source.serial, + } + if not settings['Destination']: + settings['Destination'] = { + 'model': self.destination.model, + 'serial': self.destination.serial, + } + + # Done + return settings + def mark_started(self) -> None: """Edit clone settings, if applicable, to mark recovery as started.""" # Skip if not cloning @@ -570,69 +622,9 @@ class State(): def safety_check_destination(self) -> None: """Run safety checks for destination and abort if necessary.""" - errors_detected = False - - # Check for critical errors - if not smart_status_ok(self.destination): - cli.print_error( - f'Critical error(s) detected for: {self.destination.path}', - ) - - # Check for minor errors - if not check_attributes(self.destination, only_blocking=True): - cli.print_warning( - f'Attribute error(s) detected for: {self.destination.path}', - ) - - # Done - if errors_detected: - raise std.GenericAbort() - - def safety_check_size(self) -> None: - """Run size safety check and abort if necessary.""" - required_size = sum(pair.size for pair in self.block_pairs) - settings = self.load_settings() if self.mode == 'Clone' else {} - - # Increase required_size if necessary - if self.mode == 'Clone' and settings.get('Needs Format', False): - if settings['Table Type'] == 'GPT': - # Below is the size calculation for the GPT - # 1 LBA for the protective MBR - # 33 LBAs each for the primary and backup GPT tables - # Source: https://en.wikipedia.org/wiki/GUID_Partition_Table - required_size += (1 + 33 + 33) * self.destination.phy_sec - if settings['Create Boot Partition']: - # 260MiB EFI System Partition and a 16MiB MS Reserved partition - required_size += (260 + 16) * 1024**2 - else: - # MBR only requires one LBA but adding a full 4096 bytes anyway - required_size += 4096 - if settings['Create Boot Partition']: - # 100MiB System Reserved partition - required_size += 100 * 1024**2 - - # Reduce required_size if necessary - if self.mode == 'Image': - for pair in self.block_pairs: - if pair.destination.exists(): - # NOTE: This uses the "max space" of the destination - # i.e. not the apparent size which is smaller for sparse files - # While this can result in an out-of-space error it's better - # than nothing. - required_size -= pair.destination.stat().st_size - - # Check destination size if self.mode == 'Clone': - destination_size = self.destination.size - error_msg = 'A larger destination disk is required' - else: - # NOTE: Adding an extra 5% here to better ensure it will fit - destination_size = psutil.disk_usage(self.destination).free - destination_size *= 1.05 - error_msg = 'Not enough free space on the destination' - if required_size > destination_size: - cli.print_error(error_msg) - raise std.GenericAbort() + self._check_dest_smart() + self._check_dest_size() def save_debug_reports(self) -> None: """Save debug reports to disk.""" @@ -656,6 +648,18 @@ class State(): _f.write('\n'.join(debug.generate_object_report(_bp))) _f.write('\n') + def save_settings(self, settings: dict[Any, Any]) -> None: + """Save settings for future runs.""" + settings_file = self.get_clone_settings_path() + + # Try saving JSON data + try: + with open(settings_file, 'w', encoding='utf-8') as _f: + json.dump(settings, _f) + except OSError as err: + cli.print_error('Failed to save clone settings') + raise std.GenericAbort() from err + def skip_pass(self, pass_name: str) -> None: """Mark block_pairs as skipped if applicable.""" for pair in self.block_pairs: @@ -1185,24 +1189,15 @@ def select_disk_obj(label:str, disk_menu: cli.Menu, disk_path: str) -> hw_disk.D def set_mode(docopt_args) -> str: """Set mode from docopt_args or user selection, returns str.""" - mode = '?' - - # Check docopt_args if docopt_args['clone']: - mode = 'Clone' - elif docopt_args['image']: - mode = 'Image' + return 'Clone' + + if docopt_args['image']: + return 'Image' # Ask user if necessary - if not mode: - answer = cli.choice('Are we cloning or imaging?', ['C', 'I']) - if answer == 'C': - mode = 'Clone' - else: - mode = 'Image' - - # Done - return mode + answer = cli.choice('Are we cloning or imaging?', ['C', 'I']) + return 'Clone' if answer == 'C' else 'Image' if __name__ == '__main__': diff --git a/scripts/wk/hw/diags.py b/scripts/wk/hw/diags.py index d5d9403a..d01ae06f 100644 --- a/scripts/wk/hw/diags.py +++ b/scripts/wk/hw/diags.py @@ -7,7 +7,6 @@ import os import pathlib import platform import subprocess -import time from docopt import docopt @@ -155,17 +154,17 @@ class State(): self.ost.disabled = not menu.toggles['osTicket Integration']['Selected'] # Set log - self.log_dir = log.format_log_path() - self.log_dir = pathlib.Path( - f'{self.log_dir.parent}/' - f'Hardware-Diagnostics_{time.strftime("%Y-%m-%d_%H%M%S%z")}/' + self.log_dir = log.format_log_path( + log_name='main', + sub_dir='Hardware-Diagnostics', ) log.update_log_path( - dest_dir=self.log_dir, - dest_name='main', + dest_dir=self.log_dir.parent, + dest_name=self.log_dir.stem, keep_history=False, timestamp=False, ) + self.log_dir = self.log_dir.parent cli.clear_screen() cli.print_info('Initializing...') @@ -276,7 +275,7 @@ class State(): proc = exe.run_program(['smc', '-l']) data.extend(proc.stdout.splitlines()) except Exception: - LOG.ERROR('Error(s) encountered while exporting SMC data') + LOG.error('Error(s) encountered while exporting SMC data') data = [line.strip() for line in data] with open(f'{debug_dir}/smc.data', 'a', encoding='utf-8') as _f: _f.write('\n'.join(data)) diff --git a/scripts/wk/log.py b/scripts/wk/log.py index 768b9f13..31733fa2 100644 --- a/scripts/wk/log.py +++ b/scripts/wk/log.py @@ -39,16 +39,21 @@ def enable_debug_mode() -> None: def format_log_path( - log_dir: None | pathlib.Path | str = None, - log_name: None | str = None, + log_dir: pathlib.Path | str | None = None, + log_name: str | None = None, + append: bool = False, + kit: bool = False, + sub_dir: str | None = None, timestamp: bool = False, - kit: bool = False, tool: bool = False, append: bool = False, + tool: bool = False, ) -> pathlib.Path: """Format path based on args passed, returns pathlib.Path obj.""" log_path = pathlib.Path( f'{log_dir if log_dir else DEFAULT_LOG_DIR}/' f'{cfg.main.KIT_NAME_FULL+"/" if kit else ""}' f'{"Tools/" if tool else ""}' + f'{sub_dir+"_" if sub_dir else ""}' + f'{time.strftime("%Y-%m-%d_%H%M%S%z") if sub_dir else ""}/' f'{log_name if log_name else DEFAULT_LOG_NAME}' f'{"_" if timestamp else ""}' f'{time.strftime("%Y-%m-%d_%H%M%S%z") if timestamp else ""}' @@ -127,9 +132,11 @@ def start(config: dict[str, str] | None = None) -> None: def update_log_path( - dest_dir: None | pathlib.Path | str = None, - dest_name: None | str = None, - keep_history: bool = True, timestamp: bool = True, append: bool = False, + dest_dir: None | pathlib.Path | str = None, + dest_name: None | str = None, + append: bool = False, + keep_history: bool = True, + timestamp: bool = True, ) -> None: """Moves current log file to new path and updates the root logger.""" root_logger = logging.getLogger() diff --git a/setup/build_linux b/setup/build_linux index 5870dffd..26def869 100755 --- a/setup/build_linux +++ b/setup/build_linux @@ -80,6 +80,10 @@ function copy_live_env() { mkdir -p "$PROFILE_DIR/airootfs/usr/local/bin" rsync -aI "$ROOT_DIR/scripts/" "$PROFILE_DIR/airootfs/usr/local/bin/" + # Pre-compile Python scripts + unset PYTHONPYCACHEPREFIX + python -m compileall "$PROFILE_DIR/airootfs/usr/local/bin/" + # Update profiledef.sh to set proper permissions for executable files for _file in $(find "$PROFILE_DIR/airootfs" -executable -type f | sed "s%$PROFILE_DIR/airootfs%%" | sort); do sed -i "\$i\ [\"$_file\"]=\"0:0:755\"" "$PROFILE_DIR/profiledef.sh" diff --git a/setup/linux/profile/airootfs/etc/skel/.aliases b/setup/linux/profile/airootfs/etc/skel/.aliases index d3c75407..0c796bdd 100644 --- a/setup/linux/profile/airootfs/etc/skel/.aliases +++ b/setup/linux/profile/airootfs/etc/skel/.aliases @@ -12,7 +12,6 @@ alias fix-perms='find -type d -exec chmod 755 "{}" \; && find -type f -exec chmo alias hexedit='hexedit --color' alias hw-info='sudo hw-info | less -S' alias ip='ip -br -c' -alias journalctl-datarec="echo -e 'Monitoring journal output...\n' && journalctl -kf | grep -Ei 'ata|nvme|scsi|sd[a..z]+|usb|comreset|critical|error'" alias less='less -S' alias ls='ls --color=auto' alias mkdir='mkdir -p'