Added TryAndPrint() class

* Replaces try_and_print() function
* Moved several functions to TryAndPrint() class
  * _format_exception_message()
  * _format_function_output()
  * _get_exception()
* Separates the formatting settings and the function paramters
This commit is contained in:
2Shirt 2019-09-13 22:35:39 -07:00
parent 8d9e264efc
commit 7cb5ecd09f
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C

View file

@ -348,6 +348,206 @@ class Menu():
return self._resolve_selection(user_selection)
class TryAndPrint():
"""Object used to standardize running functions and returning the result.
The errors and warning attributes are used to allow fine-tuned results
based on exception names.
"""
def __init__(self, msg_bad='FAILED', msg_good='SUCCESS'):
self.indent = INDENT
self.msg_bad = msg_bad
self.msg_good = msg_good
self.width = WIDTH
self.list_errors = ['GenericError']
self.list_warnings = ['GenericWarning']
def _format_exception_message(self, _exception):
"""Format using the exception's args or name, returns str."""
# pylint: disable=broad-except
LOG.debug(
'Formatting exception: %s',
_exception.__class__.__name__,
)
message = None
# Use known argument index or first string found
try:
if isinstance(_exception, subprocess.CalledProcessError):
message = _exception.stderr
if not isinstance(message, str):
message = message.decode('utf-8')
message = message.strip()
elif isinstance(_exception, FileNotFoundError):
message = _exception.args[1]
elif isinstance(_exception, ZeroDivisionError):
message = 'ZeroDivisionError'
else:
for arg in _exception.args:
if isinstance(arg, str):
message = arg
break
except Exception:
# Just use the exception name instead
pass
# Safety check
if not message:
try:
message = _exception.__class__.__name__
except Exception:
message = 'UNKNOWN ERROR'
# Fix multi-line messages
if '\n' in message:
try:
lines = [
f'{" "*(self.indent+self.width)}{line.strip()}'
for line in message.splitlines() if line.strip()
]
lines[0] = lines[0].strip()
message = '\n'.join(lines)
except Exception:
pass
# Done
return message
def _format_function_output(self, output):
"""Format function output for use in try_and_print(), returns str."""
LOG.debug('Formatting output: %s', output)
if not output:
raise GenericWarning('No output')
# Ensure we're working with a list
if isinstance(output, subprocess.CompletedProcess):
stdout = output.stdout
if not isinstance(stdout, str):
stdout = stdout.decode('utf8')
output = stdout.strip().splitlines()
else:
output = list(output)
# Safety check
if not output:
# Going to ignore empty function output for now
LOG.error('Output is empty')
return 'UNKNOWN'
# Build result_msg
result_msg = f'{output.pop(0)}'
if output:
output = [f'{" "*(self.indent+self.width)}{line}' for line in output]
result_msg += '\n' + '\n'.join(output)
# Done
return result_msg
def _get_exception(self, name):
# pylint: disable=no-self-use
"""Get exception by name, returns exception object.
[Doctest]
>>> self._get_exception('AttributeError')
<class 'AttributeError'>
>>> self._get_exception('CalledProcessError')
<class 'subprocess.CalledProcessError'>
>>> self._get_exception('GenericError')
<class 'std.GenericError'>
"""
LOG.debug('Getting exception: %s', name)
try:
obj = getattr(sys.modules[__name__], name)
except AttributeError:
# Try builtin classes
obj = getattr(sys.modules['builtins'], name)
return obj
def add_error(self, exception_name):
"""Add exception name to error list."""
if exception_name not in self.list_errors:
self.list_errors.append(exception_name)
def add_warning(self, exception_name):
"""Add exception name to warning list."""
if exception_name not in self.list_warnings:
self.list_warnings.append(exception_name)
def run_function(
self, message, function, *args,
catch_all=True, print_return=False, verbose=False, **kwargs):
# pylint: disable=catching-non-exception
"""Run a function and print the results, returns results as dict.
If catch_all is True then (nearly) all exceptions will be caught.
Otherwise if an exception occurs that wasn't specified it will be
re-raised.
If print_return is True then the output from the function will be used
instead of msg_good, msg_bad, or exception text. The output should be
a list or a subprocess.CompletedProcess object.
If verbose is True then exception names or messages will be used for
the result message. Otherwise it will simply be set to result_bad.
args and kwargs are passed to the function.
"""
LOG.debug('function: %s.%s', function.__module__, function.__name__)
LOG.debug('args: %s', args)
LOG.debug('kwargs: %s', kwargs)
LOG.debug(
'catch_all: %s, print_return: %s, verbose: %s',
catch_all,
print_return,
verbose,
)
f_exception = None
output = None
result_msg = 'UNKNOWN'
# Build exception tuples
e_exceptions = tuple(self._get_exception(e) for e in self.list_errors)
w_exceptions = tuple(self._get_exception(e) for e in self.list_warnings)
# Run function and catch exceptions
print(f'{" "*self.indent}{message:<{self.width}}', end='', flush=True)
LOG.info('Running function: %s.%s', function.__module__, function.__name__)
try:
output = function(*args, **kwargs)
if print_return:
result_msg = self._format_function_output(output)
print(result_msg)
else:
print_success(self.msg_good)
except w_exceptions as _exception:
result_msg = self._format_exception_message(_exception)
print_warning(result_msg)
f_exception = _exception
except e_exceptions as _exception:
result_msg = self._format_exception_message(_exception)
print_error(result_msg)
f_exception = _exception
except Exception as _exception: # pylint: disable=broad-except
if verbose:
result_msg = self._format_exception_message(_exception)
else:
result_msg = self.msg_bad
print_error(result_msg)
f_exception = _exception
if not catch_all:
# Re-raise error as necessary
raise
# Done
LOG.info('Result: %s', result_msg.strip())
return {
'Failed': bool(f_exception),
'Exception': f_exception,
'Output': output,
}
# Functions
def abort(prompt='Aborted.', show_prompt=True, return_code=1):
"""Abort script."""
@ -473,110 +673,6 @@ def clear_screen():
subprocess.run(cmd, check=False, shell=True, stderr=subprocess.PIPE)
def format_exception_message(_exception, indent=INDENT, width=WIDTH):
"""Format using the exception's args or name, returns str."""
# pylint: disable=broad-except
LOG.debug(
'Formatting exception: %s',
_exception.__class__.__name__,
)
message = None
# Use known argument index or first string found
try:
if isinstance(_exception, subprocess.CalledProcessError):
message = _exception.stderr
if not isinstance(message, str):
message = message.decode('utf-8')
message = message.strip()
elif isinstance(_exception, FileNotFoundError):
message = _exception.args[1]
elif isinstance(_exception, ZeroDivisionError):
message = 'ZeroDivisionError'
else:
for arg in _exception.args:
if isinstance(arg, str):
message = arg
break
except Exception:
# Just use the exception name instead
pass
# Safety check
if not message:
try:
message = _exception.__class__.__name__
except Exception:
message = 'UNKNOWN ERROR'
# Fix multi-line messages
if '\n' in message:
try:
lines = [
f'{" "*(indent+width)}{line.strip()}'
for line in message.splitlines() if line.strip()
]
lines[0] = lines[0].strip()
message = '\n'.join(lines)
except Exception:
pass
# Done
return message
def format_function_output(output, indent=INDENT, width=WIDTH):
"""Format function output for use in try_and_print(), returns str."""
LOG.debug('Formatting output: %s', output)
if not output:
raise GenericWarning('No output')
# Ensure we're working with a list
if isinstance(output, subprocess.CompletedProcess):
stdout = output.stdout
if not isinstance(stdout, str):
stdout = stdout.decode('utf8')
output = stdout.strip().splitlines()
else:
output = list(output)
# Safety check
if not output:
# Going to ignore empty function output for now
LOG.error('Output is empty')
return 'UNKNOWN'
# Build result_msg
result_msg = f'{output.pop(0)}'
if output:
output = [f'{" "*(indent+width)}{line}' for line in output]
result_msg += '\n' + '\n'.join(output)
# Done
return result_msg
def get_exception(name):
"""Get exception by name, returns exception object.
[Doctest]
>>> get_exception('AttributeError')
<class 'AttributeError'>
>>> get_exception('CalledProcessError')
<class 'subprocess.CalledProcessError'>
>>> get_exception('GenericError')
<class 'std.GenericError'>
"""
LOG.debug('Getting exception: %s', name)
try:
obj = getattr(sys.modules[__name__], name)
except AttributeError:
# Try builtin classes
obj = getattr(sys.modules['builtins'], name)
return obj
def get_log_filepath():
"""Get the log filepath from the root logger, returns pathlib.Path obj.
@ -801,91 +897,6 @@ def strip_colors(string):
return string
def try_and_print(
message, function, *args,
msg_good='CS', msg_bad='NS', indent=INDENT, width=WIDTH,
w_exceptions=None, e_exceptions=None,
catch_all=True, print_return=False, verbose=False,
**kwargs):
# pylint: disable=catching-non-exception,unused-argument,too-many-locals
"""Run a function and print the results, returns results as dict.
If catch_all is True then (nearly) all exceptions will be caught.
Otherwise if an exception occurs that wasn't specified it will be
re-raised.
If print_return is True then the output from the function will be used
instead of msg_good, msg_bad, or exception text. The output should be
a list or a subprocess.CompletedProcess object.
If verbose is True then exception names or messages will be used for
the result message. Otherwise it will simply be set to result_bad.
If specified w_exceptions and e_exceptions should be lists of
exception class names. Details from the excceptions will be used to
format more clear result messages.
"""
LOG.debug('function: %s.%s', function.__module__, function.__name__)
LOG.debug('args: %s', args)
LOG.debug('kwargs: %s', kwargs)
LOG.debug('w_exceptions: %s', w_exceptions)
LOG.debug('e_exceptions: %s', e_exceptions)
LOG.debug(
'catch_all: %s, print_return: %s, verbose: %s',
catch_all,
print_return,
verbose,
)
f_exception = None
output = None
result_msg = 'UNKNOWN'
# Build tuples of exceptions
if not w_exceptions:
w_exceptions = ('GenericWarning',)
if not e_exceptions:
e_exceptions = ('GenericError',)
w_exceptions = tuple(get_exception(e) for e in w_exceptions)
e_exceptions = tuple(get_exception(e) for e in e_exceptions)
# Run function and catch exceptions
print(f'{" "*indent}{message:<{width}}', end='', flush=True)
LOG.info('Running function: %s.%s', function.__module__, function.__name__)
try:
output = function(*args, **kwargs)
if print_return:
result_msg = format_function_output(output, indent, width)
print(result_msg)
else:
print_success(msg_good)
except w_exceptions as _exception:
result_msg = format_exception_message(_exception, indent, width)
print_warning(result_msg)
f_exception = _exception
except e_exceptions as _exception:
result_msg = format_exception_message(_exception, indent, width)
print_error(result_msg)
f_exception = _exception
except Exception as _exception: # pylint: disable=broad-except
if verbose:
result_msg = format_exception_message(_exception, indent, width)
else:
result_msg = msg_bad
print_error(result_msg)
f_exception = _exception
if not catch_all:
# Re-raise error as necessary
raise
# Done
LOG.info('Result: %s', result_msg.strip())
return {
'Failed': bool(f_exception),
'Exception': f_exception,
'Output': output,
}
def upload_debug_report(report, compress=True, reason='DEBUG'):
"""Upload debug report to CRASH_SERVER as specified in wk.cfg.main."""
LOG.info('Uploading debug report to %s', CRASH_SERVER.get('Name', '?'))