Moved exe functions to a separate file

This commit is contained in:
2Shirt 2019-09-18 18:49:56 -07:00
parent 2678ce77da
commit 304d811698
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C
6 changed files with 125 additions and 118 deletions

View file

@ -25,7 +25,7 @@ def main():
# Done
print('Done.')
wk.std.pause('Press Enter to reboot...')
wk.std.run_program('shutdown -r -t 3'.split(), check=False)
wk.exe.run_program('shutdown -r -t 3'.split(), check=False)
if __name__ == '__main__':

View file

@ -25,7 +25,7 @@ def main():
# Done
print('Done.')
wk.std.pause('Press Enter to reboot...')
wk.std.run_program('shutdown -r -t 3'.split(), check=False)
wk.exe.run_program('shutdown -r -t 3'.split(), check=False)
if __name__ == '__main__':

View file

@ -4,6 +4,7 @@
from sys import version_info as version
from wk import cfg
from wk import exe
from wk import hw
from wk import io
from wk import kit

120
scripts/wk/exe.py Normal file
View file

@ -0,0 +1,120 @@
"""WizardKit: Executable functions"""
#vim: sts=2 sw=2 ts=2
import re
import subprocess
import psutil
# Functions
def build_cmd_kwargs(cmd, minimized=False, pipe=True, shell=False, **kwargs):
"""Build kwargs for use by subprocess functions, returns dict.
Specifically subprocess.run() and subprocess.Popen().
NOTE: If no encoding specified then UTF-8 will be used.
"""
cmd_kwargs = {
'args': cmd,
'shell': shell,
}
# Add additional kwargs if applicable
for key in ('check', 'cwd', 'encoding', 'errors', 'stderr', 'stdout'):
if key in kwargs:
cmd_kwargs[key] = kwargs[key]
# Default to UTF-8 encoding
if not ('encoding' in cmd_kwargs or 'errors' in cmd_kwargs):
cmd_kwargs['encoding'] = 'utf-8'
cmd_kwargs['errors'] = 'ignore'
# Start minimized
if minimized:
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags = subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = 6
cmd_kwargs['startupinfo'] = startupinfo
# Pipe output
if pipe:
cmd_kwargs['stderr'] = subprocess.PIPE
cmd_kwargs['stdout'] = subprocess.PIPE
# Done
return cmd_kwargs
def get_procs(name, exact=True):
"""Get process object(s) based on name, returns list of proc objects."""
processes = []
regex = f'^{name}$' if exact else name
# Iterate over all processes
for proc in psutil.process_iter():
if re.search(regex, proc.name(), re.IGNORECASE):
processes.append(proc)
# Done
return processes
def kill_procs(name, exact=True, force=False, timeout=30):
"""Kill all processes matching name (case-insensitively).
NOTE: Under Posix systems this will send SIGINT to allow processes
to gracefully exit.
If force is True then it will wait until timeout specified and then
send SIGKILL to any processes still alive.
"""
target_procs = get_procs(name, exact=exact)
for proc in target_procs:
proc.terminate()
# Force kill if necesary
if force:
results = psutil.wait_procs(target_procs, timeout=timeout)
for proc in results[1]: # Alive processes
proc.kill()
def popen_program(cmd, pipe=False, minimized=False, shell=False, **kwargs):
"""Run program and return a subprocess.Popen object."""
cmd_kwargs = build_cmd_kwargs(
cmd,
minimized=minimized,
pipe=pipe,
shell=shell,
**kwargs)
# Ready to run program
return subprocess.Popen(**cmd_kwargs)
def run_program(cmd, check=True, pipe=True, shell=False, **kwargs):
"""Run program and return a subprocess.CompletedProcess object."""
cmd_kwargs = build_cmd_kwargs(
cmd,
check=check,
pipe=pipe,
shell=shell,
**kwargs)
# Ready to run program
return subprocess.run(**cmd_kwargs)
def wait_for_procs(name, exact=True, timeout=None):
"""Wait for all process matching name."""
target_procs = get_procs(name, exact=exact)
results = psutil.wait_procs(target_procs, timeout=timeout)
# Raise exception if necessary
if results[1]: # Alive processes
raise psutil.TimeoutExpired(name=name, seconds=timeout)
if __name__ == '__main__':
print("This file is not meant to be called directly.")

View file

@ -8,12 +8,9 @@ import re
import time
from wk import cfg
from wk.exe import run_program
from wk.io import non_clobber_path
from wk.std import (
GenericError,
GenericWarning,
run_program,
)
from wk.std import GenericError, GenericWarning
# STATIC VARIABLES
LOG = logging.getLogger(__name__)

View file

