"""WizardKit: CLI functions""" # vim: sts=2 sw=2 ts=2 import logging import os import platform import re import subprocess import sys import traceback from typing import Any, Callable, Iterable from prompt_toolkit import prompt from prompt_toolkit.document import Document from prompt_toolkit.validation import Validator, ValidationError try: from functools import cache except ImportError: # Assuming Python is < 3.9 from functools import lru_cache as cache from wk.cfg.main import ( ENABLED_UPLOAD_DATA, INDENT, SUPPORT_MESSAGE, WIDTH, ) from wk.std import (sleep, GenericWarning) from wk.ui.ansi import clear_screen, color_string, strip_colors # STATIC VARIABLES LOG = logging.getLogger(__name__) PLATFORM = platform.system() # Classes class InputChoiceValidator(Validator): """Validate that input is one of the provided choices.""" def __init__(self, choices: Iterable[str], allow_empty: bool = False): self.allow_empty: bool = allow_empty self.choices: list[str] = [str(c).upper() for c in choices] super().__init__() def validate(self, document: Document) -> None: text = document.text if not (text or self.allow_empty): raise ValidationError( message='This input is required!', cursor_position=len(text), ) if text and text.upper() not in self.choices: raise ValidationError( message='Invalid selection', cursor_position=len(text), ) class InputNotEmptyValidator(Validator): """Validate that input is not empty.""" def validate(self, document: Document) -> None: text = document.text if not text: raise ValidationError( message='This input is required!', cursor_position=len(text), ) class InputTicketIDValidator(Validator): """Validate that input resembles a ticket ID.""" def __init__(self, allow_empty: bool = False): self.allow_empty: bool = allow_empty super().__init__() def validate(self, document: Document) -> None: text = document.text if not (text or self.allow_empty): raise ValidationError( message='This input is required!', cursor_position=len(text), ) if text and not re.match(r'^\d', text): raise ValidationError( message='Ticket ID should start with a number!', cursor_position=len(text), ) class InputYesNoValidator(Validator): """Validate that input is a yes or no.""" def __init__(self, allow_empty: bool = False): self.allow_empty: bool = allow_empty super().__init__() def validate(self, document: Document) -> None: text = document.text if not (text or self.allow_empty): raise ValidationError( message='This input is required!', cursor_position=len(text), ) if text and not re.match(r'^(y(es|up|)|n(o|ope|))$', text, re.IGNORECASE): raise ValidationError( message='Please answer "yes" or "no"', cursor_position=len(text), ) class Menu(): """Object for tracking menu specific data and methods. ASSUMPTIONS: 1. All entry names are unique. 2. All action entry names start with different letters. """ def __init__(self, title: str = '[Untitled Menu]'): self.actions: dict[str, dict[Any, Any]] = {} self.options: dict[str, dict[Any, Any]] = {} self.sets: dict[str, dict[Any, Any]] = {} self.toggles: dict[str, dict[Any, Any]] = {} self.disabled_str: str = 'Disabled' self.separator: str = '─' self.title: str = title def _generate_menu_text(self) -> str: """Generate menu text, returns str.""" separator_string = self._get_separator_string() menu_lines = [self.title, separator_string] if self.title else [] # Sets & toggles for section in (self.sets, self.toggles): for details in section.values(): if details.get('Hidden', False): continue if details.get('Separator', False): menu_lines.append(separator_string) menu_lines.append(details['Display Name']) if self.sets or self.toggles: menu_lines.append(separator_string) # Options for details in self.options.values(): if details.get('Hidden', False): continue if details.get('Separator', False): menu_lines.append(separator_string) menu_lines.append(details['Display Name']) if self.options: menu_lines.append(separator_string) # Actions for details in self.actions.values(): if details.get('Hidden', False): continue if details.get('Separator', False): menu_lines.append(separator_string) menu_lines.append(details['Display Name']) # Show menu menu_lines.append('') menu_lines = [str(line) for line in menu_lines] return '\n'.join(menu_lines) def _get_display_name( self, name, details, index=None, no_checkboxes=True, setting_item=False) -> str: """Format display name based on details and args, returns str.""" disabled = details.get('Disabled', False) if setting_item and not details['Selected']: # Display item in YELLOW disabled = True checkmark = '*' if 'DISPLAY' in os.environ or PLATFORM == 'Darwin': checkmark = '✓' display_name = f'{index if index else name[:1].upper()}: ' if not (index and index >= 10): display_name = f' {display_name}' if setting_item and 'Value' in details: name = f'{name} = {details["Value"]}' # Add enabled status if necessary if not no_checkboxes: display_name += f'[{checkmark if details["Selected"] else " "}] ' # Add name if disabled: display_name += color_string(f'{name} ({self.disabled_str})', 'YELLOW') else: display_name += name # Done return display_name def _get_separator_string(self) -> str: """Format separator length based on name lengths, returns str.""" separator_length = 0 # Check title line(s) if self.title: for line in self.title.split('\n'): separator_length = max(separator_length, len(strip_colors(line))) # Loop over all item names for section in (self.actions, self.options, self.sets, self.toggles): for details in section.values(): if details.get('Hidden', False): # Skip hidden lines continue line = strip_colors(details['Display Name']) separator_length = max(separator_length, len(line)) separator_length += 1 # Done return self.separator * separator_length def _get_valid_answers(self) -> list[str]: """Get valid answers based on menu items, returns list.""" valid_answers = [] # Numbered items index = 0 for section in (self.sets, self.toggles, self.options): for details in section.values(): if details.get('Hidden', False): # Don't increment index or add to valid_answers continue index += 1 if not details.get('Disabled', False): valid_answers.append(str(index)) # Action items for name, details in self.actions.items(): if not details.get('Disabled', False): valid_answers.append(name[:1].upper()) # Done return valid_answers def _resolve_selection(self, selection: str) -> tuple[str, dict[Any, Any]]: """Get menu item based on user selection, returns tuple.""" offset = 1 resolved_selection = tuple() if selection.isnumeric(): # Enumerate over numbered entries entries = [ *self.sets.items(), *self.toggles.items(), *self.options.items(), ] for _i, details in enumerate(entries): if details[1].get('Hidden', False): offset -= 1 elif str(_i+offset) == selection: # TODO: Fix this typo! # It was discovered after being in production for SEVERAL YEARS! # Extra testing is needed to verify any calls to this function still # depend on this functionality resolved_selection = (details) break else: # Just check actions for action, details in self.actions.items(): if action.lower().startswith(selection.lower()): resolved_selection = (action, details) break # Done return resolved_selection def _update(self, single_selection: bool = True, settings_mode: bool = False) -> None: """Update menu items in preparation for printing to screen.""" index = 0 # Fix selection status for sets for set_details in self.sets.values(): set_selected = True set_targets = set_details['Targets'] for option, option_details in self.options.items(): if option in set_targets and not option_details['Selected']: set_selected = False elif option not in set_targets and option_details['Selected']: set_selected = False set_details['Selected'] = set_selected # Numbered sections for section in (self.sets, self.toggles, self.options): for name, details in section.items(): if details.get('Hidden', False): # Skip hidden lines and don't increment index continue index += 1 details['Display Name'] = self._get_display_name( name, details, index=index, no_checkboxes=single_selection, setting_item=settings_mode, ) # Actions for name, details in self.actions.items(): details['Display Name'] = self._get_display_name( name, details, no_checkboxes=True, ) def _update_entry_selection_status( self, entry: str, toggle: bool = True, status: bool = False) -> None: """Update entry selection status either directly or by toggling.""" if entry in self.sets: # Update targets not the set itself new_status = not self.sets[entry]['Selected'] if toggle else status targets = self.sets[entry]['Targets'] self._update_set_selection_status(targets, new_status) for section in (self.toggles, self.options, self.actions): if entry in section: if toggle: section[entry]['Selected'] = not section[entry]['Selected'] else: section[entry]['Selected'] = status def _update_set_selection_status(self, targets: Iterable[str], status: bool) -> None: """Select or deselect options based on targets and status.""" for option, details in self.options.items(): # If (new) status is True and this option is a target then select # Otherwise deselect details['Selected'] = status and option in targets def _user_select(self, prompt_msg: str) -> str: """Show menu and select an entry, returns str.""" menu_text = self._generate_menu_text() valid_answers = self._get_valid_answers() # Menu loop while True: clear_screen() print(menu_text) sleep(0.01) answer = input_text(prompt_msg).strip() if answer.upper() in valid_answers: break # Done return answer def add_action(self, name: str, details: dict[Any, Any] | None = None) -> None: """Add action to menu.""" details = details if details else {} details['Selected'] = details.get('Selected', False) self.actions[name] = details def add_option(self, name: str, details: dict[Any, Any] | None = None) -> None: """Add option to menu.""" details = details if details else {} details['Selected'] = details.get('Selected', False) self.options[name] = details def add_set(self, name: str, details: dict[Any, Any] | None = None) -> None: """Add set to menu.""" details = details if details else {} details['Selected'] = details.get('Selected', False) # Safety check if 'Targets' not in details: raise KeyError('Menu set has no targets') # Add set self.sets[name] = details def add_toggle(self, name: str, details: dict[Any, Any] | None = None) -> None: """Add toggle to menu.""" details = details if details else {} details['Selected'] = details.get('Selected', False) self.toggles[name] = details def advanced_select( self, prompt_msg: str = 'Please make a selection: ', ) -> tuple[str, dict[Any, Any]]: """Display menu and make multiple selections, returns tuple. NOTE: Menu is displayed until an action entry is selected. """ while True: self._update(single_selection=False) user_selection = self._user_select(prompt_msg) selected_entry = self._resolve_selection(user_selection) if user_selection.isnumeric(): # Update selection(s) self._update_entry_selection_status(selected_entry[0]) else: # Action selected break # Done return selected_entry def settings_select( self, prompt_msg: str = 'Please make a selection: ', ) -> tuple[str, dict[Any, Any]]: """Display menu and make multiple selections, returns tuple. NOTE: Menu is displayed until an action entry is selected. """ choice_kwargs = { 'prompt_msg': 'Toggle or change value?', 'choices': ['T', 'C'], } while True: self._update(single_selection=True, settings_mode=True) user_selection = self._user_select(prompt_msg) selected_entry = self._resolve_selection(user_selection) if user_selection.isnumeric(): if 'Value' in selected_entry[-1] and choice(**choice_kwargs) == 'C': # Change selected_entry[-1]['Value'] = input_text('Enter new value: ') else: # Toggle self._update_entry_selection_status(selected_entry[0]) else: # Action selected break # Done return selected_entry def simple_select( self, prompt_msg: str = 'Please make a selection: ', update: bool = True, ) -> tuple[str, dict[Any, Any]]: """Display menu and make a single selection, returns tuple.""" if update: self._update() user_selection = self._user_select(prompt_msg) return self._resolve_selection(user_selection) def update(self) -> None: """Update menu with default settings.""" self._update() class TryAndPrint(): """Object used to standardize running functions and returning the result. The errors and warning attributes are used to allow fine-tuned results based on exception names. """ def __init__(self, msg_bad: str = 'FAILED', msg_good: str = 'SUCCESS'): self.catch_all : bool = True self.indent: int = INDENT self.list_errors: list[str] = ['GenericError'] self.list_warnings: list[str] = ['GenericWarning'] self.msg_bad: str = msg_bad self.msg_good: str = msg_good self.verbose : bool = False self.width: int = WIDTH def _format_exception_message(self, _exception: Exception) -> str: """Format using the exception's args or name, returns str.""" LOG.debug( 'Formatting exception: %s, %s', _exception.__class__.__name__, _exception, ) message = '' # Format message string from _exception try: if isinstance(_exception, subprocess.CalledProcessError): message = _exception.stderr if not isinstance(message, str): message = message.decode('utf-8') message = message.strip() elif isinstance(_exception, ZeroDivisionError): # Skip and just use exception name below pass else: message = str(_exception) except Exception: # Just use the exception name instead pass # Prepend exception name if _exception.__class__.__name__ not in ('GenericError', 'GenericWarning'): try: message = f'{_exception.__class__.__name__}: {message}' except Exception: message = f'UNKNOWN ERROR: {message}' # Fix multi-line messages if '\n' in message: try: lines = [ f'{" "*(self.indent+self.width)}{line.strip()}' for line in message.splitlines() if line.strip() ] lines[0] = lines[0].strip() message = '\n'.join(lines) except Exception: pass # Done return message def _format_function_output( self, output: list | subprocess.CompletedProcess, msg_good: str, ) -> str: """Format function output for use in try_and_print(), returns str.""" LOG.debug('Formatting output: %s', output) if not output: raise GenericWarning('No output') # Ensure we're working with a list if isinstance(output, subprocess.CompletedProcess): stdout = output.stdout if not isinstance(stdout, str): stdout = stdout.decode('utf8') output = stdout.strip().splitlines() if not output: # Going to treat these as successes (for now) LOG.warning('Program output was empty, assuming good result.') return color_string(msg_good, 'GREEN') else: try: output = list(output) except TypeError: output = [output] # Safety check if not output: # Going to ignore empty function output for now LOG.error('Output is empty') return 'UNKNOWN' # Build result_msg result_msg = f'{output.pop(0)}' if output: output = [f'{" "*(self.indent+self.width)}{line}' for line in output] result_msg += '\n' + '\n'.join(output) # Done return result_msg def _log_result(self, message: str, result_msg: str) -> None: """Log result text without color formatting.""" log_text = f'{" "*self.indent}{message:<{self.width}}{result_msg}' for line in log_text.splitlines(): line = strip_colors(line) LOG.info(line) def add_error(self, exception_name: str) -> None: """Add exception name to error list.""" if exception_name not in self.list_errors: self.list_errors.append(exception_name) def add_warning(self, exception_name: str) -> None: """Add exception name to warning list.""" if exception_name not in self.list_warnings: self.list_warnings.append(exception_name) def run( self, message: str, function: Callable, *args: Iterable[Any], catch_all: bool | None = None, msg_good: str | None = None, verbose: bool | None = None, **kwargs, ) -> dict[str, Any]: """Run a function and print the results, returns results as dict. If catch_all is True then (nearly) all exceptions will be caught. Otherwise if an exception occurs that wasn't specified it will be re-raised. If the function returns data it will be used instead of msg_good, msg_bad, or exception text. The output should be a list or a subprocess.CompletedProcess object. If msg_good is passed it will override self.msg_good. If verbose is True then exception names or messages will be used for the result message. Otherwise it will simply be set to result_bad. If catch_all and/or verbose are passed it will override self.catch_all and/or self.verbose for this call. args and kwargs are passed to the function. """ LOG.debug('function: %s.%s', function.__module__, function.__name__) LOG.debug('args: %s', args) LOG.debug('kwargs: %s', kwargs) LOG.debug( 'catch_all: %s, msg_good: %s, verbose: %s', catch_all, msg_good, verbose, ) f_exception = None catch_all = catch_all if catch_all is not None else self.catch_all msg_good = msg_good if msg_good is not None else self.msg_good output = None result_msg = 'UNKNOWN' verbose = verbose if verbose is not None else self.verbose # Build exception tuples e_exceptions: tuple = tuple(get_exception(e) for e in self.list_errors) w_exceptions: tuple = tuple(get_exception(e) for e in self.list_warnings) # Run function and catch exceptions print(f'{" "*self.indent}{message:<{self.width}}', end='', flush=True) LOG.debug('Running function: %s.%s', function.__module__, function.__name__) try: output = function(*args, **kwargs) except w_exceptions as _exception: # Warnings result_msg = self._format_exception_message(_exception) print_warning(result_msg, log=False) f_exception = _exception except e_exceptions as _exception: # Exceptions result_msg = self._format_exception_message(_exception) print_error(result_msg, log=False) f_exception = _exception except Exception as _exception: # Unexpected exceptions if verbose: result_msg = self._format_exception_message(_exception) else: result_msg = self.msg_bad print_error(result_msg, log=False) f_exception = _exception if not catch_all: # Re-raise error as necessary raise else: # Success if output: result_msg = self._format_function_output(output, msg_good) print(result_msg) else: result_msg = msg_good print_success(result_msg, log=False) # Done self._log_result(message, result_msg) return { 'Exception': f_exception, 'Failed': bool(f_exception), 'Message': result_msg, 'Output': output, } # Functions def abort( prompt_msg: str = 'Aborted.', show_prompt_msg: bool = True, return_code: int = 1, ) -> None: """Abort script.""" print_warning(prompt_msg) if show_prompt_msg: sleep(0.5) pause(prompt_msg='Press Enter to exit... ') sys.exit(return_code) def ask(prompt_msg: str) -> bool: """Prompt the user with a Y/N question, returns bool.""" validator = InputYesNoValidator() # Show prompt response = input_text(f'{prompt_msg} [Y/N]: ', validator=validator) if response.upper().startswith('Y'): LOG.info('%sYes', prompt_msg) return True if response.upper().startswith('N'): LOG.info('%sNo', prompt_msg) return False # This shouldn't ever be reached raise ValueError(f'Invalid answer given: {response}') def beep(repeat: int = 1) -> None: """Play system bell with optional repeat.""" while repeat >= 1: # Print bell char without a newline print('\a', end='', flush=True) sleep(0.5) repeat -= 1 def choice(prompt_msg: str, choices: Iterable[str]) -> str: """Choose an option from a provided list, returns str. Choices provided will be converted to uppercase and returned as such. Similar to the commands choice (Windows) and select (Linux). """ LOG.debug('prompt_msg: %s, choices: %s', prompt_msg, choices) choices = [str(c).upper()[:1] for c in choices] prompt_msg = f'{prompt_msg} [{"/".join(choices)}]' # Show prompt response = input_text(prompt_msg, validator=InputChoiceValidator(choices)) # Done LOG.info('%s %s', prompt_msg, response) return response.upper() def fix_prompt(message: str) -> str: """Fix prompt, returns str.""" if not message: message = 'Input text: ' message = str(message) if message[-1:] != ' ': message += ' ' return message @cache def get_exception(name: str) -> Exception: """Get exception by name, returns exception object. [Doctest] >>> t = TryAndPrint() >>> t._get_exception('AttributeError') >>> t._get_exception('CalledProcessError') >>> t._get_exception('GenericError') """ LOG.debug('Getting exception: %s', name) obj = getattr(sys.modules[__name__], name, None) if obj: return obj # Try builtin classes obj = getattr(sys.modules['builtins'], name, None) if obj: return obj # Try all modules for _mod in sys.modules.values(): obj = getattr(_mod, name, None) if obj: break # Check if not found if not obj: raise AttributeError(f'Failed to find exception: {name}') # Done return obj def get_ticket_id() -> str: """Get ticket ID, returns str.""" prompt_msg = 'Please enter ticket ID:' validator = InputTicketIDValidator() # Show prompt ticket_id = input_text(prompt_msg, validator=validator) # Done return ticket_id def input_text( prompt_msg: str = 'Enter text: ', allow_empty: bool = False, validator: Validator | None = None, ) -> str: """Get input from user, returns str.""" prompt_msg = fix_prompt(prompt_msg) # Accept empty responses? if not (allow_empty or validator): validator = InputNotEmptyValidator() # Show prompt result = None while result is None: try: result = prompt(prompt_msg, validator=validator) except KeyboardInterrupt: # Ignore CTRL+c pass # Done return result def major_exception() -> None: """Display traceback, optionally upload detailes, and exit.""" LOG.critical('Major exception encountered', exc_info=True) print_error('Major exception', log=False) print_warning(SUPPORT_MESSAGE) if ENABLED_UPLOAD_DATA: print_warning('Also, please run upload-logs to help debugging!') print(traceback.format_exc()) # Done pause('Press Enter to exit... ') raise SystemExit(1) def pause(prompt_msg: str = 'Press Enter to continue... ') -> None: """Simple pause implementation.""" input_text(prompt_msg, allow_empty=True) def print_colored( strings: Iterable[str] | str, colors: Iterable[str | None] | str, log: bool = False, sep: str = ' ', **kwargs, ) -> None: """Prints strings in the colors specified.""" LOG.debug( 'strings: %s, colors: %s, sep: %s, kwargs: %s', strings, colors, sep, kwargs, ) msg = color_string(strings, colors, sep=sep) print_options = { 'end': kwargs.get('end', '\n'), 'file': kwargs.get('file', sys.stdout), 'flush': kwargs.get('flush', False), } print(msg, **print_options) if log: LOG.info(strip_colors(msg)) def print_error(msg: str, log: bool = True, **kwargs) -> None: """Prints message in RED and log as ERROR.""" if 'file' not in kwargs: # Only set if not specified kwargs['file'] = sys.stderr print_colored(msg, 'RED', **kwargs) if log: LOG.error(msg) def print_info(msg: str, log: bool = True, **kwargs) -> None: """Prints message in BLUE and log as INFO.""" print_colored(msg, 'BLUE', **kwargs) if log: LOG.info(msg) def print_report(report: list[str], indent=None, log: bool = True) -> None: """Print report to screen and optionally to log.""" for line in report: if indent: line = f'{" "*indent}{line}' print(line) if log: LOG.info(strip_colors(line)) def print_standard(msg: str, log: bool = True, **kwargs) -> None: """Prints message and log as INFO.""" print(msg, **kwargs) if log: LOG.info(msg) def print_success(msg: str, log: bool = True, **kwargs) -> None: """Prints message in GREEN and log as INFO.""" print_colored(msg, 'GREEN', **kwargs) if log: LOG.info(msg) def print_warning(msg: str, log: bool = True, **kwargs) -> None: """Prints message in YELLOW and log as WARNING.""" if 'file' not in kwargs: # Only set if not specified kwargs['file'] = sys.stderr print_colored(msg, 'YELLOW', **kwargs) if log: LOG.warning(msg) def set_title(title: str) -> None: """Set window title.""" LOG.debug('title: %s', title) if os.name == 'nt': os.system(f'title {title}') else: print_error('Setting the title is only supported under Windows.') def show_data( message: str, data: Any, color: str | None = None, indent: int | None = None, width: int | None = None, ) -> None: """Display info using default or provided indent and width.""" indent = INDENT if indent is None else indent width = WIDTH if width is None else width print_colored( (f'{" "*indent}{message:<{width}}', data), (None, color if color else None), log=True, sep='', ) if __name__ == '__main__': print("This file is not meant to be called directly.")