From 62edaac25a800697f5738c43e5f51262c3715fe2 Mon Sep 17 00:00:00 2001 From: 2Shirt <2xShirt@gmail.com> Date: Sun, 28 May 2023 20:09:54 -0700 Subject: [PATCH] Add type hints to functions --- scripts/hw-sensors | 2 +- scripts/launch_sdio.py | 8 +- scripts/max-cpu-temp | 6 +- scripts/mount-all-volumes | 2 +- scripts/mount-backup-shares | 2 +- scripts/unmount-backup-shares | 4 +- scripts/upload-logs | 4 +- scripts/wk/clone/ddrescue.py | 151 ++++++++++++++-------------- scripts/wk/debug.py | 10 +- scripts/wk/exe.py | 33 +++--- scripts/wk/graph.py | 12 ++- scripts/wk/hw/disk.py | 2 +- scripts/wk/io.py | 20 ++-- scripts/wk/kit/build_win.py | 55 +++++----- scripts/wk/kit/tools.py | 21 ++-- scripts/wk/kit/ufd.py | 45 +++++---- scripts/wk/log.py | 13 +-- scripts/wk/net.py | 25 +++-- scripts/wk/os/linux.py | 22 ++-- scripts/wk/os/mac.py | 8 +- scripts/wk/os/win.py | 84 ++++++++-------- scripts/wk/repairs/win.py | 182 ++++++++++++++++++---------------- scripts/wk/setup/win.py | 133 +++++++++++++------------ scripts/wk/std.py | 4 +- scripts/wk/ui/ansi.py | 6 +- scripts/wk/ui/cli.py | 117 ++++++++++++---------- scripts/wk/ui/tmux.py | 31 +++--- scripts/wk/ui/tui.py | 8 +- 28 files changed, 531 insertions(+), 479 deletions(-) diff --git a/scripts/hw-sensors b/scripts/hw-sensors index 55dd31d9..5fedcf26 100755 --- a/scripts/hw-sensors +++ b/scripts/hw-sensors @@ -7,7 +7,7 @@ import platform import wk -def main(): +def main() -> None: """Show sensor data on screen.""" sensors = wk.hw.sensors.Sensors() if platform.system() == 'Darwin': diff --git a/scripts/launch_sdio.py b/scripts/launch_sdio.py index 8827c4d8..fbeaa4c0 100644 --- a/scripts/launch_sdio.py +++ b/scripts/launch_sdio.py @@ -1,6 +1,8 @@ """WizardKit: Launch Snappy Driver Installer Origin""" # vim: sts=2 sw=2 ts=2 +from subprocess import CompletedProcess + import wk from wk.cfg.net import SDIO_SERVER @@ -20,7 +22,7 @@ SDIO_REMOTE_PATH = wk.io.get_path_obj( ) # Functions -def try_again(): +def try_again() -> bool: """Ask to try again or quit.""" if wk.ui.cli.ask(' Try again?'): return True @@ -29,10 +31,10 @@ def try_again(): return False -def use_network_sdio(): +def use_network_sdio() -> bool: """Try to mount SDIO server.""" use_network = False - def _mount_server(): + def _mount_server() -> CompletedProcess: print('Connecting to server... (Press CTRL+c to use local copy)') return wk.net.mount_network_share(SDIO_SERVER, read_write=False) diff --git a/scripts/max-cpu-temp b/scripts/max-cpu-temp index 54a4ac3b..9b842538 100755 --- a/scripts/max-cpu-temp +++ b/scripts/max-cpu-temp @@ -5,10 +5,12 @@ import json import re import subprocess +from typing import Any + CPU_REGEX = re.compile(r'(core|k\d+)temp', re.IGNORECASE) NON_TEMP_REGEX = re.compile(r'^(fan|in|curr)', re.IGNORECASE) -def get_data(): +def get_data() -> dict[Any, Any]: cmd = ('sensors', '-j') data = {} raw_data = [] @@ -38,7 +40,7 @@ def get_data(): return data -def get_max_temp(data): +def get_max_temp(data) -> str: cpu_temps = [] max_cpu_temp = '??° C' for adapter, sources in data.items(): diff --git a/scripts/mount-all-volumes b/scripts/mount-all-volumes index 53eb3a26..becd7475 100755 --- a/scripts/mount-all-volumes +++ b/scripts/mount-all-volumes @@ -8,7 +8,7 @@ import wk # Functions -def main(): +def main() -> None: """Mount all volumes and show results.""" wk.ui.cli.print_standard(f'{wk.cfg.main.KIT_NAME_FULL}: Volume mount tool') wk.ui.cli.print_standard(' ') diff --git a/scripts/mount-backup-shares b/scripts/mount-backup-shares index 973b76e2..2b3dcb92 100755 --- a/scripts/mount-backup-shares +++ b/scripts/mount-backup-shares @@ -6,7 +6,7 @@ import wk # Functions -def main(): +def main() -> None: """Attempt to mount backup shares and print report.""" wk.ui.cli.print_info('Mounting Backup Shares') report = wk.net.mount_backup_shares() diff --git a/scripts/unmount-backup-shares b/scripts/unmount-backup-shares index 81188020..d7e3b24e 100755 --- a/scripts/unmount-backup-shares +++ b/scripts/unmount-backup-shares @@ -6,7 +6,7 @@ import wk # Functions -def main(): +def main() -> None: """Attempt to mount backup shares and print report.""" wk.ui.cli.print_info('Unmounting Backup Shares') report = wk.net.unmount_backup_shares() @@ -15,7 +15,7 @@ def main(): line = f' {line}' if 'Not mounted' in line: color = 'YELLOW' - print(wk.ansi.color_string(line, color)) + print(wk.ui.ansi.color_string(line, color)) if __name__ == '__main__': diff --git a/scripts/upload-logs b/scripts/upload-logs index 6ea27f14..a970fc0b 100755 --- a/scripts/upload-logs +++ b/scripts/upload-logs @@ -25,7 +25,7 @@ if PLATFORM not in ('macOS', 'Linux'): # Functions -def main(): +def main() -> None: """Upload logs for review.""" lines = [] try_and_print = wk.ui.cli.TryAndPrint() @@ -60,7 +60,7 @@ def main(): raise SystemExit(1) -def upload_log_dir(reason='Testing'): +def upload_log_dir(reason='Testing') -> None: """Upload compressed log_dir to the crash server.""" server = wk.cfg.net.CRASH_SERVER dest = pathlib.Path(f'~/{reason}_{NOW.strftime("%Y-%m-%dT%H%M%S%z")}.txz') diff --git a/scripts/wk/clone/ddrescue.py b/scripts/wk/clone/ddrescue.py index 1bc96cff..773ee510 100644 --- a/scripts/wk/clone/ddrescue.py +++ b/scripts/wk/clone/ddrescue.py @@ -15,11 +15,13 @@ import subprocess import time from collections import OrderedDict -from docopt import docopt +from typing import Union import psutil import pytz +from docopt import docopt + from wk import cfg, debug, exe, io, log, net, std from wk.cfg.ddrescue import ( DDRESCUE_MAP_TEMPLATE, @@ -190,15 +192,15 @@ class BlockPair(): # Set initial status self.set_initial_status() - def get_error_size(self): + def get_error_size(self) -> int: """Get error size in bytes, returns int.""" return self.size - self.get_rescued_size() - def get_percent_recovered(self): + def get_percent_recovered(self) -> float: """Get percent rescued from map_data, returns float.""" return 100 * self.map_data.get('rescued', 0) / self.size - def get_rescued_size(self): + def get_rescued_size(self) -> Union[float, int]: """Get rescued size using map data. NOTE: Returns 0 if no map data is available. @@ -206,7 +208,7 @@ class BlockPair(): self.load_map_data() return self.map_data.get('rescued', 0) - def load_map_data(self): + def load_map_data(self) -> None: """Load map data from file. NOTE: If the file is missing it is assumed that recovery hasn't @@ -252,7 +254,7 @@ class BlockPair(): # Done self.map_data.update(data) - def pass_complete(self, pass_name): + def pass_complete(self, pass_name) -> bool: """Check if pass_name is complete based on map data, returns bool.""" pending_size = self.map_data['non-tried'] @@ -282,7 +284,7 @@ class BlockPair(): # This should never be reached return False - def safety_check(self): + def safety_check(self) -> None: """Run safety check and abort if necessary.""" # TODO: Expand section to support non-Linux systems dest_size = -1 @@ -301,7 +303,7 @@ class BlockPair(): ui.print_error(f'Invalid destination: {self.destination}') raise std.GenericAbort() - def set_initial_status(self): + def set_initial_status(self) -> None: """Read map data and set initial statuses.""" self.load_map_data() percent = self.get_percent_recovered() @@ -314,12 +316,12 @@ class BlockPair(): self.status[name] = percent break - def skip_pass(self, pass_name): + def skip_pass(self, pass_name) -> None: """Mark pass as skipped if applicable.""" if self.status[pass_name] == 'Pending': self.status[pass_name] = 'Skipped' - def update_progress(self, pass_name): + def update_progress(self, pass_name) -> None: """Update progress via map data.""" self.load_map_data() @@ -350,7 +352,7 @@ class State(): self._init_tmux() exe.start_thread(self._fix_tmux_layout_loop) - def _add_block_pair(self, source, destination): + def _add_block_pair(self, source, destination) -> None: """Add BlockPair object and run safety checks.""" self.block_pairs.append( BlockPair( @@ -360,14 +362,14 @@ class State(): working_dir=self.working_dir, )) - def _get_clone_settings_path(self): + def _get_clone_settings_path(self) -> pathlib.Path: """get Clone settings file path, returns pathlib.Path obj.""" description = self.source.model if not description: description = self.source.path.name return pathlib.Path(f'{self.working_dir}/Clone_{description}.json') - def _fix_tmux_layout(self, forced=True): + def _fix_tmux_layout(self, forced=True) -> None: """Fix tmux layout based on cfg.ddrescue.TMUX_LAYOUT.""" layout = cfg.ddrescue.TMUX_LAYOUT needs_fixed = tmux.layout_needs_fixed(self.panes, layout) @@ -397,7 +399,7 @@ class State(): if 'Journal' in self.panes: tmux.resize_pane(self.panes['Journal'], height=p_ratios[2]) - def _fix_tmux_layout_loop(self): + def _fix_tmux_layout_loop(self) -> None: """Fix tmux layout on a loop. NOTE: This should be called as a thread. @@ -406,7 +408,7 @@ class State(): self._fix_tmux_layout(forced=False) std.sleep(1) - def _init_tmux(self): + def _init_tmux(self) -> None: """Initialize tmux layout.""" tmux.kill_all_panes() @@ -432,7 +434,7 @@ class State(): # Source / Dest self.update_top_panes() - def _load_settings(self, discard_unused_settings=False): + def _load_settings(self, discard_unused_settings=False) -> dict: """Load settings from previous run, returns dict.""" settings = {} settings_file = self._get_clone_settings_path() @@ -481,7 +483,7 @@ class State(): # Done return settings - def _save_settings(self, settings): + def _save_settings(self, settings) -> None: """Save settings for future runs.""" settings_file = self._get_clone_settings_path() @@ -493,7 +495,7 @@ class State(): ui.print_error('Failed to save clone settings') raise std.GenericAbort() from err - def add_clone_block_pairs(self): + def add_clone_block_pairs(self) -> None: """Add device to device block pairs and set settings if necessary.""" source_sep = get_partition_separator(self.source.path.name) dest_sep = get_partition_separator(self.destination.path.name) @@ -557,13 +559,13 @@ class State(): # Done return source_parts - def add_image_block_pairs(self, source_parts): + def add_image_block_pairs(self, source_parts) -> None: """Add device to image file block pairs.""" for part in source_parts: bp_dest = self.destination self._add_block_pair(part, bp_dest) - def confirm_selections(self, prompt_msg, source_parts=None): + def confirm_selections(self, prompt_msg, source_parts=None) -> None: """Show selection details and prompt for confirmation.""" report = [] @@ -645,7 +647,7 @@ class State(): if not ui.ask(prompt_msg): raise std.GenericAbort() - def generate_report(self): + def generate_report(self) -> list[str]: """Generate report of overall and per block_pair results, returns list.""" report = [] @@ -688,23 +690,23 @@ class State(): # Done return report - def get_error_size(self): + def get_error_size(self) -> int: """Get total error size from block_pairs in bytes, returns int.""" return self.get_total_size() - self.get_rescued_size() - def get_percent_recovered(self): + def get_percent_recovered(self) -> float: """Get total percent rescued from block_pairs, returns float.""" return 100 * self.get_rescued_size() / self.get_total_size() - def get_rescued_size(self): + def get_rescued_size(self) -> int: """Get total rescued size from all block pairs, returns int.""" return sum(pair.get_rescued_size() for pair in self.block_pairs) - def get_total_size(self): + def get_total_size(self) -> int: """Get total size of all block_pairs in bytes, returns int.""" return sum(pair.size for pair in self.block_pairs) - def init_recovery(self, docopt_args): + def init_recovery(self, docopt_args) -> None: """Select source/dest and set env.""" ui.clear_screen() source_parts = [] @@ -809,7 +811,7 @@ class State(): for pair in self.block_pairs: pair.safety_check() - def mark_started(self): + def mark_started(self) -> None: """Edit clone settings, if applicable, to mark recovery as started.""" # Skip if not cloning if self.mode != 'Clone': @@ -826,18 +828,18 @@ class State(): settings['First Run'] = False self._save_settings(settings) - def pass_above_threshold(self, pass_name): + def pass_above_threshold(self, pass_name) -> bool: """Check if all block_pairs meet the pass threshold, returns bool.""" threshold = cfg.ddrescue.AUTO_PASS_THRESHOLDS[pass_name] return all( p.get_percent_recovered() >= threshold for p in self.block_pairs ) - def pass_complete(self, pass_name): + def pass_complete(self, pass_name) -> bool: """Check if all block_pairs completed pass_name, returns bool.""" return all(p.pass_complete(pass_name) for p in self.block_pairs) - def prep_destination(self, source_parts, dry_run=True): + def prep_destination(self, source_parts, dry_run=True) -> None: """Prep destination as necessary.""" # TODO: Split into Linux and macOS # logical sector size is not easily found under macOS @@ -937,7 +939,7 @@ class State(): settings['Needs Format'] = False self._save_settings(settings) - def retry_all_passes(self): + def retry_all_passes(self) -> None: """Prep block_pairs for a retry recovery attempt.""" bad_statuses = ('*', '/', '-') LOG.warning('Updating block_pairs for retry') @@ -965,7 +967,7 @@ class State(): # Reinitialize status pair.set_initial_status() - def safety_check_destination(self): + def safety_check_destination(self) -> None: """Run safety checks for destination and abort if necessary.""" errors_detected = False @@ -985,7 +987,7 @@ class State(): if errors_detected: raise std.GenericAbort() - def safety_check_size(self): + def safety_check_size(self) -> None: """Run size safety check and abort if necessary.""" required_size = sum(pair.size for pair in self.block_pairs) settings = self._load_settings() if self.mode == 'Clone' else {} @@ -1031,7 +1033,7 @@ class State(): ui.print_error(error_msg) raise std.GenericAbort() - def save_debug_reports(self): + def save_debug_reports(self) -> None: """Save debug reports to disk.""" LOG.info('Saving debug reports') debug_dir = pathlib.Path(f'{self.log_dir}/debug') @@ -1053,13 +1055,13 @@ class State(): _f.write('\n'.join(debug.generate_object_report(_bp))) _f.write('\n') - def skip_pass(self, pass_name): + def skip_pass(self, pass_name) -> None: """Mark block_pairs as skipped if applicable.""" for pair in self.block_pairs: if pair.status[pass_name] == 'Pending': pair.status[pass_name] = 'Skipped' - def update_progress_pane(self, overall_status): + def update_progress_pane(self, overall_status) -> None: """Update progress pane.""" report = [] separator = '─────────────────────' @@ -1116,14 +1118,14 @@ class State(): with open(out_path, 'w', encoding='utf-8') as _f: _f.write('\n'.join(report)) - def update_top_panes(self): + def update_top_panes(self) -> None: """(Re)create top source/destination panes.""" source_exists = True dest_exists = True width = tmux.get_pane_size()[0] width = int(width / 2) - 1 - def _format_string(obj, width): + def _format_string(obj, width) -> str: """Format source/dest string using obj and width, returns str.""" string = '' @@ -1192,7 +1194,7 @@ class State(): # Functions -def build_block_pair_report(block_pairs, settings): +def build_block_pair_report(block_pairs, settings) -> list: """Build block pair report, returns list.""" report = [] notes = [] @@ -1246,7 +1248,7 @@ def build_block_pair_report(block_pairs, settings): return report -def build_ddrescue_cmd(block_pair, pass_name, settings_menu): +def build_ddrescue_cmd(block_pair, pass_name, settings_menu) -> list[str]: """Build ddrescue cmd using passed details, returns list.""" cmd = ['sudo', 'ddrescue'] if (block_pair.destination.is_block_device() @@ -1296,7 +1298,7 @@ def build_ddrescue_cmd(block_pair, pass_name, settings_menu): return cmd -def build_directory_report(path): +def build_directory_report(path) -> list[str]: """Build directory report, returns list.""" path = f'{path}/' report = [] @@ -1325,7 +1327,7 @@ def build_directory_report(path): return report -def build_disk_report(dev): +def build_disk_report(dev) -> list[str]: """Build device report, returns list.""" report = [] @@ -1398,7 +1400,7 @@ def build_disk_report(dev): return report -def build_main_menu(): +def build_main_menu() -> ui.Menu: """Build main menu, returns wk.ui.cli.Menu.""" menu = ui.Menu(title=ansi.color_string('ddrescue TUI: Main Menu', 'GREEN')) menu.separator = ' ' @@ -1414,7 +1416,7 @@ def build_main_menu(): return menu -def build_object_report(obj): +def build_object_report(obj) -> list[str]: """Build object report, returns list.""" report = [] @@ -1430,7 +1432,7 @@ def build_object_report(obj): return report -def build_settings_menu(silent=True): +def build_settings_menu(silent=True) -> ui.Menu: """Build settings menu, returns wk.ui.cli.Menu.""" title_text = [ ansi.color_string('ddrescue TUI: Expert Settings', 'GREEN'), @@ -1471,7 +1473,7 @@ def build_settings_menu(silent=True): return menu -def build_sfdisk_partition_line(table_type, dev_path, size, details): +def build_sfdisk_partition_line(table_type, dev_path, size, details) -> str: """Build sfdisk partition line using passed details, returns str.""" line = f'{dev_path} : size={size}' dest_type = '' @@ -1512,7 +1514,7 @@ def build_sfdisk_partition_line(table_type, dev_path, size, details): return line -def check_destination_health(destination): +def check_destination_health(destination) -> str: """Check destination health, returns str.""" result = '' @@ -1533,7 +1535,7 @@ def check_destination_health(destination): return result -def clean_working_dir(working_dir): +def clean_working_dir(working_dir) -> None: """Clean working directory to ensure a fresh recovery session. NOTE: Data from previous sessions will be preserved @@ -1551,7 +1553,7 @@ def clean_working_dir(working_dir): shutil.move(entry.path, new_path) -def format_status_string(status, width): +def format_status_string(status, width) -> str: """Format colored status string, returns str.""" color = None percent = -1 @@ -1586,7 +1588,7 @@ def format_status_string(status, width): return status_str -def fstype_is_ok(path, map_dir=False): +def fstype_is_ok(path, map_dir=False) -> bool: """Check if filesystem type is acceptable, returns bool.""" is_ok = False fstype = None @@ -1622,7 +1624,7 @@ def fstype_is_ok(path, map_dir=False): return is_ok -def get_ddrescue_settings(settings_menu): +def get_ddrescue_settings(settings_menu) -> list: """Get ddrescue settings from menu selections, returns list.""" settings = [] @@ -1640,7 +1642,7 @@ def get_ddrescue_settings(settings_menu): return settings -def get_etoc(): +def get_etoc() -> str: """Get EToC from ddrescue output, returns str.""" delta = None delta_dict = {} @@ -1669,7 +1671,7 @@ def get_etoc(): return etoc -def get_fstype_macos(path): +def get_fstype_macos(path) -> str: """Get fstype for path under macOS, returns str.""" fstype = 'UNKNOWN' proc = exe.run_program(['mount'], check=False) @@ -1687,8 +1689,9 @@ def get_fstype_macos(path): return fstype -def get_object(path): +def get_object(path) -> Union[hw_disk.Disk, None, pathlib.Path]: """Get object based on path, returns obj.""" + # TODO: Refactor to avoid returning None obj = None # Bail early @@ -1721,7 +1724,7 @@ def get_object(path): return obj -def get_partition_separator(name): +def get_partition_separator(name) -> str: """Get partition separator based on device name, returns str.""" separator = '' if re.search(r'(loop|mmc|nvme)', name, re.IGNORECASE): @@ -1730,7 +1733,7 @@ def get_partition_separator(name): return separator -def get_percent_color(percent): +def get_percent_color(percent) -> str: """Get color based on percentage, returns str.""" color = None if percent > 100: @@ -1746,7 +1749,7 @@ def get_percent_color(percent): return color -def get_table_type(disk_path): +def get_table_type(disk_path) -> str: """Get disk partition table type, returns str. NOTE: If resulting table type is not GPT or MBR @@ -1786,7 +1789,7 @@ def get_table_type(disk_path): return table_type -def get_working_dir(mode, destination, force_local=False): +def get_working_dir(mode, destination, force_local=False) -> pathlib.Path: """Get working directory using mode and destination, returns path.""" ticket_id = ui.get_ticket_id() working_dir = None @@ -1830,7 +1833,7 @@ def get_working_dir(mode, destination, force_local=False): return working_dir -def is_missing_source_or_destination(state): +def is_missing_source_or_destination(state) -> bool: """Check if source or destination dissapeared, returns bool.""" missing = False items = { @@ -1860,7 +1863,7 @@ def is_missing_source_or_destination(state): return missing -def source_or_destination_changed(state): +def source_or_destination_changed(state) -> bool: """Verify the source and destination objects are still valid.""" changed = False @@ -1885,7 +1888,7 @@ def source_or_destination_changed(state): return changed -def main(): +def main() -> None: """Main function for ddrescue TUI.""" args = docopt(DOCSTRING) log.update_log_path(dest_name='ddrescue-TUI', timestamp=True) @@ -1953,7 +1956,7 @@ def main(): LOG.info(' %s', ansi.strip_colors(line)) -def mount_raw_image(path): +def mount_raw_image(path) -> pathlib.Path: """Mount raw image using OS specific methods, returns pathlib.Path.""" loopback_path = None @@ -1973,7 +1976,7 @@ def mount_raw_image(path): return loopback_path -def mount_raw_image_linux(path): +def mount_raw_image_linux(path) -> pathlib.Path: """Mount raw image using losetup, returns pathlib.Path.""" loopback_path = None @@ -1995,7 +1998,7 @@ def mount_raw_image_linux(path): # Done return loopback_path -def mount_raw_image_macos(path): +def mount_raw_image_macos(path) -> pathlib.Path: """Mount raw image using hdiutil, returns pathlib.Path.""" loopback_path = None plist_data = {} @@ -2026,7 +2029,7 @@ def mount_raw_image_macos(path): return loopback_path -def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True): +def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True) -> None: """Run ddrescue using passed settings.""" cmd = build_ddrescue_cmd(block_pair, pass_name, settings) poweroff_source_after_idle = True @@ -2034,7 +2037,7 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True): ui.clear_screen() warning_message = '' - def _poweroff_source_drive(idle_minutes): + def _poweroff_source_drive(idle_minutes) -> None: """Power off source drive after a while.""" source_dev = state.source.path @@ -2068,7 +2071,7 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True): 'Press Enter to return to main menu...', end='', flush=True, ) - def _update_smart_pane(): + def _update_smart_pane() -> None: """Update SMART pane every 30 seconds.""" update_smart_details(state.source) now = datetime.datetime.now(tz=TIMEZONE).strftime('%Y-%m-%d %H:%M %Z') @@ -2173,7 +2176,7 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True): raise std.GenericAbort() -def run_recovery(state, main_menu, settings_menu, dry_run=True): +def run_recovery(state, main_menu, settings_menu, dry_run=True) -> None: """Run recovery passes.""" atexit.register(state.save_debug_reports) attempted_recovery = False @@ -2252,7 +2255,7 @@ def run_recovery(state, main_menu, settings_menu, dry_run=True): state.update_progress_pane('Idle') -def select_disk(prompt_msg, skip_disk=None): +def select_disk(prompt_msg, skip_disk=None) -> hw_disk.Disk: """Select disk from list, returns Disk().""" ui.print_info('Scanning disks...') disks = hw_disk.get_disks() @@ -2297,7 +2300,7 @@ def select_disk(prompt_msg, skip_disk=None): return selected_disk -def select_disk_parts(prompt_msg, disk): +def select_disk_parts(prompt_msg, disk) -> hw_disk.Disk: """Select disk parts from list, returns list of Disk().""" title = ansi.color_string('ddrescue TUI: Partition Selection', 'GREEN') title += f'\n\nDisk: {disk.path} {disk.description}' @@ -2309,7 +2312,7 @@ def select_disk_parts(prompt_msg, disk): menu.add_action('Quit') object_list = [] - def _select_parts(menu): + def _select_parts(menu) -> None: """Loop over selection menu until at least one partition selected.""" while True: selection = menu.advanced_select( @@ -2377,7 +2380,7 @@ def select_disk_parts(prompt_msg, disk): return object_list -def select_path(prompt_msg): +def select_path(prompt_msg) -> pathlib.Path: """Select path, returns pathlib.Path.""" invalid = False menu = ui.Menu( @@ -2411,7 +2414,7 @@ def select_path(prompt_msg): return path -def set_mode(docopt_args): +def set_mode(docopt_args) -> str: """Set mode from docopt_args or user selection, returns str.""" mode = None @@ -2433,7 +2436,7 @@ def set_mode(docopt_args): return mode -def unmount_loopback_device(path): +def unmount_loopback_device(path) -> None: """Unmount loopback device using OS specific methods.""" cmd = [] diff --git a/scripts/wk/debug.py b/scripts/wk/debug.py index 0a339229..e11406c9 100644 --- a/scripts/wk/debug.py +++ b/scripts/wk/debug.py @@ -20,7 +20,7 @@ from wk.log import get_root_logger_path # Classes class Debug(): """Object used when dumping debug data.""" - def method(self): + def method(self) -> None: """Dummy method used to identify functions vs data.""" @@ -31,7 +31,7 @@ METHOD_TYPE = type(DEBUG_CLASS.method) # Functions -def generate_debug_report(): +def generate_debug_report() -> str: """Generate debug report, returns str.""" platform_function_list = ( 'architecture', @@ -78,7 +78,7 @@ def generate_debug_report(): return '\n'.join(report) -def generate_object_report(obj, indent=0): +def generate_object_report(obj, indent=0) -> list[str]: """Generate debug report for obj, returns list.""" report = [] attr_list = [] @@ -109,7 +109,7 @@ def generate_object_report(obj, indent=0): return report -def save_pickles(obj_dict, out_path=None): +def save_pickles(obj_dict, out_path=None) -> None: """Save dict of objects using pickle.""" LOG.info('Saving pickles') @@ -129,7 +129,7 @@ def save_pickles(obj_dict, out_path=None): LOG.error('Failed to save all the pickles', exc_info=True) -def upload_debug_report(report, compress=True, reason='DEBUG'): +def upload_debug_report(report, compress=True, reason='DEBUG') -> None: """Upload debug report to CRASH_SERVER as specified in wk.cfg.main.""" LOG.info('Uploading debug report to %s', CRASH_SERVER.get('Name', '?')) headers = CRASH_SERVER.get('Headers', {'X-Requested-With': 'XMLHttpRequest'}) diff --git a/scripts/wk/exe.py b/scripts/wk/exe.py index 1a3ed084..167225f5 100644 --- a/scripts/wk/exe.py +++ b/scripts/wk/exe.py @@ -8,8 +8,9 @@ import re import subprocess import time -from threading import Thread from queue import Queue, Empty +from threading import Thread +from typing import Any import psutil @@ -29,7 +30,7 @@ class NonBlockingStreamReader(): self.stream = stream self.queue = Queue() - def populate_queue(stream, queue): + def populate_queue(stream, queue) -> None: """Collect lines from stream and put them in queue.""" while not stream.closed: try: @@ -45,18 +46,18 @@ class NonBlockingStreamReader(): args=(self.stream, self.queue), ) - def stop(self): + def stop(self) -> None: """Stop reading from input stream.""" self.stream.close() - def read(self, timeout=None): + def read(self, timeout=None) -> Any: """Read from queue if possible, returns item from queue.""" try: return self.queue.get(block=timeout is not None, timeout=timeout) except Empty: return None - def save_to_file(self, proc, out_path): + def save_to_file(self, proc, out_path) -> None: """Continuously save output to file while proc is running.""" LOG.debug('Saving process %s output to %s', proc, out_path) while proc.poll() is None: @@ -74,7 +75,8 @@ class NonBlockingStreamReader(): # Functions -def build_cmd_kwargs(cmd, minimized=False, pipe=True, shell=False, **kwargs): +def build_cmd_kwargs( + cmd, minimized=False, pipe=True, shell=False, **kwargs) -> dict[str, Any]: """Build kwargs for use by subprocess functions, returns dict. Specifically subprocess.run() and subprocess.Popen(). @@ -122,7 +124,8 @@ def build_cmd_kwargs(cmd, minimized=False, pipe=True, shell=False, **kwargs): return cmd_kwargs -def get_json_from_command(cmd, check=True, encoding='utf-8', errors='ignore'): +def get_json_from_command( + cmd, check=True, encoding='utf-8', errors='ignore') -> dict[Any, Any]: """Capture JSON content from cmd output, returns dict. If the data can't be decoded then either an exception is raised @@ -141,7 +144,7 @@ def get_json_from_command(cmd, check=True, encoding='utf-8', errors='ignore'): return json_data -def get_procs(name, exact=True, try_again=True): +def get_procs(name, exact=True, try_again=True) -> list[psutil.Process]: """Get process object(s) based on name, returns list of proc objects.""" LOG.debug('name: %s, exact: %s', name, exact) processes = [] @@ -161,7 +164,7 @@ def get_procs(name, exact=True, try_again=True): return processes -def kill_procs(name, exact=True, force=False, timeout=30): +def kill_procs(name, exact=True, force=False, timeout=30) -> None: """Kill all processes matching name (case-insensitively). NOTE: Under Posix systems this will send SIGINT to allow processes @@ -185,7 +188,8 @@ def kill_procs(name, exact=True, force=False, timeout=30): proc.kill() -def popen_program(cmd, minimized=False, pipe=False, shell=False, **kwargs): +def popen_program( + cmd, minimized=False, pipe=False, shell=False, **kwargs) -> subprocess.Popen: """Run program and return a subprocess.Popen object.""" LOG.debug( 'cmd: %s, minimized: %s, pipe: %s, shell: %s', @@ -209,7 +213,8 @@ def popen_program(cmd, minimized=False, pipe=False, shell=False, **kwargs): return proc -def run_program(cmd, check=True, pipe=True, shell=False, **kwargs): +def run_program( + cmd, check=True, pipe=True, shell=False, **kwargs) -> subprocess.CompletedProcess: """Run program and return a subprocess.CompletedProcess object.""" LOG.debug( 'cmd: %s, check: %s, pipe: %s, shell: %s', @@ -233,7 +238,7 @@ def run_program(cmd, check=True, pipe=True, shell=False, **kwargs): return proc -def start_thread(function, args=None, daemon=True): +def start_thread(function, args=None, daemon=True) -> Thread: """Run function as thread in background, returns Thread object.""" LOG.debug( 'Starting background thread for function: %s, args: %s, daemon: %s', @@ -245,7 +250,7 @@ def start_thread(function, args=None, daemon=True): return thread -def stop_process(proc, graceful=True): +def stop_process(proc, graceful=True) -> None: """Stop process. NOTES: proc should be a subprocess.Popen obj. @@ -267,7 +272,7 @@ def stop_process(proc, graceful=True): proc.kill() -def wait_for_procs(name, exact=True, timeout=None): +def wait_for_procs(name, exact=True, timeout=None) -> None: """Wait for all process matching name.""" LOG.debug('name: %s, exact: %s, timeout: %s', name, exact, timeout) target_procs = get_procs(name, exact=exact) diff --git a/scripts/wk/graph.py b/scripts/wk/graph.py index 95324956..24d9999e 100644 --- a/scripts/wk/graph.py +++ b/scripts/wk/graph.py @@ -3,6 +3,8 @@ import logging +from typing import Union + from wk.ui import ansi @@ -33,7 +35,7 @@ THRESH_GREAT = 750 * 1024**2 # Functions -def generate_horizontal_graph(rate_list, graph_width=40, oneline=False): +def generate_horizontal_graph(rate_list, graph_width=40, oneline=False) -> list[str]: """Generate horizontal graph from rate_list, returns list.""" graph = ['', '', '', ''] scale = 8 if oneline else 32 @@ -80,7 +82,7 @@ def generate_horizontal_graph(rate_list, graph_width=40, oneline=False): return graph -def get_graph_step(rate, scale=16): +def get_graph_step(rate, scale=16) -> int: """Get graph step based on rate and scale, returns int.""" rate_in_mb = rate / (1024**2) step = 0 @@ -95,14 +97,14 @@ def get_graph_step(rate, scale=16): return step -def merge_rates(rates, graph_width=40): +def merge_rates(rates, graph_width=40) -> list[Union[int, float]]: """Merge rates to have entries equal to the width, returns list.""" merged_rates = [] offset = 0 slice_width = int(len(rates) / graph_width) # Merge rates - for _i in range(graph_width): + for _ in range(graph_width): merged_rates.append(sum(rates[offset:offset+slice_width])/slice_width) offset += slice_width @@ -110,7 +112,7 @@ def merge_rates(rates, graph_width=40): return merged_rates -def vertical_graph_line(percent, rate, scale=32): +def vertical_graph_line(percent, rate, scale=32) -> str: """Build colored graph string using thresholds, returns str.""" color_bar = None color_rate = None diff --git a/scripts/wk/hw/disk.py b/scripts/wk/hw/disk.py index 31704c8f..5186f632 100644 --- a/scripts/wk/hw/disk.py +++ b/scripts/wk/hw/disk.py @@ -57,7 +57,7 @@ class Disk: tests: list[Test] = field(init=False, default_factory=list) use_sat: bool = field(init=False, default=False) - def __post_init__(self) -> None: + def __post_init__(self): self.path = pathlib.Path(self.path).resolve() self.update_details() self.set_description() diff --git a/scripts/wk/io.py b/scripts/wk/io.py index 8e398da4..b0e097a7 100644 --- a/scripts/wk/io.py +++ b/scripts/wk/io.py @@ -13,7 +13,7 @@ LOG = logging.getLogger(__name__) # Functions -def case_insensitive_path(path): +def case_insensitive_path(path) -> pathlib.Path: """Find path case-insensitively, returns pathlib.Path obj.""" given_path = pathlib.Path(path).resolve() real_path = None @@ -37,7 +37,7 @@ def case_insensitive_path(path): return real_path -def case_insensitive_search(path, item): +def case_insensitive_search(path, item) -> pathlib.Path: """Search path for item case insensitively, returns pathlib.Path obj.""" path = pathlib.Path(path).resolve() given_path = path.joinpath(item) @@ -61,7 +61,7 @@ def case_insensitive_search(path, item): return real_path -def copy_file(source, dest, overwrite=False): +def copy_file(source, dest, overwrite=False) -> None: """Copy file and optionally overwrite the destination.""" source = case_insensitive_path(source) dest = pathlib.Path(dest).resolve() @@ -72,7 +72,7 @@ def copy_file(source, dest, overwrite=False): shutil.copy2(source, dest) -def delete_empty_folders(path): +def delete_empty_folders(path) -> None: """Recursively delete all empty folders in path.""" LOG.debug('path: %s', path) @@ -89,7 +89,7 @@ def delete_empty_folders(path): pass -def delete_folder(path, force=False, ignore_errors=False): +def delete_folder(path, force=False, ignore_errors=False) -> None: """Delete folder if empty or if forced. NOTE: Exceptions are not caught by this function, @@ -106,7 +106,7 @@ def delete_folder(path, force=False, ignore_errors=False): os.rmdir(path) -def delete_item(path, force=False, ignore_errors=False): +def delete_item(path, force=False, ignore_errors=False) -> None: """Delete file or folder, optionally recursively. NOTE: Exceptions are not caught by this function, @@ -124,7 +124,7 @@ def delete_item(path, force=False, ignore_errors=False): os.remove(path) -def get_path_obj(path, expanduser=True, resolve=True): +def get_path_obj(path, expanduser=True, resolve=True) -> pathlib.Path: """Get based on path, returns pathlib.Path.""" path = pathlib.Path(path) if expanduser: @@ -134,7 +134,7 @@ def get_path_obj(path, expanduser=True, resolve=True): return path -def non_clobber_path(path): +def non_clobber_path(path) -> pathlib.Path: """Update path as needed to non-existing path, returns pathlib.Path.""" LOG.debug('path: %s', path) path = pathlib.Path(path) @@ -163,7 +163,7 @@ def non_clobber_path(path): return new_path -def recursive_copy(source, dest, overwrite=False): +def recursive_copy(source, dest, overwrite=False) -> None: """Copy source to dest recursively. NOTE: This uses rsync style source/dest syntax. @@ -213,7 +213,7 @@ def recursive_copy(source, dest, overwrite=False): raise FileExistsError(f'Refusing to delete file: {dest}') -def rename_item(path, new_path): +def rename_item(path, new_path) -> pathlib.Path: """Rename item, returns pathlib.Path.""" path = pathlib.Path(path) return path.rename(new_path) diff --git a/scripts/wk/kit/build_win.py b/scripts/wk/kit/build_win.py index e87d5101..cace9c69 100644 --- a/scripts/wk/kit/build_win.py +++ b/scripts/wk/kit/build_win.py @@ -6,6 +6,7 @@ NOTE: This script is meant to be called from within a new kit in ConEmu. import logging import os +import pathlib import re from wk.cfg.launchers import LAUNCHERS @@ -44,7 +45,7 @@ WIDTH = 50 # Functions -def compress_cbin_dirs(): +def compress_cbin_dirs() -> None: """Compress CBIN_DIR items using ARCHIVE_PASSWORD.""" current_dir = os.getcwd() for item in CBIN_DIR.iterdir(): @@ -62,25 +63,25 @@ def compress_cbin_dirs(): delete_item(item, force=True, ignore_errors=True) -def delete_from_temp(item_path): +def delete_from_temp(item_path) -> None: """Delete item from temp.""" delete_item(TMP_DIR.joinpath(item_path), force=True, ignore_errors=True) -def download_to_temp(filename, source_url, referer=None): +def download_to_temp(filename, source_url, referer=None) -> pathlib.Path: """Download file to temp dir, returns pathlib.Path.""" out_path = TMP_DIR.joinpath(filename) download_file(out_path, source_url, referer=referer) return out_path -def extract_to_bin(archive, folder): +def extract_to_bin(archive, folder) -> None: """Extract archive to folder under BIN_DIR.""" out_path = BIN_DIR.joinpath(folder) extract_archive(archive, out_path) -def generate_launcher(section, name, options): +def generate_launcher(section, name, options) -> None: """Generate launcher script.""" dest = ROOT_DIR.joinpath(f'{section+"/" if section else ""}{name}.cmd') out_text = [] @@ -107,27 +108,27 @@ def generate_launcher(section, name, options): # Download functions -def download_adobe_reader(): +def download_adobe_reader() -> None: """Download Adobe Reader.""" out_path = INSTALLERS_DIR.joinpath('Adobe Reader DC.exe') download_file(out_path, SOURCES['Adobe Reader DC']) -def download_aida64(): +def download_aida64() -> None: """Download AIDA64.""" archive = download_to_temp('AIDA64.zip', SOURCES['AIDA64']) extract_to_bin(archive, 'AIDA64') delete_from_temp('AIDA64.zip') -def download_autoruns(): +def download_autoruns() -> None: """Download Autoruns.""" for item in ('Autoruns32', 'Autoruns64'): out_path = BIN_DIR.joinpath(f'Sysinternals/{item}.exe') download_file(out_path, SOURCES[item]) -def download_bleachbit(): +def download_bleachbit() -> None: """Download BleachBit.""" out_path = BIN_DIR.joinpath('BleachBit') archive = download_to_temp('BleachBit.zip', SOURCES['BleachBit']) @@ -142,7 +143,7 @@ def download_bleachbit(): delete_from_temp('BleachBit.zip') -def download_bluescreenview(): +def download_bluescreenview() -> None: """Download BlueScreenView.""" archive_32 = download_to_temp( 'bluescreenview32.zip', SOURCES['BlueScreenView32'], @@ -161,14 +162,14 @@ def download_bluescreenview(): delete_from_temp('bluescreenview64.zip') -def download_erunt(): +def download_erunt() -> None: """Download ERUNT.""" archive = download_to_temp('erunt.zip', SOURCES['ERUNT']) extract_to_bin(archive, 'ERUNT') delete_from_temp('erunt.zip') -def download_everything(): +def download_everything() -> None: """Download Everything.""" archive_32 = download_to_temp('everything32.zip', SOURCES['Everything32']) archive_64 = download_to_temp('everything64.zip', SOURCES['Everything64']) @@ -183,7 +184,7 @@ def download_everything(): delete_from_temp('everything64.zip') -def download_fastcopy(): +def download_fastcopy() -> None: """Download FastCopy.""" installer = download_to_temp('FastCopyInstaller.exe', SOURCES['FastCopy']) out_path = BIN_DIR.joinpath('FastCopy') @@ -199,7 +200,7 @@ def download_fastcopy(): delete_item(BIN_DIR.joinpath('FastCopy/setup.exe')) -def download_furmark(): +def download_furmark() -> None: """Download FurMark.""" installer = download_to_temp( 'FurMark_Setup.exe', @@ -219,20 +220,20 @@ def download_furmark(): delete_from_temp('FurMarkInstall') -def download_hwinfo(): +def download_hwinfo() -> None: """Download HWiNFO.""" archive = download_to_temp('HWiNFO.zip', SOURCES['HWiNFO']) extract_to_bin(archive, 'HWiNFO') delete_from_temp('HWiNFO.zip') -def download_macs_fan_control(): +def download_macs_fan_control() -> None: """Download Macs Fan Control.""" out_path = INSTALLERS_DIR.joinpath('Macs Fan Control.exe') download_file(out_path, SOURCES['Macs Fan Control']) -def download_libreoffice(): +def download_libreoffice() -> None: """Download LibreOffice.""" for arch in 32, 64: out_path = INSTALLERS_DIR.joinpath(f'LibreOffice{arch}.msi') @@ -240,7 +241,7 @@ def download_libreoffice(): ui.sleep(1) -def download_neutron(): +def download_neutron() -> None: """Download Neutron.""" archive = download_to_temp('neutron.zip', SOURCES['Neutron']) out_path = BIN_DIR.joinpath('Neutron') @@ -248,7 +249,7 @@ def download_neutron(): delete_from_temp('neutron.zip') -def download_notepad_plus_plus(): +def download_notepad_plus_plus() -> None: """Download Notepad++.""" archive = download_to_temp('npp.7z', SOURCES['Notepad++']) extract_to_bin(archive, 'NotepadPlusPlus') @@ -260,21 +261,21 @@ def download_notepad_plus_plus(): delete_from_temp('npp.7z') -def download_openshell(): +def download_openshell() -> None: """Download OpenShell installer and Fluent-Metro skin.""" for name in ('OpenShell.exe', 'Fluent-Metro.zip'): out_path = BIN_DIR.joinpath(f'OpenShell/{name}') download_file(out_path, SOURCES[name[:-4]]) -def download_putty(): +def download_putty() -> None: """Download PuTTY.""" archive = download_to_temp('putty.zip', SOURCES['PuTTY']) extract_to_bin(archive, 'PuTTY') delete_from_temp('putty.zip') -def download_snappy_driver_installer_origin(): +def download_snappy_driver_installer_origin() -> None: """Download Snappy Driver Installer Origin.""" archive = download_to_temp('aria2.zip', SOURCES['Aria2']) aria2c = TMP_DIR.joinpath('aria2/aria2c.exe') @@ -344,7 +345,7 @@ def download_snappy_driver_installer_origin(): delete_from_temp('fake.7z') -def download_uninstallview(): +def download_uninstallview() -> None: """Download UninstallView.""" archive_32 = download_to_temp('uninstallview32.zip', SOURCES['UninstallView32']) archive_64 = download_to_temp('uninstallview64.zip', SOURCES['UninstallView64']) @@ -359,14 +360,14 @@ def download_uninstallview(): delete_from_temp('uninstallview64.zip') -def download_wiztree(): +def download_wiztree() -> None: """Download WizTree.""" archive = download_to_temp('wiztree.zip', SOURCES['WizTree']) extract_to_bin(archive, 'WizTree') delete_from_temp('wiztree.zip') -def download_xmplay(): +def download_xmplay() -> None: """Download XMPlay.""" archives = [ download_to_temp('xmplay.zip', SOURCES['XMPlay']), @@ -394,7 +395,7 @@ def download_xmplay(): delete_from_temp('xmp-rar.zip') delete_from_temp('Innocuous.zip') -def download_xmplay_music(): +def download_xmplay_music() -> None: """Download XMPlay Music.""" music_tmp = TMP_DIR.joinpath('music') music_tmp.mkdir(exist_ok=True) @@ -447,7 +448,7 @@ def download_xmplay_music(): # "Main" Function -def build_kit(): +def build_kit() -> None: """Build Kit.""" update_log_path(dest_name='Build Tool', timestamp=True) title = f'{KIT_NAME_FULL}: Build Tool' diff --git a/scripts/wk/kit/tools.py b/scripts/wk/kit/tools.py index cf0ea34d..5deda344 100644 --- a/scripts/wk/kit/tools.py +++ b/scripts/wk/kit/tools.py @@ -1,11 +1,14 @@ """WizardKit: Tool Functions""" # vim: sts=2 sw=2 ts=2 -from datetime import datetime, timedelta import logging import pathlib import platform +from datetime import datetime, timedelta +from subprocess import CompletedProcess, Popen +from typing import Union + import requests from wk.cfg.main import ARCHIVE_PASSWORD @@ -30,7 +33,9 @@ CACHED_DIRS = {} # Functions -def download_file(out_path, source_url, as_new=False, overwrite=False, referer=None): +def download_file( + out_path, source_url, + as_new=False, overwrite=False, referer=None) -> pathlib.Path: """Download a file using requests, returns pathlib.Path.""" out_path = pathlib.Path(out_path).resolve() name = out_path.name @@ -95,7 +100,7 @@ def download_file(out_path, source_url, as_new=False, overwrite=False, referer=N return out_path -def download_tool(folder, name, suffix=None): +def download_tool(folder, name, suffix=None) -> None: """Download tool.""" name_arch = f'{name}{ARCH}' out_path = get_tool_path(folder, name, check=False, suffix=suffix) @@ -130,7 +135,7 @@ def download_tool(folder, name, suffix=None): raise -def extract_archive(archive, out_path, *args, mode='x', silent=True): +def extract_archive(archive, out_path, *args, mode='x', silent=True) -> None: """Extract an archive to out_path.""" out_path = pathlib.Path(out_path).resolve() out_path.parent.mkdir(parents=True, exist_ok=True) @@ -142,7 +147,7 @@ def extract_archive(archive, out_path, *args, mode='x', silent=True): run_program(cmd) -def extract_tool(folder): +def extract_tool(folder) -> None: """Extract tool.""" extract_archive( find_kit_dir('.cbin').joinpath(folder).with_suffix('.7z'), @@ -151,7 +156,7 @@ def extract_tool(folder): ) -def find_kit_dir(name=None): +def find_kit_dir(name=None) -> pathlib.Path: """Find folder in kit, returns pathlib.Path. Search is performed in the script's path and then recursively upwards. @@ -178,7 +183,7 @@ def find_kit_dir(name=None): return cur_path -def get_tool_path(folder, name, check=True, suffix=None): +def get_tool_path(folder, name, check=True, suffix=None) -> pathlib.Path: """Get tool path, returns pathlib.Path""" bin_dir = find_kit_dir('.bin') if not suffix: @@ -203,7 +208,7 @@ def run_tool( folder, name, *run_args, cbin=False, cwd=False, download=False, popen=False, **run_kwargs, - ): + ) -> Union[CompletedProcess, Popen]: """Run tool from the kit or the Internet, returns proc obj. proc will be either subprocess.CompletedProcess or subprocess.Popen.""" diff --git a/scripts/wk/kit/ufd.py b/scripts/wk/kit/ufd.py index 64697a41..d3715a4d 100644 --- a/scripts/wk/kit/ufd.py +++ b/scripts/wk/kit/ufd.py @@ -1,9 +1,12 @@ """WizardKit: UFD Functions""" # vim: sts=2 sw=2 ts=2 +# TODO: Drop OrderedDict use + import logging import math import os +import pathlib import shutil from subprocess import CalledProcessError @@ -59,7 +62,7 @@ UFD_LABEL = f'{KIT_NAME_SHORT}_UFD' # Functions -def apply_image(part_path, image_path, hide_macos_boot=True): +def apply_image(part_path, image_path, hide_macos_boot=True) -> None: """Apply raw image to dev_path using dd.""" cmd = [ 'sudo', @@ -89,7 +92,7 @@ def apply_image(part_path, image_path, hide_macos_boot=True): linux.unmount(source_or_mountpoint='/mnt/TMP') -def build_ufd(): +def build_ufd() -> None: """Build UFD using selected sources.""" args = docopt(DOCSTRING) if args['--debug']: @@ -252,7 +255,7 @@ def build_ufd(): ui.pause('Press Enter to exit...') -def confirm_selections(update=False): +def confirm_selections(update=False) -> None: """Ask tech to confirm selections, twice if necessary.""" if not ui.ask('Is the above information correct?'): ui.abort() @@ -273,7 +276,7 @@ def confirm_selections(update=False): ui.print_standard(' ') -def copy_source(source, items, overwrite=False): +def copy_source(source, items, overwrite=False) -> None: """Copy source items to /mnt/UFD.""" is_image = source.is_file() items_not_found = False @@ -300,7 +303,7 @@ def copy_source(source, items, overwrite=False): raise FileNotFoundError('One or more items not found') -def create_table(dev_path, use_mbr=False, images=None): +def create_table(dev_path, use_mbr=False, images=None) -> None: """Create GPT or DOS partition table.""" cmd = [ 'sudo', @@ -338,7 +341,7 @@ def create_table(dev_path, use_mbr=False, images=None): run_program(cmd) -def find_first_partition(dev_path): +def find_first_partition(dev_path) -> str: """Find path to first partition of dev, returns str.""" cmd = [ 'lsblk', @@ -357,7 +360,7 @@ def find_first_partition(dev_path): return part_path -def format_partition(dev_path, label): +def format_partition(dev_path, label) -> None: """Format first partition on device FAT32.""" cmd = [ 'sudo', @@ -369,7 +372,7 @@ def format_partition(dev_path, label): run_program(cmd) -def get_block_device_size(dev_path): +def get_block_device_size(dev_path) -> int: """Get block device size via lsblk, returns int.""" cmd = [ 'lsblk', @@ -388,7 +391,7 @@ def get_block_device_size(dev_path): return int(proc.stdout.strip()) -def get_uuid(path): +def get_uuid(path) -> str: """Get filesystem UUID via findmnt, returns str.""" cmd = [ 'findmnt', @@ -404,7 +407,7 @@ def get_uuid(path): return proc.stdout.strip() -def hide_items(ufd_dev_first_partition, items): +def hide_items(ufd_dev_first_partition, items) -> None: """Set FAT32 hidden flag for items.""" with open('/root/.mtoolsrc', 'w', encoding='utf-8') as _f: _f.write(f'drive U: file="{ufd_dev_first_partition}"\n') @@ -416,7 +419,7 @@ def hide_items(ufd_dev_first_partition, items): run_program(cmd, shell=True, check=False) -def install_syslinux_to_dev(ufd_dev, use_mbr): +def install_syslinux_to_dev(ufd_dev, use_mbr) -> None: """Install Syslinux to UFD (dev).""" cmd = [ 'sudo', @@ -429,7 +432,7 @@ def install_syslinux_to_dev(ufd_dev, use_mbr): run_program(cmd) -def install_syslinux_to_partition(partition): +def install_syslinux_to_partition(partition) -> None: """Install Syslinux to UFD (partition).""" cmd = [ 'sudo', @@ -442,7 +445,7 @@ def install_syslinux_to_partition(partition): run_program(cmd) -def is_valid_path(path_obj, path_type): +def is_valid_path(path_obj, path_type) -> bool: """Verify path_obj is valid by type, returns bool.""" valid_path = False if path_type == 'DIR': @@ -459,7 +462,7 @@ def is_valid_path(path_obj, path_type): return valid_path -def set_boot_flag(dev_path, use_mbr=False): +def set_boot_flag(dev_path, use_mbr=False) -> None: """Set modern or legacy boot flag.""" cmd = [ 'sudo', @@ -471,7 +474,7 @@ def set_boot_flag(dev_path, use_mbr=False): run_program(cmd) -def remove_arch(): +def remove_arch() -> None: """Remove arch dir from UFD. This ensures a clean installation to the UFD and resets the boot files @@ -479,7 +482,7 @@ def remove_arch(): shutil.rmtree(io.case_insensitive_path('/mnt/UFD/arch')) -def show_selections(args, sources, ufd_dev, ufd_sources, extra_images): +def show_selections(args, sources, ufd_dev, ufd_sources, extra_images) -> None: """Show selections including non-specified options.""" # Sources @@ -526,7 +529,7 @@ def show_selections(args, sources, ufd_dev, ufd_sources, extra_images): ui.print_standard(' ') -def update_boot_entries(ufd_dev, images=None): +def update_boot_entries(ufd_dev, images=None) -> None: """Update boot files for UFD usage""" configs = [] uuids = [get_uuid('/mnt/UFD')] @@ -613,7 +616,7 @@ def update_boot_entries(ufd_dev, images=None): break -def verify_sources(args, ufd_sources): +def verify_sources(args, ufd_sources) -> dict[str, pathlib.Path]: """Check all sources and abort if necessary, returns dict.""" sources = OrderedDict() @@ -633,7 +636,7 @@ def verify_sources(args, ufd_sources): return sources -def verify_ufd(dev_path): +def verify_ufd(dev_path) -> pathlib.Path: """Check that dev_path is a valid UFD, returns pathlib.Path obj.""" ufd_dev = None @@ -647,10 +650,10 @@ def verify_ufd(dev_path): ui.print_error(f'ERROR: Invalid UFD device: {ufd_dev}') ui.abort() - return ufd_dev + return ufd_dev # type: ignore[reportGeneralTypeIssues] -def zero_device(dev_path): +def zero_device(dev_path) -> None: """Zero-out first 64MB of device.""" cmd = [ 'sudo', diff --git a/scripts/wk/log.py b/scripts/wk/log.py index 96ee251a..5ba49756 100644 --- a/scripts/wk/log.py +++ b/scripts/wk/log.py @@ -26,7 +26,7 @@ DEFAULT_LOG_NAME = cfg.main.KIT_NAME_FULL # Functions -def enable_debug_mode(): +def enable_debug_mode() -> None: """Configures logging for better debugging.""" root_logger = logging.getLogger() for handler in root_logger.handlers: @@ -40,7 +40,7 @@ def enable_debug_mode(): def format_log_path( log_dir=None, log_name=None, timestamp=False, - kit=False, tool=False, append=False): + kit=False, tool=False, append=False) -> pathlib.Path: """Format path based on args passed, returns pathlib.Path obj.""" log_path = pathlib.Path( f'{log_dir if log_dir else DEFAULT_LOG_DIR}/' @@ -61,7 +61,7 @@ def format_log_path( return log_path -def get_root_logger_path(): +def get_root_logger_path() -> pathlib.Path: """Get the log filepath from the root logger, returns pathlib.Path obj. NOTE: This will use the first handler baseFilename it finds (if any). @@ -78,7 +78,7 @@ def get_root_logger_path(): raise RuntimeError('Log path not found.') -def remove_empty_log(log_path=None): +def remove_empty_log(log_path=None) -> None: """Remove log if empty. NOTE: Under Windows an empty log is 2 bytes long. @@ -101,7 +101,7 @@ def remove_empty_log(log_path=None): log_path.unlink() -def start(config=None): +def start(config=None) -> None: """Configure and start logging using safe defaults.""" log_path = format_log_path(timestamp=os.name != 'nt') root_logger = logging.getLogger() @@ -124,7 +124,8 @@ def start(config=None): def update_log_path( - dest_dir=None, dest_name=None, keep_history=True, timestamp=True, append=False): + dest_dir=None, dest_name=None, keep_history=True, + timestamp=True, append=False) -> None: """Moves current log file to new path and updates the root logger.""" root_logger = logging.getLogger() new_path = format_log_path(dest_dir, dest_name, timestamp=timestamp, append=append) diff --git a/scripts/wk/net.py b/scripts/wk/net.py index 5e478e54..930369f3 100644 --- a/scripts/wk/net.py +++ b/scripts/wk/net.py @@ -5,6 +5,8 @@ import os import pathlib import re +from subprocess import CompletedProcess + import psutil from wk.exe import get_json_from_command, run_program @@ -23,7 +25,7 @@ REGEX_VALID_IP = re.compile( # Functions -def connected_to_private_network(raise_on_error=False): +def connected_to_private_network(raise_on_error=False) -> bool: """Check if connected to a private network, returns bool. This checks for a valid private IP assigned to this system. @@ -49,12 +51,10 @@ def connected_to_private_network(raise_on_error=False): raise GenericError('Not connected to a network') # Done - if raise_on_error: - connected = None return connected -def mount_backup_shares(read_write=False): +def mount_backup_shares(read_write=False) -> list[str]: """Mount backup shares using OS specific methods.""" report = [] for name, details in BACKUP_SERVERS.items(): @@ -97,7 +97,8 @@ def mount_backup_shares(read_write=False): return report -def mount_network_share(details, mount_point=None, read_write=False): +def mount_network_share( + details, mount_point=None, read_write=False) -> CompletedProcess: """Mount network share using OS specific methods.""" cmd = None address = details['Address'] @@ -148,7 +149,7 @@ def mount_network_share(details, mount_point=None, read_write=False): return run_program(cmd, check=False) -def ping(addr='google.com'): +def ping(addr='google.com') -> None: """Attempt to ping addr.""" cmd = ( 'ping', @@ -159,7 +160,7 @@ def ping(addr='google.com'): run_program(cmd) -def share_is_mounted(details): +def share_is_mounted(details) -> bool: """Check if dev/share/etc is mounted, returns bool.""" mounted = False @@ -193,8 +194,9 @@ def share_is_mounted(details): return mounted -def show_valid_addresses(): +def show_valid_addresses() -> None: """Show all valid private IP addresses assigned to the system.""" + # TODO: Refactor to remove ui dependancy devs = psutil.net_if_addrs() for dev, families in sorted(devs.items()): for family in families: @@ -203,8 +205,9 @@ def show_valid_addresses(): ui.show_data(message=dev, data=family.address) -def speedtest(): +def speedtest() -> list[str]: """Run a network speedtest using speedtest-cli.""" + # TODO: Refactor to use speedtest-cli's JSON output cmd = ['speedtest-cli', '--simple'] proc = run_program(cmd, check=False) output = [line.strip() for line in proc.stdout.splitlines() if line.strip()] @@ -213,7 +216,7 @@ def speedtest(): return [f'{a:<10}{b:6.2f} {c}' for a, b, c in output] -def unmount_backup_shares(): +def unmount_backup_shares() -> list[str]: """Unmount backup shares.""" report = [] for name, details in BACKUP_SERVERS.items(): @@ -242,7 +245,7 @@ def unmount_backup_shares(): return report -def unmount_network_share(details=None, mount_point=None): +def unmount_network_share(details=None, mount_point=None) -> CompletedProcess: """Unmount network share""" cmd = [] diff --git a/scripts/wk/os/linux.py b/scripts/wk/os/linux.py index ee17792a..ecc2eca4 100644 --- a/scripts/wk/os/linux.py +++ b/scripts/wk/os/linux.py @@ -20,12 +20,12 @@ UUID_CORESTORAGE = '53746f72-6167-11aa-aa11-00306543ecac' # Functions -def build_volume_report(device_path=None) -> list: +def build_volume_report(device_path=None) -> list[str]: """Build volume report using lsblk, returns list. If device_path is provided the report is limited to that device. """ - def _get_volumes(dev, indent=0) -> list: + def _get_volumes(dev, indent=0) -> list[dict]: """Convert lsblk JSON tree to a flat list of items, returns list.""" dev['name'] = f'{" "*indent}{dev["name"]}' volumes = [dev] @@ -108,7 +108,7 @@ def build_volume_report(device_path=None) -> list: return report -def get_user_home(user): +def get_user_home(user) -> pathlib.Path: """Get path to user's home dir, returns pathlib.Path obj.""" home = None @@ -129,7 +129,7 @@ def get_user_home(user): return pathlib.Path(home) -def get_user_name(): +def get_user_name() -> str: """Get real user name, returns str.""" user = None @@ -146,7 +146,7 @@ def get_user_name(): return user -def make_temp_file(suffix=None): +def make_temp_file(suffix=None) -> pathlib.Path: """Make temporary file, returns pathlib.Path() obj.""" cmd = ['mktemp'] if suffix: @@ -155,7 +155,7 @@ def make_temp_file(suffix=None): return pathlib.Path(proc.stdout.strip()) -def mount(source, mount_point=None, read_write=False): +def mount(source, mount_point=None, read_write=False) -> None: """Mount source (on mount_point if provided). NOTE: If not running_as_root() then udevil will be used. @@ -178,13 +178,13 @@ def mount(source, mount_point=None, read_write=False): raise RuntimeError(f'Failed to mount: {source} on {mount_point}') -def mount_volumes(device_path=None, read_write=False, scan_corestorage=False): +def mount_volumes(device_path=None, read_write=False, scan_corestorage=False) -> None: """Mount all detected volumes. NOTE: If device_path is specified then only volumes under that path will be mounted. """ - def _get_volumes(dev) -> list: + def _get_volumes(dev) -> list[dict]: """Convert lsblk JSON tree to a flat list of items, returns list.""" volumes = [dev] for child in dev.get('children', []): @@ -233,12 +233,12 @@ def mount_volumes(device_path=None, read_write=False, scan_corestorage=False): pass -def running_as_root(): +def running_as_root() -> bool: """Check if running with effective UID of 0, returns bool.""" return os.geteuid() == 0 -def scan_corestorage_container(container, timeout=300): +def scan_corestorage_container(container, timeout=300) -> list[dict]: """Scan CoreStorage container for inner volumes, returns list.""" container_path = pathlib.Path(container) detected_volumes = {} @@ -285,7 +285,7 @@ def scan_corestorage_container(container, timeout=300): return inner_volumes -def unmount(source_or_mountpoint): +def unmount(source_or_mountpoint) -> None: """Unmount source_or_mountpoint. NOTE: If not running_as_root() then udevil will be used. diff --git a/scripts/wk/os/mac.py b/scripts/wk/os/mac.py index 5c812826..6ecb9001 100644 --- a/scripts/wk/os/mac.py +++ b/scripts/wk/os/mac.py @@ -13,7 +13,7 @@ REGEX_FANS = re.compile(r'^.*\(bytes (?P.*)\)$') # Functions -def decode_smc_bytes(text): +def decode_smc_bytes(text) -> int: """Decode SMC bytes, returns int.""" result = None @@ -32,7 +32,7 @@ def decode_smc_bytes(text): return result -def set_fans(mode): +def set_fans(mode) -> None: """Set fans to auto or max.""" if mode == 'auto': set_fans_auto() @@ -42,14 +42,14 @@ def set_fans(mode): raise RuntimeError(f'Invalid fan mode: {mode}') -def set_fans_auto(): +def set_fans_auto() -> None: """Set fans to auto.""" LOG.info('Setting fans to auto') cmd = ['sudo', 'smc', '-k', 'FS! ', '-w', '0000'] run_program(cmd) -def set_fans_max(): +def set_fans_max() -> None: """Set fans to their max speeds.""" LOG.info('Setting fans to max') num_fans = 0 diff --git a/scripts/wk/os/win.py b/scripts/wk/os/win.py index 4d6d7248..8b64ed71 100644 --- a/scripts/wk/os/win.py +++ b/scripts/wk/os/win.py @@ -9,6 +9,8 @@ import platform import re from contextlib import suppress +from typing import Any, Union + import psutil try: @@ -92,7 +94,7 @@ else: # Activation Functions -def activate_with_bios(): +def activate_with_bios() -> None: """Attempt to activate Windows with a key stored in the BIOS.""" # Code borrowed from https://github.com/aeruder/get_win8key ##################################################### @@ -132,7 +134,7 @@ def activate_with_bios(): raise GenericError('Activation Failed') -def get_activation_string(): +def get_activation_string() -> str: """Get activation status, returns str.""" cmd = ['cscript', '//nologo', SLMGR, '/xpr'] proc = run_program(cmd, check=False) @@ -142,7 +144,7 @@ def get_activation_string(): return act_str -def is_activated(): +def is_activated() -> bool: """Check if Windows is activated via slmgr.vbs and return bool.""" act_str = get_activation_string() @@ -151,22 +153,22 @@ def is_activated(): # Date / Time functions -def get_timezone(): +def get_timezone() -> str: """Get current timezone using tzutil, returns str.""" cmd = ['tzutil', '/g'] proc = run_program(cmd, check=False) return proc.stdout -def set_timezone(zone): +def set_timezone(zone) -> None: """Set current timezone using tzutil.""" cmd = ['tzutil', '/s', zone] run_program(cmd, check=False) # Info Functions -def check_4k_alignment(show_alert=False): - """Check if all partitions are 4K aligned, returns book.""" +def check_4k_alignment(show_alert=False) -> list[str]: + """Check if all partitions are 4K aligned, returns list.""" cmd = ['WMIC', 'partition', 'get', 'Caption,Size,StartingOffset'] report = [] show_alert = False @@ -204,8 +206,8 @@ def check_4k_alignment(show_alert=False): return report -def export_bitlocker_info(): - """Get Bitlocker info and save to the current directory.""" +def export_bitlocker_info() -> None: + """Get Bitlocker info and save to the base directory of the kit.""" commands = [ ['manage-bde', '-status', SYSTEMDRIVE], ['manage-bde', '-protectors', '-get', SYSTEMDRIVE], @@ -222,7 +224,7 @@ def export_bitlocker_info(): _f.write(f'{proc.stdout}\n\n') -def get_installed_antivirus(): +def get_installed_antivirus() -> list[str]: """Get list of installed antivirus programs, returns list.""" cmd = [ 'WMIC', r'/namespace:\\root\SecurityCenter2', @@ -263,8 +265,8 @@ def get_installed_antivirus(): return report -def get_installed_ram(as_list=False, raise_exceptions=False): - """Get installed RAM.""" +def get_installed_ram(as_list=False, raise_exceptions=False) -> Union[list, str]: + """Get installed RAM, returns list or str.""" mem = psutil.virtual_memory() mem_str = bytes_to_string(mem.total, decimals=1) @@ -279,8 +281,8 @@ def get_installed_ram(as_list=False, raise_exceptions=False): return [mem_str] if as_list else mem_str -def get_os_activation(as_list=False, check=True): - """Get OS activation status, returns str. +def get_os_activation(as_list=False, check=True) -> Union[list, str]: + """Get OS activation status, returns list or str. NOTE: If check=True then raise an exception if OS isn't activated. """ @@ -296,7 +298,7 @@ def get_os_activation(as_list=False, check=True): return [act_str] if as_list else act_str -def get_os_name(as_list=False, check=True): +def get_os_name(as_list=False, check=True) -> str: """Build OS display name, returns str. NOTE: If check=True then an exception is raised if the OS version is @@ -322,7 +324,7 @@ def get_os_name(as_list=False, check=True): return [display_name] if as_list else display_name -def get_raw_disks(): +def get_raw_disks() -> list[str]: """Get all disks without a partiton table, returns list.""" script_path = find_kit_dir('Scripts').joinpath('get_raw_disks.ps1') cmd = ['PowerShell', '-ExecutionPolicy', 'Bypass', '-File', script_path] @@ -347,7 +349,7 @@ def get_raw_disks(): return raw_disks -def get_volume_usage(use_colors=False): +def get_volume_usage(use_colors=False) -> list[str]: """Get space usage info for all fixed volumes, returns list.""" report = [] for disk in psutil.disk_partitions(): @@ -371,7 +373,7 @@ def get_volume_usage(use_colors=False): return report -def show_alert_box(message, title=None): +def show_alert_box(message, title=None) -> None: """Show Windows alert box with message.""" title = title if title else f'{KIT_NAME_FULL} Warning' message_box = ctypes.windll.user32.MessageBoxW @@ -379,7 +381,7 @@ def show_alert_box(message, title=None): # Registry Functions -def reg_delete_key(hive, key, recurse=False): +def reg_delete_key(hive, key, recurse=False) -> None: """Delete a key from the registry. NOTE: If recurse is False then it will only work on empty keys. @@ -401,7 +403,7 @@ def reg_delete_key(hive, key, recurse=False): except FileNotFoundError: # Ignore pass - except PermissionError: + except PermissionError as _e: LOG.error(r'Failed to delete registry key: %s\%s', hive_name, key) if recurse: # Re-raise exception @@ -409,10 +411,10 @@ def reg_delete_key(hive, key, recurse=False): # recurse is not True so assuming we tried to remove a non-empty key msg = fr'Refusing to remove non-empty key: {hive_name}\{key}' - raise FileExistsError(msg) + raise FileExistsError(msg) from _e -def reg_delete_value(hive, key, value): +def reg_delete_value(hive, key, value) -> None: """Delete a value from the registry.""" access = winreg.KEY_ALL_ACCESS hive = reg_get_hive(hive) @@ -436,8 +438,9 @@ def reg_delete_value(hive, key, value): raise -def reg_get_hive(hive): +def reg_get_hive(hive) -> Any: """Get winreg HKEY constant from string, returns HKEY constant.""" + # TODO: Fix type hint if isinstance(hive, int): # Assuming we're already a winreg HKEY constant pass @@ -448,8 +451,9 @@ def reg_get_hive(hive): return hive -def reg_get_data_type(data_type): +def reg_get_data_type(data_type) -> Any: """Get registry data type from string, returns winreg constant.""" + # TODO: Fix type hint if isinstance(data_type, int): # Assuming we're already a winreg value type constant pass @@ -460,7 +464,7 @@ def reg_get_data_type(data_type): return data_type -def reg_key_exists(hive, key): +def reg_key_exists(hive, key) -> bool: """Test if the specified hive/key exists, returns bool.""" exists = False hive = reg_get_hive(hive) @@ -478,7 +482,7 @@ def reg_key_exists(hive, key): return exists -def reg_read_value(hive, key, value, force_32=False, force_64=False): +def reg_read_value(hive, key, value, force_32=False, force_64=False) -> Any: """Query value from hive/hey, returns multiple types. NOTE: Set value='' to read the default value. @@ -502,7 +506,7 @@ def reg_read_value(hive, key, value, force_32=False, force_64=False): return data -def reg_write_settings(settings): +def reg_write_settings(settings) -> None: """Set registry values in bulk from a custom data structure. Data structure should be as follows: @@ -542,7 +546,7 @@ def reg_write_settings(settings): reg_set_value(hive, key, *value) -def reg_set_value(hive, key, name, data, data_type, option=None): +def reg_set_value(hive, key, name, data, data_type, option=None) -> None: """Set value for hive/key.""" access = winreg.KEY_WRITE data_type = reg_get_data_type(data_type) @@ -574,25 +578,25 @@ def reg_set_value(hive, key, name, data, data_type, option=None): # Safe Mode Functions -def disable_safemode(): +def disable_safemode() -> None: """Edit BCD to remove safeboot value.""" cmd = ['bcdedit', '/deletevalue', '{default}', 'safeboot'] run_program(cmd) -def disable_safemode_msi(): +def disable_safemode_msi() -> None: """Disable MSI access under safemode.""" cmd = ['reg', 'delete', REG_MSISERVER, '/f'] run_program(cmd) -def enable_safemode(): +def enable_safemode() -> None: """Edit BCD to set safeboot as default.""" cmd = ['bcdedit', '/set', '{default}', 'safeboot', 'network'] run_program(cmd) -def enable_safemode_msi(): +def enable_safemode_msi() -> None: """Enable MSI access under safemode.""" cmd = ['reg', 'add', REG_MSISERVER, '/f'] run_program(cmd) @@ -605,7 +609,7 @@ def enable_safemode_msi(): # Secure Boot Functions -def is_booted_uefi(): +def is_booted_uefi() -> bool: """Check if booted UEFI or legacy, returns bool.""" kernel = ctypes.windll.kernel32 firmware_type = ctypes.c_uint() @@ -621,7 +625,7 @@ def is_booted_uefi(): return firmware_type.value == 2 -def is_secure_boot_enabled(raise_exceptions=False, show_alert=False): +def is_secure_boot_enabled(raise_exceptions=False, show_alert=False) -> bool: """Check if Secure Boot is enabled, returns bool. If raise_exceptions is True then an exception is raised with details. @@ -671,7 +675,7 @@ def is_secure_boot_enabled(raise_exceptions=False, show_alert=False): # Service Functions -def disable_service(service_name): +def disable_service(service_name) -> None: """Set service startup to disabled.""" cmd = ['sc', 'config', service_name, 'start=', 'disabled'] run_program(cmd, check=False) @@ -681,7 +685,7 @@ def disable_service(service_name): raise GenericError(f'Failed to disable service {service_name}') -def enable_service(service_name, start_type='auto'): +def enable_service(service_name, start_type='auto') -> None: """Enable service by setting start type.""" cmd = ['sc', 'config', service_name, 'start=', start_type] psutil_type = 'automatic' @@ -696,7 +700,7 @@ def enable_service(service_name, start_type='auto'): raise GenericError(f'Failed to enable service {service_name}') -def get_service_status(service_name): +def get_service_status(service_name) -> str: """Get service status using psutil, returns str.""" status = 'unknown' try: @@ -708,7 +712,7 @@ def get_service_status(service_name): return status -def get_service_start_type(service_name): +def get_service_start_type(service_name) -> str: """Get service startup type using psutil, returns str.""" start_type = 'unknown' try: @@ -720,7 +724,7 @@ def get_service_start_type(service_name): return start_type -def start_service(service_name): +def start_service(service_name) -> None: """Stop service.""" cmd = ['net', 'start', service_name] run_program(cmd, check=False) @@ -730,7 +734,7 @@ def start_service(service_name): raise GenericError(f'Failed to start service {service_name}') -def stop_service(service_name): +def stop_service(service_name) -> None: """Stop service.""" cmd = ['net', 'stop', service_name] run_program(cmd, check=False) diff --git a/scripts/wk/repairs/win.py b/scripts/wk/repairs/win.py index 51c418d4..39417083 100644 --- a/scripts/wk/repairs/win.py +++ b/scripts/wk/repairs/win.py @@ -4,11 +4,13 @@ import atexit import logging import os +import pathlib import re import sys import time from subprocess import CalledProcessError, DEVNULL +from typing import Any from xml.dom.minidom import parse as xml_parse from wk.cfg.main import KIT_NAME_FULL, KIT_NAME_SHORT, WINDOWS_TIME_ZONE @@ -102,7 +104,7 @@ for error in ('CalledProcessError', 'FileNotFoundError'): # Auto Repairs -def build_menus(base_menus, title, presets): +def build_menus(base_menus, title, presets) -> dict[str, ui.Menu]: """Build menus, returns dict.""" menus = {} menus['Main'] = ui.Menu(title=f'{title}\n{ansi.color_string("Main Menu", "GREEN")}') @@ -169,7 +171,7 @@ def build_menus(base_menus, title, presets): return menus -def update_scheduled_task(): +def update_scheduled_task() -> None: """Create (or update) scheduled task to start repairs.""" cmd = [ 'schtasks', '/create', '/f', @@ -183,7 +185,7 @@ def update_scheduled_task(): run_program(cmd) -def end_session(): +def end_session() -> None: """End Auto Repairs session.""" # Remove logon task cmd = [ @@ -222,7 +224,7 @@ def end_session(): LOG.error('Failed to remove Auto Repairs session settings') -def get_entry_settings(group, name): +def get_entry_settings(group, name) -> dict[str, Any]: """Get menu entry settings from the registry, returns dict.""" key_path = fr'{AUTO_REPAIR_KEY}\{group}\{name}' settings = {} @@ -241,7 +243,7 @@ def get_entry_settings(group, name): return settings -def init(menus, presets): +def init(menus, presets) -> None: """Initialize Auto Repairs.""" session_started = is_session_started() @@ -267,7 +269,7 @@ def init(menus, presets): print('') -def init_run(options): +def init_run(options) -> None: """Initialize Auto Repairs Run.""" update_scheduled_task() if options['Kill Explorer']['Selected']: @@ -294,7 +296,7 @@ def init_run(options): TRY_PRINT.run('Running RKill...', run_rkill, msg_good='DONE') -def init_session(options): +def init_session(options) -> None: """Initialize Auto Repairs session.""" reg_set_value('HKCU', AUTO_REPAIR_KEY, 'SessionStarted', 1, 'DWORD') reg_set_value('HKCU', AUTO_REPAIR_KEY, 'LogName', get_root_logger_path().stem, 'SZ') @@ -319,7 +321,7 @@ def init_session(options): reboot(30) -def is_autologon_enabled(): +def is_autologon_enabled() -> bool: """Check if Autologon is enabled, returns bool.""" auto_admin_logon = False try: @@ -337,7 +339,7 @@ def is_autologon_enabled(): return auto_admin_logon -def is_session_started(): +def is_session_started() -> bool: """Check if session was started, returns bool.""" session_started = False try: @@ -349,7 +351,7 @@ def is_session_started(): return session_started -def load_preset(menus, presets, enable_menu_exit=True): +def load_preset(menus, presets, enable_menu_exit=True) -> None: """Load menu settings from preset and ask selection question(s).""" if not enable_menu_exit: MENU_PRESETS.actions['Main Menu'].update({'Disabled':True, 'Hidden':True}) @@ -375,7 +377,7 @@ def load_preset(menus, presets, enable_menu_exit=True): MENU_PRESETS.actions['Main Menu'].update({'Disabled':False, 'Hidden':False}) -def load_settings(menus): +def load_settings(menus) -> None: """Load session settings from the registry.""" for group, menu in menus.items(): if group == 'Main': @@ -384,7 +386,7 @@ def load_settings(menus): menu.options[name].update(get_entry_settings(group, ansi.strip_colors(name))) -def run_auto_repairs(base_menus, presets): +def run_auto_repairs(base_menus, presets) -> None: """Run Auto Repairs.""" set_log_path() title = f'{KIT_NAME_FULL}: Auto Repairs' @@ -443,7 +445,7 @@ def run_auto_repairs(base_menus, presets): ui.pause('Press Enter to exit...') -def run_group(group, menu): +def run_group(group, menu) -> None: """Run entries in group if appropriate.""" ui.print_info(f' {group}') for name, details in menu.options.items(): @@ -487,7 +489,7 @@ def run_group(group, menu): details['Function'](group, name) -def save_selection_settings(menus): +def save_selection_settings(menus) -> None: """Save selections in the registry.""" for group, menu in menus.items(): if group == 'Main': @@ -500,7 +502,7 @@ def save_selection_settings(menus): ) -def save_settings(group, name, result=None, **kwargs): +def save_settings(group, name, result=None, **kwargs) -> None: """Save entry settings in the registry.""" key_path = fr'{AUTO_REPAIR_KEY}\{group}\{ansi.strip_colors(name)}' @@ -528,7 +530,7 @@ def save_settings(group, name, result=None, **kwargs): reg_set_value('HKCU', key_path, value_name, data, data_type) -def set_log_path(): +def set_log_path() -> None: """Set log name using defaults or the saved registry value.""" try: log_path = reg_read_value('HKCU', AUTO_REPAIR_KEY, 'LogName') @@ -544,7 +546,7 @@ def set_log_path(): ) -def show_main_menu(base_menus, menus, presets, title): +def show_main_menu(base_menus, menus, presets, title) -> None: """Show main menu and handle actions.""" while True: update_main_menu(menus) @@ -559,7 +561,7 @@ def show_main_menu(base_menus, menus, presets, title): raise SystemExit -def show_sub_menu(menu): +def show_sub_menu(menu) -> None: """Show sub-menu and handle sub-menu actions.""" while True: selection = menu.advanced_select() @@ -590,7 +592,7 @@ def show_sub_menu(menu): menu.options[name][key] = value -def update_main_menu(menus): +def update_main_menu(menus) -> None: """Update main menu based on current selections.""" index = 1 skip = 'Reboot' @@ -609,7 +611,7 @@ def update_main_menu(menus): # Auto Repairs: Wrapper Functions -def auto_adwcleaner(group, name): +def auto_adwcleaner(group, name) -> None: """Run AdwCleaner scan. save_settings() is called first since AdwCleaner may kill this script. @@ -621,25 +623,25 @@ def auto_adwcleaner(group, name): save_settings(group, name, result=result) -def auto_backup_browser_profiles(group, name): +def auto_backup_browser_profiles(group, name) -> None: """Backup browser profiles.""" backup_all_browser_profiles(use_try_print=True) save_settings(group, name, done=True, failed=False, message='DONE') -def auto_backup_power_plans(group, name): +def auto_backup_power_plans(group, name) -> None: """Backup power plans.""" result = TRY_PRINT.run('Backup Power Plans...', export_power_plans) save_settings(group, name, result=result) -def auto_backup_registry(group, name): +def auto_backup_registry(group, name) -> None: """Backup registry.""" result = TRY_PRINT.run('Backup Registry...', backup_registry) save_settings(group, name, result=result) -def auto_bleachbit(group, name): +def auto_bleachbit(group, name) -> None: """Run BleachBit to clean files.""" result = TRY_PRINT.run( 'BleachBit...', run_bleachbit, BLEACH_BIT_CLEANERS, msg_good='DONE', @@ -647,7 +649,7 @@ def auto_bleachbit(group, name): save_settings(group, name, result=result) -def auto_chkdsk(group, name): +def auto_chkdsk(group, name) -> None: """Run CHKDSK repairs.""" needs_reboot = False result = TRY_PRINT.run(f'CHKDSK ({SYSTEMDRIVE})...', run_chkdsk_online) @@ -671,7 +673,7 @@ def auto_chkdsk(group, name): reboot() -def auto_disable_pending_renames(group, name): +def auto_disable_pending_renames(group, name) -> None: """Disable pending renames.""" result = TRY_PRINT.run( 'Disabling pending renames...', disable_pending_renames, @@ -679,7 +681,7 @@ def auto_disable_pending_renames(group, name): save_settings(group, name, result=result) -def auto_dism(group, name): +def auto_dism(group, name) -> None: """Run DISM repairs.""" needs_reboot = False result = TRY_PRINT.run('DISM (RestoreHealth)...', run_dism) @@ -704,7 +706,7 @@ def auto_dism(group, name): reboot() -def auto_enable_regback(group, name): +def auto_enable_regback(group, name) -> None: """Enable RegBack.""" result = TRY_PRINT.run( 'Enable RegBack...', reg_set_value, 'HKLM', @@ -714,19 +716,19 @@ def auto_enable_regback(group, name): save_settings(group, name, result=result) -def auto_hitmanpro(group, name): +def auto_hitmanpro(group, name) -> None: """Run HitmanPro scan.""" result = TRY_PRINT.run('HitmanPro...', run_hitmanpro, msg_good='DONE') save_settings(group, name, result=result) -def auto_kvrt(group, name): +def auto_kvrt(group, name) -> None: """Run KVRT scan.""" result = TRY_PRINT.run('KVRT...', run_kvrt, msg_good='DONE') save_settings(group, name, result=result) -def auto_microsoft_defender(group, name): +def auto_microsoft_defender(group, name) -> None: """Run Microsoft Defender scan.""" result = TRY_PRINT.run( 'Microsoft Defender...', run_microsoft_defender, msg_good='DONE', @@ -734,14 +736,14 @@ def auto_microsoft_defender(group, name): save_settings(group, name, result=result) -def auto_reboot(group, name): +def auto_reboot(group, name) -> None: """Reboot the system.""" save_settings(group, name, done=True, failed=False, message='DONE') print('') reboot(30) -def auto_remove_power_plan(group, name): +def auto_remove_power_plan(group, name) -> None: """Remove custom power plan and set to Balanced.""" result = TRY_PRINT.run( 'Remove Custom Power Plan...', remove_custom_power_plan, @@ -749,7 +751,7 @@ def auto_remove_power_plan(group, name): save_settings(group, name, result=result) -def auto_repair_registry(group, name): +def auto_repair_registry(group, name) -> None: """Delete registry keys with embedded null characters.""" result = TRY_PRINT.run( 'Running Registry repairs...', delete_registry_null_keys, @@ -757,19 +759,19 @@ def auto_repair_registry(group, name): save_settings(group, name, result=result) -def auto_reset_power_plans(group, name): +def auto_reset_power_plans(group, name) -> None: """Reset power plans.""" result = TRY_PRINT.run('Reset Power Plans...', reset_power_plans) save_settings(group, name, result=result) -def auto_reset_proxy(group, name): +def auto_reset_proxy(group, name) -> None: """Reset proxy settings.""" result = TRY_PRINT.run('Clearing proxy settings...', reset_proxy) save_settings(group, name, result=result) -def auto_reset_windows_policies(group, name): +def auto_reset_windows_policies(group, name) -> None: """Reset Windows policies to defaults.""" result = TRY_PRINT.run( 'Resetting Windows policies...', reset_windows_policies, @@ -777,13 +779,13 @@ def auto_reset_windows_policies(group, name): save_settings(group, name, result=result) -def auto_restore_uac_defaults(group, name): +def auto_restore_uac_defaults(group, name) -> None: """Restore UAC default settings.""" result = TRY_PRINT.run('Restoring UAC defaults...', restore_uac_defaults) save_settings(group, name, result=result) -def auto_set_custom_power_plan(group, name): +def auto_set_custom_power_plan(group, name) -> None: """Set custom power plan.""" result = TRY_PRINT.run( 'Set Custom Power Plan...', create_custom_power_plan, @@ -793,13 +795,13 @@ def auto_set_custom_power_plan(group, name): save_settings(group, name, result=result) -def auto_sfc(group, name): +def auto_sfc(group, name) -> None: """Run SFC repairs.""" result = TRY_PRINT.run('SFC Scan...', run_sfc_scan) save_settings(group, name, result=result) -def auto_system_restore_create(group, name): +def auto_system_restore_create(group, name) -> None: """Create System Restore point.""" result = TRY_PRINT.run( 'Create System Restore...', create_system_restore_point, @@ -807,7 +809,7 @@ def auto_system_restore_create(group, name): save_settings(group, name, result=result) -def auto_system_restore_enable(group, name): +def auto_system_restore_enable(group, name) -> None: """Enable System Restore.""" cmd = [ 'powershell', '-Command', 'Enable-ComputerRestore', @@ -817,13 +819,13 @@ def auto_system_restore_enable(group, name): save_settings(group, name, result=result) -def auto_system_restore_set_size(group, name): +def auto_system_restore_set_size(group, name) -> None: """Set System Restore size.""" result = TRY_PRINT.run('Set System Restore Size...', set_system_restore_size) save_settings(group, name, result=result) -def auto_uninstallview(group, name): +def auto_uninstallview(group, name) -> None: """Run UninstallView.""" result = TRY_PRINT.run( 'UninstallView...', run_uninstallview, msg_good='DONE', @@ -831,7 +833,7 @@ def auto_uninstallview(group, name): save_settings(group, name, result=result) -def auto_windows_updates_disable(group, name): +def auto_windows_updates_disable(group, name) -> None: """Disable Windows Updates.""" result = TRY_PRINT.run('Disable Windows Updates...', disable_windows_updates) if result['Failed']: @@ -840,13 +842,13 @@ def auto_windows_updates_disable(group, name): save_settings(group, name, result=result) -def auto_windows_updates_enable(group, name): +def auto_windows_updates_enable(group, name) -> None: """Enable Windows Updates.""" result = TRY_PRINT.run('Enable Windows Updates...', enable_windows_updates) save_settings(group, name, result=result) -def auto_windows_updates_reset(group, name): +def auto_windows_updates_reset(group, name) -> None: """Reset Windows Updates.""" result = TRY_PRINT.run('Reset Windows Updates...', reset_windows_updates) if result['Failed']: @@ -856,12 +858,12 @@ def auto_windows_updates_reset(group, name): # Misc Functions -def set_backup_path(name, date=False): +def set_backup_path(name, date=False) -> pathlib.Path: """Set backup path, returns pathlib.Path.""" return set_local_storage_path('Backups', name, date) -def set_local_storage_path(folder, name, date=False): +def set_local_storage_path(folder, name, date=False) -> pathlib.Path: """Get path for local storage, returns pathlib.Path.""" local_path = get_path_obj(f'{SYSTEMDRIVE}/{KIT_NAME_SHORT}/{folder}/{name}') if date: @@ -869,13 +871,13 @@ def set_local_storage_path(folder, name, date=False): return local_path -def set_quarantine_path(name, date=False): +def set_quarantine_path(name, date=False) -> pathlib.Path: """Set quarantine path, returns pathlib.Path.""" return set_local_storage_path('Quarantine', name, date) # Tool Functions -def backup_all_browser_profiles(use_try_print=False): +def backup_all_browser_profiles(use_try_print=False) -> None: """Backup browser profiles for all users.""" users = get_path_obj(f'{SYSTEMDRIVE}/Users') for userprofile in users.iterdir(): @@ -884,7 +886,7 @@ def backup_all_browser_profiles(use_try_print=False): backup_browser_profiles(userprofile, use_try_print) -def backup_browser_chromium(backup_path, browser, search_path, use_try_print): +def backup_browser_chromium(backup_path, browser, search_path, use_try_print) -> None: """Backup Chromium-based browser profile.""" for item in search_path.iterdir(): match = re.match(r'^(Default|Profile).*', item.name, re.IGNORECASE) @@ -914,7 +916,7 @@ def backup_browser_chromium(backup_path, browser, search_path, use_try_print): run_program(cmd, check=False) -def backup_browser_firefox(backup_path, search_path, use_try_print): +def backup_browser_firefox(backup_path, search_path, use_try_print) -> None: """Backup Firefox browser profile.""" output_path = backup_path.joinpath('Firefox.7z') @@ -939,7 +941,7 @@ def backup_browser_firefox(backup_path, search_path, use_try_print): run_program(cmd, check=False) -def backup_browser_profiles(userprofile, use_try_print=False): +def backup_browser_profiles(userprofile, use_try_print=False) -> None: """Backup browser profiles for userprofile.""" backup_path = set_backup_path('Browsers', date=True) backup_path = backup_path.joinpath(userprofile.name) @@ -968,7 +970,7 @@ def backup_browser_profiles(userprofile, use_try_print=False): pass -def backup_registry(): +def backup_registry() -> None: """Backup Registry.""" backup_path = set_backup_path('Registry', date=True) backup_path.parent.mkdir(parents=True, exist_ok=True) @@ -981,12 +983,12 @@ def backup_registry(): run_tool('ERUNT', 'ERUNT', backup_path, 'sysreg', 'curuser', 'otherusers') -def delete_registry_null_keys(): +def delete_registry_null_keys() -> None: """Delete registry keys with embedded null characters.""" run_tool('RegDelNull', 'RegDelNull', '-s', '-y', download=True) -def log_kvrt_results(log_path, report_path): +def log_kvrt_results(log_path, report_path) -> None: """Parse KVRT report and log results in plain text.""" log_text = '' report_file = None @@ -1027,7 +1029,7 @@ def log_kvrt_results(log_path, report_path): log_path.write_text(log_text, encoding='utf-8') -def run_adwcleaner(): +def run_adwcleaner() -> None: """Run AdwCleaner.""" settings_path = get_tool_path('AdwCleaner', 'AdwCleaner', check=False) settings_path = settings_path.with_name('settings') @@ -1037,7 +1039,7 @@ def run_adwcleaner(): run_tool('AdwCleaner', 'AdwCleaner', download=True) -def run_bleachbit(cleaners, preview=True): +def run_bleachbit(cleaners, preview=True) -> None: """Run BleachBit to either clean or preview files.""" cmd_args = ( '--preview' if preview else '--clean', @@ -1048,11 +1050,15 @@ def run_bleachbit(cleaners, preview=True): proc = run_tool('BleachBit', 'bleachbit_console', *cmd_args) # Save logs - log_path.write_text(proc.stdout, encoding='utf-8') - log_path.with_suffix('.err').write_text(proc.stderr, encoding='utf-8') + log_path.write_text( + proc.stdout, encoding='utf-8', # type: ignore[reportGeneralTypeIssues] + ) + log_path.with_suffix('.err').write_text( + proc.stderr, encoding='utf-8', # type: ignore[reportGeneralTypeIssues] + ) -def run_hitmanpro(): +def run_hitmanpro() -> None: """Run HitmanPro scan.""" log_path = format_log_path(log_name='HitmanPro', timestamp=True, tool=True) log_path = log_path.with_suffix('.xml') @@ -1061,7 +1067,7 @@ def run_hitmanpro(): run_tool('HitmanPro', 'HitmanPro', *cmd_args, download=True) -def run_kvrt(): +def run_kvrt() -> None: """Run KVRT scan.""" log_path = format_log_path(log_name='KVRT', timestamp=True, tool=True) log_path.parent.mkdir(parents=True, exist_ok=True) @@ -1102,11 +1108,11 @@ def run_kvrt(): log_kvrt_results(log_path, report_path) -def run_microsoft_defender(full=True): +def run_microsoft_defender(full=True) -> None: """Run Microsoft Defender scan.""" reg_key = r'Software\Microsoft\Windows Defender' - def _get_defender_path(): + def _get_defender_path() -> str: install_path = reg_read_value('HKLM', reg_key, 'InstallLocation') return fr'{install_path}\MpCmdRun.exe' @@ -1147,7 +1153,7 @@ def run_microsoft_defender(full=True): raise GenericError('Failed to run scan or clean items.') -def run_rkill(): +def run_rkill() -> None: """Run RKill scan.""" log_path = format_log_path(log_name='RKill', timestamp=True, tool=True) log_path.parent.mkdir(parents=True, exist_ok=True) @@ -1161,7 +1167,7 @@ def run_rkill(): run_tool('RKill', 'RKill', *cmd_args, download=True) -def run_tdsskiller(): +def run_tdsskiller() -> None: """Run TDSSKiller scan.""" log_path = format_log_path(log_name='TDSSKiller', timestamp=True, tool=True) log_path.parent.mkdir(parents=True, exist_ok=True) @@ -1179,13 +1185,13 @@ def run_tdsskiller(): run_tool('TDSSKiller', 'TDSSKiller', *cmd_args, download=True) -def run_uninstallview(): +def run_uninstallview() -> None: """Run UninstallView.""" run_tool('UninstallView', 'UninstallView') # OS Built-in Functions -def create_custom_power_plan(enable_sleep=True, keep_display_on=False): +def create_custom_power_plan(enable_sleep=True, keep_display_on=False) -> None: """Create new power plan and set as active.""" custom_guid = POWER_PLANS['Custom'] sleep_timeouts = POWER_PLAN_SLEEP_TIMEOUTS['High Performance'] @@ -1234,7 +1240,7 @@ def create_custom_power_plan(enable_sleep=True, keep_display_on=False): run_program(cmd) -def create_system_restore_point(): +def create_system_restore_point() -> None: """Create System Restore point.""" cmd = [ 'powershell', '-Command', 'Checkpoint-Computer', @@ -1249,7 +1255,7 @@ def create_system_restore_point(): raise GenericWarning('Skipped, a restore point was created too recently') -def disable_pending_renames(): +def disable_pending_renames() -> None: """Disable pending renames.""" reg_set_value( 'HKLM', r'SYSTEM\CurrentControlSet\Control\Session Manager', @@ -1257,18 +1263,18 @@ def disable_pending_renames(): ) -def disable_windows_updates(): +def disable_windows_updates() -> None: """Disable and stop Windows Updates.""" disable_service('wuauserv') stop_service('wuauserv') -def enable_windows_updates(): +def enable_windows_updates() -> None: """Enable Windows Updates.""" enable_service('wuauserv', 'demand') -def export_power_plans(): +def export_power_plans() -> None: """Export existing power plans.""" backup_path = set_backup_path('Power Plans', date=True) @@ -1299,13 +1305,13 @@ def export_power_plans(): run_program(cmd) -def kill_explorer(): +def kill_explorer() -> None: """Kill all Explorer processes.""" cmd = ['taskkill', '/im', 'explorer.exe', '/f'] run_program(cmd, check=False) -def reboot(timeout=10): +def reboot(timeout=10) -> None: """Reboot the system.""" atexit.unregister(start_explorer) ui.print_warning(f'Rebooting the system in {timeout} seconds...') @@ -1315,7 +1321,7 @@ def reboot(timeout=10): raise SystemExit -def remove_custom_power_plan(high_performance=False): +def remove_custom_power_plan(high_performance=False) -> None: """Remove custom power plan and set to a built-in plan. If high_performance is True then set to High Performance and set @@ -1342,13 +1348,13 @@ def remove_custom_power_plan(high_performance=False): run_program(cmd) -def reset_power_plans(): +def reset_power_plans() -> None: """Reset power plans to their default settings.""" cmd = ['powercfg', '-RestoreDefaultSchemes'] run_program(cmd) -def reset_proxy(): +def reset_proxy() -> None: """Reset WinHTTP proxy settings.""" cmd = ['netsh', 'winhttp', 'reset', 'proxy'] proc = run_program(cmd, check=False) @@ -1358,7 +1364,7 @@ def reset_proxy(): raise GenericError('Failed to reset proxy settings.') -def reset_windows_policies(): +def reset_windows_policies() -> None: """Reset Windows policies to defaults.""" cmd = ['gpupdate', '/force'] proc = run_program(cmd, check=False) @@ -1368,7 +1374,7 @@ def reset_windows_policies(): raise GenericError('Failed to reset one or more policies.') -def reset_windows_updates(): +def reset_windows_updates() -> None: """Reset Windows Updates.""" system_root = os.environ.get('SYSTEMROOT', 'C:/Windows') src_path = f'{system_root}/SoftwareDistribution' @@ -1381,7 +1387,7 @@ def reset_windows_updates(): pass -def restore_uac_defaults(): +def restore_uac_defaults() -> None: """Restore UAC default settings.""" settings = REG_UAC_DEFAULTS_WIN10 if OS_VERSION in (7, 8, 8.1): @@ -1390,7 +1396,7 @@ def restore_uac_defaults(): reg_write_settings(settings) -def run_chkdsk_offline(): +def run_chkdsk_offline() -> None: """Set filesystem 'dirty bit' to force a CHKDSK during startup.""" cmd = ['fsutil', 'dirty', 'set', SYSTEMDRIVE] proc = run_program(cmd, check=False) @@ -1400,7 +1406,7 @@ def run_chkdsk_offline(): raise GenericError('Failed to set dirty bit.') -def run_chkdsk_online(): +def run_chkdsk_online() -> None: """Run CHKDSK. NOTE: If run on Windows 8+ online repairs are attempted. @@ -1440,7 +1446,7 @@ def run_chkdsk_online(): raise GenericError('Issue(s) detected') -def run_dism(repair=True): +def run_dism(repair=True) -> None: """Run DISM to either scan or repair component store health.""" conemu_args = ['-new_console:nb', '-new_console:s33V'] if IN_CONEMU else [] @@ -1479,7 +1485,7 @@ def run_dism(repair=True): raise GenericError('Issue(s) detected') -def run_sfc_scan(): +def run_sfc_scan() -> None: """Run SFC and save results.""" cmd = ['sfc', '/scannow'] log_path = format_log_path(log_name='SFC', timestamp=True, tool=True) @@ -1506,7 +1512,7 @@ def run_sfc_scan(): raise OSError -def set_system_restore_size(size=8): +def set_system_restore_size(size=8) -> None: """Set System Restore size.""" cmd = [ 'vssadmin', 'Resize', 'ShadowStorage', @@ -1515,7 +1521,7 @@ def set_system_restore_size(size=8): run_program(cmd, pipe=False, stderr=DEVNULL, stdout=DEVNULL) -def start_explorer(): +def start_explorer() -> None: """Start Explorer.""" popen_program(['explorer.exe']) diff --git a/scripts/wk/setup/win.py b/scripts/wk/setup/win.py index 97f93b5b..355e8fd3 100644 --- a/scripts/wk/setup/win.py +++ b/scripts/wk/setup/win.py @@ -8,6 +8,8 @@ import os import re import sys +from typing import Any + from wk.cfg.main import KIT_NAME_FULL from wk.cfg.setup import ( BROWSER_PATHS, @@ -99,7 +101,7 @@ for error in ('CalledProcessError', 'FileNotFoundError'): # Auto Setup -def build_menus(base_menus, title, presets): +def build_menus(base_menus, title, presets) -> dict[str, ui.Menu]: """Build menus, returns dict.""" menus = {} menus['Main'] = ui.Menu(title=f'{title}\n{ansi.color_string("Main Menu", "GREEN")}') @@ -154,7 +156,7 @@ def build_menus(base_menus, title, presets): return menus -def check_os_and_set_menu_title(title): +def check_os_and_set_menu_title(title) -> str: """Check OS version and update title for menus, returns str.""" color = None os_name = get_os_name(check=False) @@ -180,7 +182,7 @@ def check_os_and_set_menu_title(title): return f'{title} ({ansi.color_string(os_name, color)})' -def load_preset(menus, presets, title, enable_menu_exit=True): +def load_preset(menus, presets, title, enable_menu_exit=True) -> None: """Load menu settings from preset and ask selection question(s).""" if not enable_menu_exit: MENU_PRESETS.actions['Main Menu'].update({'Disabled':True, 'Hidden':True}) @@ -219,7 +221,7 @@ def load_preset(menus, presets, title, enable_menu_exit=True): menus[group_name].options[entry_name]['Selected'] = False -def run_auto_setup(base_menus, presets): +def run_auto_setup(base_menus, presets) -> None: """Run Auto Setup.""" update_log_path(dest_name='Auto Setup', timestamp=True) title = f'{KIT_NAME_FULL}: Auto Setup' @@ -261,7 +263,7 @@ def run_auto_setup(base_menus, presets): ui.pause('Press Enter to exit...') -def run_group(group, menu): +def run_group(group, menu) -> None: """Run entries in group if appropriate.""" ui.print_info(f' {group}') for name, details in menu.options.items(): @@ -276,7 +278,7 @@ def run_group(group, menu): details['Function']() -def show_main_menu(base_menus, menus, presets, title): +def show_main_menu(base_menus, menus, presets, title) -> None: """Show main menu and handle actions.""" while True: update_main_menu(menus) @@ -291,7 +293,7 @@ def show_main_menu(base_menus, menus, presets, title): raise SystemExit -def show_sub_menu(menu): +def show_sub_menu(menu) -> None: """Show sub-menu and handle sub-menu actions.""" while True: selection = menu.advanced_select() @@ -307,7 +309,7 @@ def show_sub_menu(menu): menu.options[name]['Selected'] = value -def update_main_menu(menus): +def update_main_menu(menus) -> None: """Update main menu based on current selections.""" index = 1 skip = 'Reboot' @@ -326,37 +328,37 @@ def update_main_menu(menus): # Auto Repairs: Wrapper Functions -def auto_backup_registry(): +def auto_backup_registry() -> None: """Backup registry.""" TRY_PRINT.run('Backup Registry...', backup_registry) -def auto_backup_browser_profiles(): +def auto_backup_browser_profiles() -> None: """Backup browser profiles.""" backup_all_browser_profiles(use_try_print=True) -def auto_backup_power_plans(): +def auto_backup_power_plans() -> None: """Backup power plans.""" TRY_PRINT.run('Backup Power Plans...', export_power_plans) -def auto_reset_power_plans(): +def auto_reset_power_plans() -> None: """Reset power plans.""" TRY_PRINT.run('Reset Power Plans...', reset_power_plans) -def auto_set_custom_power_plan(): +def auto_set_custom_power_plan() -> None: """Set custom power plan.""" TRY_PRINT.run('Set Custom Power Plan...', create_custom_power_plan) -def auto_enable_bsod_minidumps(): +def auto_enable_bsod_minidumps() -> None: """Enable saving minidumps during BSoDs.""" TRY_PRINT.run('Enable BSoD mini dumps...', enable_bsod_minidumps) -def auto_enable_regback(): +def auto_enable_regback() -> None: """Enable RegBack.""" TRY_PRINT.run( 'Enable RegBack...', reg_set_value, 'HKLM', @@ -365,7 +367,7 @@ def auto_enable_regback(): ) -def auto_system_restore_enable(): +def auto_system_restore_enable() -> None: """Enable System Restore.""" cmd = [ 'powershell', '-Command', 'Enable-ComputerRestore', @@ -374,28 +376,28 @@ def auto_system_restore_enable(): TRY_PRINT.run('Enable System Restore...', run_program, cmd=cmd) -def auto_system_restore_set_size(): +def auto_system_restore_set_size() -> None: """Set System Restore size.""" TRY_PRINT.run('Set System Restore Size...', set_system_restore_size) -def auto_system_restore_create(): +def auto_system_restore_create() -> None: """Create System Restore point.""" TRY_PRINT.run('Create System Restore...', create_system_restore_point) -def auto_windows_updates_enable(): +def auto_windows_updates_enable() -> None: """Enable Windows Updates.""" TRY_PRINT.run('Enable Windows Updates...', enable_windows_updates) # Auto Setup: Wrapper Functions -def auto_activate_windows(): +def auto_activate_windows() -> None: """Attempt to activate Windows using BIOS key.""" TRY_PRINT.run('Windows Activation...', activate_with_bios) -def auto_config_browsers(): +def auto_config_browsers() -> None: """Configure Browsers.""" prompt = ' Press Enter to continue...' TRY_PRINT.run('Chrome Notifications...', disable_chrome_notifications) @@ -412,27 +414,27 @@ def auto_config_browsers(): print(f'\033[F\r{" "*len(prompt)}\r', end='', flush=True) -def auto_config_explorer(): +def auto_config_explorer() -> None: """Configure Windows Explorer and restart the process.""" TRY_PRINT.run('Windows Explorer...', config_explorer) -def auto_config_open_shell(): +def auto_config_open_shell() -> None: """Configure Open Shell.""" TRY_PRINT.run('Open Shell...', config_open_shell) -def auto_export_aida64_report(): +def auto_export_aida64_report() -> None: """Export AIDA64 reports.""" TRY_PRINT.run('AIDA64 Report...', export_aida64_report) -def auto_install_firefox(): +def auto_install_firefox() -> None: """Install Firefox.""" TRY_PRINT.run('Firefox...', install_firefox) -def auto_install_libreoffice(): +def auto_install_libreoffice() -> None: """Install LibreOffice. NOTE: It is assumed that auto_install_vcredists() will be run @@ -441,105 +443,105 @@ def auto_install_libreoffice(): TRY_PRINT.run('LibreOffice...', install_libreoffice, vcredist=False) -def auto_install_open_shell(): +def auto_install_open_shell() -> None: """Install Open Shell.""" TRY_PRINT.run('Open Shell...', install_open_shell) -def auto_install_software_bundle(): +def auto_install_software_bundle() -> None: """Install standard software bundle.""" TRY_PRINT.run('Software Bundle...', install_software_bundle) -def auto_install_vcredists(): +def auto_install_vcredists() -> None: """Install latest supported Visual C++ runtimes.""" TRY_PRINT.run('Visual C++ Runtimes...', install_vcredists) -def auto_open_device_manager(): +def auto_open_device_manager() -> None: """Open Device Manager.""" TRY_PRINT.run('Device Manager...', open_device_manager) -def auto_open_hwinfo_sensors(): +def auto_open_hwinfo_sensors() -> None: """Open HWiNFO Sensors.""" TRY_PRINT.run('HWiNFO Sensors...', open_hwinfo_sensors) -def auto_open_snappy_driver_installer_origin(): +def auto_open_snappy_driver_installer_origin() -> None: """Open Snappy Driver Installer Origin.""" TRY_PRINT.run('Snappy Driver Installer...', open_snappy_driver_installer_origin) -def auto_open_windows_activation(): +def auto_open_windows_activation() -> None: """Open Windows Activation.""" if not is_activated(): TRY_PRINT.run('Windows Activation...', open_windows_activation) -def auto_open_windows_updates(): +def auto_open_windows_updates() -> None: """Open Windows Updates.""" TRY_PRINT.run('Windows Updates...', open_windows_updates) -def auto_open_xmplay(): +def auto_open_xmplay() -> None: """Open XMPlay.""" TRY_PRINT.run('XMPlay...', open_xmplay) -def auto_show_4k_alignment_check(): +def auto_show_4k_alignment_check() -> None: """Display 4K alignment check.""" TRY_PRINT.run('4K alignment Check...', check_4k_alignment, show_alert=True) -def auto_show_installed_antivirus(): +def auto_show_installed_antivirus() -> None: """Display installed antivirus.""" TRY_PRINT.run('Virus Protection...', get_installed_antivirus) -def auto_show_installed_ram(): +def auto_show_installed_ram() -> None: """Display installed RAM.""" TRY_PRINT.run('Installed RAM...', get_installed_ram, as_list=True, raise_exceptions=True, ) -def auto_show_os_activation(): +def auto_show_os_activation() -> None: """Display OS activation status.""" TRY_PRINT.run('Activation...', get_os_activation, as_list=True) -def auto_show_os_name(): +def auto_show_os_name() -> None: """Display OS Name.""" TRY_PRINT.run('Operating System...', get_os_name, as_list=True) -def auto_show_secure_boot_status(): +def auto_show_secure_boot_status() -> None: """Display Secure Boot status.""" TRY_PRINT.run( 'Secure Boot...', check_secure_boot_status, msg_good='Enabled', ) -def auto_show_storage_status(): +def auto_show_storage_status() -> None: """Display storage status.""" TRY_PRINT.run('Storage Status...', get_storage_status) -def auto_windows_temp_fix(): +def auto_windows_temp_fix() -> None: """Restore default ACLs for Windows\\Temp.""" TRY_PRINT.run(r'Windows\Temp fix...', fix_windows_temp) # Configure Functions -def config_explorer(): +def config_explorer() -> None: """Configure Windows Explorer and restart the process.""" reg_write_settings(REG_WINDOWS_EXPLORER) kill_procs('explorer.exe', force=True) popen_program(['explorer.exe']) -def config_open_shell(): +def config_open_shell() -> None: """Configure Open Shell.""" has_low_power_idle = False @@ -559,7 +561,7 @@ def config_open_shell(): reg_write_settings(REG_OPEN_SHELL_LOW_POWER_IDLE) -def disable_chrome_notifications(): +def disable_chrome_notifications() -> None: """Disable notifications in Google Chrome.""" defaults_key = 'default_content_setting_values' profiles = [] @@ -601,13 +603,13 @@ def disable_chrome_notifications(): pref_file.write_text(json.dumps(pref_data, separators=(',', ':'))) -def enable_bsod_minidumps(): +def enable_bsod_minidumps() -> None: """Enable saving minidumps during BSoDs.""" cmd = ['wmic', 'RECOVEROS', 'set', 'DebugInfoType', '=', '3'] run_program(cmd) -def enable_ublock_origin(): +def enable_ublock_origin() -> None: """Enable uBlock Origin in supported browsers.""" base_paths = [ PROGRAMFILES_64, PROGRAMFILES_32, os.environ.get('LOCALAPPDATA'), @@ -637,7 +639,7 @@ def enable_ublock_origin(): popen_program(cmd, pipe=True) -def fix_windows_temp(): +def fix_windows_temp() -> None: """Restore default permissions for Windows\\Temp.""" permissions = ( 'Users:(CI)(X,WD,AD)', @@ -649,7 +651,7 @@ def fix_windows_temp(): # Install Functions -def install_firefox(): +def install_firefox() -> None: """Install Firefox. As far as I can tell if you use the EXE installers then it will use @@ -754,7 +756,7 @@ def install_libreoffice( run_program(cmd) -def install_open_shell(): +def install_open_shell() -> None: """Install Open Shell (just the Start Menu).""" skin_zip = get_tool_path('OpenShell', 'Fluent-Metro', suffix='zip') @@ -784,7 +786,7 @@ def install_open_shell(): run_program(cmd) -def install_software_bundle(): +def install_software_bundle() -> None: """Install standard software bundle.""" download_tool('Ninite', 'Software Bundle') installer = get_tool_path('Ninite', 'Software Bundle') @@ -809,7 +811,7 @@ def install_software_bundle(): end='', flush=True) -def install_vcredists(): +def install_vcredists() -> None: """Install latest supported Visual C++ runtimes.""" for year in (2012, 2013, 2022): cmd_args = ['/install', '/passive', '/norestart'] @@ -826,7 +828,7 @@ def install_vcredists(): run_program([installer, *cmd_args]) -def uninstall_firefox(): +def uninstall_firefox() -> None: """Uninstall all copies of Firefox.""" json_file = format_log_path(log_name='Installed Programs', timestamp=True) json_file = json_file.with_name(f'{json_file.stem}.json') @@ -847,13 +849,14 @@ def uninstall_firefox(): # Misc Functions -def check_secure_boot_status(): +def check_secure_boot_status() -> None: """Check Secure Boot status.""" is_secure_boot_enabled(raise_exceptions=True, show_alert=True) -def get_firefox_default_profile(profiles_ini): +def get_firefox_default_profile(profiles_ini) -> Any: """Get Firefox default profile, returns(pathlib.Path, encoding) or None.""" + # TODO: Refactor to remove dependancy on Any default_profile = None encoding = None parser = None @@ -890,7 +893,7 @@ def get_firefox_default_profile(profiles_ini): return (default_profile, encoding) -def get_storage_status(): +def get_storage_status() -> list[str]: """Get storage status for fixed disks, returns list.""" report = get_volume_usage(use_colors=True) for disk in get_raw_disks(): @@ -900,14 +903,14 @@ def get_storage_status(): return report -def set_default_browser(): +def set_default_browser() -> None: """Open Windows Settings to the default apps section.""" cmd = ['start', '', 'ms-settings:defaultapps'] popen_program(cmd, shell=True) # Tool Functions -def export_aida64_report(): +def export_aida64_report() -> None: """Export AIDA64 report.""" report_path = format_log_path( log_name='AIDA64 System Report', @@ -928,12 +931,12 @@ def export_aida64_report(): raise GenericError('Error(s) encountered exporting report.') -def open_device_manager(): +def open_device_manager() -> None: """Open Device Manager.""" popen_program(['mmc', 'devmgmt.msc']) -def open_hwinfo_sensors(): +def open_hwinfo_sensors() -> None: """Open HWiNFO sensors.""" hwinfo_path = get_tool_path('HWiNFO', 'HWiNFO') base_config = hwinfo_path.with_name('general.ini') @@ -949,22 +952,22 @@ def open_hwinfo_sensors(): run_tool('HWiNFO', 'HWiNFO', popen=True) -def open_snappy_driver_installer_origin(): +def open_snappy_driver_installer_origin() -> None: """Open Snappy Driver Installer Origin.""" run_tool('SDIO', 'SDIO', cwd=True, pipe=True, popen=True) -def open_windows_activation(): +def open_windows_activation() -> None: """Open Windows Activation.""" popen_program(['slui']) -def open_windows_updates(): +def open_windows_updates() -> None: """Open Windows Updates.""" popen_program(['control', '/name', 'Microsoft.WindowsUpdate']) -def open_xmplay(): +def open_xmplay() -> None: """Open XMPlay.""" sleep(2) run_tool('XMPlay', 'XMPlay', 'music.7z', cwd=True, popen=True) diff --git a/scripts/wk/std.py b/scripts/wk/std.py index 85ae3f93..9e511527 100644 --- a/scripts/wk/std.py +++ b/scripts/wk/std.py @@ -33,7 +33,7 @@ class GenericWarning(Exception): # Functions -def bytes_to_string(size, decimals=0, use_binary=True): +def bytes_to_string(size, decimals=0, use_binary=True) -> str: """Convert size into a human-readable format, returns str. [Doctest] @@ -80,7 +80,7 @@ def sleep(seconds: Union[int, float] = 2) -> None: time.sleep(seconds) -def string_to_bytes(size, assume_binary=False): +def string_to_bytes(size, assume_binary=False) -> int: """Convert human-readable size str to bytes and return an int.""" LOG.debug('size: %s, assume_binary: %s', size, assume_binary) scale = 1000 diff --git a/scripts/wk/ui/ansi.py b/scripts/wk/ui/ansi.py index 2c21c0ed..c31b6459 100644 --- a/scripts/wk/ui/ansi.py +++ b/scripts/wk/ui/ansi.py @@ -23,12 +23,12 @@ COLORS = { # Functions -def clear_screen(): +def clear_screen() -> None: """Clear screen using ANSI escape.""" print('\033c', end='', flush=True) -def color_string(strings, colors, sep=' '): +def color_string(strings, colors, sep=' ') -> str: """Build colored string using ANSI escapes, returns str.""" clear_code = COLORS['CLEAR'] msg = [] @@ -60,7 +60,7 @@ def color_string(strings, colors, sep=' '): return sep.join(msg) -def strip_colors(string): +def strip_colors(string) -> str: """Strip known ANSI color escapes from string, returns str.""" LOG.debug('string: %s', string) for color in COLORS.values(): diff --git a/scripts/wk/ui/cli.py b/scripts/wk/ui/cli.py index c2f19292..14051884 100644 --- a/scripts/wk/ui/cli.py +++ b/scripts/wk/ui/cli.py @@ -10,9 +10,10 @@ import sys import traceback from collections import OrderedDict +from typing import Any + from prompt_toolkit import prompt from prompt_toolkit.validation import Validator, ValidationError -from typing import Tuple try: from functools import cache @@ -42,7 +43,7 @@ class InputChoiceValidator(Validator): self.choices = [str(c).upper() for c in choices] super().__init__() - def validate(self, document): + def validate(self, document) -> None: text = document.text if not (text or self.allow_empty): raise ValidationError( @@ -57,7 +58,7 @@ class InputChoiceValidator(Validator): class InputNotEmptyValidator(Validator): """Validate that input is not empty.""" - def validate(self, document): + def validate(self, document) -> None: text = document.text if not text: raise ValidationError( @@ -71,7 +72,7 @@ class InputTicketIDValidator(Validator): self.allow_empty = allow_empty super().__init__() - def validate(self, document): + def validate(self, document) -> None: text = document.text if not (text or self.allow_empty): raise ValidationError( @@ -90,7 +91,7 @@ class InputYesNoValidator(Validator): self.allow_empty = allow_empty super().__init__() - def validate(self, document): + def validate(self, document) -> None: text = document.text if not (text or self.allow_empty): raise ValidationError( @@ -121,7 +122,7 @@ class Menu(): self.separator = '─' self.title = title - def _generate_menu_text(self): + def _generate_menu_text(self) -> str: """Generate menu text, returns str.""" separator_string = self._get_separator_string() menu_lines = [self.title, separator_string] if self.title else [] @@ -162,7 +163,7 @@ class Menu(): def _get_display_name( self, name, details, - index=None, no_checkboxes=True, setting_item=False): + index=None, no_checkboxes=True, setting_item=False) -> str: """Format display name based on details and args, returns str.""" disabled = details.get('Disabled', False) if setting_item and not details['Selected']: @@ -190,7 +191,7 @@ class Menu(): # Done return display_name - def _get_separator_string(self): + def _get_separator_string(self) -> str: """Format separator length based on name lengths, returns str.""" separator_length = 0 @@ -212,7 +213,7 @@ class Menu(): # Done return self.separator * separator_length - def _get_valid_answers(self): + def _get_valid_answers(self) -> list[str]: """Get valid answers based on menu items, returns list.""" valid_answers = [] @@ -235,7 +236,7 @@ class Menu(): # Done return valid_answers - def _resolve_selection(self, selection) -> Tuple[str, dict]: + def _resolve_selection(self, selection) -> tuple[str, dict[Any, Any]]: """Get menu item based on user selection, returns tuple.""" offset = 1 resolved_selection = tuple() @@ -250,6 +251,10 @@ class Menu(): if details[1].get('Hidden', False): offset -= 1 elif str(_i+offset) == selection: + # TODO: Fix this typo! + # It was discovered after being in production for SEVERAL YEARS! + # Extra testing is needed to verify any calls to this function still + # depend on this functionality resolved_selection = (details) break else: @@ -262,7 +267,7 @@ class Menu(): # Done return resolved_selection - def _update(self, single_selection=True, settings_mode=False): + def _update(self, single_selection=True, settings_mode=False) -> None: """Update menu items in preparation for printing to screen.""" index = 0 @@ -300,7 +305,7 @@ class Menu(): no_checkboxes=True, ) - def _update_entry_selection_status(self, entry, toggle=True, status=None): + def _update_entry_selection_status(self, entry, toggle=True, status=None) -> None: """Update entry selection status either directly or by toggling.""" if entry in self.sets: # Update targets not the set itself @@ -314,14 +319,14 @@ class Menu(): else: section[entry]['Selected'] = status - def _update_set_selection_status(self, targets, status): + def _update_set_selection_status(self, targets, status) -> None: """Select or deselect options based on targets and status.""" for option, details in self.options.items(): # If (new) status is True and this option is a target then select # Otherwise deselect details['Selected'] = status and option in targets - def _user_select(self, prompt_msg): + def _user_select(self, prompt_msg) -> str: """Show menu and select an entry, returns str.""" menu_text = self._generate_menu_text() valid_answers = self._get_valid_answers() @@ -338,19 +343,19 @@ class Menu(): # Done return answer - def add_action(self, name, details=None): + def add_action(self, name, details=None) -> None: """Add action to menu.""" details = details if details else {} details['Selected'] = details.get('Selected', False) self.actions[name] = details - def add_option(self, name, details=None): + def add_option(self, name, details=None) -> None: """Add option to menu.""" details = details if details else {} details['Selected'] = details.get('Selected', False) self.options[name] = details - def add_set(self, name, details=None): + def add_set(self, name, details=None) -> None: """Add set to menu.""" details = details if details else {} details['Selected'] = details.get('Selected', False) @@ -362,13 +367,14 @@ class Menu(): # Add set self.sets[name] = details - def add_toggle(self, name, details=None): + def add_toggle(self, name, details=None) -> None: """Add toggle to menu.""" details = details if details else {} details['Selected'] = details.get('Selected', False) self.toggles[name] = details - def advanced_select(self, prompt_msg='Please make a selection: ') -> Tuple[str, dict]: + def advanced_select( + self, prompt_msg='Please make a selection: ') -> tuple[str, dict[Any, Any]]: """Display menu and make multiple selections, returns tuple. NOTE: Menu is displayed until an action entry is selected. @@ -387,7 +393,8 @@ class Menu(): # Done return selected_entry - def settings_select(self, prompt_msg='Please make a selection: '): + def settings_select( + self, prompt_msg='Please make a selection: ') -> tuple[str, dict[Any, Any]]: """Display menu and make multiple selections, returns tuple. NOTE: Menu is displayed until an action entry is selected. @@ -416,14 +423,15 @@ class Menu(): return selected_entry def simple_select( - self, prompt_msg='Please make a selection: ', update=True) -> Tuple[str, dict]: + self, prompt_msg='Please make a selection: ', + update=True) -> tuple[str, dict[Any, Any]]: """Display menu and make a single selection, returns tuple.""" if update: self._update() user_selection = self._user_select(prompt_msg) return self._resolve_selection(user_selection) - def update(self): + def update(self) -> None: """Update menu with default settings.""" self._update() @@ -443,7 +451,7 @@ class TryAndPrint(): self.verbose = False self.width = WIDTH - def _format_exception_message(self, _exception): + def _format_exception_message(self, _exception) -> str: """Format using the exception's args or name, returns str.""" LOG.debug( 'Formatting exception: %s, %s', @@ -490,7 +498,7 @@ class TryAndPrint(): # Done return message - def _format_function_output(self, output, msg_good): + def _format_function_output(self, output, msg_good) -> str: """Format function output for use in try_and_print(), returns str.""" LOG.debug('Formatting output: %s', output) @@ -528,26 +536,27 @@ class TryAndPrint(): # Done return result_msg - def _log_result(self, message, result_msg): + def _log_result(self, message, result_msg) -> None: """Log result text without color formatting.""" log_text = f'{" "*self.indent}{message:<{self.width}}{result_msg}' for line in log_text.splitlines(): line = strip_colors(line) LOG.info(line) - def add_error(self, exception_name): + def add_error(self, exception_name) -> None: """Add exception name to error list.""" if exception_name not in self.list_errors: self.list_errors.append(exception_name) - def add_warning(self, exception_name): + def add_warning(self, exception_name) -> None: """Add exception name to warning list.""" if exception_name not in self.list_warnings: self.list_warnings.append(exception_name) def run( self, message, function, *args, - catch_all=None, msg_good=None, verbose=None, **kwargs): + catch_all=None, msg_good=None, verbose=None, + **kwargs) -> dict[str, Any]: """Run a function and print the results, returns results as dict. If catch_all is True then (nearly) all exceptions will be caught. @@ -558,7 +567,7 @@ class TryAndPrint(): msg_bad, or exception text. The output should be a list or a subprocess.CompletedProcess object. - If msg_good is passed it will override self.msg_good for this call. + If msg_good is passed it will override self.msg_good. If verbose is True then exception names or messages will be used for the result message. Otherwise it will simply be set to result_bad. @@ -634,7 +643,8 @@ class TryAndPrint(): # Functions -def abort(prompt_msg='Aborted.', show_prompt_msg=True, return_code=1): +def abort( + prompt_msg='Aborted.', show_prompt_msg=True, return_code=1) -> None: """Abort script.""" print_warning(prompt_msg) if show_prompt_msg: @@ -643,23 +653,24 @@ def abort(prompt_msg='Aborted.', show_prompt_msg=True, return_code=1): sys.exit(return_code) -def ask(prompt_msg): +def ask(prompt_msg) -> bool: """Prompt the user with a Y/N question, returns bool.""" validator = InputYesNoValidator() # Show prompt response = input_text(f'{prompt_msg} [Y/N]: ', validator=validator) if response.upper().startswith('Y'): - answer = True - elif response.upper().startswith('N'): - answer = False + LOG.info('%sYes', prompt_msg) + return True + if response.upper().startswith('N'): + LOG.info('%sNo', prompt_msg) + return False - # Done - LOG.info('%s%s', prompt_msg, 'Yes' if answer else 'No') - return answer + # This shouldn't ever be reached + raise ValueError(f'Invalid answer given: {response}') -def beep(repeat=1): +def beep(repeat=1) -> None: """Play system bell with optional repeat.""" while repeat >= 1: # Print bell char without a newline @@ -668,7 +679,7 @@ def beep(repeat=1): repeat -= 1 -def choice(prompt_msg, choices): +def choice(prompt_msg, choices) -> str: """Choose an option from a provided list, returns str. Choices provided will be converted to uppercase and returned as such. @@ -686,7 +697,7 @@ def choice(prompt_msg, choices): return response.upper() -def fix_prompt(message): +def fix_prompt(message) -> str: """Fix prompt, returns str.""" if not message: message = 'Input text: ' @@ -697,7 +708,7 @@ def fix_prompt(message): @cache -def get_exception(name): +def get_exception(name) -> Exception: """Get exception by name, returns exception object. [Doctest] @@ -733,7 +744,7 @@ def get_exception(name): return obj -def get_ticket_id(): +def get_ticket_id() -> str: """Get ticket ID, returns str.""" prompt_msg = 'Please enter ticket ID:' validator = InputTicketIDValidator() @@ -768,7 +779,7 @@ def input_text( return result -def major_exception(): +def major_exception() -> None: """Display traceback, optionally upload detailes, and exit.""" LOG.critical('Major exception encountered', exc_info=True) print_error('Major exception', log=False) @@ -782,12 +793,12 @@ def major_exception(): raise SystemExit(1) -def pause(prompt_msg='Press Enter to continue... '): +def pause(prompt_msg='Press Enter to continue... ') -> None: """Simple pause implementation.""" input_text(prompt_msg, allow_empty=True) -def print_colored(strings, colors, log=False, sep=' ', **kwargs): +def print_colored(strings, colors, log=False, sep=' ', **kwargs) -> None: """Prints strings in the colors specified.""" LOG.debug( 'strings: %s, colors: %s, sep: %s, kwargs: %s', @@ -805,7 +816,7 @@ def print_colored(strings, colors, log=False, sep=' ', **kwargs): LOG.info(strip_colors(msg)) -def print_error(msg, log=True, **kwargs): +def print_error(msg, log=True, **kwargs) -> None: """Prints message in RED and log as ERROR.""" if 'file' not in kwargs: # Only set if not specified @@ -815,14 +826,14 @@ def print_error(msg, log=True, **kwargs): LOG.error(msg) -def print_info(msg, log=True, **kwargs): +def print_info(msg, log=True, **kwargs) -> None: """Prints message in BLUE and log as INFO.""" print_colored(msg, 'BLUE', **kwargs) if log: LOG.info(msg) -def print_report(report, indent=None, log=True): +def print_report(report, indent=None, log=True) -> None: """Print report to screen and optionally to log.""" for line in report: if indent: @@ -832,21 +843,21 @@ def print_report(report, indent=None, log=True): LOG.info(strip_colors(line)) -def print_standard(msg, log=True, **kwargs): +def print_standard(msg, log=True, **kwargs) -> None: """Prints message and log as INFO.""" print(msg, **kwargs) if log: LOG.info(msg) -def print_success(msg, log=True, **kwargs): +def print_success(msg, log=True, **kwargs) -> None: """Prints message in GREEN and log as INFO.""" print_colored(msg, 'GREEN', **kwargs) if log: LOG.info(msg) -def print_warning(msg, log=True, **kwargs): +def print_warning(msg, log=True, **kwargs) -> None: """Prints message in YELLOW and log as WARNING.""" if 'file' not in kwargs: # Only set if not specified @@ -856,7 +867,7 @@ def print_warning(msg, log=True, **kwargs): LOG.warning(msg) -def set_title(title): +def set_title(title) -> None: """Set window title.""" LOG.debug('title: %s', title) if os.name == 'nt': @@ -865,7 +876,7 @@ def set_title(title): print_error('Setting the title is only supported under Windows.') -def show_data(message, data, color=None, indent=None, width=None): +def show_data(message, data, color=None, indent=None, width=None) -> None: """Display info using default or provided indent and width.""" colors = (None, color if color else None) indent = INDENT if indent is None else indent diff --git a/scripts/wk/ui/tmux.py b/scripts/wk/ui/tmux.py index 22438c3e..bcd12759 100644 --- a/scripts/wk/ui/tmux.py +++ b/scripts/wk/ui/tmux.py @@ -13,7 +13,7 @@ LOG = logging.getLogger(__name__) # Functions -def capture_pane(pane_id=None): +def capture_pane(pane_id=None) -> str: """Capture text from current or target pane, returns str.""" cmd = ['tmux', 'capture-pane', '-p'] if pane_id: @@ -24,7 +24,7 @@ def capture_pane(pane_id=None): return proc.stdout.strip() -def clear_pane(pane_id=None): +def clear_pane(pane_id=None) -> None: """Clear pane buffer for current or target pane.""" commands = [ ['tmux', 'send-keys', '-R'], @@ -38,7 +38,7 @@ def clear_pane(pane_id=None): run_program(cmd, check=False) -def fix_layout(layout, forced=False): +def fix_layout(layout, forced=False) -> None: """Fix pane sizes based on layout.""" resize_kwargs = [] @@ -110,7 +110,7 @@ def fix_layout(layout, forced=False): resize_pane(worker, height=layout['Workers']['height']) -def get_pane_size(pane_id=None): +def get_pane_size(pane_id=None) -> tuple[int, int]: """Get current or target pane size, returns tuple.""" cmd = ['tmux', 'display', '-p'] if pane_id: @@ -127,7 +127,7 @@ def get_pane_size(pane_id=None): return (width, height) -def get_window_size(): +def get_window_size() -> tuple[int, int]: """Get current window size, returns tuple.""" cmd = ['tmux', 'display', '-p', '#{window_width} #{window_height}'] @@ -141,7 +141,7 @@ def get_window_size(): return (width, height) -def kill_all_panes(pane_id=None): +def kill_all_panes(pane_id=None) -> None: """Kill all panes except for the current or target pane.""" cmd = ['tmux', 'kill-pane', '-a'] if pane_id: @@ -151,7 +151,7 @@ def kill_all_panes(pane_id=None): run_program(cmd, check=False) -def kill_pane(*pane_ids): +def kill_pane(*pane_ids) -> None: """Kill pane(s) by id.""" cmd = ['tmux', 'kill-pane', '-t'] @@ -160,7 +160,7 @@ def kill_pane(*pane_ids): run_program(cmd+[pane_id], check=False) -def layout_needs_fixed(layout): +def layout_needs_fixed(layout) -> bool: """Check if layout needs fixed, returns bool.""" needs_fixed = False @@ -189,7 +189,7 @@ def layout_needs_fixed(layout): return needs_fixed -def poll_pane(pane_id): +def poll_pane(pane_id) -> bool: """Check if pane exists, returns bool.""" cmd = ['tmux', 'list-panes', '-F', '#D'] @@ -202,7 +202,8 @@ def poll_pane(pane_id): def prep_action( - cmd=None, working_dir=None, text=None, watch_file=None, watch_cmd='cat'): + cmd=None, working_dir=None, text=None, + watch_file=None, watch_cmd='cat') -> list[str]: """Prep action to perform during a tmux call, returns list. This will prep for running a basic command, displaying text on screen, @@ -252,7 +253,7 @@ def prep_action( return action_cmd -def prep_file(path): +def prep_file(path) -> None: """Check if file exists and create empty file if not.""" path = pathlib.Path(path).resolve() try: @@ -262,7 +263,7 @@ def prep_file(path): pass -def resize_pane(pane_id=None, width=None, height=None): +def resize_pane(pane_id=None, width=None, height=None) -> None: """Resize current or target pane. NOTE: kwargs is only here to make calling this function easier @@ -287,7 +288,7 @@ def resize_pane(pane_id=None, width=None, height=None): run_program(cmd, check=False) -def respawn_pane(pane_id, **action): +def respawn_pane(pane_id, **action) -> None: """Respawn pane with action.""" cmd = ['tmux', 'respawn-pane', '-k', '-t', pane_id] cmd.extend(prep_action(**action)) @@ -299,7 +300,7 @@ def respawn_pane(pane_id, **action): def split_window( lines=None, percent=None, behind=False, vertical=False, - target_id=None, **action): + target_id=None, **action) -> str: """Split tmux window, run action, and return pane_id as str.""" cmd = ['tmux', 'split-window', '-d', '-PF', '#D'] @@ -332,7 +333,7 @@ def split_window( return proc.stdout.strip() -def zoom_pane(pane_id=None): +def zoom_pane(pane_id=None) -> None: """Toggle zoom status for current or target pane.""" cmd = ['tmux', 'resize-pane', '-Z'] if pane_id: diff --git a/scripts/wk/ui/tui.py b/scripts/wk/ui/tui.py index 95d86ae7..ac6329b4 100644 --- a/scripts/wk/ui/tui.py +++ b/scripts/wk/ui/tui.py @@ -29,7 +29,7 @@ TMUX_LAYOUT = { # NOTE: This needs to be in order from top to bottom # Classes class TUI(): """Object for tracking TUI elements.""" - def __init__(self, title_text=None) -> None: + def __init__(self, title_text=None): self.layout = deepcopy(TMUX_LAYOUT) self.side_width = TMUX_SIDE_WIDTH self.title_text = title_text if title_text else 'Title Text' @@ -251,7 +251,7 @@ class TUI(): # Functions -def fix_layout(layout, forced=False): +def fix_layout(layout, forced=False) -> None: """Fix pane sizes based on layout.""" resize_kwargs = [] @@ -320,7 +320,7 @@ def fix_layout(layout, forced=False): tmux.resize_pane(workers[1], height=next_height) workers.pop(0) -def layout_needs_fixed(layout): +def layout_needs_fixed(layout) -> bool: """Check if layout needs fixed, returns bool.""" needs_fixed = False @@ -338,7 +338,7 @@ def layout_needs_fixed(layout): # Done return needs_fixed -def test(): +def test() -> TUI: """TODO: Deleteme""" ui = TUI() ui.add_info_pane(lines=10, text='Info One')