From 89fd64779263d6175ef69cb953ebdd17ab9c5b07 Mon Sep 17 00:00:00 2001 From: 2Shirt <2xShirt@gmail.com> Date: Sat, 1 Apr 2023 22:14:03 -0700 Subject: [PATCH] Split wk.std into debug, std, and ui sections --- scripts/auto_repairs.py | 6 +- scripts/auto_setup.py | 4 +- scripts/build-ufd | 2 +- scripts/build_kit_windows.py | 4 +- scripts/ddrescue-tui.py | 4 +- scripts/embedded_python_env.py | 2 +- scripts/hw-diags.py | 4 +- scripts/hw-sensors | 6 +- scripts/launch_sdio.py | 14 +- scripts/mount-all-volumes | 18 +- scripts/mount-backup-shares | 6 +- scripts/msword-search | 14 +- scripts/pyproject.toml | 1 + scripts/unmount-backup-shares | 6 +- scripts/upload-logs | 14 +- scripts/wk/__init__.py | 10 +- scripts/wk/clone/ddrescue.py | 223 ++++---- scripts/wk/debug.py | 121 ++++ scripts/wk/graph.py | 2 +- scripts/wk/hw/benchmark.py | 4 +- scripts/wk/hw/cpu.py | 6 +- scripts/wk/hw/diags.py | 100 ++-- scripts/wk/hw/disk.py | 3 +- scripts/wk/hw/keyboard.py | 3 +- scripts/wk/hw/network.py | 3 +- scripts/wk/hw/sensors.py | 3 +- scripts/wk/hw/smart.py | 3 +- scripts/wk/hw/surface_scan.py | 8 +- scripts/wk/hw/system.py | 8 +- scripts/wk/kit/build_win.py | 5 +- scripts/wk/kit/ufd.py | 90 +-- scripts/wk/log.py | 18 + scripts/wk/net.py | 3 +- scripts/wk/os/linux.py | 3 +- scripts/wk/os/win.py | 3 +- scripts/wk/repairs/win.py | 5 +- scripts/wk/setup/win.py | 5 +- scripts/wk/std.py | 976 --------------------------------- scripts/wk/ui/__init__.py | 3 + scripts/wk/ui/cli.py | 862 +++++++++++++++++++++++++++++ scripts/wk_debug.py | 8 +- 41 files changed, 1312 insertions(+), 1271 deletions(-) create mode 100644 scripts/wk/ui/__init__.py create mode 100644 scripts/wk/ui/cli.py diff --git a/scripts/auto_repairs.py b/scripts/auto_repairs.py index c01f1735..0615f115 100644 --- a/scripts/auto_repairs.py +++ b/scripts/auto_repairs.py @@ -5,7 +5,7 @@ import wk # Classes -REBOOT_STR = wk.std.color_string('Reboot', 'YELLOW') +REBOOT_STR = wk.ui.cli.color_string('Reboot', 'YELLOW') class MenuEntry(): """Simple class to allow cleaner code below.""" def __init__(self, name, function=None, selected=True, **kwargs): @@ -87,8 +87,8 @@ if __name__ == '__main__': try: wk.repairs.win.run_auto_repairs(BASE_MENUS, PRESETS) except KeyboardInterrupt: - wk.std.abort() + wk.ui.cli.abort() except SystemExit: raise except: # noqa: E722 - wk.std.major_exception() + wk.ui.cli.major_exception() diff --git a/scripts/auto_setup.py b/scripts/auto_setup.py index 071ab8a3..f76ee648 100644 --- a/scripts/auto_setup.py +++ b/scripts/auto_setup.py @@ -154,8 +154,8 @@ if __name__ == '__main__': try: wk.setup.win.run_auto_setup(BASE_MENUS, PRESETS) except KeyboardInterrupt: - wk.std.abort() + wk.ui.cli.abort() except SystemExit: raise except: # noqa: E722 - wk.std.major_exception() + wk.ui.cli.major_exception() diff --git a/scripts/build-ufd b/scripts/build-ufd index 8bb04f63..3dacb7f3 100755 --- a/scripts/build-ufd +++ b/scripts/build-ufd @@ -11,4 +11,4 @@ if __name__ == '__main__': except SystemExit: raise except: # noqa: E722 - wk.std.major_exception() + wk.ui.cli.major_exception() diff --git a/scripts/build_kit_windows.py b/scripts/build_kit_windows.py index e739014b..a9323fc1 100644 --- a/scripts/build_kit_windows.py +++ b/scripts/build_kit_windows.py @@ -8,8 +8,8 @@ if __name__ == '__main__': try: wk.kit.build.build_kit() except KeyboardInterrupt: - wk.std.abort() + wk.ui.cli.abort() except SystemExit: raise except: # noqa: E722 - wk.std.major_exception() + wk.ui.cli.major_exception() diff --git a/scripts/ddrescue-tui.py b/scripts/ddrescue-tui.py index 183595a3..8e273dda 100755 --- a/scripts/ddrescue-tui.py +++ b/scripts/ddrescue-tui.py @@ -12,7 +12,7 @@ if __name__ == '__main__': docopt(wk.clone.ddrescue.DOCSTRING) except SystemExit: print('') - wk.std.pause('Press Enter to exit...') + wk.ui.cli.pause('Press Enter to exit...') raise try: @@ -20,4 +20,4 @@ if __name__ == '__main__': except SystemExit: raise except: # noqa: E722 - wk.std.major_exception() + wk.ui.cli.major_exception() diff --git a/scripts/embedded_python_env.py b/scripts/embedded_python_env.py index 7ebf7b98..1c63099b 100644 --- a/scripts/embedded_python_env.py +++ b/scripts/embedded_python_env.py @@ -8,7 +8,7 @@ python.exe -i embedded_python_env.py import wk -wk.std.print_colored( +wk.ui.cli.print_colored( (wk.cfg.main.KIT_NAME_FULL, ': ', 'Debug Console'), ('GREEN', None, 'YELLOW'), sep='', diff --git a/scripts/hw-diags.py b/scripts/hw-diags.py index 13c456a3..e2d00a41 100755 --- a/scripts/hw-diags.py +++ b/scripts/hw-diags.py @@ -12,7 +12,7 @@ if __name__ == '__main__': docopt(wk.hw.diags.DOCSTRING) except SystemExit: print('') - wk.std.pause('Press Enter to exit...') + wk.ui.cli.pause('Press Enter to exit...') raise try: @@ -20,4 +20,4 @@ if __name__ == '__main__': except SystemExit: raise except: # noqa: E722 - wk.std.major_exception() + wk.ui.cli.major_exception() diff --git a/scripts/hw-sensors b/scripts/hw-sensors index 1f1393c9..55dd31d9 100755 --- a/scripts/hw-sensors +++ b/scripts/hw-sensors @@ -11,11 +11,11 @@ def main(): """Show sensor data on screen.""" sensors = wk.hw.sensors.Sensors() if platform.system() == 'Darwin': - wk.std.clear_screen() + wk.ui.cli.clear_screen() while True: print('\033[100A', end='') sensors.update_sensor_data() - wk.std.print_report(sensors.generate_report('Current', 'Max')) + wk.ui.cli.print_report(sensors.generate_report('Current', 'Max')) wk.std.sleep(1) elif platform.system() == 'Linux': proc = wk.exe.run_program(cmd=['mktemp']) @@ -43,4 +43,4 @@ if __name__ == '__main__': except SystemExit: raise except: # noqa: E722 - wk.std.major_exception() + wk.ui.cli.major_exception() diff --git a/scripts/launch_sdio.py b/scripts/launch_sdio.py index 03d4070c..8827c4d8 100644 --- a/scripts/launch_sdio.py +++ b/scripts/launch_sdio.py @@ -22,10 +22,10 @@ SDIO_REMOTE_PATH = wk.io.get_path_obj( # Functions def try_again(): """Ask to try again or quit.""" - if wk.std.ask(' Try again?'): + if wk.ui.cli.ask(' Try again?'): return True - if not wk.std.ask(' Use local version?'): - wk.std.abort() + if not wk.ui.cli.ask(' Use local version?'): + wk.ui.cli.abort() return False @@ -47,7 +47,7 @@ def use_network_sdio(): except KeyboardInterrupt: break except MOUNT_EXCEPTIONS as err: - wk.std.print_error(f' {err}') + wk.ui.cli.print_error(f' {err}') if not try_again(): break else: @@ -57,7 +57,7 @@ def use_network_sdio(): break # Failed to mount - wk.std.print_error(' Failed to mount server') + wk.ui.cli.print_error(' Failed to mount server') if not try_again(): break @@ -66,7 +66,7 @@ def use_network_sdio(): if __name__ == '__main__': - wk.std.set_title( + wk.ui.cli.set_title( f'{wk.cfg.main.KIT_NAME_FULL}: Snappy Driver Installer Origin Launcher', ) log_dir = wk.log.format_log_path(tool=True).parent @@ -76,7 +76,7 @@ if __name__ == '__main__': try: USE_NETWORK = use_network_sdio() except KeyboardInterrupt: - wk.std.abort() + wk.ui.cli.abort() # Run SDIO EXE_PATH = SDIO_LOCAL_PATH diff --git a/scripts/mount-all-volumes b/scripts/mount-all-volumes index 3adde487..53eb3a26 100755 --- a/scripts/mount-all-volumes +++ b/scripts/mount-all-volumes @@ -10,32 +10,32 @@ import wk # Functions def main(): """Mount all volumes and show results.""" - wk.std.print_standard(f'{wk.cfg.main.KIT_NAME_FULL}: Volume mount tool') - wk.std.print_standard(' ') + wk.ui.cli.print_standard(f'{wk.cfg.main.KIT_NAME_FULL}: Volume mount tool') + wk.ui.cli.print_standard(' ') # Mount volumes and get report - wk.std.print_standard('Mounting volumes...') + wk.ui.cli.print_standard('Mounting volumes...') wk.os.linux.mount_volumes() report = wk.os.linux.build_volume_report() # Show results - wk.std.print_info('Results') - wk.std.print_report(report) + wk.ui.cli.print_info('Results') + wk.ui.cli.print_report(report) # GUI mode if 'gui' in sys.argv: - wk.std.pause('Press Enter to exit...') + wk.ui.cli.pause('Press Enter to exit...') wk.exe.popen_program(['nohup', 'thunar', '/media']) if __name__ == '__main__': if wk.std.PLATFORM != 'Linux': os_name = wk.std.PLATFORM.replace('Darwin', 'macOS') - wk.std.print_error(f'This script is not supported under {os_name}.') - wk.std.abort() + wk.ui.cli.print_error(f'This script is not supported under {os_name}.') + wk.ui.cli.abort() try: main() except SystemExit: raise except: # noqa: E722 - wk.std.major_exception() + wk.ui.cli.major_exception() diff --git a/scripts/mount-backup-shares b/scripts/mount-backup-shares index d19ffaf6..bf528771 100755 --- a/scripts/mount-backup-shares +++ b/scripts/mount-backup-shares @@ -8,7 +8,7 @@ import wk # Functions def main(): """Attempt to mount backup shares and print report.""" - wk.std.print_info('Mounting Backup Shares') + wk.ui.cli.print_info('Mounting Backup Shares') report = wk.net.mount_backup_shares() for line in report: color = 'GREEN' @@ -17,7 +17,7 @@ def main(): color = 'RED' elif 'Already' in line: color = 'YELLOW' - print(wk.std.color_string(line, color)) + print(wk.ui.cli.color_string(line, color)) if __name__ == '__main__': @@ -26,4 +26,4 @@ if __name__ == '__main__': except SystemExit: raise except: # noqa: E722 - wk.std.major_exception() + wk.ui.cli.major_exception() diff --git a/scripts/msword-search b/scripts/msword-search index a5aac0b6..14062725 100755 --- a/scripts/msword-search +++ b/scripts/msword-search @@ -46,13 +46,13 @@ def scan_file(file_path, search): if __name__ == '__main__': try: # Prep - wk.std.clear_screen() + wk.ui.cli.clear_screen() terms = [re.sub(r'\s+', r'\s*', t) for t in sys.argv[1:]] search = '({})'.format('|'.join(terms)) if len(sys.argv) == 1: # Print usage - wk.std.print_standard(USAGE) + wk.ui.cli.print_standard(USAGE) else: matches = [] for entry in scan_for_docs(SCANDIR): @@ -60,20 +60,20 @@ if __name__ == '__main__': # Strip None values (i.e. non-matching entries) matches = [m for m in matches if m] if matches: - wk.std.print_success('Found {} {}:'.format( + wk.ui.cli.print_success('Found {} {}:'.format( len(matches), 'Matches' if len(matches) > 1 else 'Match')) for match in matches: - wk.std.print_standard(match) + wk.ui.cli.print_standard(match) else: - wk.std.print_error('No matches found.') + wk.ui.cli.print_error('No matches found.') # Done - wk.std.print_standard('\nDone.') + wk.ui.cli.print_standard('\nDone.') #pause("Press Enter to exit...") except SystemExit: raise except: # noqa: E722 - wk.std.major_exception() + wk.ui.cli.major_exception() # vim: sts=2 sw=2 ts=2 diff --git a/scripts/pyproject.toml b/scripts/pyproject.toml index 05f92df9..04adb66c 100644 --- a/scripts/pyproject.toml +++ b/scripts/pyproject.toml @@ -8,6 +8,7 @@ "wk/os/__init__.py" = ["F401"] "wk/repairs/__init__.py" = ["F401"] "wk/setup/__init__.py" = ["F401"] +"wk/ui/__init__.py" = ["F401"] # Long lines "wk/borrowed/acpi.py" = ["E501", "F841"] diff --git a/scripts/unmount-backup-shares b/scripts/unmount-backup-shares index e7375c08..57e7e880 100755 --- a/scripts/unmount-backup-shares +++ b/scripts/unmount-backup-shares @@ -8,14 +8,14 @@ import wk # Functions def main(): """Attempt to mount backup shares and print report.""" - wk.std.print_info('Unmounting Backup Shares') + wk.ui.cli.print_info('Unmounting Backup Shares') report = wk.net.unmount_backup_shares() for line in report: color = 'GREEN' line = f' {line}' if 'Not mounted' in line: color = 'YELLOW' - print(wk.std.color_string(line, color)) + print(wk.ui.cli.color_string(line, color)) if __name__ == '__main__': @@ -24,4 +24,4 @@ if __name__ == '__main__': except SystemExit: raise except: # noqa: E722 - wk.std.major_exception() + wk.ui.cli.major_exception() diff --git a/scripts/upload-logs b/scripts/upload-logs index 61ba67bf..6ea27f14 100755 --- a/scripts/upload-logs +++ b/scripts/upload-logs @@ -28,21 +28,21 @@ if PLATFORM not in ('macOS', 'Linux'): def main(): """Upload logs for review.""" lines = [] - try_and_print = wk.std.TryAndPrint() + try_and_print = wk.ui.cli.TryAndPrint() # Set log wk.log.update_log_path(dest_name='Upload-Logs', timestamp=True) # Instructions - wk.std.print_success(f'{wk.cfg.main.KIT_NAME_FULL}: Upload Logs') - wk.std.print_standard('') - wk.std.print_standard('Please state the reason for the review.') - wk.std.print_info(' End note with an empty line.') - wk.std.print_standard('') + wk.ui.cli.print_success(f'{wk.cfg.main.KIT_NAME_FULL}: Upload Logs') + wk.ui.cli.print_standard('') + wk.ui.cli.print_standard('Please state the reason for the review.') + wk.ui.cli.print_info(' End note with an empty line.') + wk.ui.cli.print_standard('') # Get reason note while True: - text = wk.std.input_text('> ') + text = wk.ui.cli.input_text('> ') if not text: lines.append('') break diff --git a/scripts/wk/__init__.py b/scripts/wk/__init__.py index e6877c23..5bf5d701 100644 --- a/scripts/wk/__init__.py +++ b/scripts/wk/__init__.py @@ -1,7 +1,7 @@ """WizardKit: wk module init""" # vim: sts=2 sw=2 ts=2 -from sys import version_info as version +from sys import stderr, version_info from . import cfg from . import clone @@ -18,20 +18,22 @@ from . import repairs from . import setup from . import std from . import tmux +from . import ui # Check env -if version < (3, 7): +if version_info < (3, 7): # Unsupported raise RuntimeError( - f'This package is unsupported on Python {version.major}.{version.minor}' + 'This package is unsupported on Python ' + f'{version_info.major}.{version_info.minor}' ) # Init try: log.start() except UserWarning as err: - std.print_warning(err) + print(err, file=stderr) if __name__ == '__main__': diff --git a/scripts/wk/clone/ddrescue.py b/scripts/wk/clone/ddrescue.py index 29667bd9..d0255360 100644 --- a/scripts/wk/clone/ddrescue.py +++ b/scripts/wk/clone/ddrescue.py @@ -33,6 +33,7 @@ from wk.hw.smart import ( smart_status_ok, update_smart_details, ) +from wk.ui import cli as ui # TODO: This is lazy # STATIC VARIABLES @@ -89,8 +90,8 @@ REGEX_REMAINING_TIME = re.compile( LOG = logging.getLogger(__name__) MENU_ACTIONS = ( 'Start', - f'Change settings {std.color_string("(experts only)", "YELLOW")}', - f'Detect drives {std.color_string("(experts only)", "YELLOW")}', + f'Change settings {ui.color_string("(experts only)", "YELLOW")}', + f'Detect drives {ui.color_string("(experts only)", "YELLOW")}', 'Quit') MENU_TOGGLES = { 'Auto continue (if recovery % over threshold)': True, @@ -296,7 +297,7 @@ class BlockPair(): # Check destination size if cloning if not self.destination.is_file() and dest_size < self.size: - std.print_error(f'Invalid destination: {self.destination}') + ui.print_error(f'Invalid destination: {self.destination}') raise std.GenericAbort() def set_initial_status(self): @@ -420,7 +421,7 @@ class State(): self.panes['Started'] = tmux.split_window( lines=cfg.ddrescue.TMUX_SIDE_WIDTH, target_id=self.panes['Source'], - text=std.color_string( + text=ui.color_string( ['Started', time.strftime("%Y-%m-%d %H:%M %Z")], ['BLUE', None], sep='\n', @@ -442,7 +443,7 @@ class State(): settings = json.loads(_f.read()) except (OSError, json.JSONDecodeError) as err: LOG.error('Failed to load clone settings') - std.print_error('Invalid clone settings detected.') + ui.print_error('Invalid clone settings detected.') raise std.GenericAbort() from err # Check settings @@ -454,10 +455,10 @@ class State(): bail = False for key in ('model', 'serial'): if settings['Source'][key] != getattr(self.source, key): - std.print_error(f"Clone settings don't match source {key}") + ui.print_error(f"Clone settings don't match source {key}") bail = True if settings['Destination'][key] != getattr(self.destination, key): - std.print_error(f"Clone settings don't match destination {key}") + ui.print_error(f"Clone settings don't match destination {key}") bail = True if bail: raise std.GenericAbort() @@ -488,7 +489,7 @@ class State(): with open(settings_file, 'w', encoding='utf-8') as _f: json.dump(settings, _f) except OSError as err: - std.print_error('Failed to save clone settings') + ui.print_error('Failed to save clone settings') raise std.GenericAbort() from err def add_clone_block_pairs(self): @@ -522,7 +523,7 @@ class State(): # New run, use new settings file settings['Needs Format'] = True offset = 0 - user_choice = std.choice( + user_choice = ui.choice( ['G', 'M', 'S'], 'Format clone using GPT, MBR, or match Source type?', ) @@ -533,7 +534,7 @@ class State(): else: # Match source type settings['Table Type'] = get_table_type(self.source.path) - if std.ask('Create an empty Windows boot partition on the clone?'): + if ui.ask('Create an empty Windows boot partition on the clone?'): settings['Create Boot Partition'] = True offset = 2 if settings['Table Type'] == 'GPT' else 1 @@ -566,14 +567,14 @@ class State(): report = [] # Source - report.append(std.color_string('Source', 'GREEN')) + report.append(ui.color_string('Source', 'GREEN')) report.extend(build_object_report(self.source)) report.append(' ') # Destination - report.append(std.color_string('Destination', 'GREEN')) + report.append(ui.color_string('Destination', 'GREEN')) if self.mode == 'Clone': - report[-1] += std.color_string(' (ALL DATA WILL BE DELETED)', 'RED') + report[-1] += ui.color_string(' (ALL DATA WILL BE DELETED)', 'RED') report.extend(build_object_report(self.destination)) report.append(' ') @@ -581,12 +582,12 @@ class State(): # NOTE: The check for block_pairs is to limit this section # to the second confirmation if self.mode == 'Clone' and self.block_pairs: - report.append(std.color_string('WARNING', 'YELLOW')) + report.append(ui.color_string('WARNING', 'YELLOW')) report.append( 'All data will be deleted from the destination listed above.', ) report.append( - std.color_string( + ui.color_string( ['This is irreversible and will lead to', 'DATA LOSS.'], ['YELLOW', 'RED'], ), @@ -605,18 +606,18 @@ class State(): # Map dir if self.working_dir: - report.append(std.color_string('Map Save Directory', 'GREEN')) + report.append(ui.color_string('Map Save Directory', 'GREEN')) report.append(f'{self.working_dir}/') report.append(' ') if not fstype_is_ok(self.working_dir, map_dir=True): report.append( - std.color_string( + ui.color_string( 'Map file(s) are being saved to a non-recommended filesystem.', 'YELLOW', ), ) report.append( - std.color_string( + ui.color_string( ['This is strongly discouraged and may lead to', 'DATA LOSS'], [None, 'RED'], ), @@ -625,11 +626,11 @@ class State(): # Source part(s) selected if source_parts: - report.append(std.color_string('Source Part(s) selected', 'GREEN')) + report.append(ui.color_string('Source Part(s) selected', 'GREEN')) if self.source.path.samefile(source_parts[0].path): report.append('Whole Disk') else: - report.append(std.color_string(f'{"NAME":<9} SIZE', 'BLUE')) + report.append(ui.color_string(f'{"NAME":<9} SIZE', 'BLUE')) for part in source_parts: report.append( f'{part.path.name:<9} ' @@ -638,9 +639,9 @@ class State(): report.append(' ') # Prompt user - std.clear_screen() - std.print_report(report) - if not std.ask(prompt): + ui.clear_screen() + ui.print_report(report) + if not ui.ask(prompt): raise std.GenericAbort() def generate_report(self): @@ -661,7 +662,7 @@ class State(): error_size = self.get_error_size() error_size_str = std.bytes_to_string(error_size, decimals=2) if error_size > 0: - error_size_str = std.color_string(error_size_str, 'YELLOW') + error_size_str = ui.color_string(error_size_str, 'YELLOW') percent = self.get_percent_recovered() percent = format_status_string(percent, width=0) report.append(f'Overall rescued: {percent}, error size: {error_size_str}') @@ -673,7 +674,7 @@ class State(): error_size = pair.get_error_size() error_size_str = std.bytes_to_string(error_size, decimals=2) if error_size > 0: - error_size_str = std.color_string(error_size_str, 'YELLOW') + error_size_str = ui.color_string(error_size_str, 'YELLOW') pair_size = std.bytes_to_string(pair.size, decimals=2) percent = pair.get_percent_recovered() percent = format_status_string(percent, width=0) @@ -704,7 +705,7 @@ class State(): def init_recovery(self, docopt_args): """Select source/dest and set env.""" - std.clear_screen() + ui.clear_screen() source_parts = [] # Set log @@ -795,8 +796,8 @@ class State(): try: exe.run_program(cmd) except subprocess.CalledProcessError: - std.print_error('Failed to unmount source and/or destination') - std.abort() + ui.print_error('Failed to unmount source and/or destination') + ui.abort() # Prep destination if self.mode == 'Clone': @@ -928,7 +929,7 @@ class State(): check=False, ) if proc.returncode != 0: - std.print_error('Error(s) encoundtered while formatting destination') + ui.print_error('Error(s) encoundtered while formatting destination') raise std.GenericAbort() # Update settings @@ -969,13 +970,13 @@ class State(): # Check for critical errors if not smart_status_ok(self.destination): - std.print_error( + ui.print_error( f'Critical error(s) detected for: {self.destination.path}', ) # Check for minor errors if not check_attributes(self.destination, only_blocking=False): - std.print_warning( + ui.print_warning( f'Attribute error(s) detected for: {self.destination.path}', ) @@ -1026,7 +1027,7 @@ class State(): destination_size *= 1.05 error_msg = 'Not enough free space on the destination' if required_size > destination_size: - std.print_error(error_msg) + ui.print_error(error_msg) raise std.GenericAbort() def save_debug_reports(self): @@ -1037,7 +1038,7 @@ class State(): debug_dir.mkdir() # State (self) - std.save_pickles({'state': self}, debug_dir) + debug.save_pickles({'state': self}, debug_dir) with open(f'{debug_dir}/state.report', 'a', encoding='utf-8') as _f: _f.write('[Debug report]\n') _f.write('\n'.join(debug.generate_object_report(self))) @@ -1064,10 +1065,10 @@ class State(): width = cfg.ddrescue.TMUX_SIDE_WIDTH # Status - report.append(std.color_string(f'{"Status":^{width}}', 'BLUE')) + report.append(ui.color_string(f'{"Status":^{width}}', 'BLUE')) if 'NEEDS ATTENTION' in overall_status: report.append( - std.color_string(f'{overall_status:^{width}}', 'YELLOW_BLINK'), + ui.color_string(f'{overall_status:^{width}}', 'YELLOW_BLINK'), ) else: report.append(f'{overall_status:^{width}}') @@ -1077,12 +1078,12 @@ class State(): if self.block_pairs: total_rescued = self.get_rescued_size() percent = self.get_percent_recovered() - report.append(std.color_string('Overall Progress', 'BLUE')) + report.append(ui.color_string('Overall Progress', 'BLUE')) report.append( f'Rescued: {format_status_string(percent, width=width-9)}', ) report.append( - std.color_string( + ui.color_string( [f'{std.bytes_to_string(total_rescued, decimals=2):>{width}}'], [get_percent_color(percent)], ), @@ -1091,7 +1092,7 @@ class State(): # Block pair progress for pair in self.block_pairs: - report.append(std.color_string(pair.source, 'BLUE')) + report.append(ui.color_string(pair.source, 'BLUE')) for name, status in pair.status.items(): name = name.title() report.append( @@ -1103,9 +1104,9 @@ class State(): if overall_status in ('Active', 'NEEDS ATTENTION'): etoc = get_etoc() report.append(separator) - report.append(std.color_string('Estimated Pass Finish', 'BLUE')) + report.append(ui.color_string('Estimated Pass Finish', 'BLUE')) if overall_status == 'NEEDS ATTENTION' or etoc == 'N/A': - report.append(std.color_string('N/A', 'YELLOW')) + report.append(ui.color_string('N/A', 'YELLOW')) else: report.append(etoc) @@ -1166,7 +1167,7 @@ class State(): source_str = _format_string(self.source, width) tmux.respawn_pane( self.panes['Source'], - text=std.color_string( + text=ui.color_string( ['Source', '' if source_exists else ' (Missing)', '\n', source_str], ['BLUE', 'RED', None, None], sep='', @@ -1181,7 +1182,7 @@ class State(): percent=50, vertical=False, target_id=self.panes['Source'], - text=std.color_string( + text=ui.color_string( ['Destination', '' if dest_exists else ' (Missing)', '\n', dest_str], ['BLUE', 'RED', None, None], sep='', @@ -1195,7 +1196,7 @@ def build_block_pair_report(block_pairs, settings): report = [] notes = [] if block_pairs: - report.append(std.color_string('Block Pairs', 'GREEN')) + report.append(ui.color_string('Block Pairs', 'GREEN')) else: # Bail early return report @@ -1214,7 +1215,7 @@ def build_block_pair_report(block_pairs, settings): if settings: if not settings['First Run']: notes.append( - std.color_string( + ui.color_string( ['NOTE:', 'Clone settings loaded from previous run.'], ['BLUE', None], ), @@ -1222,14 +1223,14 @@ def build_block_pair_report(block_pairs, settings): if settings['Needs Format'] and settings['Table Type']: msg = f'Destination will be formatted using {settings["Table Type"]}' notes.append( - std.color_string( + ui.color_string( ['NOTE:', msg], ['BLUE', None], ), ) if any(pair.get_rescued_size() > 0 for pair in block_pairs): notes.append( - std.color_string( + ui.color_string( ['NOTE:', 'Resume data loaded from map file(s).'], ['BLUE', None], ), @@ -1311,12 +1312,12 @@ def build_directory_report(path): for line in proc.stdout.splitlines(): line = line.replace('\n', '') if 'FSTYPE' in line: - line = std.color_string(f'{"PATH":<{width}}{line}', 'BLUE') + line = ui.color_string(f'{"PATH":<{width}}{line}', 'BLUE') else: line = f'{path:<{width}}{line}' report.append(line) else: - report.append(std.color_string('PATH', 'BLUE')) + report.append(ui.color_string('PATH', 'BLUE')) report.append(str(path)) # Done @@ -1352,7 +1353,7 @@ def build_disk_report(dev): # Partition details report.append( - std.color_string( + ui.color_string( ( f'{"NAME":<{widths["name"]}}' f'{" " if dev.children else ""}' @@ -1397,8 +1398,8 @@ def build_disk_report(dev): def build_main_menu(): - """Build main menu, returns wk.std.Menu.""" - menu = std.Menu(title=std.color_string('ddrescue TUI: Main Menu', 'GREEN')) + """Build main menu, returns wk.ui.cli.Menu.""" + menu = ui.Menu(title=ui.color_string('ddrescue TUI: Main Menu', 'GREEN')) menu.separator = ' ' # Add actions, options, etc @@ -1429,23 +1430,23 @@ def build_object_report(obj): def build_settings_menu(silent=True): - """Build settings menu, returns wk.std.Menu.""" + """Build settings menu, returns wk.ui.cli.Menu.""" title_text = [ - std.color_string('ddrescue TUI: Expert Settings', 'GREEN'), + ui.color_string('ddrescue TUI: Expert Settings', 'GREEN'), ' ', - std.color_string( + ui.color_string( ['These settings can cause', 'MAJOR DAMAGE', 'to drives'], ['YELLOW', 'RED', 'YELLOW'], ), 'Please read the manual before making changes', ] - menu = std.Menu(title='\n'.join(title_text)) + menu = ui.Menu(title='\n'.join(title_text)) menu.separator = ' ' preset = 'Default' if not silent: # Ask which preset to use print(f'Available ddrescue presets: {" / ".join(SETTING_PRESETS)}') - preset = std.choice(SETTING_PRESETS, 'Please select a preset:') + preset = ui.choice(SETTING_PRESETS, 'Please select a preset:') # Fix selection for _p in SETTING_PRESETS: @@ -1494,7 +1495,7 @@ def build_sfdisk_partition_line(table_type, dev_path, size, details): # Safety Check if not dest_type: - std.print_error(f'Failed to determine partition type for: {dev_path}') + ui.print_error(f'Failed to determine partition type for: {dev_path}') raise std.GenericAbort() # Add extra details @@ -1576,7 +1577,7 @@ def format_status_string(status, width): # Add color if necessary if color: - status_str = std.color_string(status_str, color) + status_str = ui.color_string(status_str, color) # Done return status_str @@ -1698,8 +1699,8 @@ def get_object(path): # Child/Parent check if obj.parent: - std.print_warning(f'"{obj.path}" is a child device') - if std.ask(f'Use parent device "{obj.parent}" instead?'): + ui.print_warning(f'"{obj.path}" is a child device') + if ui.ask(f'Use parent device "{obj.parent}" instead?'): obj = hw_disk.Disk(obj.parent) elif path.is_dir(): obj = path @@ -1710,7 +1711,7 @@ def get_object(path): # Abort if obj not set if not obj: - std.print_error(f'Invalid source/dest path: {path}') + ui.print_error(f'Invalid source/dest path: {path}') raise std.GenericAbort() # Done @@ -1775,7 +1776,7 @@ def get_table_type(disk_path): # Check type if table_type not in ('GPT', 'MBR'): - std.print_error(f'Unsupported partition table type: {table_type}') + ui.print_error(f'Unsupported partition table type: {table_type}') raise std.GenericAbort() # Done @@ -1789,7 +1790,7 @@ def get_working_dir(mode, destination, force_local=False): # Set ticket ID while ticket_id is None: - ticket_id = std.input_text( + ticket_id = ui.input_text( prompt='Please enter ticket ID:', allow_empty_response=False, ) @@ -1802,12 +1803,12 @@ def get_working_dir(mode, destination, force_local=False): try: path = pathlib.Path(destination).resolve() except TypeError as err: - std.print_error(f'Invalid destination: {destination}') + ui.print_error(f'Invalid destination: {destination}') raise std.GenericAbort() from err if path.exists() and fstype_is_ok(path, map_dir=False): working_dir = path elif mode == 'Clone' and not force_local: - std.print_info('Mounting backup shares...') + ui.print_info('Mounting backup shares...') net.mount_backup_shares(read_write=True) for server in cfg.net.BACKUP_SERVERS: path = pathlib.Path( @@ -1851,11 +1852,11 @@ def is_missing_source_or_destination(state): if hasattr(item, 'path'): if not item.path.exists(): missing = True - std.print_error(f'{name} disappeared') + ui.print_error(f'{name} disappeared') elif hasattr(item, 'exists'): if not item.exists(): missing = True - std.print_error(f'{name} disappeared') + ui.print_error(f'{name} disappeared') else: LOG.error('Unknown %s type: %s', name, item) @@ -1887,7 +1888,7 @@ def source_or_destination_changed(state): # Done if changed: - std.print_error('Source and/or Destination changed') + ui.print_error('Source and/or Destination changed') return changed @@ -1910,7 +1911,7 @@ def main(): state.init_recovery(args) except (FileNotFoundError, std.GenericAbort): is_missing_source_or_destination(state) - std.abort() + ui.abort() # Show menu while True: @@ -1928,18 +1929,18 @@ def main(): # Detect drives if 'Detect drives' in selection[0]: - std.clear_screen() - std.print_warning(DETECT_DRIVES_NOTICE) - if std.ask('Are you sure you proceed?'): - std.print_standard('Forcing controllers to rescan for devices...') + ui.clear_screen() + ui.print_warning(DETECT_DRIVES_NOTICE) + if ui.ask('Are you sure you proceed?'): + ui.print_standard('Forcing controllers to rescan for devices...') cmd = 'echo "- - -" | sudo tee /sys/class/scsi_host/host*/scan' exe.run_program(cmd, check=False, shell=True) if source_or_destination_changed(state): - std.abort() + ui.abort() # Start recovery if 'Start' in selection: - std.clear_screen() + ui.clear_screen() run_recovery(state, main_menu, settings_menu, dry_run=args['--dry-run']) # Quit @@ -1949,14 +1950,14 @@ def main(): break # Recovey < 100% - std.print_warning('Recovery is less than 100%') - if std.ask('Are you sure you want to quit?'): + ui.print_warning('Recovery is less than 100%') + if ui.ask('Are you sure you want to quit?'): break # Save results to log LOG.info('') for line in state.generate_report(): - LOG.info(' %s', std.strip_colors(line)) + LOG.info(' %s', ui.strip_colors(line)) def mount_raw_image(path): @@ -1970,7 +1971,7 @@ def mount_raw_image(path): # Check if not loopback_path: - std.print_error(f'Failed to mount image: {path}') + ui.print_error(f'Failed to mount image: {path}') # Register unmount atexit atexit.register(unmount_loopback_device, loopback_path) @@ -2037,7 +2038,7 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True): cmd = build_ddrescue_cmd(block_pair, pass_name, settings) poweroff_source_after_idle = True state.update_progress_pane('Active') - std.clear_screen() + ui.clear_screen() warning_message = '' def _poweroff_source_drive(idle_minutes): @@ -2056,8 +2057,8 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True): return if i % 600 == 0 and i > 0: if i == 600: - std.print_standard(' ', flush=True) - std.print_warning( + ui.print_standard(' ', flush=True) + ui.print_warning( f'Powering off source in {int((idle_minutes*60-i)/60)} minutes...', ) std.sleep(5) @@ -2067,10 +2068,10 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True): cmd = ['sudo', 'hdparm', '-Y', source_dev] proc = exe.run_program(cmd, check=False) if proc.returncode: - std.print_error(f'Failed to poweroff source {source_dev}') + ui.print_error(f'Failed to poweroff source {source_dev}') else: - std.print_warning(f'Powered off source {source_dev}') - std.print_standard( + ui.print_warning(f'Powered off source {source_dev}') + ui.print_standard( 'Press Enter to return to main menu...', end='', flush=True, ) @@ -2080,7 +2081,7 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True): now = datetime.datetime.now(tz=TIMEZONE).strftime('%Y-%m-%d %H:%M %Z') with open(f'{state.log_dir}/smart.out', 'w', encoding='utf-8') as _f: _f.write( - std.color_string( + ui.color_string( ['SMART Attributes', f'Updated: {now}\n'], ['BLUE', 'YELLOW'], sep='\t\t', @@ -2113,7 +2114,7 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True): if warning_message: # Error detected on destination, stop recovery exe.stop_process(proc) - std.print_error(warning_message) + ui.print_error(warning_message) break if _i % 60 == 0: @@ -2161,17 +2162,17 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True): if warning_message: print(' ') print(' ') - std.print_error('DDRESCUE PROCESS HALTED') + ui.print_error('DDRESCUE PROCESS HALTED') print(' ') - std.print_warning(warning_message) + ui.print_warning(warning_message) # Needs attention? if str(proc.poll()) != '0': state.update_progress_pane('NEEDS ATTENTION') - std.pause('Press Enter to return to main menu...') + ui.pause('Press Enter to return to main menu...') # Stop source poweroff countdown - std.print_standard('Stopping device poweroff countdown...', flush=True) + ui.print_standard('Stopping device poweroff countdown...', flush=True) poweroff_source_after_idle = False poweroff_thread.join() @@ -2187,12 +2188,12 @@ def run_recovery(state, main_menu, settings_menu, dry_run=True): # Bail early if is_missing_source_or_destination(state): - std.print_standard('') - std.pause('Press Enter to return to main menu...') + ui.print_standard('') + ui.pause('Press Enter to return to main menu...') return if source_or_destination_changed(state): - std.print_standard('') - std.abort() + ui.print_standard('') + ui.abort() # Get settings for name, details in main_menu.toggles.items(): @@ -2248,9 +2249,9 @@ def run_recovery(state, main_menu, settings_menu, dry_run=True): # Show warning if nothing was done if not attempted_recovery: - std.print_warning('No actions performed') - std.print_standard(' ') - std.pause('Press Enter to return to main menu...') + ui.print_warning('No actions performed') + ui.print_standard(' ') + ui.pause('Press Enter to return to main menu...') # Done state.save_debug_reports() @@ -2260,10 +2261,10 @@ def run_recovery(state, main_menu, settings_menu, dry_run=True): def select_disk(prompt, skip_disk=None): """Select disk from list, returns Disk().""" - std.print_info('Scanning disks...') + ui.print_info('Scanning disks...') disks = hw_disk.get_disks() - menu = std.Menu( - title=std.color_string(f'ddrescue TUI: {prompt} Selection', 'GREEN'), + menu = ui.Menu( + title=ui.color_string(f'ddrescue TUI: {prompt} Selection', 'GREEN'), ) menu.disabled_str = 'Already selected' menu.separator = ' ' @@ -2305,9 +2306,9 @@ def select_disk(prompt, skip_disk=None): def select_disk_parts(prompt, disk): """Select disk parts from list, returns list of Disk().""" - title = std.color_string('ddrescue TUI: Partition Selection', 'GREEN') + title = ui.color_string('ddrescue TUI: Partition Selection', 'GREEN') title += f'\n\nDisk: {disk.path} {disk.description}' - menu = std.Menu(title) + menu = ui.Menu(title) menu.separator = ' ' menu.add_action('All') menu.add_action('None') @@ -2356,7 +2357,7 @@ def select_disk_parts(prompt, disk): if not menu.options: menu.add_option(whole_disk_str, {'Selected': True, 'Path': disk.path}) menu.title += '\n\n' - menu.title += std.color_string(' No partitions detected.', 'YELLOW') + menu.title += ui.color_string(' No partitions detected.', 'YELLOW') # Get selection _select_parts(menu) @@ -2370,13 +2371,13 @@ def select_disk_parts(prompt, disk): if len(object_list) == len(disk.children): # NOTE: This is not true if the disk has no partitions msg = f'Preserve partition table and unused space in {prompt.lower()}?' - if std.ask(msg): + if ui.ask(msg): # Replace part list with whole disk obj object_list = [disk.path] # Convert object_list to hw_disk.Disk() objects print(' ') - std.print_info('Getting disk/partition details...') + ui.print_info('Getting disk/partition details...') object_list = [hw_disk.Disk(path) for path in object_list] # Done @@ -2386,8 +2387,8 @@ def select_disk_parts(prompt, disk): def select_path(prompt): """Select path, returns pathlib.Path.""" invalid = False - menu = std.Menu( - title=std.color_string(f'ddrescue TUI: {prompt} Path Selection', 'GREEN'), + menu = ui.Menu( + title=ui.color_string(f'ddrescue TUI: {prompt} Path Selection', 'GREEN'), ) menu.separator = ' ' menu.add_action('Quit') @@ -2400,7 +2401,7 @@ def select_path(prompt): if 'Current directory' in selection: path = os.getcwd() elif 'Enter manually' in selection: - path = std.input_text('Please enter path: ') + path = ui.input_text('Please enter path: ') elif 'Quit' in selection: raise std.GenericAbort() @@ -2410,7 +2411,7 @@ def select_path(prompt): except TypeError: invalid = True if invalid or not path.is_dir(): - std.print_error(f'Invalid path: {path}') + ui.print_error(f'Invalid path: {path}') raise std.GenericAbort() # Done @@ -2429,7 +2430,7 @@ def set_mode(docopt_args): # Ask user if necessary if not mode: - answer = std.choice(['C', 'I'], 'Are we cloning or imaging?') + answer = ui.choice(['C', 'I'], 'Are we cloning or imaging?') if answer == 'C': mode = 'Clone' else: diff --git a/scripts/wk/debug.py b/scripts/wk/debug.py index 11da914d..69f400d8 100644 --- a/scripts/wk/debug.py +++ b/scripts/wk/debug.py @@ -1,6 +1,21 @@ """WizardKit: Debug Functions""" # vim: sts=2 sw=2 ts=2 +import inspect +import logging +import lzma +import os +import pickle +import platform +import re +import socket +import sys +import time + +import requests + +from wk.cfg.net import CRASH_SERVER +from wk.log import get_log_filepath, get_root_logger_path # Classes class Debug(): @@ -10,11 +25,55 @@ class Debug(): # STATIC VARIABLES +LOG = logging.getLogger(__name__) DEBUG_CLASS = Debug() METHOD_TYPE = type(DEBUG_CLASS.method) # Functions +def generate_debug_report(): + """Generate debug report, returns str.""" + platform_function_list = ( + 'architecture', + 'machine', + 'platform', + 'python_version', + ) + report = [] + + # Logging data + log_path = get_log_filepath() + if log_path: + report.append('------ Start Log -------') + report.append('') + with open(log_path, 'r', encoding='utf-8') as log_file: + report.extend(log_file.read().splitlines()) + report.append('') + report.append('------- End Log --------') + + # System + report.append('--- Start debug info ---') + report.append('') + report.append('[System]') + report.append(f' {"FQDN":<24} {socket.getfqdn()}') + for func in platform_function_list: + func_name = func.replace('_', ' ').capitalize() + func_result = getattr(platform, func)() + report.append(f' {func_name:<24} {func_result}') + report.append(f' {"Python sys.argv":<24} {sys.argv}') + report.append('') + + # Environment + report.append('[Environment Variables]') + for key, value in sorted(os.environ.items()): + report.append(f' {key:<24} {value}') + report.append('') + + # Done + report.append('---- End debug info ----') + return '\n'.join(report) + + def generate_object_report(obj, indent=0): """Generate debug report for obj, returns list.""" report = [] @@ -46,5 +105,67 @@ def generate_object_report(obj, indent=0): return report +def save_pickles(obj_dict, out_path=None): + """Save dict of objects using pickle.""" + LOG.info('Saving pickles') + + # Set path + if not out_path: + out_path = get_root_logger_path() + out_path = out_path.parent.joinpath('../debug').resolve() + + # Save pickles + try: + for name, obj in obj_dict.copy().items(): + if name.startswith('__') or inspect.ismodule(obj): + continue + with open(f'{out_path}/{name}.pickle', 'wb') as _f: + pickle.dump(obj, _f, protocol=pickle.HIGHEST_PROTOCOL) + except Exception: + LOG.error('Failed to save all the pickles', exc_info=True) + + +def upload_debug_report(report, compress=True, reason='DEBUG'): + """Upload debug report to CRASH_SERVER as specified in wk.cfg.main.""" + LOG.info('Uploading debug report to %s', CRASH_SERVER.get('Name', '?')) + headers = CRASH_SERVER.get('Headers', {'X-Requested-With': 'XMLHttpRequest'}) + if compress: + headers['Content-Type'] = 'application/octet-stream' + + # Check if the required server details are available + if not all(CRASH_SERVER.get(key, False) for key in ('Name', 'Url', 'User')): + msg = 'Server details missing, aborting upload.' + print(msg) + raise RuntimeError(msg) + + # Set filename (based on the logging config if possible) + filename = 'Unknown' + log_path = get_log_filepath() + if log_path: + # Strip everything but the prefix + filename = re.sub(r'^(.*)_(\d{4}-\d{2}-\d{2}.*)', r'\1', log_path.name) + filename = f'{filename}_{reason}_{time.strftime("%Y-%m-%d_%H%M%S%z")}.log' + LOG.debug('filename: %s', filename) + + # Compress report + if compress: + filename += '.xz' + xz_report = lzma.compress(report.encode('utf8')) + + # Upload report + url = f'{CRASH_SERVER["Url"]}/{filename}' + response = requests.put( + url, + auth=(CRASH_SERVER['User'], CRASH_SERVER.get('Pass', '')), + data=xz_report if compress else report, + headers=headers, + timeout=60, + ) + + # Check response + if not response.ok: + raise RuntimeError('Failed to upload report') + + if __name__ == '__main__': print("This file is not meant to be called directly.") diff --git a/scripts/wk/graph.py b/scripts/wk/graph.py index 885cfe8e..fe717545 100644 --- a/scripts/wk/graph.py +++ b/scripts/wk/graph.py @@ -3,7 +3,7 @@ import logging -from wk.std import color_string +from wk.ui.cli import color_string # TODO: This is lazy # STATIC VARIABLES diff --git a/scripts/wk/hw/benchmark.py b/scripts/wk/hw/benchmark.py index 8aebf51d..b336da5c 100644 --- a/scripts/wk/hw/benchmark.py +++ b/scripts/wk/hw/benchmark.py @@ -21,8 +21,8 @@ from wk.cfg.hw import ( THRESH_SSD_MIN, ) from wk.exe import run_program -from wk.std import ( - PLATFORM, +from wk.std import PLATFORM +from wk.ui.cli import ( # TODO: This is lazy strip_colors, color_string, ) diff --git a/scripts/wk/hw/cpu.py b/scripts/wk/hw/cpu.py index 8458b918..826e0fb9 100644 --- a/scripts/wk/hw/cpu.py +++ b/scripts/wk/hw/cpu.py @@ -10,13 +10,13 @@ from typing import TextIO from wk import exe from wk.cfg.hw import CPU_FAILURE_TEMP from wk.os.mac import set_fans as macos_set_fans -from wk.std import ( - PLATFORM, +from wk.std import PLATFORM +from wk.tmux import respawn_pane as tmux_respawn_pane +from wk.ui.cli import ( # TODO: This is lazy color_string, print_error, print_warning, ) -from wk.tmux import respawn_pane as tmux_respawn_pane # STATIC VARIABLES diff --git a/scripts/wk/hw/diags.py b/scripts/wk/hw/diags.py index 7b7cb755..6c7ead59 100644 --- a/scripts/wk/hw/diags.py +++ b/scripts/wk/hw/diags.py @@ -25,6 +25,8 @@ from wk.hw.network import network_test from wk.hw.screensavers import screensaver from wk.hw.test import Test, TestGroup +from wk.ui import cli as ui # TODO: This is lazy + # STATIC VARIABLES DOCSTRING = f'''{cfg.main.KIT_NAME_FULL}: Hardware Diagnostics @@ -86,9 +88,9 @@ class State(): self.panes = {} self.system = None self.test_groups = [] - self.top_text = std.color_string('Hardware Diagnostics', 'GREEN') + self.top_text = ui.color_string('Hardware Diagnostics', 'GREEN') if test_mode: - self.top_text += std.color_string(' (Test Mode)', 'YELLOW') + self.top_text += ui.color_string(' (Test Mode)', 'YELLOW') # Init tmux and start a background process to maintain layout self.init_tmux() @@ -160,8 +162,8 @@ class State(): keep_history=False, timestamp=False, ) - std.clear_screen() - std.print_info('Initializing...') + ui.clear_screen() + ui.print_info('Initializing...') # Progress Pane self.update_progress_pane() @@ -226,7 +228,7 @@ class State(): self.panes['Started'] = tmux.split_window( lines=cfg.hw.TMUX_SIDE_WIDTH, target_id=self.panes['Top'], - text=std.color_string( + text=ui.color_string( ['Started', time.strftime("%Y-%m-%d %H:%M %Z")], ['BLUE', None], sep='\n', @@ -247,7 +249,7 @@ class State(): debug_dir.mkdir() # State (self) - std.save_pickles({'state': self}, debug_dir) + debug.save_pickles({'state': self}, debug_dir) with open(f'{debug_dir}/state.report', 'a', encoding='utf-8') as _f: _f.write('\n'.join(debug.generate_object_report(self))) @@ -289,7 +291,7 @@ class State(): """Update 'Started' pane following clock sync.""" tmux.respawn_pane( pane_id=self.panes['Started'], - text=std.color_string( + text=ui.color_string( ['Started', time.strftime("%Y-%m-%d %H:%M %Z")], ['BLUE', None], sep='\n', @@ -302,9 +304,9 @@ class State(): width = cfg.hw.TMUX_SIDE_WIDTH for group in self.test_groups: - report.append(std.color_string(group.name, 'BLUE')) + report.append(ui.color_string(group.name, 'BLUE')) for test in group.test_objects: - report.append(std.color_string( + report.append(ui.color_string( [test.label, f'{test.status:>{width-len(test.label)}}'], [None, STATUS_COLORS.get(test.status, None)], sep='', @@ -324,9 +326,9 @@ class State(): # Functions -def build_menu(cli_mode=False, quick_mode=False) -> std.Menu: - """Build main menu, returns wk.std.Menu.""" - menu = std.Menu(title=None) +def build_menu(cli_mode=False, quick_mode=False) -> ui.Menu: + """Build main menu, returns wk.ui.cli.Menu.""" + menu = ui.Menu(title=None) # Add actions, options, etc for action in MENU_ACTIONS: @@ -418,11 +420,11 @@ def cpu_stress_tests(state, test_objects, test_mode=False) -> None: state.layout['Current'] = {'height': 3, 'Check': True} # Get idle temps - std.print_standard('Saving idle temps...') + ui.print_standard('Saving idle temps...') sensors.save_average_temps(temp_label='Idle', seconds=5) # Stress CPU - std.print_info('Running stress test') + ui.print_info('Running stress test') hw_cpu.set_apple_fan_speed('max') proc_mprime = hw_cpu.start_mprime(state.log_dir, prime_log) @@ -443,14 +445,14 @@ def cpu_stress_tests(state, test_objects, test_mode=False) -> None: state.update_progress_pane() # Get cooldown temp - std.clear_screen() - std.print_standard('Letting CPU cooldown...') + ui.clear_screen() + ui.print_standard('Letting CPU cooldown...') std.sleep(5) - std.print_standard('Saving cooldown temps...') + ui.print_standard('Saving cooldown temps...') sensors.save_average_temps(temp_label='Cooldown', seconds=5) # Check Prime95 results - test_mprime_obj.report.append(std.color_string('Prime95', 'BLUE')) + test_mprime_obj.report.append(ui.color_string('Prime95', 'BLUE')) hw_cpu.check_mprime_results( test_obj=test_mprime_obj, working_dir=state.log_dir, ) @@ -461,10 +463,10 @@ def cpu_stress_tests(state, test_objects, test_mode=False) -> None: ) if run_sysbench: LOG.info('CPU Test (Sysbench)') - std.print_standard('Letting CPU cooldown more...') + ui.print_standard('Letting CPU cooldown more...') std.sleep(30) - std.clear_screen() - std.print_info('Running alternate stress test') + ui.clear_screen() + ui.print_info('Running alternate stress test') print('') proc_sysbench, filehandle_sysbench = hw_cpu.start_sysbench( sensors, @@ -490,7 +492,7 @@ def cpu_stress_tests(state, test_objects, test_mode=False) -> None: state.update_progress_pane() # Check Cooling results - test_cooling_obj.report.append(std.color_string('Temps', 'BLUE')) + test_cooling_obj.report.append(ui.color_string('Temps', 'BLUE')) hw_cpu.check_cooling_results(test_cooling_obj, sensors, run_sysbench) # Cleanup @@ -545,8 +547,8 @@ def disk_io_benchmark( continue # Start benchmark - std.clear_screen() - std.print_report(test.dev.generate_report()) + ui.clear_screen() + ui.print_report(test.dev.generate_report()) test.set_status('Working') test_log = f'{state.log_dir}/{test.dev.path.name}_benchmark.out' tmux.respawn_pane( @@ -563,12 +565,12 @@ def disk_io_benchmark( # Something went wrong LOG.error('%s', err) test.set_status('ERROR') - test.report.append(std.color_string(' Unknown Error', 'RED')) + test.report.append(ui.color_string(' Unknown Error', 'RED')) # Mark test(s) aborted if necessary if aborted: test.set_status('Aborted') - test.report.append(std.color_string(' Aborted', 'YELLOW')) + test.report.append(ui.color_string(' Aborted', 'YELLOW')) break # Update progress after each test @@ -594,7 +596,7 @@ def disk_self_test(state, test_objects, test_mode=False) -> None: state.update_top_pane( f'Disk self-test{"s" if len(test_objects) > 1 else ""}', ) - std.print_info(f'Starting self-test{"s" if len(test_objects) > 1 else ""}') + ui.print_info(f'Starting self-test{"s" if len(test_objects) > 1 else ""}') show_failed_attributes(state) for test in reversed(test_objects): if test.disabled: @@ -691,7 +693,7 @@ def disk_surface_scan(state, test_objects, test_mode=False) -> None: state.update_top_pane( f'Disk Surface Scan{"s" if len(test_objects) > 1 else ""}', ) - std.print_info( + ui.print_info( f'Starting disk surface scan{"s" if len(test_objects) > 1 else ""}', ) show_failed_attributes(state) @@ -731,7 +733,7 @@ def disk_surface_scan(state, test_objects, test_mode=False) -> None: for test in test_objects: if not (test.disabled or test.passed or test.failed): test.set_status('Aborted') - test.report.append(std.color_string(' Aborted', 'YELLOW')) + test.report.append(ui.color_string(' Aborted', 'YELLOW')) # Cleanup state.update_progress_pane() @@ -785,9 +787,9 @@ def main() -> None: try: action() except KeyboardInterrupt: - std.print_warning('Aborted.') - std.print_standard('') - std.pause('Press Enter to return to main menu...') + ui.print_warning('Aborted.') + ui.print_standard('') + ui.pause('Press Enter to return to main menu...') if 'Clock Sync' in selection: state.update_clock() @@ -852,8 +854,8 @@ def run_diags(state, menu, quick_mode=False, test_mode=False) -> None: # Just return if no tests were selected if not state.test_groups: - std.print_warning('No tests selected?') - std.pause() + ui.print_warning('No tests selected?') + ui.pause() return # Run tests @@ -864,7 +866,7 @@ def run_diags(state, menu, quick_mode=False, test_mode=False) -> None: args = [group.test_objects] if group.name == 'Disk I/O Benchmark': args.append(menu.toggles['Skip USB Benchmarks']['Selected']) - std.clear_screen() + ui.clear_screen() try: function(state, *args, test_mode=test_mode) except (KeyboardInterrupt, std.GenericAbort): @@ -891,25 +893,25 @@ def run_diags(state, menu, quick_mode=False, test_mode=False) -> None: state.save_debug_reports() atexit.unregister(state.save_debug_reports) if quick_mode: - std.pause('Press Enter to exit...') + ui.pause('Press Enter to exit...') else: - std.pause('Press Enter to return to main menu...') + ui.pause('Press Enter to return to main menu...') def show_failed_attributes(state) -> None: """Show failed attributes for all disks.""" for dev in state.disks: - std.print_colored([dev.name, dev.description], ['CYAN', None]) - std.print_report( + ui.print_colored([dev.name, dev.description], ['CYAN', None]) + ui.print_report( hw_smart.generate_attribute_report(dev, only_failed=True), ) - std.print_standard('') + ui.print_standard('') def show_results(state) -> None: """Show test results by device.""" std.sleep(0.5) - std.clear_screen() + ui.clear_screen() state.update_top_pane('Results') # CPU Tests @@ -917,22 +919,22 @@ def show_results(state) -> None: group.name for group in state.test_groups if 'CPU' in group.name ] if cpu_tests_enabled: - std.print_success('CPU:') - std.print_report(state.system.generate_report()) - std.print_standard(' ') + ui.print_success('CPU:') + ui.print_report(state.system.generate_report()) + ui.print_standard(' ') # Disk Tests disk_tests_enabled = [ group.name for group in state.test_groups if 'Disk' in group.name ] if disk_tests_enabled: - std.print_success(f'Disk{"s" if len(state.disks) > 1 else ""}:') + ui.print_success(f'Disk{"s" if len(state.disks) > 1 else ""}:') for disk in state.disks: - std.print_report(disk.generate_report()) - std.print_standard(' ') + ui.print_report(disk.generate_report()) + ui.print_standard(' ') if not state.disks: - std.print_warning('No devices') - std.print_standard(' ') + ui.print_warning('No devices') + ui.print_standard(' ') def sync_clock() -> None: diff --git a/scripts/wk/hw/disk.py b/scripts/wk/hw/disk.py index 3c7c727c..13bd821a 100644 --- a/scripts/wk/hw/disk.py +++ b/scripts/wk/hw/disk.py @@ -19,7 +19,8 @@ from wk.hw.smart import ( generate_attribute_report, get_known_disk_attributes, ) -from wk.std import PLATFORM, color_string, strip_colors +from wk.std import PLATFORM +from wk.ui.cli import color_string, strip_colors # TODO: This is lazy # STATIC VARIABLES diff --git a/scripts/wk/hw/keyboard.py b/scripts/wk/hw/keyboard.py index 68e2d0a6..686fd52b 100644 --- a/scripts/wk/hw/keyboard.py +++ b/scripts/wk/hw/keyboard.py @@ -4,7 +4,8 @@ import logging from wk.exe import run_program -from wk.std import PLATFORM, print_warning +from wk.std import PLATFORM +from wk.ui.cli import print_warning # TODO: This is lazy # STATIC VARIABLES diff --git a/scripts/wk/hw/network.py b/scripts/wk/hw/network.py index 700ebfea..29a7f8ba 100644 --- a/scripts/wk/hw/network.py +++ b/scripts/wk/hw/network.py @@ -9,7 +9,8 @@ from wk.net import ( show_valid_addresses, speedtest, ) -from wk.std import ( +from wk.ui.cli import ( + # TODO: This is lazy TryAndPrint, pause, print_warning, diff --git a/scripts/wk/hw/sensors.py b/scripts/wk/hw/sensors.py index 49d693f0..3ce1ded1 100644 --- a/scripts/wk/hw/sensors.py +++ b/scripts/wk/hw/sensors.py @@ -12,7 +12,8 @@ from typing import Any from wk.cfg.hw import CPU_CRITICAL_TEMP, SMC_IDS, TEMP_COLORS from wk.exe import run_program, start_thread from wk.io import non_clobber_path -from wk.std import PLATFORM, color_string, sleep +from wk.std import PLATFORM, sleep +from wk.ui.cli import color_string # TODO: This is lazy # STATIC VARIABLES diff --git a/scripts/wk/hw/smart.py b/scripts/wk/hw/smart.py index d9f47f7a..e56db1ea 100644 --- a/scripts/wk/hw/smart.py +++ b/scripts/wk/hw/smart.py @@ -18,7 +18,8 @@ from wk.cfg.hw import ( SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS, ) from wk.exe import get_json_from_command, run_program -from wk.std import bytes_to_string, color_string, sleep +from wk.std import bytes_to_string, sleep +from wk.ui.cli import color_string # TODO: This is lazy # STATIC VARIABLES diff --git a/scripts/wk/hw/surface_scan.py b/scripts/wk/hw/surface_scan.py index 49afd25c..b2dc99c6 100644 --- a/scripts/wk/hw/surface_scan.py +++ b/scripts/wk/hw/surface_scan.py @@ -14,12 +14,8 @@ from wk.cfg.hw import ( TEST_MODE_BADBLOCKS_LIMIT, ) from wk.exe import run_program -from wk.std import ( - PLATFORM, - bytes_to_string, - color_string, - strip_colors, - ) +from wk.std import PLATFORM, bytes_to_string +from wk.ui.cli import color_string, strip_colors # TODO: This is lazy # STATIC VARIABLES diff --git a/scripts/wk/hw/system.py b/scripts/wk/hw/system.py index 8da2ed15..6634b326 100644 --- a/scripts/wk/hw/system.py +++ b/scripts/wk/hw/system.py @@ -12,12 +12,8 @@ from wk.cfg.hw import KNOWN_RAM_VENDOR_IDS from wk.cfg.python import DATACLASS_DECORATOR_KWARGS from wk.exe import get_json_from_command, run_program from wk.hw.test import Test -from wk.std import ( - PLATFORM, - bytes_to_string, - color_string, - string_to_bytes, - ) +from wk.std import PLATFORM, bytes_to_string, string_to_bytes +from wk.ui.cli import color_string # TODO: This is lazy # STATIC VARIABLES diff --git a/scripts/wk/kit/build_win.py b/scripts/wk/kit/build_win.py index ff377790..bb363430 100644 --- a/scripts/wk/kit/build_win.py +++ b/scripts/wk/kit/build_win.py @@ -21,8 +21,9 @@ from wk.kit.tools import ( get_tool_path, ) from wk.log import update_log_path -from wk.std import ( - GenericError, +from wk.std import GenericError +from wk.ui.cli import ( + # TODO: This is lazy TryAndPrint, clear_screen, pause, diff --git a/scripts/wk/kit/ufd.py b/scripts/wk/kit/ufd.py index cc5e9969..f0cfa767 100644 --- a/scripts/wk/kit/ufd.py +++ b/scripts/wk/kit/ufd.py @@ -10,7 +10,7 @@ from subprocess import CalledProcessError from collections import OrderedDict from docopt import docopt -from wk import io, log, std +from wk import io, log from wk.cfg.main import KIT_NAME_FULL, KIT_NAME_SHORT from wk.cfg.ufd import ( BOOT_ENTRIES, @@ -23,6 +23,8 @@ from wk.cfg.ufd import ( from wk.exe import get_json_from_command, run_program from wk.os import linux +from wk.ui import cli as ui # TODO: This is lazy + # STATIC VARIABLES DOCSTRING = '''WizardKit: Build UFD @@ -93,10 +95,10 @@ def build_ufd(): if args['--debug']: log.enable_debug_mode() if args['--update'] and args['EXTRA_IMAGES']: - std.print_warning('Extra images are ignored when updating') + ui.print_warning('Extra images are ignored when updating') args['EXTRA_IMAGES'] = [] log.update_log_path(dest_name='build-ufd', timestamp=True) - try_print = std.TryAndPrint() + try_print = ui.TryAndPrint() try_print.add_error('FileNotFoundError') try_print.catch_all = False try_print.indent = 2 @@ -104,9 +106,9 @@ def build_ufd(): try_print.width = 64 # Show header - std.print_success(KIT_NAME_FULL) - std.print_warning('UFD Build Tool') - std.print_warning(' ') + ui.print_success(KIT_NAME_FULL) + ui.print_warning('UFD Build Tool') + ui.print_warning(' ') # Verify selections ufd_dev = verify_ufd(args['--ufd-device']) @@ -118,7 +120,7 @@ def build_ufd(): # Prep UFD if not args['--update']: - std.print_info('Prep UFD') + ui.print_info('Prep UFD') try_print.run( message='Zeroing first 64MiB...', function=zero_device, @@ -170,8 +172,8 @@ def build_ufd(): ) # Copy sources - std.print_standard(' ') - std.print_info('Copy Sources') + ui.print_standard(' ') + ui.print_info('Copy Sources') try_print.run( 'Copying Memtest86...', io.recursive_copy, '/usr/share/memtest86-efi/', '/mnt/UFD/EFI/Memtest86/', overwrite=True, @@ -187,8 +189,8 @@ def build_ufd(): # Apply extra images if not args['--update']: - std.print_standard(' ') - std.print_info('Apply Extra Images') + ui.print_standard(' ') + ui.print_info('Apply Extra Images') for part_num, image_path in enumerate(extra_images): try_print.run( message=f'Applying {image_path.name}...', @@ -203,8 +205,8 @@ def build_ufd(): _f.write('\n'.join([image.name for image in extra_images])) # Update boot entries - std.print_standard(' ') - std.print_info('Boot Setup') + ui.print_standard(' ') + ui.print_info('Boot Setup') try_print.run( message='Updating boot entries...', function=update_boot_entries, @@ -235,8 +237,8 @@ def build_ufd(): ) # Hide items - std.print_standard(' ') - std.print_info('Final Touches') + ui.print_standard(' ') + ui.print_info('Final Touches') try_print.run( message='Hiding items...', function=hide_items, @@ -245,30 +247,30 @@ def build_ufd(): ) # Done - std.print_standard('\nDone.') + ui.print_standard('\nDone.') if not args['--force']: - std.pause('Press Enter to exit...') + ui.pause('Press Enter to exit...') def confirm_selections(update=False): """Ask tech to confirm selections, twice if necessary.""" - if not std.ask('Is the above information correct?'): - std.abort() + if not ui.ask('Is the above information correct?'): + ui.abort() # Safety check if not update: - std.print_standard(' ') - std.print_warning('SAFETY CHECK') - std.print_standard( + ui.print_standard(' ') + ui.print_warning('SAFETY CHECK') + ui.print_standard( 'All data will be DELETED from the disk and partition(s) listed above.') - std.print_colored( + ui.print_colored( ['This is irreversible and will lead to', 'DATA LOSS'], [None, 'RED'], ) - if not std.ask('Asking again to confirm, is this correct?'): - std.abort() + if not ui.ask('Asking again to confirm, is this correct?'): + ui.abort() - std.print_standard(' ') + ui.print_standard(' ') def copy_source(source, items, overwrite=False): @@ -481,12 +483,12 @@ def show_selections(args, sources, ufd_dev, ufd_sources, extra_images): """Show selections including non-specified options.""" # Sources - std.print_info('Sources') + ui.print_info('Sources') for label in ufd_sources.keys(): if label in sources: - std.print_standard(f' {label+":":<18} {sources[label]}') + ui.print_standard(f' {label+":":<18} {sources[label]}') else: - std.print_colored( + ui.print_colored( [f' {label+":":<18}', 'Not Specified'], [None, 'YELLOW'], ) @@ -498,15 +500,15 @@ def show_selections(args, sources, ufd_dev, ufd_sources, extra_images): print(f' {" ":<18} {image}') # Destination - std.print_standard(' ') - std.print_info('Destination') + ui.print_standard(' ') + ui.print_info('Destination') cmd = [ 'lsblk', '--nodeps', '--noheadings', '--paths', '--output', 'NAME,FSTYPE,TRAN,SIZE,VENDOR,MODEL,SERIAL', ufd_dev, ] proc = run_program(cmd, check=False) - std.print_standard(proc.stdout.strip()) + ui.print_standard(proc.stdout.strip()) cmd = [ 'lsblk', '--noheadings', '--paths', '--output', 'NAME,SIZE,FSTYPE,LABEL,MOUNTPOINT', @@ -514,14 +516,14 @@ def show_selections(args, sources, ufd_dev, ufd_sources, extra_images): ] proc = run_program(cmd, check=False) for line in proc.stdout.splitlines()[1:]: - std.print_standard(line) + ui.print_standard(line) # Notes if args['--update']: - std.print_warning('Updating kit in-place') + ui.print_warning('Updating kit in-place') elif args['--use-mbr']: - std.print_warning('Formatting using legacy MBR') - std.print_standard(' ') + ui.print_warning('Formatting using legacy MBR') + ui.print_standard(' ') def update_boot_entries(ufd_dev, images=None): @@ -621,11 +623,11 @@ def verify_sources(args, ufd_sources): try: s_path_obj = io.case_insensitive_path(s_path) except FileNotFoundError: - std.print_error(f'ERROR: {label} not found: {s_path}') - std.abort() + ui.print_error(f'ERROR: {label} not found: {s_path}') + ui.abort() if not is_valid_path(s_path_obj, data['Type']): - std.print_error(f'ERROR: Invalid {label} source: {s_path}') - std.abort() + ui.print_error(f'ERROR: Invalid {label} source: {s_path}') + ui.abort() sources[label] = s_path_obj return sources @@ -638,12 +640,12 @@ def verify_ufd(dev_path): try: ufd_dev = io.case_insensitive_path(dev_path) except FileNotFoundError: - std.print_error(f'ERROR: UFD device not found: {dev_path}') - std.abort() + ui.print_error(f'ERROR: UFD device not found: {dev_path}') + ui.abort() if not is_valid_path(ufd_dev, 'UFD'): - std.print_error(f'ERROR: Invalid UFD device: {ufd_dev}') - std.abort() + ui.print_error(f'ERROR: Invalid UFD device: {ufd_dev}') + ui.abort() return ufd_dev diff --git a/scripts/wk/log.py b/scripts/wk/log.py index b69afb1f..0b3e7e06 100644 --- a/scripts/wk/log.py +++ b/scripts/wk/log.py @@ -61,6 +61,24 @@ def format_log_path( return log_path +def get_log_filepath(): + """Get the log filepath from the root logger, returns pathlib.Path obj. + + NOTE: This will use the first handler baseFilename it finds (if any). + """ + log_filepath = None + root_logger = logging.getLogger() + + # Check handlers + for handler in root_logger.handlers: + if hasattr(handler, 'baseFilename'): + log_filepath = pathlib.Path(handler.baseFilename).resolve() + break + + # Done + return log_filepath + + def get_root_logger_path(): """Get path to log file from root logger, returns pathlib.Path obj.""" log_path = None diff --git a/scripts/wk/net.py b/scripts/wk/net.py index 3e9382a9..1c4c3383 100644 --- a/scripts/wk/net.py +++ b/scripts/wk/net.py @@ -8,9 +8,10 @@ import re import psutil from wk.exe import get_json_from_command, run_program -from wk.std import PLATFORM, GenericError, show_data +from wk.std import PLATFORM, GenericError from wk.cfg.net import BACKUP_SERVERS +from wk.ui.cli import show_data # TODO: This is lazy # REGEX diff --git a/scripts/wk/os/linux.py b/scripts/wk/os/linux.py index 481e687c..1c134f78 100644 --- a/scripts/wk/os/linux.py +++ b/scripts/wk/os/linux.py @@ -10,7 +10,8 @@ import subprocess from wk.cfg.hw import VOLUME_FAILURE_THRESHOLD, VOLUME_WARNING_THRESHOLD from wk.exe import get_json_from_command, popen_program, run_program from wk.log import format_log_path -from wk.std import bytes_to_string, color_string +from wk.std import bytes_to_string +from wk.ui.cli import color_string # TODO: This is lazy # STATIC VARIABLES diff --git a/scripts/wk/os/win.py b/scripts/wk/os/win.py index 3b5c80e3..bdf424ad 100644 --- a/scripts/wk/os/win.py +++ b/scripts/wk/os/win.py @@ -30,10 +30,9 @@ from wk.std import ( GenericError, GenericWarning, bytes_to_string, - color_string, - input_text, sleep, ) +from wk.ui.cli import color_string, input_text # TODO: This is lazy # STATIC VARIABLES diff --git a/scripts/wk/repairs/win.py b/scripts/wk/repairs/win.py index 938fb3b3..f468be33 100644 --- a/scripts/wk/repairs/win.py +++ b/scripts/wk/repairs/win.py @@ -58,6 +58,10 @@ from wk.os.win import ( from wk.std import ( GenericError, GenericWarning, + sleep, + ) +from wk.ui.cli import ( + # TODO: This is lazy Menu, TryAndPrint, abort, @@ -70,7 +74,6 @@ from wk.std import ( print_warning, set_title, show_data, - sleep, strip_colors, ) diff --git a/scripts/wk/setup/win.py b/scripts/wk/setup/win.py index 362c7b1a..ad388192 100644 --- a/scripts/wk/setup/win.py +++ b/scripts/wk/setup/win.py @@ -60,6 +60,10 @@ from wk.repairs.win import ( from wk.std import ( GenericError, GenericWarning, + sleep, + ) +from wk.ui.cli import ( + # TODO: This is lazy Menu, TryAndPrint, abort, @@ -73,7 +77,6 @@ from wk.std import ( print_warning, set_title, show_data, - sleep, strip_colors, ) diff --git a/scripts/wk/std.py b/scripts/wk/std.py index 65fd69f0..cea6b2e1 100644 --- a/scripts/wk/std.py +++ b/scripts/wk/std.py @@ -1,55 +1,13 @@ """WizardKit: Standard Functions""" # vim: sts=2 sw=2 ts=2 -import inspect -import itertools import logging -import lzma -import os -import pathlib -import pickle import platform import re -import socket -import subprocess -import sys import time -import traceback - -from collections import OrderedDict - -try: - from functools import cache -except ImportError: - # Assuming Python is < 3.9 - from functools import lru_cache as cache - -import requests - -from wk.cfg.main import ( - ENABLED_UPLOAD_DATA, - INDENT, - SUPPORT_MESSAGE, - WIDTH, - ) -from wk.cfg.net import CRASH_SERVER -from wk.log import get_root_logger_path # STATIC VARIABLES -COLORS = { - 'CLEAR': '\033[0m', - 'RED': '\033[31m', - 'RED_BLINK': '\033[31;5m', - 'ORANGE': '\033[31;1m', - 'ORANGE_RED': '\033[1;31;41m', - 'GREEN': '\033[32m', - 'YELLOW': '\033[33m', - 'YELLOW_BLINK': '\033[33;5m', - 'BLUE': '\033[34m', - 'PURPLE': '\033[35m', - 'CYAN': '\033[36m', - } LOG = logging.getLogger(__name__) PLATFORM = platform.system() REGEX_SIZE_STRING = re.compile( @@ -72,574 +30,7 @@ class GenericWarning(Exception): """ -# Classes -class Menu(): - """Object for tracking menu specific data and methods. - - Menu items are added to an OrderedDict so the order is preserved. - - ASSUMPTIONS: - 1. All entry names are unique. - 2. All action entry names start with different letters. - """ - def __init__(self, title='[Untitled Menu]'): - self.actions = OrderedDict() - self.options = OrderedDict() - self.sets = OrderedDict() - self.toggles = OrderedDict() - self.disabled_str = 'Disabled' - self.separator = '─' - self.title = title - - def _generate_menu_text(self): - """Generate menu text, returns str.""" - separator_string = self._get_separator_string() - menu_lines = [self.title, separator_string] if self.title else [] - - # Sets & toggles - for section in (self.sets, self.toggles): - for details in section.values(): - if details.get('Hidden', False): - continue - if details.get('Separator', False): - menu_lines.append(separator_string) - menu_lines.append(details['Display Name']) - if self.sets or self.toggles: - menu_lines.append(separator_string) - - # Options - for details in self.options.values(): - if details.get('Hidden', False): - continue - if details.get('Separator', False): - menu_lines.append(separator_string) - menu_lines.append(details['Display Name']) - if self.options: - menu_lines.append(separator_string) - - # Actions - for details in self.actions.values(): - if details.get('Hidden', False): - continue - if details.get('Separator', False): - menu_lines.append(separator_string) - menu_lines.append(details['Display Name']) - - # Show menu - menu_lines.append('') - menu_lines = [str(line) for line in menu_lines] - return '\n'.join(menu_lines) - - def _get_display_name( - self, name, details, - index=None, no_checkboxes=True, setting_item=False): - """Format display name based on details and args, returns str.""" - disabled = details.get('Disabled', False) - if setting_item and not details['Selected']: - # Display item in YELLOW - disabled = True - checkmark = '*' - if 'DISPLAY' in os.environ or PLATFORM == 'Darwin': - checkmark = '✓' - display_name = f'{index if index else name[:1].upper()}: ' - if not (index and index >= 10): - display_name = f' {display_name}' - if setting_item and 'Value' in details: - name = f'{name} = {details["Value"]}' - - # Add enabled status if necessary - if not no_checkboxes: - display_name += f'[{checkmark if details["Selected"] else " "}] ' - - # Add name - if disabled: - display_name += color_string(f'{name} ({self.disabled_str})', 'YELLOW') - else: - display_name += name - - # Done - return display_name - - def _get_separator_string(self): - """Format separator length based on name lengths, returns str.""" - separator_length = 0 - - # Check title line(s) - if self.title: - for line in self.title.split('\n'): - separator_length = max(separator_length, len(strip_colors(line))) - - # Loop over all item names - for section in (self.actions, self.options, self.sets, self.toggles): - for details in section.values(): - if details.get('Hidden', False): - # Skip hidden lines - continue - line = strip_colors(details['Display Name']) - separator_length = max(separator_length, len(line)) - separator_length += 1 - - # Done - return self.separator * separator_length - - def _get_valid_answers(self): - """Get valid answers based on menu items, returns list.""" - valid_answers = [] - - # Numbered items - index = 0 - for section in (self.sets, self.toggles, self.options): - for details in section.values(): - if details.get('Hidden', False): - # Don't increment index or add to valid_answers - continue - index += 1 - if not details.get('Disabled', False): - valid_answers.append(str(index)) - - # Action items - for name, details in self.actions.items(): - if not details.get('Disabled', False): - valid_answers.append(name[:1].upper()) - - # Done - return valid_answers - - def _resolve_selection(self, selection): - """Get menu item based on user selection, returns tuple.""" - offset = 1 - resolved_selection = None - if selection.isnumeric(): - # Enumerate over numbered entries - entries = [ - *self.sets.items(), - *self.toggles.items(), - *self.options.items(), - ] - for _i, details in enumerate(entries): - if details[1].get('Hidden', False): - offset -= 1 - elif str(_i+offset) == selection: - resolved_selection = (details) - break - else: - # Just check actions - for action, details in self.actions.items(): - if action.lower().startswith(selection.lower()): - resolved_selection = (action, details) - break - - # Done - return resolved_selection - - def _update(self, single_selection=True, settings_mode=False): - """Update menu items in preparation for printing to screen.""" - index = 0 - - # Fix selection status for sets - for set_details in self.sets.values(): - set_selected = True - set_targets = set_details['Targets'] - for option, option_details in self.options.items(): - if option in set_targets and not option_details['Selected']: - set_selected = False - elif option not in set_targets and option_details['Selected']: - set_selected = False - set_details['Selected'] = set_selected - - # Numbered sections - for section in (self.sets, self.toggles, self.options): - for name, details in section.items(): - if details.get('Hidden', False): - # Skip hidden lines and don't increment index - continue - index += 1 - details['Display Name'] = self._get_display_name( - name, - details, - index=index, - no_checkboxes=single_selection, - setting_item=settings_mode, - ) - - # Actions - for name, details in self.actions.items(): - details['Display Name'] = self._get_display_name( - name, - details, - no_checkboxes=True, - ) - - def _update_entry_selection_status(self, entry, toggle=True, status=None): - """Update entry selection status either directly or by toggling.""" - if entry in self.sets: - # Update targets not the set itself - new_status = not self.sets[entry]['Selected'] if toggle else status - targets = self.sets[entry]['Targets'] - self._update_set_selection_status(targets, new_status) - for section in (self.toggles, self.options, self.actions): - if entry in section: - if toggle: - section[entry]['Selected'] = not section[entry]['Selected'] - else: - section[entry]['Selected'] = status - - def _update_set_selection_status(self, targets, status): - """Select or deselect options based on targets and status.""" - for option, details in self.options.items(): - # If (new) status is True and this option is a target then select - # Otherwise deselect - details['Selected'] = status and option in targets - - def _user_select(self, prompt): - """Show menu and select an entry, returns str.""" - menu_text = self._generate_menu_text() - valid_answers = self._get_valid_answers() - - # Menu loop - while True: - clear_screen() - print(menu_text) - sleep(0.01) - answer = input_text(prompt).strip() - if answer.upper() in valid_answers: - break - - # Done - return answer - - def add_action(self, name, details=None): - """Add action to menu.""" - details = details if details else {} - details['Selected'] = details.get('Selected', False) - self.actions[name] = details - - def add_option(self, name, details=None): - """Add option to menu.""" - details = details if details else {} - details['Selected'] = details.get('Selected', False) - self.options[name] = details - - def add_set(self, name, details=None): - """Add set to menu.""" - details = details if details else {} - details['Selected'] = details.get('Selected', False) - - # Safety check - if 'Targets' not in details: - raise KeyError('Menu set has no targets') - - # Add set - self.sets[name] = details - - def add_toggle(self, name, details=None): - """Add toggle to menu.""" - details = details if details else {} - details['Selected'] = details.get('Selected', False) - self.toggles[name] = details - - def advanced_select(self, prompt='Please make a selection: '): - """Display menu and make multiple selections, returns tuple. - - NOTE: Menu is displayed until an action entry is selected. - """ - while True: - self._update(single_selection=False) - user_selection = self._user_select(prompt) - selected_entry = self._resolve_selection(user_selection) - if user_selection.isnumeric(): - # Update selection(s) - self._update_entry_selection_status(selected_entry[0]) - else: - # Action selected - break - - # Done - return selected_entry - - def settings_select(self, prompt='Please make a selection: '): - """Display menu and make multiple selections, returns tuple. - - NOTE: Menu is displayed until an action entry is selected. - """ - choice_kwargs = { - 'choices': ['T', 'C'], - 'prompt': 'Toggle or change value?', - } - - while True: - self._update(single_selection=True, settings_mode=True) - user_selection = self._user_select(prompt) - selected_entry = self._resolve_selection(user_selection) - if user_selection.isnumeric(): - if 'Value' in selected_entry[-1] and choice(**choice_kwargs) == 'C': - # Change - selected_entry[-1]['Value'] = input_text('Enter new value: ') - else: - # Toggle - self._update_entry_selection_status(selected_entry[0]) - else: - # Action selected - break - - # Done - return selected_entry - - def simple_select(self, prompt='Please make a selection: ', update=True): - """Display menu and make a single selection, returns tuple.""" - if update: - self._update() - user_selection = self._user_select(prompt) - return self._resolve_selection(user_selection) - - def update(self): - """Update menu with default settings.""" - self._update() - - -class TryAndPrint(): - """Object used to standardize running functions and returning the result. - - The errors and warning attributes are used to allow fine-tuned results - based on exception names. - """ - def __init__(self, msg_bad='FAILED', msg_good='SUCCESS'): - self.catch_all = True - self.indent = INDENT - self.list_errors = ['GenericError'] - self.list_warnings = ['GenericWarning'] - self.msg_bad = msg_bad - self.msg_good = msg_good - self.verbose = False - self.width = WIDTH - - def _format_exception_message(self, _exception): - """Format using the exception's args or name, returns str.""" - LOG.debug( - 'Formatting exception: %s, %s', - _exception.__class__.__name__, - _exception, - ) - message = '' - - # Format message string from _exception - try: - if isinstance(_exception, subprocess.CalledProcessError): - message = _exception.stderr - if not isinstance(message, str): - message = message.decode('utf-8') - message = message.strip() - elif isinstance(_exception, ZeroDivisionError): - # Skip and just use exception name below - pass - else: - message = str(_exception) - except Exception: - # Just use the exception name instead - pass - - # Prepend exception name - if _exception.__class__.__name__ not in ('GenericError', 'GenericWarning'): - try: - message = f'{_exception.__class__.__name__}: {message}' - except Exception: - message = f'UNKNOWN ERROR: {message}' - - # Fix multi-line messages - if '\n' in message: - try: - lines = [ - f'{" "*(self.indent+self.width)}{line.strip()}' - for line in message.splitlines() if line.strip() - ] - lines[0] = lines[0].strip() - message = '\n'.join(lines) - except Exception: - pass - - # Done - return message - - def _format_function_output(self, output, msg_good): - """Format function output for use in try_and_print(), returns str.""" - LOG.debug('Formatting output: %s', output) - - if not output: - raise GenericWarning('No output') - - # Ensure we're working with a list - if isinstance(output, subprocess.CompletedProcess): - stdout = output.stdout - if not isinstance(stdout, str): - stdout = stdout.decode('utf8') - output = stdout.strip().splitlines() - if not output: - # Going to treat these as successes (for now) - LOG.warning('Program output was empty, assuming good result.') - return color_string(msg_good, 'GREEN') - else: - try: - output = list(output) - except TypeError: - output = [output] - - # Safety check - if not output: - # Going to ignore empty function output for now - LOG.error('Output is empty') - return 'UNKNOWN' - - # Build result_msg - result_msg = f'{output.pop(0)}' - if output: - output = [f'{" "*(self.indent+self.width)}{line}' for line in output] - result_msg += '\n' + '\n'.join(output) - - # Done - return result_msg - - def _log_result(self, message, result_msg): - """Log result text without color formatting.""" - log_text = f'{" "*self.indent}{message:<{self.width}}{result_msg}' - for line in log_text.splitlines(): - line = strip_colors(line) - LOG.info(line) - - def add_error(self, exception_name): - """Add exception name to error list.""" - if exception_name not in self.list_errors: - self.list_errors.append(exception_name) - - def add_warning(self, exception_name): - """Add exception name to warning list.""" - if exception_name not in self.list_warnings: - self.list_warnings.append(exception_name) - - def run( - self, message, function, *args, - catch_all=None, msg_good=None, verbose=None, **kwargs): - """Run a function and print the results, returns results as dict. - - If catch_all is True then (nearly) all exceptions will be caught. - Otherwise if an exception occurs that wasn't specified it will be - re-raised. - - If the function returns data it will be used instead of msg_good, - msg_bad, or exception text. - The output should be a list or a subprocess.CompletedProcess object. - - If msg_good is passed it will override self.msg_good for this call. - - If verbose is True then exception names or messages will be used for - the result message. Otherwise it will simply be set to result_bad. - - If catch_all and/or verbose are passed it will override - self.catch_all and/or self.verbose for this call. - - args and kwargs are passed to the function. - """ - LOG.debug('function: %s.%s', function.__module__, function.__name__) - LOG.debug('args: %s', args) - LOG.debug('kwargs: %s', kwargs) - LOG.debug( - 'catch_all: %s, msg_good: %s, verbose: %s', - catch_all, - msg_good, - verbose, - ) - f_exception = None - catch_all = catch_all if catch_all is not None else self.catch_all - msg_good = msg_good if msg_good is not None else self.msg_good - output = None - result_msg = 'UNKNOWN' - verbose = verbose if verbose is not None else self.verbose - - # Build exception tuples - e_exceptions = tuple(get_exception(e) for e in self.list_errors) - w_exceptions = tuple(get_exception(e) for e in self.list_warnings) - - # Run function and catch exceptions - print(f'{" "*self.indent}{message:<{self.width}}', end='', flush=True) - LOG.debug('Running function: %s.%s', function.__module__, function.__name__) - try: - output = function(*args, **kwargs) - except w_exceptions as _exception: - # Warnings - result_msg = self._format_exception_message(_exception) - print_warning(result_msg, log=False) - f_exception = _exception - except e_exceptions as _exception: - # Exceptions - result_msg = self._format_exception_message(_exception) - print_error(result_msg, log=False) - f_exception = _exception - except Exception as _exception: - # Unexpected exceptions - if verbose: - result_msg = self._format_exception_message(_exception) - else: - result_msg = self.msg_bad - print_error(result_msg, log=False) - f_exception = _exception - if not catch_all: - # Re-raise error as necessary - raise - else: - # Success - if output: - result_msg = self._format_function_output(output, msg_good) - print(result_msg) - else: - result_msg = msg_good - print_success(result_msg, log=False) - - # Done - self._log_result(message, result_msg) - return { - 'Exception': f_exception, - 'Failed': bool(f_exception), - 'Message': result_msg, - 'Output': output, - } - - # Functions -def abort(prompt='Aborted.', show_prompt=True, return_code=1): - """Abort script.""" - print_warning(prompt) - if show_prompt: - sleep(0.5) - pause(prompt='Press Enter to exit... ') - sys.exit(return_code) - - -def ask(prompt='Kotaero!'): - """Prompt the user with a Y/N question, returns bool.""" - answer = None - prompt = f'{prompt} [Y/N]: ' - - # Loop until acceptable answer is given - while answer is None: - tmp = input_text(prompt) - if re.search(r'^y(es|up|)$', tmp, re.IGNORECASE): - answer = True - elif re.search(r'^n(o|ope|)$', tmp, re.IGNORECASE): - answer = False - - # Done - LOG.info('%s%s', prompt, 'Yes' if answer else 'No') - return answer - - -def beep(repeat=1): - """Play system bell with optional repeat.""" - while repeat >= 1: - # Print bell char without a newline - print('\a', end='', flush=True) - sleep(0.5) - repeat -= 1 - - def bytes_to_string(size, decimals=0, use_binary=True): """Convert size into a human-readable format, returns str. @@ -682,323 +73,6 @@ def bytes_to_string(size, decimals=0, use_binary=True): return size_str -def choice(choices, prompt='答えろ!'): - """Choose an option from a provided list, returns str. - - Choices provided will be converted to uppercase and returned as such. - Similar to the commands choice (Windows) and select (Linux). - """ - LOG.debug('choices: %s, prompt: %s', choices, prompt) - answer = None - choices = [str(c).upper()[:1] for c in choices] - prompt = f'{prompt} [{"/".join(choices)}]' - regex = f'^({"|".join(choices)})$' - - # Loop until acceptable answer is given - while answer is None: - tmp = input_text(prompt=prompt) - if re.search(regex, tmp, re.IGNORECASE): - answer = tmp.upper() - - # Done - LOG.info('%s %s', prompt, answer) - return answer - - -def clear_screen(): - """Simple wrapper for clear/cls.""" - cmd = 'cls' if os.name == 'nt' else 'clear' - proc = subprocess.run(cmd, check=False, shell=True, stderr=subprocess.PIPE) - - # Workaround for live macOS env - if proc.returncode != 0: - print('\033c') - - -def color_string(strings, colors, sep=' '): - """Build colored string using ANSI escapes, returns str.""" - clear_code = COLORS['CLEAR'] - msg = [] - - # Convert to tuples if necessary - if isinstance(strings, (str, pathlib.Path)): - strings = (strings,) - if isinstance(colors, (str, pathlib.Path)): - colors = (colors,) - - # Convert to strings if necessary - try: - iter(strings) - except TypeError: - # Assuming single element passed, convert to string - strings = (str(strings),) - try: - iter(colors) - except TypeError: - # Assuming single element passed, convert to string - colors = (str(colors),) - - # Build new string with color escapes added - for string, color in itertools.zip_longest(strings, colors): - color_code = COLORS.get(color, clear_code) - msg.append(f'{color_code}{string}{clear_code}') - - # Done - return sep.join(msg) - - -def generate_debug_report(): - """Generate debug report, returns str.""" - platform_function_list = ( - 'architecture', - 'machine', - 'platform', - 'python_version', - ) - report = [] - - # Logging data - log_path = get_log_filepath() - if log_path: - report.append('------ Start Log -------') - report.append('') - with open(log_path, 'r', encoding='utf-8') as log_file: - report.extend(log_file.read().splitlines()) - report.append('') - report.append('------- End Log --------') - - # System - report.append('--- Start debug info ---') - report.append('') - report.append('[System]') - report.append(f' {"FQDN":<24} {socket.getfqdn()}') - for func in platform_function_list: - func_name = func.replace('_', ' ').capitalize() - func_result = getattr(platform, func)() - report.append(f' {func_name:<24} {func_result}') - report.append(f' {"Python sys.argv":<24} {sys.argv}') - report.append('') - - # Environment - report.append('[Environment Variables]') - for key, value in sorted(os.environ.items()): - report.append(f' {key:<24} {value}') - report.append('') - - # Done - report.append('---- End debug info ----') - return '\n'.join(report) - - -@cache -def get_exception(name): - """Get exception by name, returns exception object. - - [Doctest] - >>> t = TryAndPrint() - >>> t._get_exception('AttributeError') - - >>> t._get_exception('CalledProcessError') - - >>> t._get_exception('GenericError') - - """ - LOG.debug('Getting exception: %s', name) - obj = getattr(sys.modules[__name__], name, None) - if obj: - return obj - - # Try builtin classes - obj = getattr(sys.modules['builtins'], name, None) - if obj: - return obj - - # Try all modules - for _mod in sys.modules.values(): - obj = getattr(_mod, name, None) - if obj: - break - - # Check if not found - if not obj: - raise AttributeError(f'Failed to find exception: {name}') - - # Done - return obj - - -def get_log_filepath(): - """Get the log filepath from the root logger, returns pathlib.Path obj. - - NOTE: This will use the first handler baseFilename it finds (if any). - """ - log_filepath = None - root_logger = logging.getLogger() - - # Check handlers - for handler in root_logger.handlers: - if hasattr(handler, 'baseFilename'): - log_filepath = pathlib.Path(handler.baseFilename).resolve() - break - - # Done - return log_filepath - - -def input_text(prompt='Enter text', allow_empty_response=True): - """Get text from user, returns string.""" - prompt = str(prompt) - response = None - if prompt[-1:] != ' ': - prompt += ' ' - print(prompt, end='', flush=True) - - while response is None: - try: - response = input() - LOG.debug('%s%s', prompt, response) - except EOFError: - # Ignore and try again - LOG.warning('Exception occured', exc_info=True) - print('', flush=True) - if not allow_empty_response: - if response is None or not response.strip(): - # The None check here is used to avoid a TypeError if response is None - print(f'\r{prompt}', end='', flush=True) - response = None - - return response - - -def major_exception(): - """Display traceback, optionally upload detailes, and exit.""" - LOG.critical('Major exception encountered', exc_info=True) - print_error('Major exception', log=False) - print_warning(SUPPORT_MESSAGE) - if ENABLED_UPLOAD_DATA: - print_warning('Also, please run upload-logs to help debugging!') - print(traceback.format_exc()) - - # Done - pause('Press Enter to exit... ') - raise SystemExit(1) - - -def pause(prompt='Press Enter to continue... '): - """Simple pause implementation.""" - input_text(prompt) - - -def print_colored(strings, colors, log=False, sep=' ', **kwargs): - """Prints strings in the colors specified.""" - LOG.debug( - 'strings: %s, colors: %s, sep: %s, kwargs: %s', - strings, colors, sep, kwargs, - ) - msg = color_string(strings, colors, sep=sep) - print_options = { - 'end': kwargs.get('end', '\n'), - 'file': kwargs.get('file', sys.stdout), - 'flush': kwargs.get('flush', False), - } - - print(msg, **print_options) - if log: - LOG.info(strip_colors(msg)) - - -def print_error(msg, log=True, **kwargs): - """Prints message in RED and log as ERROR.""" - if 'file' not in kwargs: - # Only set if not specified - kwargs['file'] = sys.stderr - print_colored(msg, 'RED', **kwargs) - if log: - LOG.error(msg) - - -def print_info(msg, log=True, **kwargs): - """Prints message in BLUE and log as INFO.""" - print_colored(msg, 'BLUE', **kwargs) - if log: - LOG.info(msg) - - -def print_report(report, indent=None, log=True): - """Print report to screen and optionally to log.""" - for line in report: - if indent: - line = f'{" "*indent}{line}' - print(line) - if log: - LOG.info(strip_colors(line)) - - -def print_standard(msg, log=True, **kwargs): - """Prints message and log as INFO.""" - print(msg, **kwargs) - if log: - LOG.info(msg) - - -def print_success(msg, log=True, **kwargs): - """Prints message in GREEN and log as INFO.""" - print_colored(msg, 'GREEN', **kwargs) - if log: - LOG.info(msg) - - -def print_warning(msg, log=True, **kwargs): - """Prints message in YELLOW and log as WARNING.""" - if 'file' not in kwargs: - # Only set if not specified - kwargs['file'] = sys.stderr - print_colored(msg, 'YELLOW', **kwargs) - if log: - LOG.warning(msg) - - -def save_pickles(obj_dict, out_path=None): - """Save dict of objects using pickle.""" - LOG.info('Saving pickles') - - # Set path - if not out_path: - out_path = pathlib.Path(f'{get_root_logger_path().parent}/debug') - - # Save pickles - try: - for name, obj in obj_dict.copy().items(): - if name.startswith('__') or inspect.ismodule(obj): - continue - with open(f'{out_path}/{name}.pickle', 'wb') as _f: - pickle.dump(obj, _f, protocol=pickle.HIGHEST_PROTOCOL) - except Exception: - LOG.error('Failed to save all the pickles', exc_info=True) - - -def set_title(title): - """Set window title.""" - LOG.debug('title: %s', title) - if os.name == 'nt': - os.system(f'title {title}') - else: - print_error('Setting the title is only supported under Windows.') - - -def show_data(message, data, color=None, indent=None, width=None): - """Display info using default or provided indent and width.""" - colors = (None, color if color else None) - indent = INDENT if indent is None else indent - width = WIDTH if width is None else width - print_colored( - (f'{" "*indent}{message:<{width}}', data), - colors, - log=True, - sep='', - ) - - def sleep(seconds=2): """Simple wrapper for time.sleep.""" time.sleep(seconds) @@ -1053,55 +127,5 @@ def string_to_bytes(size, assume_binary=False): return size -def strip_colors(string): - """Strip known ANSI color escapes from string, returns str.""" - LOG.debug('string: %s', string) - for color in COLORS.values(): - string = string.replace(color, '') - return string - - -def upload_debug_report(report, compress=True, reason='DEBUG'): - """Upload debug report to CRASH_SERVER as specified in wk.cfg.main.""" - LOG.info('Uploading debug report to %s', CRASH_SERVER.get('Name', '?')) - headers = CRASH_SERVER.get('Headers', {'X-Requested-With': 'XMLHttpRequest'}) - if compress: - headers['Content-Type'] = 'application/octet-stream' - - # Check if the required server details are available - if not all(CRASH_SERVER.get(key, False) for key in ('Name', 'Url', 'User')): - msg = 'Server details missing, aborting upload.' - print_error(msg) - raise RuntimeError(msg) - - # Set filename (based on the logging config if possible) - filename = 'Unknown' - log_path = get_log_filepath() - if log_path: - # Strip everything but the prefix - filename = re.sub(r'^(.*)_(\d{4}-\d{2}-\d{2}.*)', r'\1', log_path.name) - filename = f'{filename}_{reason}_{time.strftime("%Y-%m-%d_%H%M%S%z")}.log' - LOG.debug('filename: %s', filename) - - # Compress report - if compress: - filename += '.xz' - xz_report = lzma.compress(report.encode('utf8')) - - # Upload report - url = f'{CRASH_SERVER["Url"]}/{filename}' - response = requests.put( - url, - auth=(CRASH_SERVER['User'], CRASH_SERVER.get('Pass', '')), - data=xz_report if compress else report, - headers=headers, - timeout=60, - ) - - # Check response - if not response.ok: - raise RuntimeError('Failed to upload report') - - if __name__ == '__main__': print("This file is not meant to be called directly.") diff --git a/scripts/wk/ui/__init__.py b/scripts/wk/ui/__init__.py new file mode 100644 index 00000000..eb6fa151 --- /dev/null +++ b/scripts/wk/ui/__init__.py @@ -0,0 +1,3 @@ +"""WizardKit: ui module init""" + +from . import cli diff --git a/scripts/wk/ui/cli.py b/scripts/wk/ui/cli.py new file mode 100644 index 00000000..ecd8965a --- /dev/null +++ b/scripts/wk/ui/cli.py @@ -0,0 +1,862 @@ +"""WizardKit: CLI functions""" +# vim: sts=2 sw=2 ts=2 + +import itertools +import logging +import os +import pathlib +import platform +import re +import subprocess +import sys +import traceback + +from collections import OrderedDict + +try: + from functools import cache +except ImportError: + # Assuming Python is < 3.9 + from functools import lru_cache as cache + +from wk.cfg.main import ( + ENABLED_UPLOAD_DATA, + INDENT, + SUPPORT_MESSAGE, + WIDTH, + ) +from wk.std import (sleep, GenericWarning) + +# STATIC VARIABLES +COLORS = { + 'CLEAR': '\033[0m', + 'RED': '\033[31m', + 'RED_BLINK': '\033[31;5m', + 'ORANGE': '\033[31;1m', + 'ORANGE_RED': '\033[1;31;41m', + 'GREEN': '\033[32m', + 'YELLOW': '\033[33m', + 'YELLOW_BLINK': '\033[33;5m', + 'BLUE': '\033[34m', + 'PURPLE': '\033[35m', + 'CYAN': '\033[36m', + } +LOG = logging.getLogger(__name__) +PLATFORM = platform.system() + + +# Classes +class Menu(): + """Object for tracking menu specific data and methods. + + Menu items are added to an OrderedDict so the order is preserved. + + ASSUMPTIONS: + 1. All entry names are unique. + 2. All action entry names start with different letters. + """ + def __init__(self, title='[Untitled Menu]'): + self.actions = OrderedDict() + self.options = OrderedDict() + self.sets = OrderedDict() + self.toggles = OrderedDict() + self.disabled_str = 'Disabled' + self.separator = '─' + self.title = title + + def _generate_menu_text(self): + """Generate menu text, returns str.""" + separator_string = self._get_separator_string() + menu_lines = [self.title, separator_string] if self.title else [] + + # Sets & toggles + for section in (self.sets, self.toggles): + for details in section.values(): + if details.get('Hidden', False): + continue + if details.get('Separator', False): + menu_lines.append(separator_string) + menu_lines.append(details['Display Name']) + if self.sets or self.toggles: + menu_lines.append(separator_string) + + # Options + for details in self.options.values(): + if details.get('Hidden', False): + continue + if details.get('Separator', False): + menu_lines.append(separator_string) + menu_lines.append(details['Display Name']) + if self.options: + menu_lines.append(separator_string) + + # Actions + for details in self.actions.values(): + if details.get('Hidden', False): + continue + if details.get('Separator', False): + menu_lines.append(separator_string) + menu_lines.append(details['Display Name']) + + # Show menu + menu_lines.append('') + menu_lines = [str(line) for line in menu_lines] + return '\n'.join(menu_lines) + + def _get_display_name( + self, name, details, + index=None, no_checkboxes=True, setting_item=False): + """Format display name based on details and args, returns str.""" + disabled = details.get('Disabled', False) + if setting_item and not details['Selected']: + # Display item in YELLOW + disabled = True + checkmark = '*' + if 'DISPLAY' in os.environ or PLATFORM == 'Darwin': + checkmark = '✓' + display_name = f'{index if index else name[:1].upper()}: ' + if not (index and index >= 10): + display_name = f' {display_name}' + if setting_item and 'Value' in details: + name = f'{name} = {details["Value"]}' + + # Add enabled status if necessary + if not no_checkboxes: + display_name += f'[{checkmark if details["Selected"] else " "}] ' + + # Add name + if disabled: + display_name += color_string(f'{name} ({self.disabled_str})', 'YELLOW') + else: + display_name += name + + # Done + return display_name + + def _get_separator_string(self): + """Format separator length based on name lengths, returns str.""" + separator_length = 0 + + # Check title line(s) + if self.title: + for line in self.title.split('\n'): + separator_length = max(separator_length, len(strip_colors(line))) + + # Loop over all item names + for section in (self.actions, self.options, self.sets, self.toggles): + for details in section.values(): + if details.get('Hidden', False): + # Skip hidden lines + continue + line = strip_colors(details['Display Name']) + separator_length = max(separator_length, len(line)) + separator_length += 1 + + # Done + return self.separator * separator_length + + def _get_valid_answers(self): + """Get valid answers based on menu items, returns list.""" + valid_answers = [] + + # Numbered items + index = 0 + for section in (self.sets, self.toggles, self.options): + for details in section.values(): + if details.get('Hidden', False): + # Don't increment index or add to valid_answers + continue + index += 1 + if not details.get('Disabled', False): + valid_answers.append(str(index)) + + # Action items + for name, details in self.actions.items(): + if not details.get('Disabled', False): + valid_answers.append(name[:1].upper()) + + # Done + return valid_answers + + def _resolve_selection(self, selection): + """Get menu item based on user selection, returns tuple.""" + offset = 1 + resolved_selection = None + if selection.isnumeric(): + # Enumerate over numbered entries + entries = [ + *self.sets.items(), + *self.toggles.items(), + *self.options.items(), + ] + for _i, details in enumerate(entries): + if details[1].get('Hidden', False): + offset -= 1 + elif str(_i+offset) == selection: + resolved_selection = (details) + break + else: + # Just check actions + for action, details in self.actions.items(): + if action.lower().startswith(selection.lower()): + resolved_selection = (action, details) + break + + # Done + return resolved_selection + + def _update(self, single_selection=True, settings_mode=False): + """Update menu items in preparation for printing to screen.""" + index = 0 + + # Fix selection status for sets + for set_details in self.sets.values(): + set_selected = True + set_targets = set_details['Targets'] + for option, option_details in self.options.items(): + if option in set_targets and not option_details['Selected']: + set_selected = False + elif option not in set_targets and option_details['Selected']: + set_selected = False + set_details['Selected'] = set_selected + + # Numbered sections + for section in (self.sets, self.toggles, self.options): + for name, details in section.items(): + if details.get('Hidden', False): + # Skip hidden lines and don't increment index + continue + index += 1 + details['Display Name'] = self._get_display_name( + name, + details, + index=index, + no_checkboxes=single_selection, + setting_item=settings_mode, + ) + + # Actions + for name, details in self.actions.items(): + details['Display Name'] = self._get_display_name( + name, + details, + no_checkboxes=True, + ) + + def _update_entry_selection_status(self, entry, toggle=True, status=None): + """Update entry selection status either directly or by toggling.""" + if entry in self.sets: + # Update targets not the set itself + new_status = not self.sets[entry]['Selected'] if toggle else status + targets = self.sets[entry]['Targets'] + self._update_set_selection_status(targets, new_status) + for section in (self.toggles, self.options, self.actions): + if entry in section: + if toggle: + section[entry]['Selected'] = not section[entry]['Selected'] + else: + section[entry]['Selected'] = status + + def _update_set_selection_status(self, targets, status): + """Select or deselect options based on targets and status.""" + for option, details in self.options.items(): + # If (new) status is True and this option is a target then select + # Otherwise deselect + details['Selected'] = status and option in targets + + def _user_select(self, prompt): + """Show menu and select an entry, returns str.""" + menu_text = self._generate_menu_text() + valid_answers = self._get_valid_answers() + + # Menu loop + while True: + clear_screen() + print(menu_text) + sleep(0.01) + answer = input_text(prompt).strip() + if answer.upper() in valid_answers: + break + + # Done + return answer + + def add_action(self, name, details=None): + """Add action to menu.""" + details = details if details else {} + details['Selected'] = details.get('Selected', False) + self.actions[name] = details + + def add_option(self, name, details=None): + """Add option to menu.""" + details = details if details else {} + details['Selected'] = details.get('Selected', False) + self.options[name] = details + + def add_set(self, name, details=None): + """Add set to menu.""" + details = details if details else {} + details['Selected'] = details.get('Selected', False) + + # Safety check + if 'Targets' not in details: + raise KeyError('Menu set has no targets') + + # Add set + self.sets[name] = details + + def add_toggle(self, name, details=None): + """Add toggle to menu.""" + details = details if details else {} + details['Selected'] = details.get('Selected', False) + self.toggles[name] = details + + def advanced_select(self, prompt='Please make a selection: '): + """Display menu and make multiple selections, returns tuple. + + NOTE: Menu is displayed until an action entry is selected. + """ + while True: + self._update(single_selection=False) + user_selection = self._user_select(prompt) + selected_entry = self._resolve_selection(user_selection) + if user_selection.isnumeric(): + # Update selection(s) + self._update_entry_selection_status(selected_entry[0]) + else: + # Action selected + break + + # Done + return selected_entry + + def settings_select(self, prompt='Please make a selection: '): + """Display menu and make multiple selections, returns tuple. + + NOTE: Menu is displayed until an action entry is selected. + """ + choice_kwargs = { + 'choices': ['T', 'C'], + 'prompt': 'Toggle or change value?', + } + + while True: + self._update(single_selection=True, settings_mode=True) + user_selection = self._user_select(prompt) + selected_entry = self._resolve_selection(user_selection) + if user_selection.isnumeric(): + if 'Value' in selected_entry[-1] and choice(**choice_kwargs) == 'C': + # Change + selected_entry[-1]['Value'] = input_text('Enter new value: ') + else: + # Toggle + self._update_entry_selection_status(selected_entry[0]) + else: + # Action selected + break + + # Done + return selected_entry + + def simple_select(self, prompt='Please make a selection: ', update=True): + """Display menu and make a single selection, returns tuple.""" + if update: + self._update() + user_selection = self._user_select(prompt) + return self._resolve_selection(user_selection) + + def update(self): + """Update menu with default settings.""" + self._update() + + +class TryAndPrint(): + """Object used to standardize running functions and returning the result. + + The errors and warning attributes are used to allow fine-tuned results + based on exception names. + """ + def __init__(self, msg_bad='FAILED', msg_good='SUCCESS'): + self.catch_all = True + self.indent = INDENT + self.list_errors = ['GenericError'] + self.list_warnings = ['GenericWarning'] + self.msg_bad = msg_bad + self.msg_good = msg_good + self.verbose = False + self.width = WIDTH + + def _format_exception_message(self, _exception): + """Format using the exception's args or name, returns str.""" + LOG.debug( + 'Formatting exception: %s, %s', + _exception.__class__.__name__, + _exception, + ) + message = '' + + # Format message string from _exception + try: + if isinstance(_exception, subprocess.CalledProcessError): + message = _exception.stderr + if not isinstance(message, str): + message = message.decode('utf-8') + message = message.strip() + elif isinstance(_exception, ZeroDivisionError): + # Skip and just use exception name below + pass + else: + message = str(_exception) + except Exception: + # Just use the exception name instead + pass + + # Prepend exception name + if _exception.__class__.__name__ not in ('GenericError', 'GenericWarning'): + try: + message = f'{_exception.__class__.__name__}: {message}' + except Exception: + message = f'UNKNOWN ERROR: {message}' + + # Fix multi-line messages + if '\n' in message: + try: + lines = [ + f'{" "*(self.indent+self.width)}{line.strip()}' + for line in message.splitlines() if line.strip() + ] + lines[0] = lines[0].strip() + message = '\n'.join(lines) + except Exception: + pass + + # Done + return message + + def _format_function_output(self, output, msg_good): + """Format function output for use in try_and_print(), returns str.""" + LOG.debug('Formatting output: %s', output) + + if not output: + raise GenericWarning('No output') + + # Ensure we're working with a list + if isinstance(output, subprocess.CompletedProcess): + stdout = output.stdout + if not isinstance(stdout, str): + stdout = stdout.decode('utf8') + output = stdout.strip().splitlines() + if not output: + # Going to treat these as successes (for now) + LOG.warning('Program output was empty, assuming good result.') + return color_string(msg_good, 'GREEN') + else: + try: + output = list(output) + except TypeError: + output = [output] + + # Safety check + if not output: + # Going to ignore empty function output for now + LOG.error('Output is empty') + return 'UNKNOWN' + + # Build result_msg + result_msg = f'{output.pop(0)}' + if output: + output = [f'{" "*(self.indent+self.width)}{line}' for line in output] + result_msg += '\n' + '\n'.join(output) + + # Done + return result_msg + + def _log_result(self, message, result_msg): + """Log result text without color formatting.""" + log_text = f'{" "*self.indent}{message:<{self.width}}{result_msg}' + for line in log_text.splitlines(): + line = strip_colors(line) + LOG.info(line) + + def add_error(self, exception_name): + """Add exception name to error list.""" + if exception_name not in self.list_errors: + self.list_errors.append(exception_name) + + def add_warning(self, exception_name): + """Add exception name to warning list.""" + if exception_name not in self.list_warnings: + self.list_warnings.append(exception_name) + + def run( + self, message, function, *args, + catch_all=None, msg_good=None, verbose=None, **kwargs): + """Run a function and print the results, returns results as dict. + + If catch_all is True then (nearly) all exceptions will be caught. + Otherwise if an exception occurs that wasn't specified it will be + re-raised. + + If the function returns data it will be used instead of msg_good, + msg_bad, or exception text. + The output should be a list or a subprocess.CompletedProcess object. + + If msg_good is passed it will override self.msg_good for this call. + + If verbose is True then exception names or messages will be used for + the result message. Otherwise it will simply be set to result_bad. + + If catch_all and/or verbose are passed it will override + self.catch_all and/or self.verbose for this call. + + args and kwargs are passed to the function. + """ + LOG.debug('function: %s.%s', function.__module__, function.__name__) + LOG.debug('args: %s', args) + LOG.debug('kwargs: %s', kwargs) + LOG.debug( + 'catch_all: %s, msg_good: %s, verbose: %s', + catch_all, + msg_good, + verbose, + ) + f_exception = None + catch_all = catch_all if catch_all is not None else self.catch_all + msg_good = msg_good if msg_good is not None else self.msg_good + output = None + result_msg = 'UNKNOWN' + verbose = verbose if verbose is not None else self.verbose + + # Build exception tuples + e_exceptions = tuple(get_exception(e) for e in self.list_errors) + w_exceptions = tuple(get_exception(e) for e in self.list_warnings) + + # Run function and catch exceptions + print(f'{" "*self.indent}{message:<{self.width}}', end='', flush=True) + LOG.debug('Running function: %s.%s', function.__module__, function.__name__) + try: + output = function(*args, **kwargs) + except w_exceptions as _exception: + # Warnings + result_msg = self._format_exception_message(_exception) + print_warning(result_msg, log=False) + f_exception = _exception + except e_exceptions as _exception: + # Exceptions + result_msg = self._format_exception_message(_exception) + print_error(result_msg, log=False) + f_exception = _exception + except Exception as _exception: + # Unexpected exceptions + if verbose: + result_msg = self._format_exception_message(_exception) + else: + result_msg = self.msg_bad + print_error(result_msg, log=False) + f_exception = _exception + if not catch_all: + # Re-raise error as necessary + raise + else: + # Success + if output: + result_msg = self._format_function_output(output, msg_good) + print(result_msg) + else: + result_msg = msg_good + print_success(result_msg, log=False) + + # Done + self._log_result(message, result_msg) + return { + 'Exception': f_exception, + 'Failed': bool(f_exception), + 'Message': result_msg, + 'Output': output, + } + + +# Functions +def abort(prompt='Aborted.', show_prompt=True, return_code=1): + """Abort script.""" + print_warning(prompt) + if show_prompt: + sleep(0.5) + pause(prompt='Press Enter to exit... ') + sys.exit(return_code) + + +def ask(prompt='Kotaero!'): + """Prompt the user with a Y/N question, returns bool.""" + answer = None + prompt = f'{prompt} [Y/N]: ' + + # Loop until acceptable answer is given + while answer is None: + tmp = input_text(prompt) + if re.search(r'^y(es|up|)$', tmp, re.IGNORECASE): + answer = True + elif re.search(r'^n(o|ope|)$', tmp, re.IGNORECASE): + answer = False + + # Done + LOG.info('%s%s', prompt, 'Yes' if answer else 'No') + return answer + + +def beep(repeat=1): + """Play system bell with optional repeat.""" + while repeat >= 1: + # Print bell char without a newline + print('\a', end='', flush=True) + sleep(0.5) + repeat -= 1 + + +def choice(choices, prompt='答えろ!'): + """Choose an option from a provided list, returns str. + + Choices provided will be converted to uppercase and returned as such. + Similar to the commands choice (Windows) and select (Linux). + """ + LOG.debug('choices: %s, prompt: %s', choices, prompt) + answer = None + choices = [str(c).upper()[:1] for c in choices] + prompt = f'{prompt} [{"/".join(choices)}]' + regex = f'^({"|".join(choices)})$' + + # Loop until acceptable answer is given + while answer is None: + tmp = input_text(prompt=prompt) + if re.search(regex, tmp, re.IGNORECASE): + answer = tmp.upper() + + # Done + LOG.info('%s %s', prompt, answer) + return answer + + +def clear_screen(): + """Simple wrapper for clear/cls.""" + cmd = 'cls' if os.name == 'nt' else 'clear' + proc = subprocess.run(cmd, check=False, shell=True, stderr=subprocess.PIPE) + + # Workaround for live macOS env + if proc.returncode != 0: + print('\033c') + + +def color_string(strings, colors, sep=' '): + """Build colored string using ANSI escapes, returns str.""" + clear_code = COLORS['CLEAR'] + msg = [] + + # Convert to tuples if necessary + if isinstance(strings, (str, pathlib.Path)): + strings = (strings,) + if isinstance(colors, (str, pathlib.Path)): + colors = (colors,) + + # Convert to strings if necessary + try: + iter(strings) + except TypeError: + # Assuming single element passed, convert to string + strings = (str(strings),) + try: + iter(colors) + except TypeError: + # Assuming single element passed, convert to string + colors = (str(colors),) + + # Build new string with color escapes added + for string, color in itertools.zip_longest(strings, colors): + color_code = COLORS.get(color, clear_code) + msg.append(f'{color_code}{string}{clear_code}') + + # Done + return sep.join(msg) + + +@cache +def get_exception(name): + """Get exception by name, returns exception object. + + [Doctest] + >>> t = TryAndPrint() + >>> t._get_exception('AttributeError') + + >>> t._get_exception('CalledProcessError') + + >>> t._get_exception('GenericError') + + """ + LOG.debug('Getting exception: %s', name) + obj = getattr(sys.modules[__name__], name, None) + if obj: + return obj + + # Try builtin classes + obj = getattr(sys.modules['builtins'], name, None) + if obj: + return obj + + # Try all modules + for _mod in sys.modules.values(): + obj = getattr(_mod, name, None) + if obj: + break + + # Check if not found + if not obj: + raise AttributeError(f'Failed to find exception: {name}') + + # Done + return obj + + +def input_text(prompt='Enter text', allow_empty_response=True): + """Get text from user, returns string.""" + prompt = str(prompt) + response = None + if prompt[-1:] != ' ': + prompt += ' ' + print(prompt, end='', flush=True) + + while response is None: + try: + response = input() + LOG.debug('%s%s', prompt, response) + except EOFError: + # Ignore and try again + LOG.warning('Exception occured', exc_info=True) + print('', flush=True) + if not allow_empty_response: + if response is None or not response.strip(): + # The None check here is used to avoid a TypeError if response is None + print(f'\r{prompt}', end='', flush=True) + response = None + + return response + + +def major_exception(): + """Display traceback, optionally upload detailes, and exit.""" + LOG.critical('Major exception encountered', exc_info=True) + print_error('Major exception', log=False) + print_warning(SUPPORT_MESSAGE) + if ENABLED_UPLOAD_DATA: + print_warning('Also, please run upload-logs to help debugging!') + print(traceback.format_exc()) + + # Done + pause('Press Enter to exit... ') + raise SystemExit(1) + + +def pause(prompt='Press Enter to continue... '): + """Simple pause implementation.""" + input_text(prompt) + + +def print_colored(strings, colors, log=False, sep=' ', **kwargs): + """Prints strings in the colors specified.""" + LOG.debug( + 'strings: %s, colors: %s, sep: %s, kwargs: %s', + strings, colors, sep, kwargs, + ) + msg = color_string(strings, colors, sep=sep) + print_options = { + 'end': kwargs.get('end', '\n'), + 'file': kwargs.get('file', sys.stdout), + 'flush': kwargs.get('flush', False), + } + + print(msg, **print_options) + if log: + LOG.info(strip_colors(msg)) + + +def print_error(msg, log=True, **kwargs): + """Prints message in RED and log as ERROR.""" + if 'file' not in kwargs: + # Only set if not specified + kwargs['file'] = sys.stderr + print_colored(msg, 'RED', **kwargs) + if log: + LOG.error(msg) + + +def print_info(msg, log=True, **kwargs): + """Prints message in BLUE and log as INFO.""" + print_colored(msg, 'BLUE', **kwargs) + if log: + LOG.info(msg) + + +def print_report(report, indent=None, log=True): + """Print report to screen and optionally to log.""" + for line in report: + if indent: + line = f'{" "*indent}{line}' + print(line) + if log: + LOG.info(strip_colors(line)) + + +def print_standard(msg, log=True, **kwargs): + """Prints message and log as INFO.""" + print(msg, **kwargs) + if log: + LOG.info(msg) + + +def print_success(msg, log=True, **kwargs): + """Prints message in GREEN and log as INFO.""" + print_colored(msg, 'GREEN', **kwargs) + if log: + LOG.info(msg) + + +def print_warning(msg, log=True, **kwargs): + """Prints message in YELLOW and log as WARNING.""" + if 'file' not in kwargs: + # Only set if not specified + kwargs['file'] = sys.stderr + print_colored(msg, 'YELLOW', **kwargs) + if log: + LOG.warning(msg) + + +def set_title(title): + """Set window title.""" + LOG.debug('title: %s', title) + if os.name == 'nt': + os.system(f'title {title}') + else: + print_error('Setting the title is only supported under Windows.') + + +def show_data(message, data, color=None, indent=None, width=None): + """Display info using default or provided indent and width.""" + colors = (None, color if color else None) + indent = INDENT if indent is None else indent + width = WIDTH if width is None else width + print_colored( + (f'{" "*indent}{message:<{width}}', data), + colors, + log=True, + sep='', + ) + + +def strip_colors(string): + """Strip known ANSI color escapes from string, returns str.""" + LOG.debug('string: %s', string) + for color in COLORS.values(): + string = string.replace(color, '') + return string + + +if __name__ == '__main__': + print("This file is not meant to be called directly.") diff --git a/scripts/wk_debug.py b/scripts/wk_debug.py index 86900004..1808be89 100755 --- a/scripts/wk_debug.py +++ b/scripts/wk_debug.py @@ -17,7 +17,7 @@ OPTIONS = { def get_debug_prefix() -> str: """Ask what we're debugging, returns log dir prefix.""" - menu = wk.std.Menu(title=f'{wk.cfg.main.KIT_NAME_FULL}: Debugging Menu\n') + menu = wk.ui.cli.Menu(title=f'{wk.cfg.main.KIT_NAME_FULL}: Debugging Menu\n') for name, prefix in OPTIONS.items(): menu.add_option(name, {'Prefix': prefix}) selection = menu.simple_select() @@ -38,14 +38,14 @@ def get_debug_path() -> pathlib.Path: # Safety check if not debug_paths: - wk.std.abort('No logs found, aborting.') + wk.ui.cli.abort('No logs found, aborting.') # Use latest option - if wk.std.ask('Use latest session?'): + if wk.ui.cli.ask('Use latest session?'): return debug_paths[-1] # Select from list - menu = wk.std.Menu(title=f'{wk.cfg.main.KIT_NAME_FULL}: Debugging Menu\n') + menu = wk.ui.cli.Menu(title=f'{wk.cfg.main.KIT_NAME_FULL}: Debugging Menu\n') for item in debug_paths: menu.add_option(item.parent.name, {'Path': item}) selection = menu.simple_select()