Add more type hints to function arguments
This commit is contained in:
parent
12326a5e2c
commit
bf9d994675
2 changed files with 64 additions and 17 deletions
|
|
@ -5,6 +5,7 @@ import inspect
|
|||
import logging
|
||||
import lzma
|
||||
import os
|
||||
import pathlib
|
||||
import pickle
|
||||
import platform
|
||||
import re
|
||||
|
|
@ -12,6 +13,8 @@ import socket
|
|||
import sys
|
||||
import time
|
||||
|
||||
from typing import Any
|
||||
|
||||
import requests
|
||||
|
||||
from wk.cfg.net import CRASH_SERVER
|
||||
|
|
@ -78,7 +81,7 @@ def generate_debug_report() -> str:
|
|||
return '\n'.join(report)
|
||||
|
||||
|
||||
def generate_object_report(obj, indent=0) -> list[str]:
|
||||
def generate_object_report(obj: Any, indent: int = 0) -> list[str]:
|
||||
"""Generate debug report for obj, returns list."""
|
||||
report = []
|
||||
attr_list = []
|
||||
|
|
@ -109,7 +112,10 @@ def generate_object_report(obj, indent=0) -> list[str]:
|
|||
return report
|
||||
|
||||
|
||||
def save_pickles(obj_dict, out_path=None) -> None:
|
||||
def save_pickles(
|
||||
obj_dict: dict[Any, Any],
|
||||
out_path: pathlib.Path | str | None = None,
|
||||
) -> None:
|
||||
"""Save dict of objects using pickle."""
|
||||
LOG.info('Saving pickles')
|
||||
|
||||
|
|
@ -129,7 +135,11 @@ def save_pickles(obj_dict, out_path=None) -> None:
|
|||
LOG.error('Failed to save all the pickles', exc_info=True)
|
||||
|
||||
|
||||
def upload_debug_report(report, compress=True, reason='DEBUG') -> None:
|
||||
def upload_debug_report(
|
||||
report: str,
|
||||
compress: bool = True,
|
||||
reason: str = 'DEBUG',
|
||||
) -> None:
|
||||
"""Upload debug report to CRASH_SERVER as specified in wk.cfg.main."""
|
||||
LOG.info('Uploading debug report to %s', CRASH_SERVER.get('Name', '?'))
|
||||
headers = CRASH_SERVER.get('Headers', {'X-Requested-With': 'XMLHttpRequest'})
|
||||
|
|
|
|||
|
|
@ -4,13 +4,15 @@
|
|||
import json
|
||||
import logging
|
||||
import os
|
||||
import pathlib
|
||||
import re
|
||||
import subprocess
|
||||
import time
|
||||
|
||||
from io import BufferedReader, TextIOWrapper
|
||||
from queue import Queue, Empty
|
||||
from threading import Thread
|
||||
from typing import Any
|
||||
from typing import Any, Callable, Iterable
|
||||
|
||||
import psutil
|
||||
|
||||
|
|
@ -26,11 +28,11 @@ class NonBlockingStreamReader():
|
|||
## https://gist.github.com/EyalAr/7915597
|
||||
## https://stackoverflow.com/a/4896288
|
||||
|
||||
def __init__(self, stream):
|
||||
def __init__(self, stream: BufferedReader | TextIOWrapper):
|
||||
self.stream = stream
|
||||
self.queue = Queue()
|
||||
|
||||
def populate_queue(stream, queue) -> None:
|
||||
def populate_queue(stream: BufferedReader | TextIOWrapper, queue: Queue) -> None:
|
||||
"""Collect lines from stream and put them in queue."""
|
||||
while not stream.closed:
|
||||
try:
|
||||
|
|
@ -50,14 +52,14 @@ class NonBlockingStreamReader():
|
|||
"""Stop reading from input stream."""
|
||||
self.stream.close()
|
||||
|
||||
def read(self, timeout=None) -> Any:
|
||||
def read(self, timeout: float | int | None = None) -> Any:
|
||||
"""Read from queue if possible, returns item from queue."""
|
||||
try:
|
||||
return self.queue.get(block=timeout is not None, timeout=timeout)
|
||||
except Empty:
|
||||
return None
|
||||
|
||||
def save_to_file(self, proc, out_path) -> None:
|
||||
def save_to_file(self, proc: subprocess.Popen, out_path: pathlib.Path | str) -> None:
|
||||
"""Continuously save output to file while proc is running."""
|
||||
LOG.debug('Saving process %s output to %s', proc, out_path)
|
||||
while proc.poll() is None:
|
||||
|
|
@ -76,7 +78,11 @@ class NonBlockingStreamReader():
|
|||
|
||||
# Functions
|
||||
def build_cmd_kwargs(
|
||||
cmd, minimized=False, pipe=True, shell=False, **kwargs) -> dict[str, Any]:
|
||||
cmd: list[str],
|
||||
minimized: bool = False,
|
||||
pipe: bool = True,
|
||||
shell: bool = False,
|
||||
**kwargs) -> dict[str, Any]:
|
||||
"""Build kwargs for use by subprocess functions, returns dict.
|
||||
|
||||
Specifically subprocess.run() and subprocess.Popen().
|
||||
|
|
@ -125,7 +131,11 @@ def build_cmd_kwargs(
|
|||
|
||||
|
||||
def get_json_from_command(
|
||||
cmd, check=True, encoding='utf-8', errors='ignore') -> dict[Any, Any]:
|
||||
cmd: list[str],
|
||||
check: bool = True,
|
||||
encoding: str = 'utf-8',
|
||||
errors: str = 'ignore',
|
||||
) -> dict[Any, Any]:
|
||||
"""Capture JSON content from cmd output, returns dict.
|
||||
|
||||
If the data can't be decoded then either an exception is raised
|
||||
|
|
@ -144,7 +154,11 @@ def get_json_from_command(
|
|||
return json_data
|
||||
|
||||
|
||||
def get_procs(name, exact=True, try_again=True) -> list[psutil.Process]:
|
||||
def get_procs(
|
||||
name: str,
|
||||
exact: bool = True,
|
||||
try_again: bool = True,
|
||||
) -> list[psutil.Process]:
|
||||
"""Get process object(s) based on name, returns list of proc objects."""
|
||||
LOG.debug('name: %s, exact: %s', name, exact)
|
||||
processes = []
|
||||
|
|
@ -164,7 +178,12 @@ def get_procs(name, exact=True, try_again=True) -> list[psutil.Process]:
|
|||
return processes
|
||||
|
||||
|
||||
def kill_procs(name, exact=True, force=False, timeout=30) -> None:
|
||||
def kill_procs(
|
||||
name: str,
|
||||
exact: bool = True,
|
||||
force: bool = False,
|
||||
timeout: float | int = 30,
|
||||
) -> None:
|
||||
"""Kill all processes matching name (case-insensitively).
|
||||
|
||||
NOTE: Under Posix systems this will send SIGINT to allow processes
|
||||
|
|
@ -189,7 +208,12 @@ def kill_procs(name, exact=True, force=False, timeout=30) -> None:
|
|||
|
||||
|
||||
def popen_program(
|
||||
cmd, minimized=False, pipe=False, shell=False, **kwargs) -> subprocess.Popen:
|
||||
cmd: list[str],
|
||||
minimized: bool = False,
|
||||
pipe: bool = False,
|
||||
shell: bool = False,
|
||||
**kwargs: dict[Any, Any],
|
||||
) -> subprocess.Popen:
|
||||
"""Run program and return a subprocess.Popen object."""
|
||||
LOG.debug(
|
||||
'cmd: %s, minimized: %s, pipe: %s, shell: %s',
|
||||
|
|
@ -214,7 +238,12 @@ def popen_program(
|
|||
|
||||
|
||||
def run_program(
|
||||
cmd, check=True, pipe=True, shell=False, **kwargs) -> subprocess.CompletedProcess:
|
||||
cmd: list[str],
|
||||
check: bool = True,
|
||||
pipe: bool = True,
|
||||
shell: bool = False,
|
||||
**kwargs: dict[Any, Any],
|
||||
) -> subprocess.CompletedProcess:
|
||||
"""Run program and return a subprocess.CompletedProcess object."""
|
||||
LOG.debug(
|
||||
'cmd: %s, check: %s, pipe: %s, shell: %s',
|
||||
|
|
@ -238,7 +267,11 @@ def run_program(
|
|||
return proc
|
||||
|
||||
|
||||
def start_thread(function, args=None, daemon=True) -> Thread:
|
||||
def start_thread(
|
||||
function: Callable,
|
||||
args: Iterable[Any] | None = None,
|
||||
daemon: bool = True,
|
||||
) -> Thread:
|
||||
"""Run function as thread in background, returns Thread object."""
|
||||
LOG.debug(
|
||||
'Starting background thread for function: %s, args: %s, daemon: %s',
|
||||
|
|
@ -250,7 +283,7 @@ def start_thread(function, args=None, daemon=True) -> Thread:
|
|||
return thread
|
||||
|
||||
|
||||
def stop_process(proc, graceful=True) -> None:
|
||||
def stop_process(proc: subprocess.Popen, graceful: bool = True) -> None:
|
||||
"""Stop process.
|
||||
|
||||
NOTES: proc should be a subprocess.Popen obj.
|
||||
|
|
@ -272,7 +305,11 @@ def stop_process(proc, graceful=True) -> None:
|
|||
proc.kill()
|
||||
|
||||
|
||||
def wait_for_procs(name, exact=True, timeout=None) -> None:
|
||||
def wait_for_procs(
|
||||
name: str,
|
||||
exact: bool = True,
|
||||
timeout: float | int | None = None,
|
||||
) -> None:
|
||||
"""Wait for all process matching name."""
|
||||
LOG.debug('name: %s, exact: %s, timeout: %s', name, exact, timeout)
|
||||
target_procs = get_procs(name, exact=exact)
|
||||
|
|
|
|||
Loading…
Reference in a new issue