Merge branch 'type-hinting' into dev

This commit is contained in:
2Shirt 2023-05-29 16:26:16 -07:00
commit 386a8b7000
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C
32 changed files with 772 additions and 568 deletions

View file

@ -7,7 +7,7 @@ import platform
import wk
def main():
def main() -> None:
"""Show sensor data on screen."""
sensors = wk.hw.sensors.Sensors()
if platform.system() == 'Darwin':

View file

@ -1,6 +1,8 @@
"""WizardKit: Launch Snappy Driver Installer Origin"""
# vim: sts=2 sw=2 ts=2
from subprocess import CompletedProcess
import wk
from wk.cfg.net import SDIO_SERVER
@ -20,7 +22,7 @@ SDIO_REMOTE_PATH = wk.io.get_path_obj(
)
# Functions
def try_again():
def try_again() -> bool:
"""Ask to try again or quit."""
if wk.ui.cli.ask(' Try again?'):
return True
@ -29,10 +31,10 @@ def try_again():
return False
def use_network_sdio():
def use_network_sdio() -> bool:
"""Try to mount SDIO server."""
use_network = False
def _mount_server():
def _mount_server() -> CompletedProcess:
print('Connecting to server... (Press CTRL+c to use local copy)')
return wk.net.mount_network_share(SDIO_SERVER, read_write=False)

View file

@ -5,10 +5,12 @@ import json
import re
import subprocess
from typing import Any
CPU_REGEX = re.compile(r'(core|k\d+)temp', re.IGNORECASE)
NON_TEMP_REGEX = re.compile(r'^(fan|in|curr)', re.IGNORECASE)
def get_data():
def get_data() -> dict[Any, Any]:
cmd = ('sensors', '-j')
data = {}
raw_data = []
@ -38,7 +40,7 @@ def get_data():
return data
def get_max_temp(data):
def get_max_temp(data) -> str:
cpu_temps = []
max_cpu_temp = '??° C'
for adapter, sources in data.items():

View file

@ -8,7 +8,7 @@ import wk
# Functions
def main():
def main() -> None:
"""Mount all volumes and show results."""
wk.ui.cli.print_standard(f'{wk.cfg.main.KIT_NAME_FULL}: Volume mount tool')
wk.ui.cli.print_standard(' ')

View file

@ -6,7 +6,7 @@ import wk
# Functions
def main():
def main() -> None:
"""Attempt to mount backup shares and print report."""
wk.ui.cli.print_info('Mounting Backup Shares')
report = wk.net.mount_backup_shares()

View file

@ -6,7 +6,7 @@ import wk
# Functions
def main():
def main() -> None:
"""Attempt to mount backup shares and print report."""
wk.ui.cli.print_info('Unmounting Backup Shares')
report = wk.net.unmount_backup_shares()
@ -15,7 +15,7 @@ def main():
line = f' {line}'
if 'Not mounted' in line:
color = 'YELLOW'
print(wk.ansi.color_string(line, color))
print(wk.ui.ansi.color_string(line, color))
if __name__ == '__main__':

View file

@ -25,7 +25,7 @@ if PLATFORM not in ('macOS', 'Linux'):
# Functions
def main():
def main() -> None:
"""Upload logs for review."""
lines = []
try_and_print = wk.ui.cli.TryAndPrint()
@ -60,7 +60,7 @@ def main():
raise SystemExit(1)
def upload_log_dir(reason='Testing'):
def upload_log_dir(reason='Testing') -> None:
"""Upload compressed log_dir to the crash server."""
server = wk.cfg.net.CRASH_SERVER
dest = pathlib.Path(f'~/{reason}_{NOW.strftime("%Y-%m-%dT%H%M%S%z")}.txz')

View file

