From 7cb5ecd09f8ea3ade8d9b8564fd48ab840f36254 Mon Sep 17 00:00:00 2001 From: 2Shirt <2xShirt@gmail.com> Date: Fri, 13 Sep 2019 22:35:39 -0700 Subject: [PATCH] Added TryAndPrint() class * Replaces try_and_print() function * Moved several functions to TryAndPrint() class * _format_exception_message() * _format_function_output() * _get_exception() * Separates the formatting settings and the function paramters --- scripts/wk/std.py | 389 ++++++++++++++++++++++++---------------------- 1 file changed, 200 insertions(+), 189 deletions(-) diff --git a/scripts/wk/std.py b/scripts/wk/std.py index 9d3f77f6..82dcd8eb 100644 --- a/scripts/wk/std.py +++ b/scripts/wk/std.py @@ -348,6 +348,206 @@ class Menu(): return self._resolve_selection(user_selection) +class TryAndPrint(): + """Object used to standardize running functions and returning the result. + + The errors and warning attributes are used to allow fine-tuned results + based on exception names. + """ + def __init__(self, msg_bad='FAILED', msg_good='SUCCESS'): + self.indent = INDENT + self.msg_bad = msg_bad + self.msg_good = msg_good + self.width = WIDTH + self.list_errors = ['GenericError'] + self.list_warnings = ['GenericWarning'] + + def _format_exception_message(self, _exception): + """Format using the exception's args or name, returns str.""" + # pylint: disable=broad-except + LOG.debug( + 'Formatting exception: %s', + _exception.__class__.__name__, + ) + message = None + + # Use known argument index or first string found + try: + if isinstance(_exception, subprocess.CalledProcessError): + message = _exception.stderr + if not isinstance(message, str): + message = message.decode('utf-8') + message = message.strip() + elif isinstance(_exception, FileNotFoundError): + message = _exception.args[1] + elif isinstance(_exception, ZeroDivisionError): + message = 'ZeroDivisionError' + else: + for arg in _exception.args: + if isinstance(arg, str): + message = arg + break + except Exception: + # Just use the exception name instead + pass + + # Safety check + if not message: + try: + message = _exception.__class__.__name__ + except Exception: + message = 'UNKNOWN ERROR' + + # Fix multi-line messages + if '\n' in message: + try: + lines = [ + f'{" "*(self.indent+self.width)}{line.strip()}' + for line in message.splitlines() if line.strip() + ] + lines[0] = lines[0].strip() + message = '\n'.join(lines) + except Exception: + pass + + # Done + return message + + def _format_function_output(self, output): + """Format function output for use in try_and_print(), returns str.""" + LOG.debug('Formatting output: %s', output) + + if not output: + raise GenericWarning('No output') + + # Ensure we're working with a list + if isinstance(output, subprocess.CompletedProcess): + stdout = output.stdout + if not isinstance(stdout, str): + stdout = stdout.decode('utf8') + output = stdout.strip().splitlines() + else: + output = list(output) + + # Safety check + if not output: + # Going to ignore empty function output for now + LOG.error('Output is empty') + return 'UNKNOWN' + + # Build result_msg + result_msg = f'{output.pop(0)}' + if output: + output = [f'{" "*(self.indent+self.width)}{line}' for line in output] + result_msg += '\n' + '\n'.join(output) + + # Done + return result_msg + + def _get_exception(self, name): + # pylint: disable=no-self-use + """Get exception by name, returns exception object. + + [Doctest] + >>> self._get_exception('AttributeError') + + >>> self._get_exception('CalledProcessError') + + >>> self._get_exception('GenericError') + + """ + LOG.debug('Getting exception: %s', name) + try: + obj = getattr(sys.modules[__name__], name) + except AttributeError: + # Try builtin classes + obj = getattr(sys.modules['builtins'], name) + return obj + + def add_error(self, exception_name): + """Add exception name to error list.""" + if exception_name not in self.list_errors: + self.list_errors.append(exception_name) + + def add_warning(self, exception_name): + """Add exception name to warning list.""" + if exception_name not in self.list_warnings: + self.list_warnings.append(exception_name) + + def run_function( + self, message, function, *args, + catch_all=True, print_return=False, verbose=False, **kwargs): + # pylint: disable=catching-non-exception + """Run a function and print the results, returns results as dict. + + If catch_all is True then (nearly) all exceptions will be caught. + Otherwise if an exception occurs that wasn't specified it will be + re-raised. + + If print_return is True then the output from the function will be used + instead of msg_good, msg_bad, or exception text. The output should be + a list or a subprocess.CompletedProcess object. + + If verbose is True then exception names or messages will be used for + the result message. Otherwise it will simply be set to result_bad. + + args and kwargs are passed to the function. + """ + LOG.debug('function: %s.%s', function.__module__, function.__name__) + LOG.debug('args: %s', args) + LOG.debug('kwargs: %s', kwargs) + LOG.debug( + 'catch_all: %s, print_return: %s, verbose: %s', + catch_all, + print_return, + verbose, + ) + f_exception = None + output = None + result_msg = 'UNKNOWN' + + # Build exception tuples + e_exceptions = tuple(self._get_exception(e) for e in self.list_errors) + w_exceptions = tuple(self._get_exception(e) for e in self.list_warnings) + + # Run function and catch exceptions + print(f'{" "*self.indent}{message:<{self.width}}', end='', flush=True) + LOG.info('Running function: %s.%s', function.__module__, function.__name__) + try: + output = function(*args, **kwargs) + if print_return: + result_msg = self._format_function_output(output) + print(result_msg) + else: + print_success(self.msg_good) + except w_exceptions as _exception: + result_msg = self._format_exception_message(_exception) + print_warning(result_msg) + f_exception = _exception + except e_exceptions as _exception: + result_msg = self._format_exception_message(_exception) + print_error(result_msg) + f_exception = _exception + except Exception as _exception: # pylint: disable=broad-except + if verbose: + result_msg = self._format_exception_message(_exception) + else: + result_msg = self.msg_bad + print_error(result_msg) + f_exception = _exception + if not catch_all: + # Re-raise error as necessary + raise + + # Done + LOG.info('Result: %s', result_msg.strip()) + return { + 'Failed': bool(f_exception), + 'Exception': f_exception, + 'Output': output, + } + + # Functions def abort(prompt='Aborted.', show_prompt=True, return_code=1): """Abort script.""" @@ -473,110 +673,6 @@ def clear_screen(): subprocess.run(cmd, check=False, shell=True, stderr=subprocess.PIPE) -def format_exception_message(_exception, indent=INDENT, width=WIDTH): - """Format using the exception's args or name, returns str.""" - # pylint: disable=broad-except - LOG.debug( - 'Formatting exception: %s', - _exception.__class__.__name__, - ) - message = None - - # Use known argument index or first string found - try: - if isinstance(_exception, subprocess.CalledProcessError): - message = _exception.stderr - if not isinstance(message, str): - message = message.decode('utf-8') - message = message.strip() - elif isinstance(_exception, FileNotFoundError): - message = _exception.args[1] - elif isinstance(_exception, ZeroDivisionError): - message = 'ZeroDivisionError' - else: - for arg in _exception.args: - if isinstance(arg, str): - message = arg - break - except Exception: - # Just use the exception name instead - pass - - # Safety check - if not message: - try: - message = _exception.__class__.__name__ - except Exception: - message = 'UNKNOWN ERROR' - - # Fix multi-line messages - if '\n' in message: - try: - lines = [ - f'{" "*(indent+width)}{line.strip()}' - for line in message.splitlines() if line.strip() - ] - lines[0] = lines[0].strip() - message = '\n'.join(lines) - except Exception: - pass - - # Done - return message - - -def format_function_output(output, indent=INDENT, width=WIDTH): - """Format function output for use in try_and_print(), returns str.""" - LOG.debug('Formatting output: %s', output) - - if not output: - raise GenericWarning('No output') - - # Ensure we're working with a list - if isinstance(output, subprocess.CompletedProcess): - stdout = output.stdout - if not isinstance(stdout, str): - stdout = stdout.decode('utf8') - output = stdout.strip().splitlines() - else: - output = list(output) - - # Safety check - if not output: - # Going to ignore empty function output for now - LOG.error('Output is empty') - return 'UNKNOWN' - - # Build result_msg - result_msg = f'{output.pop(0)}' - if output: - output = [f'{" "*(indent+width)}{line}' for line in output] - result_msg += '\n' + '\n'.join(output) - - # Done - return result_msg - - -def get_exception(name): - """Get exception by name, returns exception object. - - [Doctest] - >>> get_exception('AttributeError') - - >>> get_exception('CalledProcessError') - - >>> get_exception('GenericError') - - """ - LOG.debug('Getting exception: %s', name) - try: - obj = getattr(sys.modules[__name__], name) - except AttributeError: - # Try builtin classes - obj = getattr(sys.modules['builtins'], name) - return obj - - def get_log_filepath(): """Get the log filepath from the root logger, returns pathlib.Path obj. @@ -801,91 +897,6 @@ def strip_colors(string): return string -def try_and_print( - message, function, *args, - msg_good='CS', msg_bad='NS', indent=INDENT, width=WIDTH, - w_exceptions=None, e_exceptions=None, - catch_all=True, print_return=False, verbose=False, - **kwargs): - # pylint: disable=catching-non-exception,unused-argument,too-many-locals - """Run a function and print the results, returns results as dict. - - If catch_all is True then (nearly) all exceptions will be caught. - Otherwise if an exception occurs that wasn't specified it will be - re-raised. - - If print_return is True then the output from the function will be used - instead of msg_good, msg_bad, or exception text. The output should be - a list or a subprocess.CompletedProcess object. - - If verbose is True then exception names or messages will be used for - the result message. Otherwise it will simply be set to result_bad. - - If specified w_exceptions and e_exceptions should be lists of - exception class names. Details from the excceptions will be used to - format more clear result messages. - """ - LOG.debug('function: %s.%s', function.__module__, function.__name__) - LOG.debug('args: %s', args) - LOG.debug('kwargs: %s', kwargs) - LOG.debug('w_exceptions: %s', w_exceptions) - LOG.debug('e_exceptions: %s', e_exceptions) - LOG.debug( - 'catch_all: %s, print_return: %s, verbose: %s', - catch_all, - print_return, - verbose, - ) - f_exception = None - output = None - result_msg = 'UNKNOWN' - - # Build tuples of exceptions - if not w_exceptions: - w_exceptions = ('GenericWarning',) - if not e_exceptions: - e_exceptions = ('GenericError',) - w_exceptions = tuple(get_exception(e) for e in w_exceptions) - e_exceptions = tuple(get_exception(e) for e in e_exceptions) - - # Run function and catch exceptions - print(f'{" "*indent}{message:<{width}}', end='', flush=True) - LOG.info('Running function: %s.%s', function.__module__, function.__name__) - try: - output = function(*args, **kwargs) - if print_return: - result_msg = format_function_output(output, indent, width) - print(result_msg) - else: - print_success(msg_good) - except w_exceptions as _exception: - result_msg = format_exception_message(_exception, indent, width) - print_warning(result_msg) - f_exception = _exception - except e_exceptions as _exception: - result_msg = format_exception_message(_exception, indent, width) - print_error(result_msg) - f_exception = _exception - except Exception as _exception: # pylint: disable=broad-except - if verbose: - result_msg = format_exception_message(_exception, indent, width) - else: - result_msg = msg_bad - print_error(result_msg) - f_exception = _exception - if not catch_all: - # Re-raise error as necessary - raise - - # Done - LOG.info('Result: %s', result_msg.strip()) - return { - 'Failed': bool(f_exception), - 'Exception': f_exception, - 'Output': output, - } - - def upload_debug_report(report, compress=True, reason='DEBUG'): """Upload debug report to CRASH_SERVER as specified in wk.cfg.main.""" LOG.info('Uploading debug report to %s', CRASH_SERVER.get('Name', '?'))