@ -1,5 +1,4 @@
"""WizardKit: Standard Functions"""
# pylint: disable=too-many-lines
# vim: sts=2 sw=2 ts=2
import itertools
@ -22,8 +21,6 @@ except ImportError:
# Not worried about this under Windows
raise
import psutil
from wk.cfg.main import (
CRASH_SERVER,
ENABLED_UPLOAD_DATA,
@ -589,44 +586,6 @@ def beep(repeat=1):
repeat -= 1
def build_cmd_kwargs(cmd, minimized=False, pipe=True, shell=False, **kwargs):
"""Build kwargs for use by subprocess functions, returns dict.
Specifically subprocess.run() and subprocess.Popen().
NOTE: If no encoding specified then UTF-8 will be used.
"""
cmd_kwargs = {
'args': cmd,
'shell': shell,
}
# Add additional kwargs if applicable
for key in ('check', 'cwd', 'encoding', 'errors', 'stderr', 'stdout'):
if key in kwargs:
cmd_kwargs[key] = kwargs[key]
# Default to UTF-8 encoding
if not ('encoding' in cmd_kwargs or 'errors' in cmd_kwargs):
cmd_kwargs['encoding'] = 'utf-8'
cmd_kwargs['errors'] = 'ignore'
# Start minimized
if minimized:
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags = subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = 6
cmd_kwargs['startupinfo'] = startupinfo
# Pipe output
if pipe:
cmd_kwargs['stderr'] = subprocess.PIPE
cmd_kwargs['stdout'] = subprocess.PIPE
# Done
return cmd_kwargs
def bytes_to_string(size, decimals=0, use_binary=True):
"""Convert size into a human-readable format, returns str.
@ -775,20 +734,6 @@ def get_log_filepath():
return log_filepath
def get_procs(name, exact=True):
"""Get process object(s) based on name, returns list of proc objects."""
processes = []
regex = f'^{name}$' if exact else name
# Iterate over all processes
for proc in psutil.process_iter():
if re.search(regex, proc.name(), re.IGNORECASE):
processes.append(proc)
# Done
return processes
def input_text(prompt='Enter text'):
"""Get text from user, returns string."""
prompt = str(prompt)
@ -811,26 +756,6 @@ def input_text(prompt='Enter text'):
return response
def kill_procs(name, exact=True, force=False, timeout=30):
"""Kill all processes matching name (case-insensitively).
NOTE: Under Posix systems this will send SIGINT to allow processes
to gracefully exit.
If force is True then it will wait until timeout specified and then
send SIGKILL to any processes still alive.
"""
target_procs = get_procs(name, exact=exact)
for proc in target_procs:
proc.terminate()
# Force kill if necesary
if force:
results = psutil.wait_procs(target_procs, timeout=timeout)
for proc in results[1]: # Alive processes
proc.kill()
def major_exception():
"""Display traceback, optionally upload detailes, and exit."""
LOG.critical('Major exception encountered', exc_info=True)
@ -864,19 +789,6 @@ def pause(prompt='Press Enter to continue... '):
input_text(prompt)
def popen_program(cmd, pipe=False, minimized=False, shell=False, **kwargs):
"""Run program and return a subprocess.Popen object."""
cmd_kwargs = build_cmd_kwargs(
cmd,
minimized=minimized,
pipe=pipe,
shell=shell,
**kwargs)
# Ready to run program
return subprocess.Popen(**cmd_kwargs)
def print_colored(strings, colors, **kwargs):
"""Prints strings in the colors specified."""
LOG.debug('strings: %s, colors: %s, kwargs: %s', strings, colors, kwargs)
@ -926,19 +838,6 @@ def print_warning(msg, **kwargs):
print_colored([msg], ['YELLOW'], **kwargs)
def run_program(cmd, check=True, pipe=True, shell=False, **kwargs):
"""Run program and return a subprocess.CompletedProcess object."""
cmd_kwargs = build_cmd_kwargs(
cmd,
check=check,
pipe=pipe,
shell=shell,
**kwargs)
# Ready to run program
return subprocess.run(**cmd_kwargs)
def set_title(title):
"""Set window title."""
if os.name == 'nt':
@ -1040,15 +939,5 @@ def upload_debug_report(report, compress=True, reason='DEBUG'):
raise RuntimeError('Failed to upload report')
def wait_for_procs(name, exact=True, timeout=None):
"""Wait for all process matching name."""
target_procs = get_procs(name, exact=exact)
results = psutil.wait_procs(target_procs, timeout=timeout)
# Raise exception if necessary
if results[1]: # Alive processes
raise psutil.TimeoutExpired(name=name, seconds=timeout)
if __name__ == '__main__':
print("This file is not meant to be called directly.")