@ -21,7 +21,7 @@ from . import ui
# Check env
if version_info < (3, 7):
if version_info < (3, 10):
# Unsupported
raise RuntimeError(
'This package is unsupported on Python '

View file

@ -15,11 +15,12 @@ import subprocess
import time
from collections import OrderedDict
from docopt import docopt
import psutil
import pytz
from docopt import docopt
from wk import cfg, debug, exe, io, log, net, std
from wk.cfg.ddrescue import (
DDRESCUE_MAP_TEMPLATE,
@ -190,15 +191,15 @@ class BlockPair():
# Set initial status
self.set_initial_status()
def get_error_size(self):
def get_error_size(self) -> int:
"""Get error size in bytes, returns int."""
return self.size - self.get_rescued_size()
def get_percent_recovered(self):
def get_percent_recovered(self) -> float:
"""Get percent rescued from map_data, returns float."""
return 100 * self.map_data.get('rescued', 0) / self.size
def get_rescued_size(self):
def get_rescued_size(self) -> float | int:
"""Get rescued size using map data.
NOTE: Returns 0 if no map data is available.
@ -206,7 +207,7 @@ class BlockPair():
self.load_map_data()
return self.map_data.get('rescued', 0)
def load_map_data(self):
def load_map_data(self) -> None:
"""Load map data from file.
NOTE: If the file is missing it is assumed that recovery hasn't
@ -252,7 +253,7 @@ class BlockPair():
# Done
self.map_data.update(data)
def pass_complete(self, pass_name):
def pass_complete(self, pass_name) -> bool:
"""Check if pass_name is complete based on map data, returns bool."""
pending_size = self.map_data['non-tried']
@ -282,7 +283,7 @@ class BlockPair():
# This should never be reached
return False
def safety_check(self):
def safety_check(self) -> None:
"""Run safety check and abort if necessary."""
# TODO: Expand section to support non-Linux systems
dest_size = -1
@ -301,7 +302,7 @@ class BlockPair():
ui.print_error(f'Invalid destination: {self.destination}')
raise std.GenericAbort()
def set_initial_status(self):
def set_initial_status(self) -> None:
"""Read map data and set initial statuses."""
self.load_map_data()
percent = self.get_percent_recovered()
@ -314,12 +315,12 @@ class BlockPair():
self.status[name] = percent
break
def skip_pass(self, pass_name):
def skip_pass(self, pass_name) -> None:
"""Mark pass as skipped if applicable."""
if self.status[pass_name] == 'Pending':
self.status[pass_name] = 'Skipped'
def update_progress(self, pass_name):
def update_progress(self, pass_name) -> None:
"""Update progress via map data."""
self.load_map_data()
@ -350,7 +351,7 @@ class State():
self._init_tmux()
exe.start_thread(self._fix_tmux_layout_loop)
def _add_block_pair(self, source, destination):
def _add_block_pair(self, source, destination) -> None:
"""Add BlockPair object and run safety checks."""
self.block_pairs.append(
BlockPair(
@ -360,14 +361,14 @@ class State():
working_dir=self.working_dir,
))
def _get_clone_settings_path(self):
def _get_clone_settings_path(self) -> pathlib.Path:
"""get Clone settings file path, returns pathlib.Path obj."""
description = self.source.model
if not description:
description = self.source.path.name
return pathlib.Path(f'{self.working_dir}/Clone_{description}.json')
def _fix_tmux_layout(self, forced=True):
def _fix_tmux_layout(self, forced=True) -> None:
"""Fix tmux layout based on cfg.ddrescue.TMUX_LAYOUT."""
layout = cfg.ddrescue.TMUX_LAYOUT
needs_fixed = tmux.layout_needs_fixed(self.panes, layout)
@ -397,7 +398,7 @@ class State():
if 'Journal' in self.panes:
tmux.resize_pane(self.panes['Journal'], height=p_ratios[2])
def _fix_tmux_layout_loop(self):
def _fix_tmux_layout_loop(self) -> None:
"""Fix tmux layout on a loop.
NOTE: This should be called as a thread.
@ -406,7 +407,7 @@ class State():
self._fix_tmux_layout(forced=False)
std.sleep(1)
def _init_tmux(self):
def _init_tmux(self) -> None:
"""Initialize tmux layout."""
tmux.kill_all_panes()
@ -432,7 +433,7 @@ class State():
# Source / Dest
self.update_top_panes()
def _load_settings(self, discard_unused_settings=False):
def _load_settings(self, discard_unused_settings=False) -> dict:
"""Load settings from previous run, returns dict."""
settings = {}
settings_file = self._get_clone_settings_path()
@ -481,7 +482,7 @@ class State():
# Done
return settings
def _save_settings(self, settings):
def _save_settings(self, settings) -> None:
"""Save settings for future runs."""
settings_file = self._get_clone_settings_path()
@ -493,7 +494,7 @@ class State():
ui.print_error('Failed to save clone settings')
raise std.GenericAbort() from err
def add_clone_block_pairs(self):
def add_clone_block_pairs(self) -> None:
"""Add device to device block pairs and set settings if necessary."""
source_sep = get_partition_separator(self.source.path.name)
dest_sep = get_partition_separator(self.destination.path.name)
@ -557,13 +558,13 @@ class State():
# Done
return source_parts
def add_image_block_pairs(self, source_parts):
def add_image_block_pairs(self, source_parts) -> None:
"""Add device to image file block pairs."""
for part in source_parts:
bp_dest = self.destination
self._add_block_pair(part, bp_dest)
def confirm_selections(self, prompt_msg, source_parts=None):
def confirm_selections(self, prompt_msg, source_parts=None) -> None:
"""Show selection details and prompt for confirmation."""
report = []
@ -645,7 +646,7 @@ class State():
if not ui.ask(prompt_msg):
raise std.GenericAbort()
def generate_report(self):
def generate_report(self) -> list[str]:
"""Generate report of overall and per block_pair results, returns list."""
report = []
@ -688,23 +689,23 @@ class State():
# Done
return report
def get_error_size(self):
def get_error_size(self) -> int:
"""Get total error size from block_pairs in bytes, returns int."""
return self.get_total_size() - self.get_rescued_size()
def get_percent_recovered(self):
def get_percent_recovered(self) -> float:
"""Get total percent rescued from block_pairs, returns float."""
return 100 * self.get_rescued_size() / self.get_total_size()
def get_rescued_size(self):
def get_rescued_size(self) -> int:
"""Get total rescued size from all block pairs, returns int."""
return sum(pair.get_rescued_size() for pair in self.block_pairs)
def get_total_size(self):
def get_total_size(self) -> int:
"""Get total size of all block_pairs in bytes, returns int."""
return sum(pair.size for pair in self.block_pairs)
def init_recovery(self, docopt_args):
def init_recovery(self, docopt_args) -> None:
"""Select source/dest and set env."""
ui.clear_screen()
source_parts = []
@ -809,7 +810,7 @@ class State():
for pair in self.block_pairs:
pair.safety_check()
def mark_started(self):
def mark_started(self) -> None:
"""Edit clone settings, if applicable, to mark recovery as started."""
# Skip if not cloning
if self.mode != 'Clone':
@ -826,18 +827,18 @@ class State():
settings['First Run'] = False
self._save_settings(settings)
def pass_above_threshold(self, pass_name):
def pass_above_threshold(self, pass_name) -> bool:
"""Check if all block_pairs meet the pass threshold, returns bool."""
threshold = cfg.ddrescue.AUTO_PASS_THRESHOLDS[pass_name]
return all(
p.get_percent_recovered() >= threshold for p in self.block_pairs
)
def pass_complete(self, pass_name):
def pass_complete(self, pass_name) -> bool:
"""Check if all block_pairs completed pass_name, returns bool."""
return all(p.pass_complete(pass_name) for p in self.block_pairs)
def prep_destination(self, source_parts, dry_run=True):
def prep_destination(self, source_parts, dry_run=True) -> None:
"""Prep destination as necessary."""
# TODO: Split into Linux and macOS
# logical sector size is not easily found under macOS
@ -937,7 +938,7 @@ class State():
settings['Needs Format'] = False
self._save_settings(settings)
def retry_all_passes(self):
def retry_all_passes(self) -> None:
"""Prep block_pairs for a retry recovery attempt."""
bad_statuses = ('*', '/', '-')
LOG.warning('Updating block_pairs for retry')
@ -965,7 +966,7 @@ class State():
# Reinitialize status
pair.set_initial_status()
def safety_check_destination(self):
def safety_check_destination(self) -> None:
"""Run safety checks for destination and abort if necessary."""
errors_detected = False
@ -985,7 +986,7 @@ class State():
if errors_detected:
raise std.GenericAbort()
def safety_check_size(self):
def safety_check_size(self) -> None:
"""Run size safety check and abort if necessary."""
required_size = sum(pair.size for pair in self.block_pairs)
settings = self._load_settings() if self.mode == 'Clone' else {}
@ -1031,7 +1032,7 @@ class State():
ui.print_error(error_msg)
raise std.GenericAbort()
def save_debug_reports(self):
def save_debug_reports(self) -> None:
"""Save debug reports to disk."""
LOG.info('Saving debug reports')
debug_dir = pathlib.Path(f'{self.log_dir}/debug')
@ -1053,13 +1054,13 @@ class State():
_f.write('\n'.join(debug.generate_object_report(_bp)))
_f.write('\n')
def skip_pass(self, pass_name):
def skip_pass(self, pass_name) -> None:
"""Mark block_pairs as skipped if applicable."""
for pair in self.block_pairs:
if pair.status[pass_name] == 'Pending':
pair.status[pass_name] = 'Skipped'
def update_progress_pane(self, overall_status):
def update_progress_pane(self, overall_status) -> None:
"""Update progress pane."""
report = []
separator = '─────────────────────'
@ -1116,14 +1117,14 @@ class State():
with open(out_path, 'w', encoding='utf-8') as _f:
_f.write('\n'.join(report))
def update_top_panes(self):
def update_top_panes(self) -> None:
"""(Re)create top source/destination panes."""
source_exists = True
dest_exists = True
width = tmux.get_pane_size()[0]
width = int(width / 2) - 1
def _format_string(obj, width):
def _format_string(obj, width) -> str:
"""Format source/dest string using obj and width, returns str."""
string = ''
@ -1192,7 +1193,7 @@ class State():
# Functions
def build_block_pair_report(block_pairs, settings):
def build_block_pair_report(block_pairs, settings) -> list:
"""Build block pair report, returns list."""
report = []
notes = []
@ -1246,7 +1247,7 @@ def build_block_pair_report(block_pairs, settings):
return report
def build_ddrescue_cmd(block_pair, pass_name, settings_menu):
def build_ddrescue_cmd(block_pair, pass_name, settings_menu) -> list[str]:
"""Build ddrescue cmd using passed details, returns list."""
cmd = ['sudo', 'ddrescue']
if (block_pair.destination.is_block_device()
@ -1296,7 +1297,7 @@ def build_ddrescue_cmd(block_pair, pass_name, settings_menu):
return cmd
def build_directory_report(path):
def build_directory_report(path) -> list[str]:
"""Build directory report, returns list."""
path = f'{path}/'
report = []
@ -1325,7 +1326,7 @@ def build_directory_report(path):
return report
def build_disk_report(dev):
def build_disk_report(dev) -> list[str]:
"""Build device report, returns list."""
report = []
@ -1398,7 +1399,7 @@ def build_disk_report(dev):
return report
def build_main_menu():
def build_main_menu() -> ui.Menu:
"""Build main menu, returns wk.ui.cli.Menu."""
menu = ui.Menu(title=ansi.color_string('ddrescue TUI: Main Menu', 'GREEN'))
menu.separator = ' '
@ -1414,7 +1415,7 @@ def build_main_menu():
return menu
def build_object_report(obj):
def build_object_report(obj) -> list[str]:
"""Build object report, returns list."""
report = []
@ -1430,7 +1431,7 @@ def build_object_report(obj):
return report
def build_settings_menu(silent=True):
def build_settings_menu(silent=True) -> ui.Menu:
"""Build settings menu, returns wk.ui.cli.Menu."""
title_text = [
ansi.color_string('ddrescue TUI: Expert Settings', 'GREEN'),
@ -1471,7 +1472,7 @@ def build_settings_menu(silent=True):
return menu
def build_sfdisk_partition_line(table_type, dev_path, size, details):
def build_sfdisk_partition_line(table_type, dev_path, size, details) -> str:
"""Build sfdisk partition line using passed details, returns str."""
line = f'{dev_path} : size={size}'
dest_type = ''
@ -1512,7 +1513,7 @@ def build_sfdisk_partition_line(table_type, dev_path, size, details):
return line
def check_destination_health(destination):
def check_destination_health(destination) -> str:
"""Check destination health, returns str."""
result = ''
@ -1533,7 +1534,7 @@ def check_destination_health(destination):
return result
def clean_working_dir(working_dir):
def clean_working_dir(working_dir) -> None:
"""Clean working directory to ensure a fresh recovery session.
NOTE: Data from previous sessions will be preserved
@ -1551,7 +1552,7 @@ def clean_working_dir(working_dir):
shutil.move(entry.path, new_path)
def format_status_string(status, width):
def format_status_string(status, width) -> str:
"""Format colored status string, returns str."""
color = None
percent = -1
@ -1586,7 +1587,7 @@ def format_status_string(status, width):
return status_str
def fstype_is_ok(path, map_dir=False):
def fstype_is_ok(path, map_dir=False) -> bool:
"""Check if filesystem type is acceptable, returns bool."""
is_ok = False
fstype = None
@ -1622,7 +1623,7 @@ def fstype_is_ok(path, map_dir=False):
return is_ok
def get_ddrescue_settings(settings_menu):
def get_ddrescue_settings(settings_menu) -> list:
"""Get ddrescue settings from menu selections, returns list."""
settings = []
@ -1640,7 +1641,7 @@ def get_ddrescue_settings(settings_menu):
return settings
def get_etoc():
def get_etoc() -> str:
"""Get EToC from ddrescue output, returns str."""
delta = None
delta_dict = {}
@ -1669,7 +1670,7 @@ def get_etoc():
return etoc
def get_fstype_macos(path):
def get_fstype_macos(path) -> str:
"""Get fstype for path under macOS, returns str."""
fstype = 'UNKNOWN'
proc = exe.run_program(['mount'], check=False)
@ -1687,8 +1688,9 @@ def get_fstype_macos(path):
return fstype
def get_object(path):
def get_object(path) -> hw_disk.Disk | None | pathlib.Path:
"""Get object based on path, returns obj."""
# TODO: Refactor to avoid returning None
obj = None
# Bail early
@ -1721,7 +1723,7 @@ def get_object(path):
return obj
def get_partition_separator(name):
def get_partition_separator(name) -> str:
"""Get partition separator based on device name, returns str."""
separator = ''
if re.search(r'(loop|mmc|nvme)', name, re.IGNORECASE):
@ -1730,7 +1732,7 @@ def get_partition_separator(name):
return separator
def get_percent_color(percent):
def get_percent_color(percent) -> str:
"""Get color based on percentage, returns str."""
color = None
if percent > 100:
@ -1746,7 +1748,7 @@ def get_percent_color(percent):
return color
def get_table_type(disk_path):
def get_table_type(disk_path) -> str:
"""Get disk partition table type, returns str.
NOTE: If resulting table type is not GPT or MBR
@ -1786,7 +1788,7 @@ def get_table_type(disk_path):
return table_type
def get_working_dir(mode, destination, force_local=False):
def get_working_dir(mode, destination, force_local=False) -> pathlib.Path:
"""Get working directory using mode and destination, returns path."""
ticket_id = ui.get_ticket_id()
working_dir = None
@ -1830,7 +1832,7 @@ def get_working_dir(mode, destination, force_local=False):
return working_dir
def is_missing_source_or_destination(state):
def is_missing_source_or_destination(state) -> bool:
"""Check if source or destination dissapeared, returns bool."""
missing = False
items = {
@ -1860,7 +1862,7 @@ def is_missing_source_or_destination(state):
return missing
def source_or_destination_changed(state):
def source_or_destination_changed(state) -> bool:
"""Verify the source and destination objects are still valid."""
changed = False
@ -1885,7 +1887,7 @@ def source_or_destination_changed(state):
return changed
def main():
def main() -> None:
"""Main function for ddrescue TUI."""
args = docopt(DOCSTRING)
log.update_log_path(dest_name='ddrescue-TUI', timestamp=True)
@ -1953,7 +1955,7 @@ def main():
LOG.info(' %s', ansi.strip_colors(line))
def mount_raw_image(path):
def mount_raw_image(path) -> pathlib.Path:
"""Mount raw image using OS specific methods, returns pathlib.Path."""
loopback_path = None
@ -1973,7 +1975,7 @@ def mount_raw_image(path):
return loopback_path
def mount_raw_image_linux(path):
def mount_raw_image_linux(path) -> pathlib.Path:
"""Mount raw image using losetup, returns pathlib.Path."""
loopback_path = None
@ -1995,7 +1997,7 @@ def mount_raw_image_linux(path):
# Done
return loopback_path
def mount_raw_image_macos(path):
def mount_raw_image_macos(path) -> pathlib.Path:
"""Mount raw image using hdiutil, returns pathlib.Path."""
loopback_path = None
plist_data = {}
@ -2026,7 +2028,7 @@ def mount_raw_image_macos(path):
return loopback_path
def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True):
def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True) -> None:
"""Run ddrescue using passed settings."""
cmd = build_ddrescue_cmd(block_pair, pass_name, settings)
poweroff_source_after_idle = True
@ -2034,7 +2036,7 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True):
ui.clear_screen()
warning_message = ''
def _poweroff_source_drive(idle_minutes):
def _poweroff_source_drive(idle_minutes) -> None:
"""Power off source drive after a while."""
source_dev = state.source.path
@ -2068,7 +2070,7 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True):
'Press Enter to return to main menu...', end='', flush=True,
)
def _update_smart_pane():
def _update_smart_pane() -> None:
"""Update SMART pane every 30 seconds."""
update_smart_details(state.source)
now = datetime.datetime.now(tz=TIMEZONE).strftime('%Y-%m-%d %H:%M %Z')
@ -2173,7 +2175,7 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True):
raise std.GenericAbort()
def run_recovery(state, main_menu, settings_menu, dry_run=True):
def run_recovery(state, main_menu, settings_menu, dry_run=True) -> None:
"""Run recovery passes."""
atexit.register(state.save_debug_reports)
attempted_recovery = False
@ -2252,7 +2254,7 @@ def run_recovery(state, main_menu, settings_menu, dry_run=True):
state.update_progress_pane('Idle')
def select_disk(prompt_msg, skip_disk=None):
def select_disk(prompt_msg, skip_disk=None) -> hw_disk.Disk:
"""Select disk from list, returns Disk()."""
ui.print_info('Scanning disks...')
disks = hw_disk.get_disks()
@ -2297,7 +2299,7 @@ def select_disk(prompt_msg, skip_disk=None):
return selected_disk
def select_disk_parts(prompt_msg, disk):
def select_disk_parts(prompt_msg, disk) -> hw_disk.Disk:
"""Select disk parts from list, returns list of Disk()."""
title = ansi.color_string('ddrescue TUI: Partition Selection', 'GREEN')
title += f'\n\nDisk: {disk.path} {disk.description}'
@ -2309,7 +2311,7 @@ def select_disk_parts(prompt_msg, disk):
menu.add_action('Quit')
object_list = []
def _select_parts(menu):
def _select_parts(menu) -> None:
"""Loop over selection menu until at least one partition selected."""
while True:
selection = menu.advanced_select(
@ -2377,7 +2379,7 @@ def select_disk_parts(prompt_msg, disk):
return object_list
def select_path(prompt_msg):
def select_path(prompt_msg) -> pathlib.Path:
"""Select path, returns pathlib.Path."""
invalid = False
menu = ui.Menu(
@ -2411,7 +2413,7 @@ def select_path(prompt_msg):
return path
def set_mode(docopt_args):
def set_mode(docopt_args) -> str:
"""Set mode from docopt_args or user selection, returns str."""
mode = None
@ -2433,7 +2435,7 @@ def set_mode(docopt_args):
return mode
def unmount_loopback_device(path):
def unmount_loopback_device(path) -> None:
"""Unmount loopback device using OS specific methods."""
cmd = []

View file

@ -5,6 +5,7 @@ import inspect
import logging
import lzma
import os
import pathlib
import pickle
import platform
import re
@ -12,6 +13,8 @@ import socket
import sys
import time
from typing import Any
import requests
from wk.cfg.net import CRASH_SERVER
@ -20,7 +23,7 @@ from wk.log import get_root_logger_path
# Classes
class Debug():
"""Object used when dumping debug data."""
def method(self):
def method(self) -> None:
"""Dummy method used to identify functions vs data."""
@ -31,7 +34,7 @@ METHOD_TYPE = type(DEBUG_CLASS.method)
# Functions
def generate_debug_report():
def generate_debug_report() -> str:
"""Generate debug report, returns str."""
platform_function_list = (
'architecture',
@ -78,7 +81,7 @@ def generate_debug_report():
return '\n'.join(report)
def generate_object_report(obj, indent=0):
def generate_object_report(obj: Any, indent: int = 0) -> list[str]:
"""Generate debug report for obj, returns list."""
report = []
attr_list = []
@ -109,7 +112,10 @@ def generate_object_report(obj, indent=0):
return report
def save_pickles(obj_dict, out_path=None):
def save_pickles(
obj_dict: dict[Any, Any],
out_path: pathlib.Path | str | None = None,
) -> None:
"""Save dict of objects using pickle."""
LOG.info('Saving pickles')
@ -129,7 +135,11 @@ def save_pickles(obj_dict, out_path=None):
LOG.error('Failed to save all the pickles', exc_info=True)
def upload_debug_report(report, compress=True, reason='DEBUG'):
def upload_debug_report(
report: str,
compress: bool = True,
reason: str = 'DEBUG',
) -> None:
"""Upload debug report to CRASH_SERVER as specified in wk.cfg.main."""
LOG.info('Uploading debug report to %s', CRASH_SERVER.get('Name', '?'))
headers = CRASH_SERVER.get('Headers', {'X-Requested-With': 'XMLHttpRequest'})

View file

@ -4,12 +4,15 @@
import json
import logging
import os
import pathlib
import re
import subprocess
import time
from threading import Thread
from io import BufferedReader, TextIOWrapper
from queue import Queue, Empty
from threading import Thread
from typing import Any, Callable, Iterable
import psutil
@ -25,11 +28,11 @@ class NonBlockingStreamReader():
## https://gist.github.com/EyalAr/7915597
## https://stackoverflow.com/a/4896288
def __init__(self, stream):
self.stream = stream
self.queue = Queue()
def __init__(self, stream: BufferedReader | TextIOWrapper):
self.stream: BufferedReader | TextIOWrapper = stream
self.queue: Queue = Queue()
def populate_queue(stream, queue):
def populate_queue(stream: BufferedReader | TextIOWrapper, queue: Queue) -> None:
"""Collect lines from stream and put them in queue."""
while not stream.closed:
try:
@ -45,18 +48,18 @@ class NonBlockingStreamReader():
args=(self.stream, self.queue),
)
def stop(self):
def stop(self) -> None:
"""Stop reading from input stream."""
self.stream.close()
def read(self, timeout=None):
def read(self, timeout: float | int | None = None) -> Any:
"""Read from queue if possible, returns item from queue."""
try:
return self.queue.get(block=timeout is not None, timeout=timeout)
except Empty:
return None
def save_to_file(self, proc, out_path):
def save_to_file(self, proc: subprocess.Popen, out_path: pathlib.Path | str) -> None:
"""Continuously save output to file while proc is running."""
LOG.debug('Saving process %s output to %s', proc, out_path)
while proc.poll() is None:
@ -74,7 +77,12 @@ class NonBlockingStreamReader():
# Functions
def build_cmd_kwargs(cmd, minimized=False, pipe=True, shell=False, **kwargs):
def build_cmd_kwargs(
cmd: list[str],
minimized: bool = False,
pipe: bool = True,
shell: bool = False,
**kwargs) -> dict[str, Any]:
"""Build kwargs for use by subprocess functions, returns dict.
Specifically subprocess.run() and subprocess.Popen().
@ -122,7 +130,12 @@ def build_cmd_kwargs(cmd, minimized=False, pipe=True, shell=False, **kwargs):
return cmd_kwargs
def get_json_from_command(cmd, check=True, encoding='utf-8', errors='ignore'):
def get_json_from_command(
cmd: list[str],
check: bool = True,
encoding: str = 'utf-8',
errors: str = 'ignore',
) -> dict[Any, Any]:
"""Capture JSON content from cmd output, returns dict.
If the data can't be decoded then either an exception is raised
@ -141,7 +154,11 @@ def get_json_from_command(cmd, check=True, encoding='utf-8', errors='ignore'):
return json_data
def get_procs(name, exact=True, try_again=True):
def get_procs(
name: str,
exact: bool = True,
try_again: bool = True,
) -> list[psutil.Process]:
"""Get process object(s) based on name, returns list of proc objects."""
LOG.debug('name: %s, exact: %s', name, exact)
processes = []
@ -161,7 +178,12 @@ def get_procs(name, exact=True, try_again=True):
return processes
def kill_procs(name, exact=True, force=False, timeout=30):
def kill_procs(
name: str,
exact: bool = True,
force: bool = False,
timeout: float | int = 30,
) -> None:
"""Kill all processes matching name (case-insensitively).
NOTE: Under Posix systems this will send SIGINT to allow processes
@ -185,7 +207,13 @@ def kill_procs(name, exact=True, force=False, timeout=30):
proc.kill()
def popen_program(cmd, minimized=False, pipe=False, shell=False, **kwargs):
def popen_program(
cmd: list[str],
minimized: bool = False,
pipe: bool = False,
shell: bool = False,
**kwargs,
) -> subprocess.Popen:
"""Run program and return a subprocess.Popen object."""
LOG.debug(
'cmd: %s, minimized: %s, pipe: %s, shell: %s',
@ -209,7 +237,13 @@ def popen_program(cmd, minimized=False, pipe=False, shell=False, **kwargs):
return proc
def run_program(cmd, check=True, pipe=True, shell=False, **kwargs):
def run_program(
cmd: list[str],
check: bool = True,
pipe: bool = True,
shell: bool = False,
**kwargs,
) -> subprocess.CompletedProcess:
"""Run program and return a subprocess.CompletedProcess object."""
LOG.debug(
'cmd: %s, check: %s, pipe: %s, shell: %s',
@ -233,7 +267,11 @@ def run_program(cmd, check=True, pipe=True, shell=False, **kwargs):
return proc
def start_thread(function, args=None, daemon=True):
def start_thread(
function: Callable,
args: Iterable[Any] | None = None,
daemon: bool = True,
) -> Thread:
"""Run function as thread in background, returns Thread object."""
LOG.debug(
'Starting background thread for function: %s, args: %s, daemon: %s',
@ -245,7 +283,7 @@ def start_thread(function, args=None, daemon=True):
return thread
def stop_process(proc, graceful=True):
def stop_process(proc: subprocess.Popen, graceful: bool = True) -> None:
"""Stop process.
NOTES: proc should be a subprocess.Popen obj.
@ -267,7 +305,11 @@ def stop_process(proc, graceful=True):
proc.kill()
def wait_for_procs(name, exact=True, timeout=None):
def wait_for_procs(
name: str,
exact: bool = True,
timeout: float | int | None = None,
) -> None:
"""Wait for all process matching name."""
LOG.debug('name: %s, exact: %s, timeout: %s', name, exact, timeout)
target_procs = get_procs(name, exact=exact)

View file

@ -33,7 +33,10 @@ THRESH_GREAT = 750 * 1024**2
# Functions
def generate_horizontal_graph(rate_list, graph_width=40, oneline=False):
def generate_horizontal_graph(
rate_list: list[float],
graph_width: int = 40,
oneline: bool = False) -> list[str]:
"""Generate horizontal graph from rate_list, returns list."""
graph = ['', '', '', '']
scale = 8 if oneline else 32
@ -80,7 +83,7 @@ def generate_horizontal_graph(rate_list, graph_width=40, oneline=False):
return graph
def get_graph_step(rate, scale=16):
def get_graph_step(rate: float, scale: int = 16) -> int:
"""Get graph step based on rate and scale, returns int."""
rate_in_mb = rate / (1024**2)
step = 0
@ -95,14 +98,17 @@ def get_graph_step(rate, scale=16):
return step
def merge_rates(rates, graph_width=40):
def merge_rates(
rates: list[float],
graph_width: int = 40,
) -> list[int | float]:
"""Merge rates to have entries equal to the width, returns list."""
merged_rates = []
offset = 0
slice_width = int(len(rates) / graph_width)
# Merge rates
for _i in range(graph_width):
for _ in range(graph_width):
merged_rates.append(sum(rates[offset:offset+slice_width])/slice_width)
offset += slice_width
@ -110,7 +116,7 @@ def merge_rates(rates, graph_width=40):
return merged_rates
def vertical_graph_line(percent, rate, scale=32):
def vertical_graph_line(percent: float, rate: float, scale: int = 32) -> str:
"""Build colored graph string using thresholds, returns str."""
color_bar = None
color_rate = None

View file

@ -82,15 +82,15 @@ PLATFORM = std.PLATFORM
class State():
"""Object for tracking hardware diagnostic data."""
def __init__(self, test_mode=False):
self.disks = []
self.log_dir = None
self.progress_file = None
self.system = None
self.test_groups = []
self.title_text = ansi.color_string('Hardware Diagnostics', 'GREEN')
self.disks: list[hw_disk.Disk] = []
self.log_dir: pathlib.Path | None = None
self.progress_file: pathlib.Path | None = None
self.system: hw_system.System | None = None
self.test_groups: list[TestGroup] = []
self.title_text: str = ansi.color_string('Hardware Diagnostics', 'GREEN')
if test_mode:
self.title_text += ansi.color_string(' (Test Mode)', 'YELLOW')
self.ui = tui.TUI(f'{self.title_text}\nMain Menu')
self.ui: tui.TUI = tui.TUI(f'{self.title_text}\nMain Menu')
def abort_testing(self) -> None:
"""Set unfinished tests as aborted and cleanup panes."""

View file

@ -9,7 +9,7 @@ import plistlib
import re
from dataclasses import dataclass, field
from typing import Any, Union
from typing import Any
from wk.cfg.main import KIT_NAME_SHORT
from wk.cfg.python import DATACLASS_DECORATOR_KWARGS
@ -46,7 +46,7 @@ class Disk:
model: str = field(init=False)
name: str = field(init=False)
notes: list[str] = field(init=False, default_factory=list)
path: Union[pathlib.Path, str]
path: pathlib.Path | str
parent: str = field(init=False)
phy_sec: int = field(init=False)
raw_details: dict[str, Any] = field(init=False)
@ -57,7 +57,7 @@ class Disk:
tests: list[Test] = field(init=False, default_factory=list)
use_sat: bool = field(init=False, default=False)
def __post_init__(self) -> None:
def __post_init__(self):
self.path = pathlib.Path(self.path).resolve()
self.update_details()
self.set_description()

View file

@ -7,6 +7,7 @@ import pathlib
import re
from subprocess import CalledProcessError
from threading import Thread
from typing import Any
from wk.cfg.hw import CPU_CRITICAL_TEMP, SMC_IDS, TEMP_COLORS
@ -37,9 +38,9 @@ class ThermalLimitReachedError(RuntimeError):
class Sensors():
"""Class for holding sensor specific data."""
def __init__(self):
self.background_thread = None
self.data = get_sensor_data()
self.out_path = None
self.background_thread: Thread | None = None
self.data: dict[Any, Any] = get_sensor_data()
self.out_path: pathlib.Path | str | None = None
def clear_temps(self) -> None:
"""Clear saved temps but keep structure"""

View file

@ -237,7 +237,10 @@ def get_smart_self_test_last_result(dev) -> str:
result = 'Unknown'
# Parse SMART data
data = dev.raw_smartctl.get('ata_smart_self_test_log', {}).get('standard', {}).get('table', [])
data = dev.raw_smartctl.get(
'ata_smart_self_test_log', {}).get(
'standard', {}).get(
'table', [])
if not data:
# No results found
return result

View file

@ -13,7 +13,7 @@ LOG = logging.getLogger(__name__)
# Functions
def case_insensitive_path(path):
def case_insensitive_path(path: pathlib.Path | str) -> pathlib.Path:
"""Find path case-insensitively, returns pathlib.Path obj."""
given_path = pathlib.Path(path).resolve()
real_path = None
@ -37,7 +37,8 @@ def case_insensitive_path(path):
return real_path
def case_insensitive_search(path, item):
def case_insensitive_search(
path: pathlib.Path | str, item: str) -> pathlib.Path:
"""Search path for item case insensitively, returns pathlib.Path obj."""
path = pathlib.Path(path).resolve()
given_path = path.joinpath(item)
@ -61,7 +62,10 @@ def case_insensitive_search(path, item):
return real_path
def copy_file(source, dest, overwrite=False):
def copy_file(
source: pathlib.Path | str,
dest: pathlib.Path | str,
overwrite: bool = False) -> None:
"""Copy file and optionally overwrite the destination."""
source = case_insensitive_path(source)
dest = pathlib.Path(dest).resolve()
@ -72,7 +76,7 @@ def copy_file(source, dest, overwrite=False):
shutil.copy2(source, dest)
def delete_empty_folders(path):
def delete_empty_folders(path: pathlib.Path | str) -> None:
"""Recursively delete all empty folders in path."""
LOG.debug('path: %s', path)
@ -89,7 +93,11 @@ def delete_empty_folders(path):
pass
def delete_folder(path, force=False, ignore_errors=False):
def delete_folder(
path: pathlib.Path | str,
force: bool = False,
ignore_errors: bool = False,
) -> None:
"""Delete folder if empty or if forced.
NOTE: Exceptions are not caught by this function,
@ -106,7 +114,11 @@ def delete_folder(path, force=False, ignore_errors=False):
os.rmdir(path)
def delete_item(path, force=False, ignore_errors=False):
def delete_item(
path: pathlib.Path | str,
force: bool = False,
ignore_errors: bool = False,
) -> None:
"""Delete file or folder, optionally recursively.
NOTE: Exceptions are not caught by this function,
@ -124,7 +136,11 @@ def delete_item(path, force=False, ignore_errors=False):
os.remove(path)
def get_path_obj(path, expanduser=True, resolve=True):
def get_path_obj(
path: pathlib.Path | str,
expanduser: bool = True,
resolve: bool = True,
) -> pathlib.Path:
"""Get based on path, returns pathlib.Path."""
path = pathlib.Path(path)
if expanduser:
@ -134,7 +150,7 @@ def get_path_obj(path, expanduser=True, resolve=True):
return path
def non_clobber_path(path):
def non_clobber_path(path: pathlib.Path | str) -> pathlib.Path:
"""Update path as needed to non-existing path, returns pathlib.Path."""
LOG.debug('path: %s', path)
path = pathlib.Path(path)
@ -163,7 +179,10 @@ def non_clobber_path(path):
return new_path
def recursive_copy(source, dest, overwrite=False):
def recursive_copy(
source: pathlib.Path | str,
dest: pathlib.Path | str,
overwrite: bool = False) -> None:
"""Copy source to dest recursively.
NOTE: This uses rsync style source/dest syntax.
@ -213,7 +232,10 @@ def recursive_copy(source, dest, overwrite=False):
raise FileExistsError(f'Refusing to delete file: {dest}')
def rename_item(path, new_path):
def rename_item(
path: pathlib.Path | str,
new_path: pathlib.Path | str,
) -> pathlib.Path:
"""Rename item, returns pathlib.Path."""
path = pathlib.Path(path)
return path.rename(new_path)

View file

@ -6,6 +6,7 @@ NOTE: This script is meant to be called from within a new kit in ConEmu.
import logging
import os
import pathlib
import re
from wk.cfg.launchers import LAUNCHERS
@ -44,7 +45,7 @@ WIDTH = 50
# Functions
def compress_cbin_dirs():
def compress_cbin_dirs() -> None:
"""Compress CBIN_DIR items using ARCHIVE_PASSWORD."""
current_dir = os.getcwd()
for item in CBIN_DIR.iterdir():
@ -62,25 +63,25 @@ def compress_cbin_dirs():
delete_item(item, force=True, ignore_errors=True)
def delete_from_temp(item_path):
def delete_from_temp(item_path) -> None:
"""Delete item from temp."""
delete_item(TMP_DIR.joinpath(item_path), force=True, ignore_errors=True)
def download_to_temp(filename, source_url, referer=None):
def download_to_temp(filename, source_url, referer=None) -> pathlib.Path:
"""Download file to temp dir, returns pathlib.Path."""
out_path = TMP_DIR.joinpath(filename)
download_file(out_path, source_url, referer=referer)
return out_path
def extract_to_bin(archive, folder):
def extract_to_bin(archive, folder) -> None:
"""Extract archive to folder under BIN_DIR."""
out_path = BIN_DIR.joinpath(folder)
extract_archive(archive, out_path)
def generate_launcher(section, name, options):
def generate_launcher(section, name, options) -> None:
"""Generate launcher script."""
dest = ROOT_DIR.joinpath(f'{section+"/" if section else ""}{name}.cmd')
out_text = []
@ -107,27 +108,27 @@ def generate_launcher(section, name, options):
# Download functions
def download_adobe_reader():
def download_adobe_reader() -> None:
"""Download Adobe Reader."""
out_path = INSTALLERS_DIR.joinpath('Adobe Reader DC.exe')
download_file(out_path, SOURCES['Adobe Reader DC'])
def download_aida64():
def download_aida64() -> None:
"""Download AIDA64."""
archive = download_to_temp('AIDA64.zip', SOURCES['AIDA64'])
extract_to_bin(archive, 'AIDA64')
delete_from_temp('AIDA64.zip')
def download_autoruns():
def download_autoruns() -> None:
"""Download Autoruns."""
for item in ('Autoruns32', 'Autoruns64'):
out_path = BIN_DIR.joinpath(f'Sysinternals/{item}.exe')
download_file(out_path, SOURCES[item])
def download_bleachbit():
def download_bleachbit() -> None:
"""Download BleachBit."""
out_path = BIN_DIR.joinpath('BleachBit')
archive = download_to_temp('BleachBit.zip', SOURCES['BleachBit'])
@ -142,7 +143,7 @@ def download_bleachbit():
delete_from_temp('BleachBit.zip')
def download_bluescreenview():
def download_bluescreenview() -> None:
"""Download BlueScreenView."""
archive_32 = download_to_temp(
'bluescreenview32.zip', SOURCES['BlueScreenView32'],
@ -161,14 +162,14 @@ def download_bluescreenview():
delete_from_temp('bluescreenview64.zip')
def download_erunt():
def download_erunt() -> None:
"""Download ERUNT."""
archive = download_to_temp('erunt.zip', SOURCES['ERUNT'])
extract_to_bin(archive, 'ERUNT')
delete_from_temp('erunt.zip')
def download_everything():
def download_everything() -> None:
"""Download Everything."""
archive_32 = download_to_temp('everything32.zip', SOURCES['Everything32'])
archive_64 = download_to_temp('everything64.zip', SOURCES['Everything64'])
@ -183,7 +184,7 @@ def download_everything():
delete_from_temp('everything64.zip')
def download_fastcopy():
def download_fastcopy() -> None:
"""Download FastCopy."""
installer = download_to_temp('FastCopyInstaller.exe', SOURCES['FastCopy'])
out_path = BIN_DIR.joinpath('FastCopy')
@ -199,7 +200,7 @@ def download_fastcopy():
delete_item(BIN_DIR.joinpath('FastCopy/setup.exe'))
def download_furmark():
def download_furmark() -> None:
"""Download FurMark."""
installer = download_to_temp(
'FurMark_Setup.exe',
@ -219,20 +220,20 @@ def download_furmark():
delete_from_temp('FurMarkInstall')
def download_hwinfo():
def download_hwinfo() -> None:
"""Download HWiNFO."""
archive = download_to_temp('HWiNFO.zip', SOURCES['HWiNFO'])
extract_to_bin(archive, 'HWiNFO')
delete_from_temp('HWiNFO.zip')
def download_macs_fan_control():
def download_macs_fan_control() -> None:
"""Download Macs Fan Control."""
out_path = INSTALLERS_DIR.joinpath('Macs Fan Control.exe')
download_file(out_path, SOURCES['Macs Fan Control'])
def download_libreoffice():
def download_libreoffice() -> None:
"""Download LibreOffice."""
for arch in 32, 64:
out_path = INSTALLERS_DIR.joinpath(f'LibreOffice{arch}.msi')
@ -240,7 +241,7 @@ def download_libreoffice():
ui.sleep(1)
def download_neutron():
def download_neutron() -> None:
"""Download Neutron."""
archive = download_to_temp('neutron.zip', SOURCES['Neutron'])
out_path = BIN_DIR.joinpath('Neutron')
@ -248,7 +249,7 @@ def download_neutron():
delete_from_temp('neutron.zip')
def download_notepad_plus_plus():
def download_notepad_plus_plus() -> None:
"""Download Notepad++."""
archive = download_to_temp('npp.7z', SOURCES['Notepad++'])
extract_to_bin(archive, 'NotepadPlusPlus')
@ -260,21 +261,21 @@ def download_notepad_plus_plus():
delete_from_temp('npp.7z')
def download_openshell():
def download_openshell() -> None:
"""Download OpenShell installer and Fluent-Metro skin."""
for name in ('OpenShell.exe', 'Fluent-Metro.zip'):
out_path = BIN_DIR.joinpath(f'OpenShell/{name}')
download_file(out_path, SOURCES[name[:-4]])
def download_putty():
def download_putty() -> None:
"""Download PuTTY."""
archive = download_to_temp('putty.zip', SOURCES['PuTTY'])
extract_to_bin(archive, 'PuTTY')
delete_from_temp('putty.zip')
def download_snappy_driver_installer_origin():
def download_snappy_driver_installer_origin() -> None:
"""Download Snappy Driver Installer Origin."""
archive = download_to_temp('aria2.zip', SOURCES['Aria2'])
aria2c = TMP_DIR.joinpath('aria2/aria2c.exe')
@ -344,7 +345,7 @@ def download_snappy_driver_installer_origin():
delete_from_temp('fake.7z')
def download_uninstallview():
def download_uninstallview() -> None:
"""Download UninstallView."""
archive_32 = download_to_temp('uninstallview32.zip', SOURCES['UninstallView32'])
archive_64 = download_to_temp('uninstallview64.zip', SOURCES['UninstallView64'])
@ -359,14 +360,14 @@ def download_uninstallview():
delete_from_temp('uninstallview64.zip')
def download_wiztree():
def download_wiztree() -> None:
"""Download WizTree."""
archive = download_to_temp('wiztree.zip', SOURCES['WizTree'])
extract_to_bin(archive, 'WizTree')
delete_from_temp('wiztree.zip')
def download_xmplay():
def download_xmplay() -> None:
"""Download XMPlay."""
archives = [
download_to_temp('xmplay.zip', SOURCES['XMPlay']),
@ -394,7 +395,7 @@ def download_xmplay():
delete_from_temp('xmp-rar.zip')
delete_from_temp('Innocuous.zip')
def download_xmplay_music():
def download_xmplay_music() -> None:
"""Download XMPlay Music."""
music_tmp = TMP_DIR.joinpath('music')
music_tmp.mkdir(exist_ok=True)
@ -447,7 +448,7 @@ def download_xmplay_music():
# "Main" Function
def build_kit():
def build_kit() -> None:
"""Build Kit."""
update_log_path(dest_name='Build Tool', timestamp=True)
title = f'{KIT_NAME_FULL}: Build Tool'

View file

@ -1,11 +1,13 @@
"""WizardKit: Tool Functions"""
# vim: sts=2 sw=2 ts=2
from datetime import datetime, timedelta
import logging
import pathlib
import platform
from datetime import datetime, timedelta
from subprocess import CompletedProcess, Popen
import requests
from wk.cfg.main import ARCHIVE_PASSWORD
@ -30,7 +32,9 @@ CACHED_DIRS = {}
# Functions
def download_file(out_path, source_url, as_new=False, overwrite=False, referer=None):
def download_file(
out_path, source_url,
as_new=False, overwrite=False, referer=None) -> pathlib.Path:
"""Download a file using requests, returns pathlib.Path."""
out_path = pathlib.Path(out_path).resolve()
name = out_path.name
@ -95,7 +99,7 @@ def download_file(out_path, source_url, as_new=False, overwrite=False, referer=N
return out_path
def download_tool(folder, name, suffix=None):
def download_tool(folder, name, suffix=None) -> None:
"""Download tool."""
name_arch = f'{name}{ARCH}'
out_path = get_tool_path(folder, name, check=False, suffix=suffix)
@ -130,7 +134,7 @@ def download_tool(folder, name, suffix=None):
raise
def extract_archive(archive, out_path, *args, mode='x', silent=True):
def extract_archive(archive, out_path, *args, mode='x', silent=True) -> None:
"""Extract an archive to out_path."""
out_path = pathlib.Path(out_path).resolve()
out_path.parent.mkdir(parents=True, exist_ok=True)
@ -142,7 +146,7 @@ def extract_archive(archive, out_path, *args, mode='x', silent=True):
run_program(cmd)
def extract_tool(folder):
def extract_tool(folder) -> None:
"""Extract tool."""
extract_archive(
find_kit_dir('.cbin').joinpath(folder).with_suffix('.7z'),
@ -151,7 +155,7 @@ def extract_tool(folder):
)
def find_kit_dir(name=None):
def find_kit_dir(name=None) -> pathlib.Path:
"""Find folder in kit, returns pathlib.Path.
Search is performed in the script's path and then recursively upwards.
@ -178,7 +182,7 @@ def find_kit_dir(name=None):
return cur_path
def get_tool_path(folder, name, check=True, suffix=None):
def get_tool_path(folder, name, check=True, suffix=None) -> pathlib.Path:
"""Get tool path, returns pathlib.Path"""
bin_dir = find_kit_dir('.bin')
if not suffix:
@ -203,7 +207,7 @@ def run_tool(
folder, name, *run_args,
cbin=False, cwd=False, download=False, popen=False,
**run_kwargs,
):
) -> CompletedProcess | Popen:
"""Run tool from the kit or the Internet, returns proc obj.
proc will be either subprocess.CompletedProcess or subprocess.Popen."""

View file

@ -1,9 +1,12 @@
"""WizardKit: UFD Functions"""
# vim: sts=2 sw=2 ts=2
# TODO: Drop OrderedDict use
import logging
import math
import os
import pathlib
import shutil
from subprocess import CalledProcessError
@ -59,7 +62,7 @@ UFD_LABEL = f'{KIT_NAME_SHORT}_UFD'
# Functions
def apply_image(part_path, image_path, hide_macos_boot=True):
def apply_image(part_path, image_path, hide_macos_boot=True) -> None:
"""Apply raw image to dev_path using dd."""
cmd = [
'sudo',
@ -89,7 +92,7 @@ def apply_image(part_path, image_path, hide_macos_boot=True):
linux.unmount(source_or_mountpoint='/mnt/TMP')
def build_ufd():
def build_ufd() -> None:
"""Build UFD using selected sources."""
args = docopt(DOCSTRING)
if args['--debug']:
@ -252,7 +255,7 @@ def build_ufd():
ui.pause('Press Enter to exit...')
def confirm_selections(update=False):
def confirm_selections(update=False) -> None:
"""Ask tech to confirm selections, twice if necessary."""
if not ui.ask('Is the above information correct?'):
ui.abort()
@ -273,7 +276,7 @@ def confirm_selections(update=False):
ui.print_standard(' ')
def copy_source(source, items, overwrite=False):
def copy_source(source, items, overwrite=False) -> None:
"""Copy source items to /mnt/UFD."""
is_image = source.is_file()
items_not_found = False
@ -300,7 +303,7 @@ def copy_source(source, items, overwrite=False):
raise FileNotFoundError('One or more items not found')
def create_table(dev_path, use_mbr=False, images=None):
def create_table(dev_path, use_mbr=False, images=None) -> None:
"""Create GPT or DOS partition table."""
cmd = [
'sudo',
@ -338,7 +341,7 @@ def create_table(dev_path, use_mbr=False, images=None):
run_program(cmd)
def find_first_partition(dev_path):
def find_first_partition(dev_path) -> str:
"""Find path to first partition of dev, returns str."""
cmd = [
'lsblk',
@ -357,7 +360,7 @@ def find_first_partition(dev_path):
return part_path
def format_partition(dev_path, label):
def format_partition(dev_path, label) -> None:
"""Format first partition on device FAT32."""
cmd = [
'sudo',
@ -369,7 +372,7 @@ def format_partition(dev_path, label):
run_program(cmd)
def get_block_device_size(dev_path):
def get_block_device_size(dev_path) -> int:
"""Get block device size via lsblk, returns int."""
cmd = [
'lsblk',
@ -388,7 +391,7 @@ def get_block_device_size(dev_path):
return int(proc.stdout.strip())
def get_uuid(path):
def get_uuid(path) -> str:
"""Get filesystem UUID via findmnt, returns str."""
cmd = [
'findmnt',
@ -404,7 +407,7 @@ def get_uuid(path):
return proc.stdout.strip()
def hide_items(ufd_dev_first_partition, items):
def hide_items(ufd_dev_first_partition, items) -> None:
"""Set FAT32 hidden flag for items."""
with open('/root/.mtoolsrc', 'w', encoding='utf-8') as _f:
_f.write(f'drive U: file="{ufd_dev_first_partition}"\n')
@ -416,7 +419,7 @@ def hide_items(ufd_dev_first_partition, items):
run_program(cmd, shell=True, check=False)
def install_syslinux_to_dev(ufd_dev, use_mbr):
def install_syslinux_to_dev(ufd_dev, use_mbr) -> None:
"""Install Syslinux to UFD (dev)."""
cmd = [
'sudo',
@ -429,7 +432,7 @@ def install_syslinux_to_dev(ufd_dev, use_mbr):
run_program(cmd)
def install_syslinux_to_partition(partition):
def install_syslinux_to_partition(partition) -> None:
"""Install Syslinux to UFD (partition)."""
cmd = [
'sudo',
@ -442,7 +445,7 @@ def install_syslinux_to_partition(partition):
run_program(cmd)
def is_valid_path(path_obj, path_type):
def is_valid_path(path_obj, path_type) -> bool:
"""Verify path_obj is valid by type, returns bool."""
valid_path = False
if path_type == 'DIR':
@ -459,7 +462,7 @@ def is_valid_path(path_obj, path_type):
return valid_path
def set_boot_flag(dev_path, use_mbr=False):
def set_boot_flag(dev_path, use_mbr=False) -> None:
"""Set modern or legacy boot flag."""
cmd = [
'sudo',
@ -471,7 +474,7 @@ def set_boot_flag(dev_path, use_mbr=False):
run_program(cmd)
def remove_arch():
def remove_arch() -> None:
"""Remove arch dir from UFD.
This ensures a clean installation to the UFD and resets the boot files
@ -479,7 +482,7 @@ def remove_arch():
shutil.rmtree(io.case_insensitive_path('/mnt/UFD/arch'))
def show_selections(args, sources, ufd_dev, ufd_sources, extra_images):
def show_selections(args, sources, ufd_dev, ufd_sources, extra_images) -> None:
"""Show selections including non-specified options."""
# Sources
@ -526,7 +529,7 @@ def show_selections(args, sources, ufd_dev, ufd_sources, extra_images):
ui.print_standard(' ')
def update_boot_entries(ufd_dev, images=None):
def update_boot_entries(ufd_dev, images=None) -> None:
"""Update boot files for UFD usage"""
configs = []
uuids = [get_uuid('/mnt/UFD')]
@ -613,7 +616,7 @@ def update_boot_entries(ufd_dev, images=None):
break
def verify_sources(args, ufd_sources):
def verify_sources(args, ufd_sources) -> dict[str, pathlib.Path]:
"""Check all sources and abort if necessary, returns dict."""
sources = OrderedDict()
@ -633,7 +636,7 @@ def verify_sources(args, ufd_sources):
return sources
def verify_ufd(dev_path):
def verify_ufd(dev_path) -> pathlib.Path:
"""Check that dev_path is a valid UFD, returns pathlib.Path obj."""
ufd_dev = None
@ -647,10 +650,10 @@ def verify_ufd(dev_path):
ui.print_error(f'ERROR: Invalid UFD device: {ufd_dev}')
ui.abort()
return ufd_dev
return ufd_dev # type: ignore[reportGeneralTypeIssues]
def zero_device(dev_path):
def zero_device(dev_path) -> None:
"""Zero-out first 64MB of device."""
cmd = [
'sudo',

View file

@ -26,7 +26,7 @@ DEFAULT_LOG_NAME = cfg.main.KIT_NAME_FULL
# Functions
def enable_debug_mode():
def enable_debug_mode() -> None:
"""Configures logging for better debugging."""
root_logger = logging.getLogger()
for handler in root_logger.handlers:
@ -39,8 +39,11 @@ def enable_debug_mode():
def format_log_path(
log_dir=None, log_name=None, timestamp=False,
kit=False, tool=False, append=False):
log_dir: None | pathlib.Path | str = None,
log_name: None | str = None,
timestamp: bool = False,
kit: bool = False, tool: bool = False, append: bool = False,
) -> pathlib.Path:
"""Format path based on args passed, returns pathlib.Path obj."""
log_path = pathlib.Path(
f'{log_dir if log_dir else DEFAULT_LOG_DIR}/'
@ -61,7 +64,7 @@ def format_log_path(
return log_path
def get_root_logger_path():
def get_root_logger_path() -> pathlib.Path:
"""Get the log filepath from the root logger, returns pathlib.Path obj.
NOTE: This will use the first handler baseFilename it finds (if any).
@ -78,7 +81,7 @@ def get_root_logger_path():
raise RuntimeError('Log path not found.')
def remove_empty_log(log_path=None):
def remove_empty_log(log_path: None | pathlib.Path = None) -> None:
"""Remove log if empty.
NOTE: Under Windows an empty log is 2 bytes long.
@ -101,7 +104,7 @@ def remove_empty_log(log_path=None):
log_path.unlink()
def start(config=None):
def start(config: dict[str, str] | None = None) -> None:
"""Configure and start logging using safe defaults."""
log_path = format_log_path(timestamp=os.name != 'nt')
root_logger = logging.getLogger()
@ -124,7 +127,10 @@ def start(config=None):
def update_log_path(
dest_dir=None, dest_name=None, keep_history=True, timestamp=True, append=False):
dest_dir: None | pathlib.Path | str = None,
dest_name: None | str = None,
keep_history: bool = True, timestamp: bool = True, append: bool = False,
) -> None:
"""Moves current log file to new path and updates the root logger."""
root_logger = logging.getLogger()
new_path = format_log_path(dest_dir, dest_name, timestamp=timestamp, append=append)

View file

@ -5,6 +5,9 @@ import os
import pathlib
import re
from subprocess import CompletedProcess
from typing import Any
import psutil
from wk.exe import get_json_from_command, run_program
@ -23,7 +26,7 @@ REGEX_VALID_IP = re.compile(
# Functions
def connected_to_private_network(raise_on_error=False):
def connected_to_private_network(raise_on_error: bool = False) -> bool:
"""Check if connected to a private network, returns bool.
This checks for a valid private IP assigned to this system.
@ -49,12 +52,10 @@ def connected_to_private_network(raise_on_error=False):
raise GenericError('Not connected to a network')
# Done
if raise_on_error:
connected = None
return connected
def mount_backup_shares(read_write=False):
def mount_backup_shares(read_write: bool = False) -> list[str]:
"""Mount backup shares using OS specific methods."""
report = []
for name, details in BACKUP_SERVERS.items():
@ -97,7 +98,10 @@ def mount_backup_shares(read_write=False):
return report
def mount_network_share(details, mount_point=None, read_write=False):
def mount_network_share(
details: dict[str, Any],
mount_point: None | pathlib.Path | str = None,
read_write: bool = False) -> CompletedProcess:
"""Mount network share using OS specific methods."""
cmd = None
address = details['Address']
@ -148,7 +152,7 @@ def mount_network_share(details, mount_point=None, read_write=False):
return run_program(cmd, check=False)
def ping(addr='google.com'):
def ping(addr: str = 'google.com') -> None:
"""Attempt to ping addr."""
cmd = (
'ping',
@ -159,7 +163,7 @@ def ping(addr='google.com'):
run_program(cmd)
def share_is_mounted(details):
def share_is_mounted(details: dict[str, Any]) -> bool:
"""Check if dev/share/etc is mounted, returns bool."""
mounted = False
@ -193,8 +197,9 @@ def share_is_mounted(details):
return mounted
def show_valid_addresses():
def show_valid_addresses() -> None:
"""Show all valid private IP addresses assigned to the system."""
# TODO: Refactor to remove ui dependancy
devs = psutil.net_if_addrs()
for dev, families in sorted(devs.items()):
for family in families:
@ -203,8 +208,9 @@ def show_valid_addresses():
ui.show_data(message=dev, data=family.address)
def speedtest():
def speedtest() -> list[str]:
"""Run a network speedtest using speedtest-cli."""
# TODO: Refactor to use speedtest-cli's JSON output
cmd = ['speedtest-cli', '--simple']
proc = run_program(cmd, check=False)
output = [line.strip() for line in proc.stdout.splitlines() if line.strip()]
@ -213,7 +219,7 @@ def speedtest():
return [f'{a:<10}{b:6.2f} {c}' for a, b, c in output]
def unmount_backup_shares():
def unmount_backup_shares() -> list[str]:
"""Unmount backup shares."""
report = []
for name, details in BACKUP_SERVERS.items():
@ -242,7 +248,10 @@ def unmount_backup_shares():
return report
def unmount_network_share(details=None, mount_point=None):
def unmount_network_share(
details: dict[str, Any] | None = None,
mount_point: None | pathlib.Path | str = None,
) -> CompletedProcess:
"""Unmount network share"""
cmd = []

View file

@ -20,12 +20,12 @@ UUID_CORESTORAGE = '53746f72-6167-11aa-aa11-00306543ecac'
# Functions
def build_volume_report(device_path=None) -> list:
def build_volume_report(device_path=None) -> list[str]:
"""Build volume report using lsblk, returns list.
If device_path is provided the report is limited to that device.
"""
def _get_volumes(dev, indent=0) -> list:
def _get_volumes(dev, indent=0) -> list[dict]:
"""Convert lsblk JSON tree to a flat list of items, returns list."""
dev['name'] = f'{" "*indent}{dev["name"]}'
volumes = [dev]
@ -108,7 +108,7 @@ def build_volume_report(device_path=None) -> list:
return report
def get_user_home(user):
def get_user_home(user) -> pathlib.Path:
"""Get path to user's home dir, returns pathlib.Path obj."""
home = None
@ -129,7 +129,7 @@ def get_user_home(user):
return pathlib.Path(home)
def get_user_name():
def get_user_name() -> str:
"""Get real user name, returns str."""
user = None
@ -146,7 +146,7 @@ def get_user_name():
return user
def make_temp_file(suffix=None):
def make_temp_file(suffix=None) -> pathlib.Path:
"""Make temporary file, returns pathlib.Path() obj."""
cmd = ['mktemp']
if suffix:
@ -155,7 +155,7 @@ def make_temp_file(suffix=None):
return pathlib.Path(proc.stdout.strip())
def mount(source, mount_point=None, read_write=False):
def mount(source, mount_point=None, read_write=False) -> None:
"""Mount source (on mount_point if provided).
NOTE: If not running_as_root() then udevil will be used.
@ -178,13 +178,13 @@ def mount(source, mount_point=None, read_write=False):
raise RuntimeError(f'Failed to mount: {source} on {mount_point}')
def mount_volumes(device_path=None, read_write=False, scan_corestorage=False):
def mount_volumes(device_path=None, read_write=False, scan_corestorage=False) -> None:
"""Mount all detected volumes.
NOTE: If device_path is specified then only volumes
under that path will be mounted.
"""
def _get_volumes(dev) -> list:
def _get_volumes(dev) -> list[dict]:
"""Convert lsblk JSON tree to a flat list of items, returns list."""
volumes = [dev]
for child in dev.get('children', []):
@ -233,12 +233,12 @@ def mount_volumes(device_path=None, read_write=False, scan_corestorage=False):
pass
def running_as_root():
def running_as_root() -> bool:
"""Check if running with effective UID of 0, returns bool."""
return os.geteuid() == 0
def scan_corestorage_container(container, timeout=300):
def scan_corestorage_container(container, timeout=300) -> list[dict]:
"""Scan CoreStorage container for inner volumes, returns list."""
container_path = pathlib.Path(container)
detected_volumes = {}
@ -285,7 +285,7 @@ def scan_corestorage_container(container, timeout=300):
return inner_volumes
def unmount(source_or_mountpoint):
def unmount(source_or_mountpoint) -> None:
"""Unmount source_or_mountpoint.
NOTE: If not running_as_root() then udevil will be used.

View file

@ -13,7 +13,7 @@ REGEX_FANS = re.compile(r'^.*\(bytes (?P<bytes>.*)\)$')
# Functions
def decode_smc_bytes(text):
def decode_smc_bytes(text) -> int:
"""Decode SMC bytes, returns int."""
result = None
@ -32,7 +32,7 @@ def decode_smc_bytes(text):
return result
def set_fans(mode):
def set_fans(mode) -> None:
"""Set fans to auto or max."""
if mode == 'auto':
set_fans_auto()
@ -42,14 +42,14 @@ def set_fans(mode):
raise RuntimeError(f'Invalid fan mode: {mode}')
def set_fans_auto():
def set_fans_auto() -> None:
"""Set fans to auto."""
LOG.info('Setting fans to auto')
cmd = ['sudo', 'smc', '-k', 'FS! ', '-w', '0000']
run_program(cmd)
def set_fans_max():
def set_fans_max() -> None:
"""Set fans to their max speeds."""
LOG.info('Setting fans to max')
num_fans = 0

View file

@ -9,6 +9,8 @@ import platform
import re
from contextlib import suppress
from typing import Any
import psutil
try:
@ -92,7 +94,7 @@ else:
# Activation Functions
def activate_with_bios():
def activate_with_bios() -> None:
"""Attempt to activate Windows with a key stored in the BIOS."""
# Code borrowed from https://github.com/aeruder/get_win8key
#####################################################
@ -132,7 +134,7 @@ def activate_with_bios():
raise GenericError('Activation Failed')
def get_activation_string():
def get_activation_string() -> str:
"""Get activation status, returns str."""
cmd = ['cscript', '//nologo', SLMGR, '/xpr']
proc = run_program(cmd, check=False)
@ -142,7 +144,7 @@ def get_activation_string():
return act_str
def is_activated():
def is_activated() -> bool:
"""Check if Windows is activated via slmgr.vbs and return bool."""
act_str = get_activation_string()
@ -151,22 +153,22 @@ def is_activated():
# Date / Time functions
def get_timezone():
def get_timezone() -> str:
"""Get current timezone using tzutil, returns str."""
cmd = ['tzutil', '/g']
proc = run_program(cmd, check=False)
return proc.stdout
def set_timezone(zone):
def set_timezone(zone) -> None:
"""Set current timezone using tzutil."""
cmd = ['tzutil', '/s', zone]
run_program(cmd, check=False)
# Info Functions
def check_4k_alignment(show_alert=False):
"""Check if all partitions are 4K aligned, returns book."""
def check_4k_alignment(show_alert=False) -> list[str]:
"""Check if all partitions are 4K aligned, returns list."""
cmd = ['WMIC', 'partition', 'get', 'Caption,Size,StartingOffset']
report = []
show_alert = False
@ -204,8 +206,8 @@ def check_4k_alignment(show_alert=False):
return report
def export_bitlocker_info():
"""Get Bitlocker info and save to the current directory."""
def export_bitlocker_info() -> None:
"""Get Bitlocker info and save to the base directory of the kit."""
commands = [
['manage-bde', '-status', SYSTEMDRIVE],
['manage-bde', '-protectors', '-get', SYSTEMDRIVE],
@ -222,7 +224,7 @@ def export_bitlocker_info():
_f.write(f'{proc.stdout}\n\n')
def get_installed_antivirus():
def get_installed_antivirus() -> list[str]:
"""Get list of installed antivirus programs, returns list."""
cmd = [
'WMIC', r'/namespace:\\root\SecurityCenter2',
@ -263,8 +265,8 @@ def get_installed_antivirus():
return report
def get_installed_ram(as_list=False, raise_exceptions=False):
"""Get installed RAM."""
def get_installed_ram(as_list=False, raise_exceptions=False) -> list | str:
"""Get installed RAM, returns list or str."""
mem = psutil.virtual_memory()
mem_str = bytes_to_string(mem.total, decimals=1)
@ -279,8 +281,8 @@ def get_installed_ram(as_list=False, raise_exceptions=False):
return [mem_str] if as_list else mem_str
def get_os_activation(as_list=False, check=True):
"""Get OS activation status, returns str.
def get_os_activation(as_list=False, check=True) -> list | str:
"""Get OS activation status, returns list or str.
NOTE: If check=True then raise an exception if OS isn't activated.
"""
@ -296,7 +298,7 @@ def get_os_activation(as_list=False, check=True):
return [act_str] if as_list else act_str
def get_os_name(as_list=False, check=True):
def get_os_name(as_list=False, check=True) -> str:
"""Build OS display name, returns str.
NOTE: If check=True then an exception is raised if the OS version is
@ -322,7 +324,7 @@ def get_os_name(as_list=False, check=True):
return [display_name] if as_list else display_name
def get_raw_disks():
def get_raw_disks() -> list[str]:
"""Get all disks without a partiton table, returns list."""
script_path = find_kit_dir('Scripts').joinpath('get_raw_disks.ps1')
cmd = ['PowerShell', '-ExecutionPolicy', 'Bypass', '-File', script_path]
@ -347,7 +349,7 @@ def get_raw_disks():
return raw_disks
def get_volume_usage(use_colors=False):
def get_volume_usage(use_colors=False) -> list[str]:
"""Get space usage info for all fixed volumes, returns list."""
report = []
for disk in psutil.disk_partitions():
@ -371,7 +373,7 @@ def get_volume_usage(use_colors=False):
return report
def show_alert_box(message, title=None):
def show_alert_box(message, title=None) -> None:
"""Show Windows alert box with message."""
title = title if title else f'{KIT_NAME_FULL} Warning'
message_box = ctypes.windll.user32.MessageBoxW
@ -379,7 +381,7 @@ def show_alert_box(message, title=None):
# Registry Functions
def reg_delete_key(hive, key, recurse=False):
def reg_delete_key(hive, key, recurse=False) -> None:
"""Delete a key from the registry.
NOTE: If recurse is False then it will only work on empty keys.
@ -401,7 +403,7 @@ def reg_delete_key(hive, key, recurse=False):
except FileNotFoundError:
# Ignore
pass
except PermissionError:
except PermissionError as _e:
LOG.error(r'Failed to delete registry key: %s\%s', hive_name, key)
if recurse:
# Re-raise exception
@ -409,10 +411,10 @@ def reg_delete_key(hive, key, recurse=False):
# recurse is not True so assuming we tried to remove a non-empty key
msg = fr'Refusing to remove non-empty key: {hive_name}\{key}'
raise FileExistsError(msg)
raise FileExistsError(msg) from _e
def reg_delete_value(hive, key, value):
def reg_delete_value(hive, key, value) -> None:
"""Delete a value from the registry."""
access = winreg.KEY_ALL_ACCESS
hive = reg_get_hive(hive)
@ -436,8 +438,9 @@ def reg_delete_value(hive, key, value):
raise
def reg_get_hive(hive):
def reg_get_hive(hive) -> Any:
"""Get winreg HKEY constant from string, returns HKEY constant."""
# TODO: Fix type hint
if isinstance(hive, int):
# Assuming we're already a winreg HKEY constant
pass
@ -448,8 +451,9 @@ def reg_get_hive(hive):
return hive
def reg_get_data_type(data_type):
def reg_get_data_type(data_type) -> Any:
"""Get registry data type from string, returns winreg constant."""
# TODO: Fix type hint
if isinstance(data_type, int):
# Assuming we're already a winreg value type constant
pass
@ -460,7 +464,7 @@ def reg_get_data_type(data_type):
return data_type
def reg_key_exists(hive, key):
def reg_key_exists(hive, key) -> bool:
"""Test if the specified hive/key exists, returns bool."""
exists = False
hive = reg_get_hive(hive)
@ -478,7 +482,7 @@ def reg_key_exists(hive, key):
return exists
def reg_read_value(hive, key, value, force_32=False, force_64=False):
def reg_read_value(hive, key, value, force_32=False, force_64=False) -> Any:
"""Query value from hive/hey, returns multiple types.
NOTE: Set value='' to read the default value.
@ -502,7 +506,7 @@ def reg_read_value(hive, key, value, force_32=False, force_64=False):
return data
def reg_write_settings(settings):
def reg_write_settings(settings) -> None:
"""Set registry values in bulk from a custom data structure.
Data structure should be as follows:
@ -542,7 +546,7 @@ def reg_write_settings(settings):
reg_set_value(hive, key, *value)
def reg_set_value(hive, key, name, data, data_type, option=None):
def reg_set_value(hive, key, name, data, data_type, option=None) -> None:
"""Set value for hive/key."""
access = winreg.KEY_WRITE
data_type = reg_get_data_type(data_type)
@ -574,25 +578,25 @@ def reg_set_value(hive, key, name, data, data_type, option=None):
# Safe Mode Functions
def disable_safemode():
def disable_safemode() -> None:
"""Edit BCD to remove safeboot value."""
cmd = ['bcdedit', '/deletevalue', '{default}', 'safeboot']
run_program(cmd)
def disable_safemode_msi():
def disable_safemode_msi() -> None:
"""Disable MSI access under safemode."""
cmd = ['reg', 'delete', REG_MSISERVER, '/f']
run_program(cmd)
def enable_safemode():
def enable_safemode() -> None:
"""Edit BCD to set safeboot as default."""
cmd = ['bcdedit', '/set', '{default}', 'safeboot', 'network']
run_program(cmd)
def enable_safemode_msi():
def enable_safemode_msi() -> None:
"""Enable MSI access under safemode."""
cmd = ['reg', 'add', REG_MSISERVER, '/f']
run_program(cmd)
@ -605,7 +609,7 @@ def enable_safemode_msi():
# Secure Boot Functions
def is_booted_uefi():
def is_booted_uefi() -> bool:
"""Check if booted UEFI or legacy, returns bool."""
kernel = ctypes.windll.kernel32
firmware_type = ctypes.c_uint()
@ -621,7 +625,7 @@ def is_booted_uefi():
return firmware_type.value == 2
def is_secure_boot_enabled(raise_exceptions=False, show_alert=False):
def is_secure_boot_enabled(raise_exceptions=False, show_alert=False) -> bool:
"""Check if Secure Boot is enabled, returns bool.
If raise_exceptions is True then an exception is raised with details.
@ -671,7 +675,7 @@ def is_secure_boot_enabled(raise_exceptions=False, show_alert=False):
# Service Functions
def disable_service(service_name):
def disable_service(service_name) -> None:
"""Set service startup to disabled."""
cmd = ['sc', 'config', service_name, 'start=', 'disabled']
run_program(cmd, check=False)
@ -681,7 +685,7 @@ def disable_service(service_name):
raise GenericError(f'Failed to disable service {service_name}')
def enable_service(service_name, start_type='auto'):
def enable_service(service_name, start_type='auto') -> None:
"""Enable service by setting start type."""
cmd = ['sc', 'config', service_name, 'start=', start_type]
psutil_type = 'automatic'
@ -696,7 +700,7 @@ def enable_service(service_name, start_type='auto'):
raise GenericError(f'Failed to enable service {service_name}')
def get_service_status(service_name):
def get_service_status(service_name) -> str:
"""Get service status using psutil, returns str."""
status = 'unknown'
try:
@ -708,7 +712,7 @@ def get_service_status(service_name):
return status
def get_service_start_type(service_name):
def get_service_start_type(service_name) -> str:
"""Get service startup type using psutil, returns str."""
start_type = 'unknown'
try:
@ -720,7 +724,7 @@ def get_service_start_type(service_name):
return start_type
def start_service(service_name):
def start_service(service_name) -> None:
"""Stop service."""
cmd = ['net', 'start', service_name]
run_program(cmd, check=False)
@ -730,7 +734,7 @@ def start_service(service_name):
raise GenericError(f'Failed to start service {service_name}')
def stop_service(service_name):
def stop_service(service_name) -> None:
"""Stop service."""
cmd = ['net', 'stop', service_name]
run_program(cmd, check=False)

View file

@ -4,11 +4,13 @@
import atexit
import logging
import os
import pathlib
import re
import sys
import time
from subprocess import CalledProcessError, DEVNULL
from typing import Any
from xml.dom.minidom import parse as xml_parse
from wk.cfg.main import KIT_NAME_FULL, KIT_NAME_SHORT, WINDOWS_TIME_ZONE
@ -102,7 +104,7 @@ for error in ('CalledProcessError', 'FileNotFoundError'):
# Auto Repairs
def build_menus(base_menus, title, presets):
def build_menus(base_menus, title, presets) -> dict[str, ui.Menu]:
"""Build menus, returns dict."""
menus = {}
menus['Main'] = ui.Menu(title=f'{title}\n{ansi.color_string("Main Menu", "GREEN")}')
@ -169,7 +171,7 @@ def build_menus(base_menus, title, presets):
return menus
def update_scheduled_task():
def update_scheduled_task() -> None:
"""Create (or update) scheduled task to start repairs."""
cmd = [
'schtasks', '/create', '/f',
@ -183,7 +185,7 @@ def update_scheduled_task():
run_program(cmd)
def end_session():
def end_session() -> None:
"""End Auto Repairs session."""
# Remove logon task
cmd = [
@ -222,7 +224,7 @@ def end_session():
LOG.error('Failed to remove Auto Repairs session settings')
def get_entry_settings(group, name):
def get_entry_settings(group, name) -> dict[str, Any]:
"""Get menu entry settings from the registry, returns dict."""
key_path = fr'{AUTO_REPAIR_KEY}\{group}\{name}'
settings = {}
@ -241,7 +243,7 @@ def get_entry_settings(group, name):
return settings
def init(menus, presets):
def init(menus, presets) -> None:
"""Initialize Auto Repairs."""
session_started = is_session_started()
@ -267,7 +269,7 @@ def init(menus, presets):
print('')
def init_run(options):
def init_run(options) -> None:
"""Initialize Auto Repairs Run."""
update_scheduled_task()
if options['Kill Explorer']['Selected']:
@ -294,7 +296,7 @@ def init_run(options):
TRY_PRINT.run('Running RKill...', run_rkill, msg_good='DONE')
def init_session(options):
def init_session(options) -> None:
"""Initialize Auto Repairs session."""
reg_set_value('HKCU', AUTO_REPAIR_KEY, 'SessionStarted', 1, 'DWORD')
reg_set_value('HKCU', AUTO_REPAIR_KEY, 'LogName', get_root_logger_path().stem, 'SZ')
@ -319,7 +321,7 @@ def init_session(options):
reboot(30)
def is_autologon_enabled():
def is_autologon_enabled() -> bool:
"""Check if Autologon is enabled, returns bool."""
auto_admin_logon = False
try:
@ -337,7 +339,7 @@ def is_autologon_enabled():
return auto_admin_logon
def is_session_started():
def is_session_started() -> bool:
"""Check if session was started, returns bool."""
session_started = False
try:
@ -349,7 +351,7 @@ def is_session_started():
return session_started
def load_preset(menus, presets, enable_menu_exit=True):
def load_preset(menus, presets, enable_menu_exit=True) -> None:
"""Load menu settings from preset and ask selection question(s)."""
if not enable_menu_exit:
MENU_PRESETS.actions['Main Menu'].update({'Disabled':True, 'Hidden':True})
@ -375,7 +377,7 @@ def load_preset(menus, presets, enable_menu_exit=True):
MENU_PRESETS.actions['Main Menu'].update({'Disabled':False, 'Hidden':False})
def load_settings(menus):
def load_settings(menus) -> None:
"""Load session settings from the registry."""
for group, menu in menus.items():
if group == 'Main':
@ -384,7 +386,7 @@ def load_settings(menus):
menu.options[name].update(get_entry_settings(group, ansi.strip_colors(name)))
def run_auto_repairs(base_menus, presets):
def run_auto_repairs(base_menus, presets) -> None:
"""Run Auto Repairs."""
set_log_path()
title = f'{KIT_NAME_FULL}: Auto Repairs'
@ -443,7 +445,7 @@ def run_auto_repairs(base_menus, presets):
ui.pause('Press Enter to exit...')
def run_group(group, menu):
def run_group(group, menu) -> None:
"""Run entries in group if appropriate."""
ui.print_info(f' {group}')
for name, details in menu.options.items():
@ -487,7 +489,7 @@ def run_group(group, menu):
details['Function'](group, name)
def save_selection_settings(menus):
def save_selection_settings(menus) -> None:
"""Save selections in the registry."""
for group, menu in menus.items():
if group == 'Main':
@ -500,7 +502,7 @@ def save_selection_settings(menus):
)
def save_settings(group, name, result=None, **kwargs):
def save_settings(group, name, result=None, **kwargs) -> None:
"""Save entry settings in the registry."""
key_path = fr'{AUTO_REPAIR_KEY}\{group}\{ansi.strip_colors(name)}'
@ -528,7 +530,7 @@ def save_settings(group, name, result=None, **kwargs):
reg_set_value('HKCU', key_path, value_name, data, data_type)
def set_log_path():
def set_log_path() -> None:
"""Set log name using defaults or the saved registry value."""
try:
log_path = reg_read_value('HKCU', AUTO_REPAIR_KEY, 'LogName')
@ -544,7 +546,7 @@ def set_log_path():
)
def show_main_menu(base_menus, menus, presets, title):
def show_main_menu(base_menus, menus, presets, title) -> None:
"""Show main menu and handle actions."""
while True:
update_main_menu(menus)
@ -559,7 +561,7 @@ def show_main_menu(base_menus, menus, presets, title):
raise SystemExit
def show_sub_menu(menu):
def show_sub_menu(menu) -> None:
"""Show sub-menu and handle sub-menu actions."""
while True:
selection = menu.advanced_select()
@ -590,7 +592,7 @@ def show_sub_menu(menu):
menu.options[name][key] = value
def update_main_menu(menus):
def update_main_menu(menus) -> None:
"""Update main menu based on current selections."""
index = 1
skip = 'Reboot'
@ -609,7 +611,7 @@ def update_main_menu(menus):
# Auto Repairs: Wrapper Functions
def auto_adwcleaner(group, name):
def auto_adwcleaner(group, name) -> None:
"""Run AdwCleaner scan.
save_settings() is called first since AdwCleaner may kill this script.
@ -621,25 +623,25 @@ def auto_adwcleaner(group, name):
save_settings(group, name, result=result)
def auto_backup_browser_profiles(group, name):
def auto_backup_browser_profiles(group, name) -> None:
"""Backup browser profiles."""
backup_all_browser_profiles(use_try_print=True)
save_settings(group, name, done=True, failed=False, message='DONE')
def auto_backup_power_plans(group, name):
def auto_backup_power_plans(group, name) -> None:
"""Backup power plans."""
result = TRY_PRINT.run('Backup Power Plans...', export_power_plans)
save_settings(group, name, result=result)
def auto_backup_registry(group, name):
def auto_backup_registry(group, name) -> None:
"""Backup registry."""
result = TRY_PRINT.run('Backup Registry...', backup_registry)
save_settings(group, name, result=result)
def auto_bleachbit(group, name):
def auto_bleachbit(group, name) -> None:
"""Run BleachBit to clean files."""
result = TRY_PRINT.run(
'BleachBit...', run_bleachbit, BLEACH_BIT_CLEANERS, msg_good='DONE',
@ -647,7 +649,7 @@ def auto_bleachbit(group, name):
save_settings(group, name, result=result)
def auto_chkdsk(group, name):
def auto_chkdsk(group, name) -> None:
"""Run CHKDSK repairs."""
needs_reboot = False
result = TRY_PRINT.run(f'CHKDSK ({SYSTEMDRIVE})...', run_chkdsk_online)
@ -671,7 +673,7 @@ def auto_chkdsk(group, name):
reboot()
def auto_disable_pending_renames(group, name):
def auto_disable_pending_renames(group, name) -> None:
"""Disable pending renames."""
result = TRY_PRINT.run(
'Disabling pending renames...', disable_pending_renames,
@ -679,7 +681,7 @@ def auto_disable_pending_renames(group, name):
save_settings(group, name, result=result)
def auto_dism(group, name):
def auto_dism(group, name) -> None:
"""Run DISM repairs."""
needs_reboot = False
result = TRY_PRINT.run('DISM (RestoreHealth)...', run_dism)
@ -704,7 +706,7 @@ def auto_dism(group, name):
reboot()
def auto_enable_regback(group, name):
def auto_enable_regback(group, name) -> None:
"""Enable RegBack."""
result = TRY_PRINT.run(
'Enable RegBack...', reg_set_value, 'HKLM',
@ -714,19 +716,19 @@ def auto_enable_regback(group, name):
save_settings(group, name, result=result)
def auto_hitmanpro(group, name):
def auto_hitmanpro(group, name) -> None:
"""Run HitmanPro scan."""
result = TRY_PRINT.run('HitmanPro...', run_hitmanpro, msg_good='DONE')
save_settings(group, name, result=result)
def auto_kvrt(group, name):
def auto_kvrt(group, name) -> None:
"""Run KVRT scan."""
result = TRY_PRINT.run('KVRT...', run_kvrt, msg_good='DONE')
save_settings(group, name, result=result)
def auto_microsoft_defender(group, name):
def auto_microsoft_defender(group, name) -> None:
"""Run Microsoft Defender scan."""
result = TRY_PRINT.run(
'Microsoft Defender...', run_microsoft_defender, msg_good='DONE',
@ -734,14 +736,14 @@ def auto_microsoft_defender(group, name):
save_settings(group, name, result=result)
def auto_reboot(group, name):
def auto_reboot(group, name) -> None:
"""Reboot the system."""
save_settings(group, name, done=True, failed=False, message='DONE')
print('')
reboot(30)
def auto_remove_power_plan(group, name):
def auto_remove_power_plan(group, name) -> None:
"""Remove custom power plan and set to Balanced."""
result = TRY_PRINT.run(
'Remove Custom Power Plan...', remove_custom_power_plan,
@ -749,7 +751,7 @@ def auto_remove_power_plan(group, name):
save_settings(group, name, result=result)
def auto_repair_registry(group, name):
def auto_repair_registry(group, name) -> None:
"""Delete registry keys with embedded null characters."""
result = TRY_PRINT.run(
'Running Registry repairs...', delete_registry_null_keys,
@ -757,19 +759,19 @@ def auto_repair_registry(group, name):
save_settings(group, name, result=result)
def auto_reset_power_plans(group, name):
def auto_reset_power_plans(group, name) -> None:
"""Reset power plans."""
result = TRY_PRINT.run('Reset Power Plans...', reset_power_plans)
save_settings(group, name, result=result)
def auto_reset_proxy(group, name):
def auto_reset_proxy(group, name) -> None:
"""Reset proxy settings."""
result = TRY_PRINT.run('Clearing proxy settings...', reset_proxy)
save_settings(group, name, result=result)
def auto_reset_windows_policies(group, name):
def auto_reset_windows_policies(group, name) -> None:
"""Reset Windows policies to defaults."""
result = TRY_PRINT.run(
'Resetting Windows policies...', reset_windows_policies,
@ -777,13 +779,13 @@ def auto_reset_windows_policies(group, name):
save_settings(group, name, result=result)
def auto_restore_uac_defaults(group, name):
def auto_restore_uac_defaults(group, name) -> None:
"""Restore UAC default settings."""
result = TRY_PRINT.run('Restoring UAC defaults...', restore_uac_defaults)
save_settings(group, name, result=result)
def auto_set_custom_power_plan(group, name):
def auto_set_custom_power_plan(group, name) -> None:
"""Set custom power plan."""
result = TRY_PRINT.run(
'Set Custom Power Plan...', create_custom_power_plan,
@ -793,13 +795,13 @@ def auto_set_custom_power_plan(group, name):
save_settings(group, name, result=result)
def auto_sfc(group, name):
def auto_sfc(group, name) -> None:
"""Run SFC repairs."""
result = TRY_PRINT.run('SFC Scan...', run_sfc_scan)
save_settings(group, name, result=result)
def auto_system_restore_create(group, name):
def auto_system_restore_create(group, name) -> None:
"""Create System Restore point."""
result = TRY_PRINT.run(
'Create System Restore...', create_system_restore_point,
@ -807,7 +809,7 @@ def auto_system_restore_create(group, name):
save_settings(group, name, result=result)
def auto_system_restore_enable(group, name):
def auto_system_restore_enable(group, name) -> None:
"""Enable System Restore."""
cmd = [
'powershell', '-Command', 'Enable-ComputerRestore',
@ -817,13 +819,13 @@ def auto_system_restore_enable(group, name):
save_settings(group, name, result=result)
def auto_system_restore_set_size(group, name):
def auto_system_restore_set_size(group, name) -> None:
"""Set System Restore size."""
result = TRY_PRINT.run('Set System Restore Size...', set_system_restore_size)
save_settings(group, name, result=result)
def auto_uninstallview(group, name):
def auto_uninstallview(group, name) -> None:
"""Run UninstallView."""
result = TRY_PRINT.run(
'UninstallView...', run_uninstallview, msg_good='DONE',
@ -831,7 +833,7 @@ def auto_uninstallview(group, name):
save_settings(group, name, result=result)
def auto_windows_updates_disable(group, name):
def auto_windows_updates_disable(group, name) -> None:
"""Disable Windows Updates."""
result = TRY_PRINT.run('Disable Windows Updates...', disable_windows_updates)
if result['Failed']:
@ -840,13 +842,13 @@ def auto_windows_updates_disable(group, name):
save_settings(group, name, result=result)
def auto_windows_updates_enable(group, name):
def auto_windows_updates_enable(group, name) -> None:
"""Enable Windows Updates."""
result = TRY_PRINT.run('Enable Windows Updates...', enable_windows_updates)
save_settings(group, name, result=result)
def auto_windows_updates_reset(group, name):
def auto_windows_updates_reset(group, name) -> None:
"""Reset Windows Updates."""
result = TRY_PRINT.run('Reset Windows Updates...', reset_windows_updates)
if result['Failed']:
@ -856,12 +858,12 @@ def auto_windows_updates_reset(group, name):
# Misc Functions
def set_backup_path(name, date=False):
def set_backup_path(name, date=False) -> pathlib.Path:
"""Set backup path, returns pathlib.Path."""
return set_local_storage_path('Backups', name, date)
def set_local_storage_path(folder, name, date=False):
def set_local_storage_path(folder, name, date=False) -> pathlib.Path:
"""Get path for local storage, returns pathlib.Path."""
local_path = get_path_obj(f'{SYSTEMDRIVE}/{KIT_NAME_SHORT}/{folder}/{name}')
if date:
@ -869,13 +871,13 @@ def set_local_storage_path(folder, name, date=False):
return local_path
def set_quarantine_path(name, date=False):
def set_quarantine_path(name, date=False) -> pathlib.Path:
"""Set quarantine path, returns pathlib.Path."""
return set_local_storage_path('Quarantine', name, date)
# Tool Functions
def backup_all_browser_profiles(use_try_print=False):
def backup_all_browser_profiles(use_try_print=False) -> None:
"""Backup browser profiles for all users."""
users = get_path_obj(f'{SYSTEMDRIVE}/Users')
for userprofile in users.iterdir():
@ -884,7 +886,7 @@ def backup_all_browser_profiles(use_try_print=False):
backup_browser_profiles(userprofile, use_try_print)
def backup_browser_chromium(backup_path, browser, search_path, use_try_print):
def backup_browser_chromium(backup_path, browser, search_path, use_try_print) -> None:
"""Backup Chromium-based browser profile."""
for item in search_path.iterdir():
match = re.match(r'^(Default|Profile).*', item.name, re.IGNORECASE)
@ -914,7 +916,7 @@ def backup_browser_chromium(backup_path, browser, search_path, use_try_print):
run_program(cmd, check=False)
def backup_browser_firefox(backup_path, search_path, use_try_print):
def backup_browser_firefox(backup_path, search_path, use_try_print) -> None:
"""Backup Firefox browser profile."""
output_path = backup_path.joinpath('Firefox.7z')
@ -939,7 +941,7 @@ def backup_browser_firefox(backup_path, search_path, use_try_print):
run_program(cmd, check=False)
def backup_browser_profiles(userprofile, use_try_print=False):
def backup_browser_profiles(userprofile, use_try_print=False) -> None:
"""Backup browser profiles for userprofile."""
backup_path = set_backup_path('Browsers', date=True)
backup_path = backup_path.joinpath(userprofile.name)
@ -968,7 +970,7 @@ def backup_browser_profiles(userprofile, use_try_print=False):
pass
def backup_registry():
def backup_registry() -> None:
"""Backup Registry."""
backup_path = set_backup_path('Registry', date=True)
backup_path.parent.mkdir(parents=True, exist_ok=True)
@ -981,12 +983,12 @@ def backup_registry():
run_tool('ERUNT', 'ERUNT', backup_path, 'sysreg', 'curuser', 'otherusers')
def delete_registry_null_keys():
def delete_registry_null_keys() -> None:
"""Delete registry keys with embedded null characters."""
run_tool('RegDelNull', 'RegDelNull', '-s', '-y', download=True)
def log_kvrt_results(log_path, report_path):
def log_kvrt_results(log_path, report_path) -> None:
"""Parse KVRT report and log results in plain text."""
log_text = ''
report_file = None
@ -1027,7 +1029,7 @@ def log_kvrt_results(log_path, report_path):
log_path.write_text(log_text, encoding='utf-8')
def run_adwcleaner():
def run_adwcleaner() -> None:
"""Run AdwCleaner."""
settings_path = get_tool_path('AdwCleaner', 'AdwCleaner', check=False)
settings_path = settings_path.with_name('settings')
@ -1037,7 +1039,7 @@ def run_adwcleaner():
run_tool('AdwCleaner', 'AdwCleaner', download=True)
def run_bleachbit(cleaners, preview=True):
def run_bleachbit(cleaners, preview=True) -> None:
"""Run BleachBit to either clean or preview files."""
cmd_args = (
'--preview' if preview else '--clean',
@ -1048,11 +1050,15 @@ def run_bleachbit(cleaners, preview=True):
proc = run_tool('BleachBit', 'bleachbit_console', *cmd_args)
# Save logs
log_path.write_text(proc.stdout, encoding='utf-8')
log_path.with_suffix('.err').write_text(proc.stderr, encoding='utf-8')
log_path.write_text(
proc.stdout, encoding='utf-8', # type: ignore[reportGeneralTypeIssues]
)
log_path.with_suffix('.err').write_text(
proc.stderr, encoding='utf-8', # type: ignore[reportGeneralTypeIssues]
)
def run_hitmanpro():
def run_hitmanpro() -> None:
"""Run HitmanPro scan."""
log_path = format_log_path(log_name='HitmanPro', timestamp=True, tool=True)
log_path = log_path.with_suffix('.xml')
@ -1061,7 +1067,7 @@ def run_hitmanpro():
run_tool('HitmanPro', 'HitmanPro', *cmd_args, download=True)
def run_kvrt():
def run_kvrt() -> None:
"""Run KVRT scan."""
log_path = format_log_path(log_name='KVRT', timestamp=True, tool=True)
log_path.parent.mkdir(parents=True, exist_ok=True)
@ -1102,11 +1108,11 @@ def run_kvrt():
log_kvrt_results(log_path, report_path)
def run_microsoft_defender(full=True):
def run_microsoft_defender(full=True) -> None:
"""Run Microsoft Defender scan."""
reg_key = r'Software\Microsoft\Windows Defender'
def _get_defender_path():
def _get_defender_path() -> str:
install_path = reg_read_value('HKLM', reg_key, 'InstallLocation')
return fr'{install_path}\MpCmdRun.exe'
@ -1147,7 +1153,7 @@ def run_microsoft_defender(full=True):
raise GenericError('Failed to run scan or clean items.')
def run_rkill():
def run_rkill() -> None:
"""Run RKill scan."""
log_path = format_log_path(log_name='RKill', timestamp=True, tool=True)
log_path.parent.mkdir(parents=True, exist_ok=True)
@ -1161,7 +1167,7 @@ def run_rkill():
run_tool('RKill', 'RKill', *cmd_args, download=True)
def run_tdsskiller():
def run_tdsskiller() -> None:
"""Run TDSSKiller scan."""
log_path = format_log_path(log_name='TDSSKiller', timestamp=True, tool=True)
log_path.parent.mkdir(parents=True, exist_ok=True)
@ -1179,13 +1185,13 @@ def run_tdsskiller():
run_tool('TDSSKiller', 'TDSSKiller', *cmd_args, download=True)
def run_uninstallview():
def run_uninstallview() -> None:
"""Run UninstallView."""
run_tool('UninstallView', 'UninstallView')
# OS Built-in Functions
def create_custom_power_plan(enable_sleep=True, keep_display_on=False):
def create_custom_power_plan(enable_sleep=True, keep_display_on=False) -> None:
"""Create new power plan and set as active."""
custom_guid = POWER_PLANS['Custom']
sleep_timeouts = POWER_PLAN_SLEEP_TIMEOUTS['High Performance']
@ -1234,7 +1240,7 @@ def create_custom_power_plan(enable_sleep=True, keep_display_on=False):
run_program(cmd)
def create_system_restore_point():
def create_system_restore_point() -> None:
"""Create System Restore point."""
cmd = [
'powershell', '-Command', 'Checkpoint-Computer',
@ -1249,7 +1255,7 @@ def create_system_restore_point():
raise GenericWarning('Skipped, a restore point was created too recently')
def disable_pending_renames():
def disable_pending_renames() -> None:
"""Disable pending renames."""
reg_set_value(
'HKLM', r'SYSTEM\CurrentControlSet\Control\Session Manager',
@ -1257,18 +1263,18 @@ def disable_pending_renames():
)
def disable_windows_updates():
def disable_windows_updates() -> None:
"""Disable and stop Windows Updates."""
disable_service('wuauserv')
stop_service('wuauserv')
def enable_windows_updates():
def enable_windows_updates() -> None:
"""Enable Windows Updates."""
enable_service('wuauserv', 'demand')
def export_power_plans():
def export_power_plans() -> None:
"""Export existing power plans."""
backup_path = set_backup_path('Power Plans', date=True)
@ -1299,13 +1305,13 @@ def export_power_plans():
run_program(cmd)
def kill_explorer():
def kill_explorer() -> None:
"""Kill all Explorer processes."""
cmd = ['taskkill', '/im', 'explorer.exe', '/f']
run_program(cmd, check=False)
def reboot(timeout=10):
def reboot(timeout=10) -> None:
"""Reboot the system."""
atexit.unregister(start_explorer)
ui.print_warning(f'Rebooting the system in {timeout} seconds...')
@ -1315,7 +1321,7 @@ def reboot(timeout=10):
raise SystemExit
def remove_custom_power_plan(high_performance=False):
def remove_custom_power_plan(high_performance=False) -> None:
"""Remove custom power plan and set to a built-in plan.
If high_performance is True then set to High Performance and set
@ -1342,13 +1348,13 @@ def remove_custom_power_plan(high_performance=False):
run_program(cmd)
def reset_power_plans():
def reset_power_plans() -> None:
"""Reset power plans to their default settings."""
cmd = ['powercfg', '-RestoreDefaultSchemes']
run_program(cmd)
def reset_proxy():
def reset_proxy() -> None:
"""Reset WinHTTP proxy settings."""
cmd = ['netsh', 'winhttp', 'reset', 'proxy']
proc = run_program(cmd, check=False)
@ -1358,7 +1364,7 @@ def reset_proxy():
raise GenericError('Failed to reset proxy settings.')
def reset_windows_policies():
def reset_windows_policies() -> None:
"""Reset Windows policies to defaults."""
cmd = ['gpupdate', '/force']
proc = run_program(cmd, check=False)
@ -1368,7 +1374,7 @@ def reset_windows_policies():
raise GenericError('Failed to reset one or more policies.')
def reset_windows_updates():
def reset_windows_updates() -> None:
"""Reset Windows Updates."""
system_root = os.environ.get('SYSTEMROOT', 'C:/Windows')
src_path = f'{system_root}/SoftwareDistribution'
@ -1381,7 +1387,7 @@ def reset_windows_updates():
pass
def restore_uac_defaults():
def restore_uac_defaults() -> None:
"""Restore UAC default settings."""
settings = REG_UAC_DEFAULTS_WIN10
if OS_VERSION in (7, 8, 8.1):
@ -1390,7 +1396,7 @@ def restore_uac_defaults():
reg_write_settings(settings)
def run_chkdsk_offline():
def run_chkdsk_offline() -> None:
"""Set filesystem 'dirty bit' to force a CHKDSK during startup."""
cmd = ['fsutil', 'dirty', 'set', SYSTEMDRIVE]
proc = run_program(cmd, check=False)
@ -1400,7 +1406,7 @@ def run_chkdsk_offline():
raise GenericError('Failed to set dirty bit.')
def run_chkdsk_online():
def run_chkdsk_online() -> None:
"""Run CHKDSK.
NOTE: If run on Windows 8+ online repairs are attempted.
@ -1440,7 +1446,7 @@ def run_chkdsk_online():
raise GenericError('Issue(s) detected')
def run_dism(repair=True):
def run_dism(repair=True) -> None:
"""Run DISM to either scan or repair component store health."""
conemu_args = ['-new_console:nb', '-new_console:s33V'] if IN_CONEMU else []
@ -1479,7 +1485,7 @@ def run_dism(repair=True):
raise GenericError('Issue(s) detected')
def run_sfc_scan():
def run_sfc_scan() -> None:
"""Run SFC and save results."""
cmd = ['sfc', '/scannow']
log_path = format_log_path(log_name='SFC', timestamp=True, tool=True)
@ -1506,7 +1512,7 @@ def run_sfc_scan():
raise OSError
def set_system_restore_size(size=8):
def set_system_restore_size(size=8) -> None:
"""Set System Restore size."""
cmd = [
'vssadmin', 'Resize', 'ShadowStorage',
@ -1515,7 +1521,7 @@ def set_system_restore_size(size=8):
run_program(cmd, pipe=False, stderr=DEVNULL, stdout=DEVNULL)
def start_explorer():
def start_explorer() -> None:
"""Start Explorer."""
popen_program(['explorer.exe'])

View file

@ -8,6 +8,8 @@ import os
import re
import sys
from typing import Any
from wk.cfg.main import KIT_NAME_FULL
from wk.cfg.setup import (
BROWSER_PATHS,
@ -99,7 +101,7 @@ for error in ('CalledProcessError', 'FileNotFoundError'):
# Auto Setup
def build_menus(base_menus, title, presets):
def build_menus(base_menus, title, presets) -> dict[str, ui.Menu]:
"""Build menus, returns dict."""
menus = {}
menus['Main'] = ui.Menu(title=f'{title}\n{ansi.color_string("Main Menu", "GREEN")}')
@ -154,7 +156,7 @@ def build_menus(base_menus, title, presets):
return menus
def check_os_and_set_menu_title(title):
def check_os_and_set_menu_title(title) -> str:
"""Check OS version and update title for menus, returns str."""
color = None
os_name = get_os_name(check=False)
@ -180,7 +182,7 @@ def check_os_and_set_menu_title(title):
return f'{title} ({ansi.color_string(os_name, color)})'
def load_preset(menus, presets, title, enable_menu_exit=True):
def load_preset(menus, presets, title, enable_menu_exit=True) -> None:
"""Load menu settings from preset and ask selection question(s)."""
if not enable_menu_exit:
MENU_PRESETS.actions['Main Menu'].update({'Disabled':True, 'Hidden':True})
@ -219,7 +221,7 @@ def load_preset(menus, presets, title, enable_menu_exit=True):
menus[group_name].options[entry_name]['Selected'] = False
def run_auto_setup(base_menus, presets):
def run_auto_setup(base_menus, presets) -> None:
"""Run Auto Setup."""
update_log_path(dest_name='Auto Setup', timestamp=True)
title = f'{KIT_NAME_FULL}: Auto Setup'
@ -261,7 +263,7 @@ def run_auto_setup(base_menus, presets):
ui.pause('Press Enter to exit...')
def run_group(group, menu):
def run_group(group, menu) -> None:
"""Run entries in group if appropriate."""
ui.print_info(f' {group}')
for name, details in menu.options.items():
@ -276,7 +278,7 @@ def run_group(group, menu):
details['Function']()
def show_main_menu(base_menus, menus, presets, title):
def show_main_menu(base_menus, menus, presets, title) -> None:
"""Show main menu and handle actions."""
while True:
update_main_menu(menus)
@ -291,7 +293,7 @@ def show_main_menu(base_menus, menus, presets, title):
raise SystemExit
def show_sub_menu(menu):
def show_sub_menu(menu) -> None:
"""Show sub-menu and handle sub-menu actions."""
while True:
selection = menu.advanced_select()
@ -307,7 +309,7 @@ def show_sub_menu(menu):
menu.options[name]['Selected'] = value
def update_main_menu(menus):
def update_main_menu(menus) -> None:
"""Update main menu based on current selections."""
index = 1
skip = 'Reboot'
@ -326,37 +328,37 @@ def update_main_menu(menus):
# Auto Repairs: Wrapper Functions
def auto_backup_registry():
def auto_backup_registry() -> None:
"""Backup registry."""
TRY_PRINT.run('Backup Registry...', backup_registry)
def auto_backup_browser_profiles():
def auto_backup_browser_profiles() -> None:
"""Backup browser profiles."""
backup_all_browser_profiles(use_try_print=True)
def auto_backup_power_plans():
def auto_backup_power_plans() -> None:
"""Backup power plans."""
TRY_PRINT.run('Backup Power Plans...', export_power_plans)
def auto_reset_power_plans():
def auto_reset_power_plans() -> None:
"""Reset power plans."""
TRY_PRINT.run('Reset Power Plans...', reset_power_plans)
def auto_set_custom_power_plan():
def auto_set_custom_power_plan() -> None:
"""Set custom power plan."""
TRY_PRINT.run('Set Custom Power Plan...', create_custom_power_plan)
def auto_enable_bsod_minidumps():
def auto_enable_bsod_minidumps() -> None:
"""Enable saving minidumps during BSoDs."""
TRY_PRINT.run('Enable BSoD mini dumps...', enable_bsod_minidumps)
def auto_enable_regback():
def auto_enable_regback() -> None:
"""Enable RegBack."""
TRY_PRINT.run(
'Enable RegBack...', reg_set_value, 'HKLM',
@ -365,7 +367,7 @@ def auto_enable_regback():
)
def auto_system_restore_enable():
def auto_system_restore_enable() -> None:
"""Enable System Restore."""
cmd = [
'powershell', '-Command', 'Enable-ComputerRestore',
@ -374,28 +376,28 @@ def auto_system_restore_enable():
TRY_PRINT.run('Enable System Restore...', run_program, cmd=cmd)
def auto_system_restore_set_size():
def auto_system_restore_set_size() -> None:
"""Set System Restore size."""
TRY_PRINT.run('Set System Restore Size...', set_system_restore_size)
def auto_system_restore_create():
def auto_system_restore_create() -> None:
"""Create System Restore point."""
TRY_PRINT.run('Create System Restore...', create_system_restore_point)
def auto_windows_updates_enable():
def auto_windows_updates_enable() -> None:
"""Enable Windows Updates."""
TRY_PRINT.run('Enable Windows Updates...', enable_windows_updates)
# Auto Setup: Wrapper Functions
def auto_activate_windows():
def auto_activate_windows() -> None:
"""Attempt to activate Windows using BIOS key."""
TRY_PRINT.run('Windows Activation...', activate_with_bios)
def auto_config_browsers():
def auto_config_browsers() -> None:
"""Configure Browsers."""
prompt = ' Press Enter to continue...'
TRY_PRINT.run('Chrome Notifications...', disable_chrome_notifications)
@ -412,27 +414,27 @@ def auto_config_browsers():
print(f'\033[F\r{" "*len(prompt)}\r', end='', flush=True)
def auto_config_explorer():
def auto_config_explorer() -> None:
"""Configure Windows Explorer and restart the process."""
TRY_PRINT.run('Windows Explorer...', config_explorer)
def auto_config_open_shell():
def auto_config_open_shell() -> None:
"""Configure Open Shell."""
TRY_PRINT.run('Open Shell...', config_open_shell)
def auto_export_aida64_report():
def auto_export_aida64_report() -> None:
"""Export AIDA64 reports."""
TRY_PRINT.run('AIDA64 Report...', export_aida64_report)
def auto_install_firefox():
def auto_install_firefox() -> None:
"""Install Firefox."""
TRY_PRINT.run('Firefox...', install_firefox)
def auto_install_libreoffice():
def auto_install_libreoffice() -> None:
"""Install LibreOffice.
NOTE: It is assumed that auto_install_vcredists() will be run
@ -441,105 +443,105 @@ def auto_install_libreoffice():
TRY_PRINT.run('LibreOffice...', install_libreoffice, vcredist=False)
def auto_install_open_shell():
def auto_install_open_shell() -> None:
"""Install Open Shell."""
TRY_PRINT.run('Open Shell...', install_open_shell)
def auto_install_software_bundle():
def auto_install_software_bundle() -> None:
"""Install standard software bundle."""
TRY_PRINT.run('Software Bundle...', install_software_bundle)
def auto_install_vcredists():
def auto_install_vcredists() -> None:
"""Install latest supported Visual C++ runtimes."""
TRY_PRINT.run('Visual C++ Runtimes...', install_vcredists)
def auto_open_device_manager():
def auto_open_device_manager() -> None:
"""Open Device Manager."""
TRY_PRINT.run('Device Manager...', open_device_manager)
def auto_open_hwinfo_sensors():
def auto_open_hwinfo_sensors() -> None:
"""Open HWiNFO Sensors."""
TRY_PRINT.run('HWiNFO Sensors...', open_hwinfo_sensors)
def auto_open_snappy_driver_installer_origin():
def auto_open_snappy_driver_installer_origin() -> None:
"""Open Snappy Driver Installer Origin."""
TRY_PRINT.run('Snappy Driver Installer...', open_snappy_driver_installer_origin)
def auto_open_windows_activation():
def auto_open_windows_activation() -> None:
"""Open Windows Activation."""
if not is_activated():
TRY_PRINT.run('Windows Activation...', open_windows_activation)
def auto_open_windows_updates():
def auto_open_windows_updates() -> None:
"""Open Windows Updates."""
TRY_PRINT.run('Windows Updates...', open_windows_updates)
def auto_open_xmplay():
def auto_open_xmplay() -> None:
"""Open XMPlay."""
TRY_PRINT.run('XMPlay...', open_xmplay)
def auto_show_4k_alignment_check():
def auto_show_4k_alignment_check() -> None:
"""Display 4K alignment check."""
TRY_PRINT.run('4K alignment Check...', check_4k_alignment, show_alert=True)
def auto_show_installed_antivirus():
def auto_show_installed_antivirus() -> None:
"""Display installed antivirus."""
TRY_PRINT.run('Virus Protection...', get_installed_antivirus)
def auto_show_installed_ram():
def auto_show_installed_ram() -> None:
"""Display installed RAM."""
TRY_PRINT.run('Installed RAM...', get_installed_ram,
as_list=True, raise_exceptions=True,
)
def auto_show_os_activation():
def auto_show_os_activation() -> None:
"""Display OS activation status."""
TRY_PRINT.run('Activation...', get_os_activation, as_list=True)
def auto_show_os_name():
def auto_show_os_name() -> None:
"""Display OS Name."""
TRY_PRINT.run('Operating System...', get_os_name, as_list=True)
def auto_show_secure_boot_status():
def auto_show_secure_boot_status() -> None:
"""Display Secure Boot status."""
TRY_PRINT.run(
'Secure Boot...', check_secure_boot_status, msg_good='Enabled',
)
def auto_show_storage_status():
def auto_show_storage_status() -> None:
"""Display storage status."""
TRY_PRINT.run('Storage Status...', get_storage_status)
def auto_windows_temp_fix():
def auto_windows_temp_fix() -> None:
"""Restore default ACLs for Windows\\Temp."""
TRY_PRINT.run(r'Windows\Temp fix...', fix_windows_temp)
# Configure Functions
def config_explorer():
def config_explorer() -> None:
"""Configure Windows Explorer and restart the process."""
reg_write_settings(REG_WINDOWS_EXPLORER)
kill_procs('explorer.exe', force=True)
popen_program(['explorer.exe'])
def config_open_shell():
def config_open_shell() -> None:
"""Configure Open Shell."""
has_low_power_idle = False
@ -559,7 +561,7 @@ def config_open_shell():
reg_write_settings(REG_OPEN_SHELL_LOW_POWER_IDLE)
def disable_chrome_notifications():
def disable_chrome_notifications() -> None:
"""Disable notifications in Google Chrome."""
defaults_key = 'default_content_setting_values'
profiles = []
@ -601,13 +603,13 @@ def disable_chrome_notifications():
pref_file.write_text(json.dumps(pref_data, separators=(',', ':')))
def enable_bsod_minidumps():
def enable_bsod_minidumps() -> None:
"""Enable saving minidumps during BSoDs."""
cmd = ['wmic', 'RECOVEROS', 'set', 'DebugInfoType', '=', '3']
run_program(cmd)
def enable_ublock_origin():
def enable_ublock_origin() -> None:
"""Enable uBlock Origin in supported browsers."""
base_paths = [
PROGRAMFILES_64, PROGRAMFILES_32, os.environ.get('LOCALAPPDATA'),
@ -637,7 +639,7 @@ def enable_ublock_origin():
popen_program(cmd, pipe=True)
def fix_windows_temp():
def fix_windows_temp() -> None:
"""Restore default permissions for Windows\\Temp."""
permissions = (
'Users:(CI)(X,WD,AD)',
@ -649,7 +651,7 @@ def fix_windows_temp():
# Install Functions
def install_firefox():
def install_firefox() -> None:
"""Install Firefox.
As far as I can tell if you use the EXE installers then it will use
@ -754,7 +756,7 @@ def install_libreoffice(
run_program(cmd)
def install_open_shell():
def install_open_shell() -> None:
"""Install Open Shell (just the Start Menu)."""
skin_zip = get_tool_path('OpenShell', 'Fluent-Metro', suffix='zip')
@ -784,7 +786,7 @@ def install_open_shell():
run_program(cmd)
def install_software_bundle():
def install_software_bundle() -> None:
"""Install standard software bundle."""
download_tool('Ninite', 'Software Bundle')
installer = get_tool_path('Ninite', 'Software Bundle')
@ -809,7 +811,7 @@ def install_software_bundle():
end='', flush=True)
def install_vcredists():
def install_vcredists() -> None:
"""Install latest supported Visual C++ runtimes."""
for year in (2012, 2013, 2022):
cmd_args = ['/install', '/passive', '/norestart']
@ -826,7 +828,7 @@ def install_vcredists():
run_program([installer, *cmd_args])
def uninstall_firefox():
def uninstall_firefox() -> None:
"""Uninstall all copies of Firefox."""
json_file = format_log_path(log_name='Installed Programs', timestamp=True)
json_file = json_file.with_name(f'{json_file.stem}.json')
@ -847,13 +849,14 @@ def uninstall_firefox():
# Misc Functions
def check_secure_boot_status():
def check_secure_boot_status() -> None:
"""Check Secure Boot status."""
is_secure_boot_enabled(raise_exceptions=True, show_alert=True)
def get_firefox_default_profile(profiles_ini):
def get_firefox_default_profile(profiles_ini) -> Any:
"""Get Firefox default profile, returns(pathlib.Path, encoding) or None."""
# TODO: Refactor to remove dependancy on Any
default_profile = None
encoding = None
parser = None
@ -890,7 +893,7 @@ def get_firefox_default_profile(profiles_ini):
return (default_profile, encoding)
def get_storage_status():
def get_storage_status() -> list[str]:
"""Get storage status for fixed disks, returns list."""
report = get_volume_usage(use_colors=True)
for disk in get_raw_disks():
@ -900,14 +903,14 @@ def get_storage_status():
return report
def set_default_browser():
def set_default_browser() -> None:
"""Open Windows Settings to the default apps section."""
cmd = ['start', '', 'ms-settings:defaultapps']
popen_program(cmd, shell=True)
# Tool Functions
def export_aida64_report():
def export_aida64_report() -> None:
"""Export AIDA64 report."""
report_path = format_log_path(
log_name='AIDA64 System Report',
@ -928,12 +931,12 @@ def export_aida64_report():
raise GenericError('Error(s) encountered exporting report.')
def open_device_manager():
def open_device_manager() -> None:
"""Open Device Manager."""
popen_program(['mmc', 'devmgmt.msc'])
def open_hwinfo_sensors():
def open_hwinfo_sensors() -> None:
"""Open HWiNFO sensors."""
hwinfo_path = get_tool_path('HWiNFO', 'HWiNFO')
base_config = hwinfo_path.with_name('general.ini')
@ -949,22 +952,22 @@ def open_hwinfo_sensors():
run_tool('HWiNFO', 'HWiNFO', popen=True)
def open_snappy_driver_installer_origin():
def open_snappy_driver_installer_origin() -> None:
"""Open Snappy Driver Installer Origin."""
run_tool('SDIO', 'SDIO', cwd=True, pipe=True, popen=True)
def open_windows_activation():
def open_windows_activation() -> None:
"""Open Windows Activation."""
popen_program(['slui'])
def open_windows_updates():
def open_windows_updates() -> None:
"""Open Windows Updates."""
popen_program(['control', '/name', 'Microsoft.WindowsUpdate'])
def open_xmplay():
def open_xmplay() -> None:
"""Open XMPlay."""
sleep(2)
run_tool('XMPlay', 'XMPlay', 'music.7z', cwd=True, popen=True)

View file

@ -31,7 +31,10 @@ class GenericWarning(Exception):
# Functions
def bytes_to_string(size, decimals=0, use_binary=True):
def bytes_to_string(
size: float | int,
decimals: int = 0,
use_binary: bool = True) -> str:
"""Convert size into a human-readable format, returns str.
[Doctest]
@ -73,13 +76,13 @@ def bytes_to_string(size, decimals=0, use_binary=True):
return size_str
def sleep(seconds=2):
def sleep(seconds: int | float = 2) -> None:
"""Simple wrapper for time.sleep."""
time.sleep(seconds)
def string_to_bytes(size, assume_binary=False):
"""Convert human-readable size str to bytes and return an int."""
def string_to_bytes(size: float | int | str, assume_binary: bool = False) -> int:
"""Convert human-readable size to bytes and return an int."""
LOG.debug('size: %s, assume_binary: %s', size, assume_binary)
scale = 1000
size = str(size)

View file

@ -3,7 +3,8 @@
import itertools
import logging
import pathlib
from typing import Iterable
# STATIC VARIABLES
LOG = logging.getLogger(__name__)
@ -23,44 +24,41 @@ COLORS = {
# Functions
def clear_screen():
def clear_screen() -> None:
"""Clear screen using ANSI escape."""
print('\033c', end='', flush=True)
def color_string(strings, colors, sep=' '):
def color_string(
strings: Iterable[str] | str,
colors: Iterable[str | None] | str,
sep=' ',
) -> str:
"""Build colored string using ANSI escapes, returns str."""
clear_code = COLORS['CLEAR']
data = {'strings': strings, 'colors': colors}
msg = []
# Convert to tuples if necessary
if isinstance(strings, (str, pathlib.Path)):
strings = (strings,)
if isinstance(colors, (str, pathlib.Path)):
colors = (colors,)
# Convert to strings if necessary
try:
iter(strings)
except TypeError:
# Assuming single element passed, convert to string
strings = (str(strings),)
try:
iter(colors)
except TypeError:
# Assuming single element passed, convert to string
colors = (str(colors),)
# Convert input to tuples of strings
for k, v in data.items():
if isinstance(v, str):
# Avoid splitting string into a list of characters
data[k] = (v,)
try:
iter(v)
except TypeError:
# Assuming single element passed, convert to string
data[k] = (str(v),)
# Build new string with color escapes added
for string, color in itertools.zip_longest(strings, colors):
color_code = COLORS.get(color, clear_code)
msg.append(f'{color_code}{string}{clear_code}')
for string, color in itertools.zip_longest(data['strings'], data['colors']):
color_code = COLORS.get(str(color), COLORS['CLEAR'])
msg.append(f'{color_code}{string}{COLORS["CLEAR"]}')
# Done
return sep.join(msg)
def strip_colors(string):
def strip_colors(string: str) -> str:
"""Strip known ANSI color escapes from string, returns str."""
LOG.debug('string: %s', string)
for color in COLORS.values():

View file

@ -9,8 +9,10 @@ import subprocess
import sys
import traceback
from collections import OrderedDict
from typing import Any, Callable, Iterable
from prompt_toolkit import prompt
from prompt_toolkit.document import Document
from prompt_toolkit.validation import Validator, ValidationError
try:
@ -36,12 +38,12 @@ PLATFORM = platform.system()
# Classes
class InputChoiceValidator(Validator):
"""Validate that input is one of the provided choices."""
def __init__(self, choices, allow_empty=False):
self.allow_empty = allow_empty
self.choices = [str(c).upper() for c in choices]
def __init__(self, choices: Iterable[str], allow_empty: bool = False):
self.allow_empty: bool = allow_empty
self.choices: list[str] = [str(c).upper() for c in choices]
super().__init__()
def validate(self, document):
def validate(self, document: Document) -> None:
text = document.text
if not (text or self.allow_empty):
raise ValidationError(
@ -56,7 +58,7 @@ class InputChoiceValidator(Validator):
class InputNotEmptyValidator(Validator):
"""Validate that input is not empty."""
def validate(self, document):
def validate(self, document: Document) -> None:
text = document.text
if not text:
raise ValidationError(
@ -66,11 +68,11 @@ class InputNotEmptyValidator(Validator):
class InputTicketIDValidator(Validator):
"""Validate that input resembles a ticket ID."""
def __init__(self, allow_empty=False):
self.allow_empty = allow_empty
def __init__(self, allow_empty: bool = False):
self.allow_empty: bool = allow_empty
super().__init__()
def validate(self, document):
def validate(self, document: Document) -> None:
text = document.text
if not (text or self.allow_empty):
raise ValidationError(
@ -85,11 +87,11 @@ class InputTicketIDValidator(Validator):
class InputYesNoValidator(Validator):
"""Validate that input is a yes or no."""
def __init__(self, allow_empty=False):
self.allow_empty = allow_empty
def __init__(self, allow_empty: bool = False):
self.allow_empty: bool = allow_empty
super().__init__()
def validate(self, document):
def validate(self, document: Document) -> None:
text = document.text
if not (text or self.allow_empty):
raise ValidationError(
@ -105,22 +107,20 @@ class InputYesNoValidator(Validator):
class Menu():
"""Object for tracking menu specific data and methods.
Menu items are added to an OrderedDict so the order is preserved.
ASSUMPTIONS:
1. All entry names are unique.
2. All action entry names start with different letters.
"""
def __init__(self, title='[Untitled Menu]'):
self.actions = OrderedDict()
self.options = OrderedDict()
self.sets = OrderedDict()
self.toggles = OrderedDict()
self.disabled_str = 'Disabled'
self.separator = ''
self.title = title
def __init__(self, title: str = '[Untitled Menu]'):
self.actions: dict[str, dict[Any, Any]] = {}
self.options: dict[str, dict[Any, Any]] = {}
self.sets: dict[str, dict[Any, Any]] = {}
self.toggles: dict[str, dict[Any, Any]] = {}
self.disabled_str: str = 'Disabled'
self.separator: str = ''
self.title: str = title
def _generate_menu_text(self):
def _generate_menu_text(self) -> str:
"""Generate menu text, returns str."""
separator_string = self._get_separator_string()
menu_lines = [self.title, separator_string] if self.title else []
@ -161,7 +161,7 @@ class Menu():
def _get_display_name(
self, name, details,
index=None, no_checkboxes=True, setting_item=False):
index=None, no_checkboxes=True, setting_item=False) -> str:
"""Format display name based on details and args, returns str."""
disabled = details.get('Disabled', False)
if setting_item and not details['Selected']:
@ -189,7 +189,7 @@ class Menu():
# Done
return display_name
def _get_separator_string(self):
def _get_separator_string(self) -> str:
"""Format separator length based on name lengths, returns str."""
separator_length = 0
@ -211,7 +211,7 @@ class Menu():
# Done
return self.separator * separator_length
def _get_valid_answers(self):
def _get_valid_answers(self) -> list[str]:
"""Get valid answers based on menu items, returns list."""
valid_answers = []
@ -234,10 +234,10 @@ class Menu():
# Done
return valid_answers
def _resolve_selection(self, selection):
def _resolve_selection(self, selection: str) -> tuple[str, dict[Any, Any]]:
"""Get menu item based on user selection, returns tuple."""
offset = 1
resolved_selection = None
resolved_selection = tuple()
if selection.isnumeric():
# Enumerate over numbered entries
entries = [
@ -249,6 +249,10 @@ class Menu():
if details[1].get('Hidden', False):
offset -= 1
elif str(_i+offset) == selection:
# TODO: Fix this typo!
# It was discovered after being in production for SEVERAL YEARS!
# Extra testing is needed to verify any calls to this function still
# depend on this functionality
resolved_selection = (details)
break
else:
@ -261,7 +265,7 @@ class Menu():
# Done
return resolved_selection
def _update(self, single_selection=True, settings_mode=False):
def _update(self, single_selection: bool = True, settings_mode: bool = False) -> None:
"""Update menu items in preparation for printing to screen."""
index = 0
@ -299,7 +303,8 @@ class Menu():
no_checkboxes=True,
)
def _update_entry_selection_status(self, entry, toggle=True, status=None):
def _update_entry_selection_status(
self, entry: str, toggle: bool = True, status: bool = False) -> None:
"""Update entry selection status either directly or by toggling."""
if entry in self.sets:
# Update targets not the set itself
@ -313,14 +318,14 @@ class Menu():
else:
section[entry]['Selected'] = status
def _update_set_selection_status(self, targets, status):
def _update_set_selection_status(self, targets: Iterable[str], status: bool) -> None:
"""Select or deselect options based on targets and status."""
for option, details in self.options.items():
# If (new) status is True and this option is a target then select
# Otherwise deselect
details['Selected'] = status and option in targets
def _user_select(self, prompt_msg):
def _user_select(self, prompt_msg: str) -> str:
"""Show menu and select an entry, returns str."""
menu_text = self._generate_menu_text()
valid_answers = self._get_valid_answers()
@ -337,19 +342,19 @@ class Menu():
# Done
return answer
def add_action(self, name, details=None):
def add_action(self, name: str, details: dict[Any, Any] | None = None) -> None:
"""Add action to menu."""
details = details if details else {}
details['Selected'] = details.get('Selected', False)
self.actions[name] = details
def add_option(self, name, details=None):
def add_option(self, name: str, details: dict[Any, Any] | None = None) -> None:
"""Add option to menu."""
details = details if details else {}
details['Selected'] = details.get('Selected', False)
self.options[name] = details
def add_set(self, name, details=None):
def add_set(self, name: str, details: dict[Any, Any] | None = None) -> None:
"""Add set to menu."""
details = details if details else {}
details['Selected'] = details.get('Selected', False)
@ -361,13 +366,16 @@ class Menu():
# Add set
self.sets[name] = details
def add_toggle(self, name, details=None):
def add_toggle(self, name: str, details: dict[Any, Any] | None = None) -> None:
"""Add toggle to menu."""
details = details if details else {}
details['Selected'] = details.get('Selected', False)
self.toggles[name] = details
def advanced_select(self, prompt_msg='Please make a selection: '):
def advanced_select(
self,
prompt_msg: str = 'Please make a selection: ',
) -> tuple[str, dict[Any, Any]]:
"""Display menu and make multiple selections, returns tuple.
NOTE: Menu is displayed until an action entry is selected.
@ -386,7 +394,10 @@ class Menu():
# Done
return selected_entry
def settings_select(self, prompt_msg='Please make a selection: '):
def settings_select(
self,
prompt_msg: str = 'Please make a selection: ',
) -> tuple[str, dict[Any, Any]]:
"""Display menu and make multiple selections, returns tuple.
NOTE: Menu is displayed until an action entry is selected.
@ -414,14 +425,18 @@ class Menu():
# Done
return selected_entry
def simple_select(self, prompt_msg='Please make a selection: ', update=True):
def simple_select(
self,
prompt_msg: str = 'Please make a selection: ',
update: bool = True,
) -> tuple[str, dict[Any, Any]]:
"""Display menu and make a single selection, returns tuple."""
if update:
self._update()
user_selection = self._user_select(prompt_msg)
return self._resolve_selection(user_selection)
def update(self):
def update(self) -> None:
"""Update menu with default settings."""
self._update()
@ -431,17 +446,17 @@ class TryAndPrint():
The errors and warning attributes are used to allow fine-tuned results
based on exception names.
"""
def __init__(self, msg_bad='FAILED', msg_good='SUCCESS'):
self.catch_all = True
self.indent = INDENT
self.list_errors = ['GenericError']
self.list_warnings = ['GenericWarning']
self.msg_bad = msg_bad
self.msg_good = msg_good
self.verbose = False
self.width = WIDTH
def __init__(self, msg_bad: str = 'FAILED', msg_good: str = 'SUCCESS'):
self.catch_all : bool = True
self.indent: int = INDENT
self.list_errors: list[str] = ['GenericError']
self.list_warnings: list[str] = ['GenericWarning']
self.msg_bad: str = msg_bad
self.msg_good: str = msg_good
self.verbose : bool = False
self.width: int = WIDTH
def _format_exception_message(self, _exception):
def _format_exception_message(self, _exception: Exception) -> str:
"""Format using the exception's args or name, returns str."""
LOG.debug(
'Formatting exception: %s, %s',
@ -488,7 +503,11 @@ class TryAndPrint():
# Done
return message
def _format_function_output(self, output, msg_good):
def _format_function_output(
self,
output: list | subprocess.CompletedProcess,
msg_good: str,
) -> str:
"""Format function output for use in try_and_print(), returns str."""
LOG.debug('Formatting output: %s', output)
@ -526,26 +545,33 @@ class TryAndPrint():
# Done
return result_msg
def _log_result(self, message, result_msg):
def _log_result(self, message: str, result_msg: str) -> None:
"""Log result text without color formatting."""
log_text = f'{" "*self.indent}{message:<{self.width}}{result_msg}'
for line in log_text.splitlines():
line = strip_colors(line)
LOG.info(line)
def add_error(self, exception_name):
def add_error(self, exception_name: str) -> None:
"""Add exception name to error list."""
if exception_name not in self.list_errors:
self.list_errors.append(exception_name)
def add_warning(self, exception_name):
def add_warning(self, exception_name: str) -> None:
"""Add exception name to warning list."""
if exception_name not in self.list_warnings:
self.list_warnings.append(exception_name)
def run(
self, message, function, *args,
catch_all=None, msg_good=None, verbose=None, **kwargs):
self,
message: str,
function: Callable,
*args: Iterable[Any],
catch_all: bool | None = None,
msg_good: str | None = None,
verbose: bool | None = None,
**kwargs,
) -> dict[str, Any]:
"""Run a function and print the results, returns results as dict.
If catch_all is True then (nearly) all exceptions will be caught.
@ -556,7 +582,7 @@ class TryAndPrint():
msg_bad, or exception text.
The output should be a list or a subprocess.CompletedProcess object.
If msg_good is passed it will override self.msg_good for this call.
If msg_good is passed it will override self.msg_good.
If verbose is True then exception names or messages will be used for
the result message. Otherwise it will simply be set to result_bad.
@ -583,8 +609,8 @@ class TryAndPrint():
verbose = verbose if verbose is not None else self.verbose
# Build exception tuples
e_exceptions = tuple(get_exception(e) for e in self.list_errors)
w_exceptions = tuple(get_exception(e) for e in self.list_warnings)
e_exceptions: tuple = tuple(get_exception(e) for e in self.list_errors)
w_exceptions: tuple = tuple(get_exception(e) for e in self.list_warnings)
# Run function and catch exceptions
print(f'{" "*self.indent}{message:<{self.width}}', end='', flush=True)
@ -632,7 +658,11 @@ class TryAndPrint():
# Functions
def abort(prompt_msg='Aborted.', show_prompt_msg=True, return_code=1):
def abort(
prompt_msg: str = 'Aborted.',
show_prompt_msg: bool = True,
return_code: int = 1,
) -> None:
"""Abort script."""
print_warning(prompt_msg)
if show_prompt_msg:
@ -641,23 +671,24 @@ def abort(prompt_msg='Aborted.', show_prompt_msg=True, return_code=1):
sys.exit(return_code)
def ask(prompt_msg):
def ask(prompt_msg: str) -> bool:
"""Prompt the user with a Y/N question, returns bool."""
validator = InputYesNoValidator()
# Show prompt
response = input_text(f'{prompt_msg} [Y/N]: ', validator=validator)
if response.upper().startswith('Y'):
answer = True
elif response.upper().startswith('N'):
answer = False
LOG.info('%sYes', prompt_msg)
return True
if response.upper().startswith('N'):
LOG.info('%sNo', prompt_msg)
return False
# Done
LOG.info('%s%s', prompt_msg, 'Yes' if answer else 'No')
return answer
# This shouldn't ever be reached
raise ValueError(f'Invalid answer given: {response}')
def beep(repeat=1):
def beep(repeat: int = 1) -> None:
"""Play system bell with optional repeat."""
while repeat >= 1:
# Print bell char without a newline
@ -666,7 +697,7 @@ def beep(repeat=1):
repeat -= 1
def choice(prompt_msg, choices):
def choice(prompt_msg: str, choices: Iterable[str]) -> str:
"""Choose an option from a provided list, returns str.
Choices provided will be converted to uppercase and returned as such.
@ -684,7 +715,7 @@ def choice(prompt_msg, choices):
return response.upper()
def fix_prompt(message):
def fix_prompt(message: str) -> str:
"""Fix prompt, returns str."""
if not message:
message = 'Input text: '
@ -695,7 +726,7 @@ def fix_prompt(message):
@cache
def get_exception(name):
def get_exception(name: str) -> Exception:
"""Get exception by name, returns exception object.
[Doctest]
@ -731,7 +762,7 @@ def get_exception(name):
return obj
def get_ticket_id():
def get_ticket_id() -> str:
"""Get ticket ID, returns str."""
prompt_msg = 'Please enter ticket ID:'
validator = InputTicketIDValidator()
@ -744,7 +775,9 @@ def get_ticket_id():
def input_text(
prompt_msg='Enter text: ', allow_empty=False, validator=None,
prompt_msg: str = 'Enter text: ',
allow_empty: bool = False,
validator: Validator | None = None,
) -> str:
"""Get input from user, returns str."""
prompt_msg = fix_prompt(prompt_msg)
@ -766,7 +799,7 @@ def input_text(
return result
def major_exception():
def major_exception() -> None:
"""Display traceback, optionally upload detailes, and exit."""
LOG.critical('Major exception encountered', exc_info=True)
print_error('Major exception', log=False)
@ -780,12 +813,18 @@ def major_exception():
raise SystemExit(1)
def pause(prompt_msg='Press Enter to continue... '):
def pause(prompt_msg: str = 'Press Enter to continue... ') -> None:
"""Simple pause implementation."""
input_text(prompt_msg, allow_empty=True)
def print_colored(strings, colors, log=False, sep=' ', **kwargs):
def print_colored(
strings: Iterable[str] | str,
colors: Iterable[str | None] | str,
log: bool = False,
sep: str = ' ',
**kwargs,
) -> None:
"""Prints strings in the colors specified."""
LOG.debug(
'strings: %s, colors: %s, sep: %s, kwargs: %s',
@ -803,7 +842,7 @@ def print_colored(strings, colors, log=False, sep=' ', **kwargs):
LOG.info(strip_colors(msg))
def print_error(msg, log=True, **kwargs):
def print_error(msg: str, log: bool = True, **kwargs) -> None:
"""Prints message in RED and log as ERROR."""
if 'file' not in kwargs:
# Only set if not specified
@ -813,14 +852,14 @@ def print_error(msg, log=True, **kwargs):
LOG.error(msg)
def print_info(msg, log=True, **kwargs):
def print_info(msg: str, log: bool = True, **kwargs) -> None:
"""Prints message in BLUE and log as INFO."""
print_colored(msg, 'BLUE', **kwargs)
if log:
LOG.info(msg)
def print_report(report, indent=None, log=True):
def print_report(report: list[str], indent=None, log: bool = True) -> None:
"""Print report to screen and optionally to log."""
for line in report:
if indent:
@ -830,21 +869,21 @@ def print_report(report, indent=None, log=True):
LOG.info(strip_colors(line))
def print_standard(msg, log=True, **kwargs):
def print_standard(msg: str, log: bool = True, **kwargs) -> None:
"""Prints message and log as INFO."""
print(msg, **kwargs)
if log:
LOG.info(msg)
def print_success(msg, log=True, **kwargs):
def print_success(msg: str, log: bool = True, **kwargs) -> None:
"""Prints message in GREEN and log as INFO."""
print_colored(msg, 'GREEN', **kwargs)
if log:
LOG.info(msg)
def print_warning(msg, log=True, **kwargs):
def print_warning(msg: str, log: bool = True, **kwargs) -> None:
"""Prints message in YELLOW and log as WARNING."""
if 'file' not in kwargs:
# Only set if not specified
@ -854,7 +893,7 @@ def print_warning(msg, log=True, **kwargs):
LOG.warning(msg)
def set_title(title):
def set_title(title: str) -> None:
"""Set window title."""
LOG.debug('title: %s', title)
if os.name == 'nt':
@ -863,14 +902,19 @@ def set_title(title):
print_error('Setting the title is only supported under Windows.')
def show_data(message, data, color=None, indent=None, width=None):
def show_data(
message: str,
data: Any,
color: str | None = None,
indent: int | None = None,
width: int | None = None,
) -> None:
"""Display info using default or provided indent and width."""
colors = (None, color if color else None)
indent = INDENT if indent is None else indent
width = WIDTH if width is None else width
print_colored(
(f'{" "*indent}{message:<{width}}', data),
colors,
(None, color if color else None),
log=True,
sep='',
)

View file

@ -4,6 +4,8 @@
import logging
import pathlib
from typing import Any
from wk.exe import run_program
from wk.std import PLATFORM
@ -13,7 +15,7 @@ LOG = logging.getLogger(__name__)
# Functions
def capture_pane(pane_id=None):
def capture_pane(pane_id: str | None = None) -> str:
"""Capture text from current or target pane, returns str."""
cmd = ['tmux', 'capture-pane', '-p']
if pane_id:
@ -24,7 +26,7 @@ def capture_pane(pane_id=None):
return proc.stdout.strip()
def clear_pane(pane_id=None):
def clear_pane(pane_id: str | None = None) -> None:
"""Clear pane buffer for current or target pane."""
commands = [
['tmux', 'send-keys', '-R'],
@ -38,7 +40,7 @@ def clear_pane(pane_id=None):
run_program(cmd, check=False)
def fix_layout(layout, forced=False):
def fix_layout(layout: dict[str, dict[str, Any]], forced: bool = False) -> None:
"""Fix pane sizes based on layout."""
resize_kwargs = []
@ -110,7 +112,7 @@ def fix_layout(layout, forced=False):
resize_pane(worker, height=layout['Workers']['height'])
def get_pane_size(pane_id=None):
def get_pane_size(pane_id: str | None = None) -> tuple[int, int]:
"""Get current or target pane size, returns tuple."""
cmd = ['tmux', 'display', '-p']
if pane_id:
@ -127,7 +129,7 @@ def get_pane_size(pane_id=None):
return (width, height)
def get_window_size():
def get_window_size() -> tuple[int, int]:
"""Get current window size, returns tuple."""
cmd = ['tmux', 'display', '-p', '#{window_width} #{window_height}']
@ -141,7 +143,7 @@ def get_window_size():
return (width, height)
def kill_all_panes(pane_id=None):
def kill_all_panes(pane_id: str | None = None) -> None:
"""Kill all panes except for the current or target pane."""
cmd = ['tmux', 'kill-pane', '-a']
if pane_id:
@ -151,7 +153,7 @@ def kill_all_panes(pane_id=None):
run_program(cmd, check=False)
def kill_pane(*pane_ids):
def kill_pane(*pane_ids: str) -> None:
"""Kill pane(s) by id."""
cmd = ['tmux', 'kill-pane', '-t']
@ -160,7 +162,7 @@ def kill_pane(*pane_ids):
run_program(cmd+[pane_id], check=False)
def layout_needs_fixed(layout):
def layout_needs_fixed(layout: dict[str, dict[str, Any]]) -> bool:
"""Check if layout needs fixed, returns bool."""
needs_fixed = False
@ -189,7 +191,7 @@ def layout_needs_fixed(layout):
return needs_fixed
def poll_pane(pane_id):
def poll_pane(pane_id: str) -> bool:
"""Check if pane exists, returns bool."""
cmd = ['tmux', 'list-panes', '-F', '#D']
@ -202,7 +204,12 @@ def poll_pane(pane_id):
def prep_action(
cmd=None, working_dir=None, text=None, watch_file=None, watch_cmd='cat'):
cmd: str | None = None,
working_dir: pathlib.Path | str | None = None,
text: str | None = None,
watch_file: pathlib.Path | str | None = None,
watch_cmd: str = 'cat',
) -> list[str]:
"""Prep action to perform during a tmux call, returns list.
This will prep for running a basic command, displaying text on screen,
@ -252,7 +259,7 @@ def prep_action(
return action_cmd
def prep_file(path):
def prep_file(path: pathlib.Path | str) -> None:
"""Check if file exists and create empty file if not."""
path = pathlib.Path(path).resolve()
try:
@ -262,7 +269,11 @@ def prep_file(path):
pass
def resize_pane(pane_id=None, width=None, height=None):
def resize_pane(
pane_id: str | None = None,
width: int | None = None,
height: int | None = None,
) -> None:
"""Resize current or target pane.
NOTE: kwargs is only here to make calling this function easier
@ -287,7 +298,7 @@ def resize_pane(pane_id=None, width=None, height=None):
run_program(cmd, check=False)
def respawn_pane(pane_id, **action):
def respawn_pane(pane_id: str, **action) -> None:
"""Respawn pane with action."""
cmd = ['tmux', 'respawn-pane', '-k', '-t', pane_id]
cmd.extend(prep_action(**action))
@ -297,9 +308,12 @@ def respawn_pane(pane_id, **action):
def split_window(
lines=None, percent=None,
behind=False, vertical=False,
target_id=None, **action):
lines: int | None = None,
percent: int | None = None,
behind: bool = False,
vertical: bool = False,
target_id: str | None = None,
**action) -> str:
"""Split tmux window, run action, and return pane_id as str."""
cmd = ['tmux', 'split-window', '-d', '-PF', '#D']
@ -332,7 +346,7 @@ def split_window(
return proc.stdout.strip()
def zoom_pane(pane_id=None):
def zoom_pane(pane_id: str | None = None) -> None:
"""Toggle zoom status for current or target pane."""
cmd = ['tmux', 'resize-pane', '-Z']
if pane_id:

View file

@ -7,6 +7,7 @@ import time
from copy import deepcopy
from os import environ
from typing import Any
from wk.exe import start_thread
from wk.std import sleep
@ -29,12 +30,12 @@ TMUX_LAYOUT = { # NOTE: This needs to be in order from top to bottom
# Classes
class TUI():
"""Object for tracking TUI elements."""
def __init__(self, title_text=None) -> None:
self.layout = deepcopy(TMUX_LAYOUT)
self.side_width = TMUX_SIDE_WIDTH
self.title_text = title_text if title_text else 'Title Text'
self.title_text_line2 = ''
self.title_colors = ['BLUE', None]
def __init__(self, title_text: str | None = None):
self.layout: dict[str, dict[str, Any]] = deepcopy(TMUX_LAYOUT)
self.side_width: int = TMUX_SIDE_WIDTH
self.title_text: str = title_text if title_text else 'Title Text'
self.title_text_line2: str = ''
self.title_colors: list[str | None] = ['BLUE', None]
# Init tmux and start a background process to maintain layout
self.init_tmux()
@ -44,7 +45,11 @@ class TUI():
atexit.register(tmux.kill_all_panes)
def add_info_pane(
self, lines=None, percent=None, update_layout=True, **tmux_args,
self,
lines: int | None = None,
percent: int | None = None,
update_layout: bool = True,
**tmux_args,
) -> None:
"""Add info pane."""
if not (lines or percent):
@ -78,7 +83,12 @@ class TUI():
# Add pane
self.layout['Info']['Panes'].append(tmux.split_window(**tmux_args))
def add_title_pane(self, line1, line2=None, colors=None) -> None:
def add_title_pane(
self,
line1: str,
line2: str | None = None,
colors: list[str] | None = None,
) -> None:
"""Add pane to title row."""
lines = [line1, line2]
colors = colors if colors else self.title_colors.copy()
@ -105,7 +115,11 @@ class TUI():
self.layout['Title']['Panes'].append(tmux.split_window(**tmux_args))
def add_worker_pane(
self, lines=None, percent=None, update_layout=True, **tmux_args,
self,
lines: int | None = None,
percent: int | None = None,
update_layout: bool = True,
**tmux_args,
) -> None:
"""Add worker pane."""
height = lines
@ -142,7 +156,7 @@ class TUI():
"""Clear current pane height and update layout."""
self.layout['Current'].pop('height', None)
def fix_layout(self, forced=True) -> None:
def fix_layout(self, forced: bool = True) -> None:
"""Fix tmux layout based on self.layout."""
try:
tmux.fix_layout(self.layout, forced=forced)
@ -208,19 +222,24 @@ class TUI():
self.layout['Workers']['Panes'].clear()
tmux.kill_pane(*panes)
def set_current_pane_height(self, height) -> None:
def set_current_pane_height(self, height: int) -> None:
"""Set current pane height and update layout."""
self.layout['Current']['height'] = height
tmux.resize_pane(height=height)
def set_progress_file(self, progress_file) -> None:
def set_progress_file(self, progress_file: str) -> None:
"""Set the file to use for the progresse pane."""
tmux.respawn_pane(
pane_id=self.layout['Progress']['Panes'][0],
watch_file=progress_file,
)
def set_title(self, line1, line2=None, colors=None) -> None:
def set_title(
self,
line1: str,
line2: str | None = None,
colors: list[str] | None = None,
) -> None:
"""Set title text."""
self.title_text = line1
self.title_text_line2 = line2 if line2 else ''
@ -251,7 +270,7 @@ class TUI():
# Functions
def fix_layout(layout, forced=False):
def fix_layout(layout, forced: bool = False) -> None:
"""Fix pane sizes based on layout."""
resize_kwargs = []
@ -320,7 +339,7 @@ def fix_layout(layout, forced=False):
tmux.resize_pane(workers[1], height=next_height)
workers.pop(0)
def layout_needs_fixed(layout):
def layout_needs_fixed(layout) -> bool:
"""Check if layout needs fixed, returns bool."""
needs_fixed = False
@ -338,7 +357,7 @@ def layout_needs_fixed(layout):
# Done
return needs_fixed
def test():
def test() -> TUI:
"""TODO: Deleteme"""
ui = TUI()
ui.add_info_pane(lines=10, text='Info One')