diff --git a/scripts/wk/debug.py b/scripts/wk/debug.py index e11406c9..6b4e3445 100644 --- a/scripts/wk/debug.py +++ b/scripts/wk/debug.py @@ -5,6 +5,7 @@ import inspect import logging import lzma import os +import pathlib import pickle import platform import re @@ -12,6 +13,8 @@ import socket import sys import time +from typing import Any + import requests from wk.cfg.net import CRASH_SERVER @@ -78,7 +81,7 @@ def generate_debug_report() -> str: return '\n'.join(report) -def generate_object_report(obj, indent=0) -> list[str]: +def generate_object_report(obj: Any, indent: int = 0) -> list[str]: """Generate debug report for obj, returns list.""" report = [] attr_list = [] @@ -109,7 +112,10 @@ def generate_object_report(obj, indent=0) -> list[str]: return report -def save_pickles(obj_dict, out_path=None) -> None: +def save_pickles( + obj_dict: dict[Any, Any], + out_path: pathlib.Path | str | None = None, + ) -> None: """Save dict of objects using pickle.""" LOG.info('Saving pickles') @@ -129,7 +135,11 @@ def save_pickles(obj_dict, out_path=None) -> None: LOG.error('Failed to save all the pickles', exc_info=True) -def upload_debug_report(report, compress=True, reason='DEBUG') -> None: +def upload_debug_report( + report: str, + compress: bool = True, + reason: str = 'DEBUG', + ) -> None: """Upload debug report to CRASH_SERVER as specified in wk.cfg.main.""" LOG.info('Uploading debug report to %s', CRASH_SERVER.get('Name', '?')) headers = CRASH_SERVER.get('Headers', {'X-Requested-With': 'XMLHttpRequest'}) diff --git a/scripts/wk/exe.py b/scripts/wk/exe.py index 167225f5..00ab62d6 100644 --- a/scripts/wk/exe.py +++ b/scripts/wk/exe.py @@ -4,13 +4,15 @@ import json import logging import os +import pathlib import re import subprocess import time +from io import BufferedReader, TextIOWrapper from queue import Queue, Empty from threading import Thread -from typing import Any +from typing import Any, Callable, Iterable import psutil @@ -26,11 +28,11 @@ class NonBlockingStreamReader(): ## https://gist.github.com/EyalAr/7915597 ## https://stackoverflow.com/a/4896288 - def __init__(self, stream): + def __init__(self, stream: BufferedReader | TextIOWrapper): self.stream = stream self.queue = Queue() - def populate_queue(stream, queue) -> None: + def populate_queue(stream: BufferedReader | TextIOWrapper, queue: Queue) -> None: """Collect lines from stream and put them in queue.""" while not stream.closed: try: @@ -50,14 +52,14 @@ class NonBlockingStreamReader(): """Stop reading from input stream.""" self.stream.close() - def read(self, timeout=None) -> Any: + def read(self, timeout: float | int | None = None) -> Any: """Read from queue if possible, returns item from queue.""" try: return self.queue.get(block=timeout is not None, timeout=timeout) except Empty: return None - def save_to_file(self, proc, out_path) -> None: + def save_to_file(self, proc: subprocess.Popen, out_path: pathlib.Path | str) -> None: """Continuously save output to file while proc is running.""" LOG.debug('Saving process %s output to %s', proc, out_path) while proc.poll() is None: @@ -76,7 +78,11 @@ class NonBlockingStreamReader(): # Functions def build_cmd_kwargs( - cmd, minimized=False, pipe=True, shell=False, **kwargs) -> dict[str, Any]: + cmd: list[str], + minimized: bool = False, + pipe: bool = True, + shell: bool = False, + **kwargs) -> dict[str, Any]: """Build kwargs for use by subprocess functions, returns dict. Specifically subprocess.run() and subprocess.Popen(). @@ -125,7 +131,11 @@ def build_cmd_kwargs( def get_json_from_command( - cmd, check=True, encoding='utf-8', errors='ignore') -> dict[Any, Any]: + cmd: list[str], + check: bool = True, + encoding: str = 'utf-8', + errors: str = 'ignore', + ) -> dict[Any, Any]: """Capture JSON content from cmd output, returns dict. If the data can't be decoded then either an exception is raised @@ -144,7 +154,11 @@ def get_json_from_command( return json_data -def get_procs(name, exact=True, try_again=True) -> list[psutil.Process]: +def get_procs( + name: str, + exact: bool = True, + try_again: bool = True, + ) -> list[psutil.Process]: """Get process object(s) based on name, returns list of proc objects.""" LOG.debug('name: %s, exact: %s', name, exact) processes = [] @@ -164,7 +178,12 @@ def get_procs(name, exact=True, try_again=True) -> list[psutil.Process]: return processes -def kill_procs(name, exact=True, force=False, timeout=30) -> None: +def kill_procs( + name: str, + exact: bool = True, + force: bool = False, + timeout: float | int = 30, + ) -> None: """Kill all processes matching name (case-insensitively). NOTE: Under Posix systems this will send SIGINT to allow processes @@ -189,7 +208,12 @@ def kill_procs(name, exact=True, force=False, timeout=30) -> None: def popen_program( - cmd, minimized=False, pipe=False, shell=False, **kwargs) -> subprocess.Popen: + cmd: list[str], + minimized: bool = False, + pipe: bool = False, + shell: bool = False, + **kwargs: dict[Any, Any], + ) -> subprocess.Popen: """Run program and return a subprocess.Popen object.""" LOG.debug( 'cmd: %s, minimized: %s, pipe: %s, shell: %s', @@ -214,7 +238,12 @@ def popen_program( def run_program( - cmd, check=True, pipe=True, shell=False, **kwargs) -> subprocess.CompletedProcess: + cmd: list[str], + check: bool = True, + pipe: bool = True, + shell: bool = False, + **kwargs: dict[Any, Any], + ) -> subprocess.CompletedProcess: """Run program and return a subprocess.CompletedProcess object.""" LOG.debug( 'cmd: %s, check: %s, pipe: %s, shell: %s', @@ -238,7 +267,11 @@ def run_program( return proc -def start_thread(function, args=None, daemon=True) -> Thread: +def start_thread( + function: Callable, + args: Iterable[Any] | None = None, + daemon: bool = True, + ) -> Thread: """Run function as thread in background, returns Thread object.""" LOG.debug( 'Starting background thread for function: %s, args: %s, daemon: %s', @@ -250,7 +283,7 @@ def start_thread(function, args=None, daemon=True) -> Thread: return thread -def stop_process(proc, graceful=True) -> None: +def stop_process(proc: subprocess.Popen, graceful: bool = True) -> None: """Stop process. NOTES: proc should be a subprocess.Popen obj. @@ -272,7 +305,11 @@ def stop_process(proc, graceful=True) -> None: proc.kill() -def wait_for_procs(name, exact=True, timeout=None) -> None: +def wait_for_procs( + name: str, + exact: bool = True, + timeout: float | int | None = None, + ) -> None: """Wait for all process matching name.""" LOG.debug('name: %s, exact: %s, timeout: %s', name, exact, timeout) target_procs = get_procs(name, exact=exact)