From 89f62562f06e2e292a1f81534e4f0c7c6b94e560 Mon Sep 17 00:00:00 2001 From: 2Shirt <2xShirt@gmail.com> Date: Wed, 2 Oct 2019 22:58:24 -0700 Subject: [PATCH] Removed reference sections from wk.prev --- scripts/wk.prev/functions/common.py | 520 --------------------------- scripts/wk.prev/functions/network.py | 53 --- 2 files changed, 573 deletions(-) delete mode 100644 scripts/wk.prev/functions/network.py diff --git a/scripts/wk.prev/functions/common.py b/scripts/wk.prev/functions/common.py index 689cc85f..f2e019d3 100644 --- a/scripts/wk.prev/functions/common.py +++ b/scripts/wk.prev/functions/common.py @@ -99,103 +99,6 @@ class WindowsUnsupportedError(Exception): # General functions -def abort(show_prompt=True): - """Abort script.""" - print_warning('Aborted.') - if show_prompt: - sleep(1) - pause(prompt='Press Enter to exit... ') - exit_script(1) - - -def ask(prompt='Kotaero!'): - """Prompt the user with a Y/N question, returns bool.""" - answer = None - prompt = '{} [Y/N]: '.format(prompt) - while answer is None: - tmp = input(prompt) - if re.search(r'^y(es|)$', tmp, re.IGNORECASE): - answer = True - elif re.search(r'^n(o|ope|)$', tmp, re.IGNORECASE): - answer = False - message = '{prompt}{answer_text}'.format( - prompt = prompt, - answer_text = 'Yes' if answer else 'No') - print_log(message=message) - return answer - - -def beep(repeat=1): - """Play system bell with optional repeat.""" - for i in range(repeat): - # Print bell char - print('\a') - sleep(0.5) - - -def choice(choices, prompt='Kotaero!'): - """Prompt the user with a choice question, returns str.""" - answer = None - choices = [str(c) for c in choices] - choices_short = {c[:1].upper(): c for c in choices} - prompt = '{} [{}]: '.format(prompt, '/'.join(choices)) - regex = '^({}|{})$'.format( - '|'.join([c[:1] for c in choices]), - '|'.join(choices)) - - # Get user's choice - while answer is None: - tmp = input(prompt) - if re.search(regex, tmp, re.IGNORECASE): - answer = tmp - - # Log result - message = '{prompt}{answer_text}'.format( - prompt = prompt, - answer_text = 'Yes' if answer else 'No') - print_log(message=message) - - # Fix answer formatting to match provided values - answer = choices_short[answer[:1].upper()] - - # Done - return answer - - -def clear_screen(): - """Simple wrapper for cls/clear.""" - if psutil.WINDOWS: - os.system('cls') - else: - os.system('clear') - - -def convert_to_bytes(size): - """Convert human-readable size str to bytes and return an int.""" - size = str(size) - tmp = re.search(r'(\d+\.?\d*)\s+([PTGMKB])B?', size.upper()) - if tmp: - size = float(tmp.group(1)) - units = tmp.group(2) - if units == 'P': - size *= 1024 ** 5 - if units == 'T': - size *= 1024 ** 4 - elif units == 'G': - size *= 1024 ** 3 - elif units == 'M': - size *= 1024 ** 2 - elif units == 'K': - size *= 1024 ** 1 - elif units == 'B': - size *= 1024 ** 0 - size = int(size) - else: - return -1 - - return size - - def exit_script(return_value=0): """Exits the script after some cleanup and opens the log (if set).""" # Remove dirs (if empty) @@ -260,16 +163,6 @@ def get_process(name=None): return proc -def get_simple_string(prompt='Enter string'): - """Get string from user (restricted character set), returns str.""" - simple_string = None - while simple_string is None: - _input = input('{}: '.format(prompt)) - if re.match(r"^(\w|-| |\.|')+$", _input, re.ASCII): - simple_string = _input.strip() - return simple_string - - def get_ticket_number(): """Get TicketNumber from user, save in LogDir, and return as str.""" if not ENABLED_TICKET_NUMBERS: @@ -287,50 +180,6 @@ def get_ticket_number(): return ticket_number -def human_readable_size(size, decimals=0): - """Convert size from bytes to a human-readable format, returns str.""" - # Prep string formatting - width = 3+decimals - if decimals > 0: - width += 1 - - # Convert size to int - try: - size = int(size) - except ValueError: - size = convert_to_bytes(size) - except TypeError: - size = -1 - - # Verify we have a valid size - if size < 0: - return '{size:>{width}} b'.format(size='???', width=width) - - # Convert to sensible units - if size >= 1024 ** 5: - size /= 1024 ** 5 - units = 'PB' - elif size >= 1024 ** 4: - size /= 1024 ** 4 - units = 'TB' - elif size >= 1024 ** 3: - size /= 1024 ** 3 - units = 'GB' - elif size >= 1024 ** 2: - size /= 1024 ** 2 - units = 'MB' - elif size >= 1024 ** 1: - size /= 1024 ** 1 - units = 'KB' - else: - size /= 1024 ** 0 - units = ' B' - - # Return - return '{size:>{width}.{decimals}f} {units}'.format( - size=size, width=width, decimals=decimals, units=units) - - def kill_process(name): """Kill any running caffeine.exe processes.""" for proc in psutil.process_iter(): @@ -338,243 +187,6 @@ def kill_process(name): proc.kill() -def major_exception(): - """Display traceback and exit""" - print_error('Major exception') - print_warning(SUPPORT_MESSAGE) - print(traceback.format_exc()) - print_log(traceback.format_exc()) - try: - upload_crash_details() - except GenericAbort: - # User declined upload - print_warning('Upload: Aborted') - sleep(10) - except GenericError: - # No log file or uploading disabled - sleep(10) - except: - print_error('Upload: NS') - sleep(10) - else: - print_success('Upload: CS') - pause('Press Enter to exit...') - exit_script(1) - - -def menu_select( - title='[Untitled Menu]', - prompt='Please make a selection', secret_actions=[], secret_exit=False, - main_entries=[], action_entries=[], disabled_label='DISABLED', - spacer=''): - """Display options in a menu and return selected option as a str.""" - # Bail early - if not main_entries and not action_entries: - raise Exception("MenuError: No items given") - - # Set title - if 'Title' in global_vars: - title = '{}\n\n{}'.format(global_vars['Title'], title) - - # Build menu - menu_splash = '{}\n{}\n'.format(title, spacer) - width = len(str(len(main_entries))) - valid_answers = [] - if secret_exit: - valid_answers.append('Q') - if secret_actions: - valid_answers.extend(secret_actions) - - # Add main entries - for i in range(len(main_entries)): - entry = main_entries[i] - # Add Spacer - if ('CRLF' in entry): - menu_splash += '{}\n'.format(spacer) - entry_str = '{number:>{width}}: {name}'.format( - number = i+1, - width = width, - name = entry.get('Display Name', entry['Name'])) - if entry.get('Disabled', False): - entry_str = '{YELLOW}{entry_str} ({disabled}){CLEAR}'.format( - entry_str = entry_str, - disabled = disabled_label, - **COLORS) - else: - valid_answers.append(str(i+1)) - menu_splash += '{}\n'.format(entry_str) - menu_splash += '{}\n'.format(spacer) - - # Add action entries - for entry in action_entries: - # Add Spacer - if ('CRLF' in entry): - menu_splash += '{}\n'.format(spacer) - valid_answers.append(entry['Letter']) - menu_splash += '{letter:>{width}}: {name}\n'.format( - letter = entry['Letter'].upper(), - width = len(str(len(action_entries))), - name = entry['Name']) - - answer = '' - - while (answer.upper() not in valid_answers): - clear_screen() - print(menu_splash) - answer = input('{}: '.format(prompt)) - - return answer.upper() - - -def non_clobber_rename(full_path): - """Append suffix to path, if necessary, to avoid clobbering path""" - new_path = full_path - _i = 1; - while os.path.exists(new_path): - new_path = '{path}_{i}'.format(i=_i, path=full_path) - _i += 1 - - return new_path - - -def pause(prompt='Press Enter to continue... '): - """Simple pause implementation.""" - if prompt[-1] != ' ': - prompt += ' ' - input(prompt) - - -def ping(addr='google.com'): - """Attempt to ping addr.""" - cmd = [ - 'ping', - '-n' if psutil.WINDOWS else '-c', - '2', - addr] - run_program(cmd) - - -def popen_program(cmd, pipe=False, minimized=False, shell=False, **kwargs): - """Run program and return a subprocess.Popen object.""" - cmd_kwargs = {'args': cmd, 'shell': shell} - for kw in ('encoding', 'errors'): - if kw in kwargs: - cmd_kwargs[kw] = kwargs[kw] - - if minimized: - startupinfo = subprocess.STARTUPINFO() - startupinfo.dwFlags = subprocess.STARTF_USESHOWWINDOW - startupinfo.wShowWindow = 6 - cmd_kwargs['startupinfo'] = startupinfo - - if pipe: - cmd_kwargs.update({ - 'stdout': subprocess.PIPE, - 'stderr': subprocess.PIPE, - }) - - if 'cwd' in kwargs: - cmd_kwargs['cwd'] = kwargs['cwd'] - - return subprocess.Popen(**cmd_kwargs) - - -def print_error(*args, **kwargs): - """Prints message to screen in RED.""" - print_standard(*args, color=COLORS['RED'], **kwargs) - - -def print_info(*args, **kwargs): - """Prints message to screen in BLUE.""" - print_standard(*args, color=COLORS['BLUE'], **kwargs) - - -def print_standard(message='Generic info', - color=None, end='\n', timestamp=True, **kwargs): - """Prints message to screen and log (if set).""" - display_message = message - if color: - display_message = color + message + COLORS['CLEAR'] - # **COLORS is used below to support non-"standard" color printing - print(display_message.format(**COLORS), end=end, **kwargs) - print_log(message, end, timestamp) - - -def print_success(*args, **kwargs): - """Prints message to screen in GREEN.""" - print_standard(*args, color=COLORS['GREEN'], **kwargs) - - -def print_warning(*args, **kwargs): - """Prints message to screen in YELLOW.""" - print_standard(*args, color=COLORS['YELLOW'], **kwargs) - - -def print_log(message='', end='\n', timestamp=True): - """Writes message to a log if LogFile is set.""" - time_str = time.strftime("%Y-%m-%d %H%M%z: ") if timestamp else '' - if 'LogFile' in global_vars and global_vars['LogFile']: - with open(global_vars['LogFile'], 'a', encoding='utf-8') as f: - for line in message.splitlines(): - f.write('{timestamp}{line}{end}'.format( - timestamp = time_str, - line = line, - end = end)) - - -def run_program(cmd, check=True, pipe=True, shell=False, **kwargs): - """Run program and return a subprocess.CompletedProcess object.""" - cmd = [c for c in cmd if c] - if shell: - cmd = ' '.join(cmd) - - cmd_kwargs = {'args': cmd, 'check': check, 'shell': shell} - for kw in ('encoding', 'errors'): - if kw in kwargs: - cmd_kwargs[kw] = kwargs[kw] - - if pipe: - cmd_kwargs.update({ - 'stdout': subprocess.PIPE, - 'stderr': subprocess.PIPE, - }) - - if 'cwd' in kwargs: - cmd_kwargs['cwd'] = kwargs['cwd'] - - return subprocess.run(**cmd_kwargs) - - -def set_title(title='[Some Title]'): - """Set title. - - Used for window title and menu titles.""" - global_vars['Title'] = title - os.system('title {}'.format(title)) - - -def show_data( - message='[Some message]', data='[Some data]', - indent=8, width=32, - info=False, warning=False, error=False): - """Display info with formatting.""" - message = '{indent}{message:<{width}}{data}'.format( - indent=' '*indent, width=width, message=message, data=data) - if error: - print_error(message) - elif warning: - print_warning(message) - elif info: - print_info(message) - else: - print_standard(message) - - -def sleep(seconds=2): - """Wait for a while.""" - time.sleep(seconds) - - def stay_awake(): """Prevent the system from sleeping or hibernating.""" # DISABLED due to VCR2008 dependency @@ -592,121 +204,6 @@ def stay_awake(): print_warning('Please set the power setting to High Performance.') -def strip_colors(s): - """Remove all ASCII color escapes from string, returns str.""" - for c in COLORS.values(): - s = s.replace(c, '') - return s - - -def get_exception(s): - """Get exception by name, returns Exception object.""" - try: - obj = getattr(sys.modules[__name__], s) - except AttributeError: - # Try builtin classes - obj = getattr(sys.modules['builtins'], s) - return obj - - -def try_and_print(message='Trying...', - function=None, cs='CS', ns='NS', other_results={}, - catch_all=True, print_return=False, silent_function=True, - indent=8, width=32, *args, **kwargs): - """Run function, print if successful or not, and return dict. - - other_results is in the form of - { - 'Warning': {'ExceptionClassName': 'Result Message'}, - 'Error': {'ExceptionClassName': 'Result Message'} - } - The the ExceptionClassNames will be excepted conditions - and the result string will be printed in the correct color. - catch_all=False will re-raise unspecified exceptions.""" - err = None - out = None - w_exceptions = other_results.get('Warning', {}).keys() - w_exceptions = tuple(get_exception(e) for e in w_exceptions) - e_exceptions = other_results.get('Error', {}).keys() - e_exceptions = tuple(get_exception(e) for e in e_exceptions) - w_results = other_results.get('Warning', {}) - e_results = other_results.get('Error', {}) - - # Run function and catch errors - print_standard('{indent}{message:<{width}}'.format( - indent=' '*indent, message=message, width=width), end='', flush=True) - try: - out = function(*args, **kwargs) - if print_return: - str_list = out - if isinstance(out, subprocess.CompletedProcess): - str_list = out.stdout.decode().strip().splitlines() - print_standard(str_list[0].strip(), timestamp=False) - for item in str_list[1:]: - print_standard('{indent}{item}'.format( - indent=' '*(indent+width), item=item.strip())) - elif silent_function: - print_success(cs, timestamp=False) - except w_exceptions as e: - _result = w_results.get(e.__class__.__name__, 'Warning') - print_warning(_result, timestamp=False) - err = e - except e_exceptions as e: - _result = e_results.get(e.__class__.__name__, 'Error') - print_error(_result, timestamp=False) - err = e - except Exception: - print_error(ns, timestamp=False) - err = traceback.format_exc() - - # Return or raise? - if err and not catch_all: - raise - else: - return {'CS': not bool(err), 'Error': err, 'Out': out} - - -def upload_crash_details(): - """Upload log and runtime data to the CRASH_SERVER. - - Intended for uploading to a public Nextcloud share.""" - if not ENABLED_UPLOAD_DATA: - raise GenericError - - import requests - if 'LogFile' in global_vars and global_vars['LogFile']: - if ask('Upload crash details to {}?'.format(CRASH_SERVER['Name'])): - with open(global_vars['LogFile']) as f: - data = '{}\n'.format(f.read()) - data += '#############################\n' - data += 'Runtime Details:\n\n' - data += 'sys.argv: {}\n\n'.format(sys.argv) - try: - data += generate_global_vars_report() - except Exception: - data += 'global_vars: {}\n'.format(global_vars) - filename = global_vars.get('LogFile', 'Unknown') - filename = re.sub(r'.*(\\|/)', '', filename) - filename += '.txt' - url = '{}/Crash_{}__{}'.format( - CRASH_SERVER['Url'], - global_vars.get('Date-Time', 'Unknown Date-Time'), - filename) - r = requests.put( - url, data=data, - headers={'X-Requested-With': 'XMLHttpRequest'}, - auth=(CRASH_SERVER['User'], CRASH_SERVER['Pass'])) - # Raise exception if upload NS - if not r.ok: - raise Exception - else: - # User said no - raise GenericAbort - else: - # No LogFile defined (or invalid LogFile) - raise GenericError - - def wait_for_process(name, poll_rate=3): """Wait for process by name.""" running = True @@ -931,23 +428,6 @@ def set_linux_vars(): } -def set_log_file(log_name): - """Sets global var LogFile and creates path as needed.""" - if psutil.LINUX: - folder_path = global_vars['LogDir'] - else: - folder_path = '{}{}{}'.format( - global_vars['LogDir'], - os.sep, - KIT_NAME_FULL) - log_file = '{}{}{}'.format( - folder_path, - os.sep, - log_name) - os.makedirs(folder_path, exist_ok=True) - global_vars['LogFile'] = log_file - - if __name__ == '__main__': print("This file is not meant to be called directly.") diff --git a/scripts/wk.prev/functions/network.py b/scripts/wk.prev/functions/network.py deleted file mode 100644 index 5b5d4f52..00000000 --- a/scripts/wk.prev/functions/network.py +++ /dev/null @@ -1,53 +0,0 @@ -# Wizard Kit: Functions - Network - -import os -import shutil -import sys - -from functions.common import * - - -# REGEX -REGEX_VALID_IP = re.compile( - r'(10.\d+.\d+.\d+' - r'|172.(1[6-9]|2\d|3[0-1])' - r'|192.168.\d+.\d+)', - re.IGNORECASE) - - -def is_connected(): - """Check for a valid private IP.""" - devs = psutil.net_if_addrs() - for dev in devs.values(): - for family in dev: - if REGEX_VALID_IP.search(family.address): - # Valid IP found - return True - # Else - return False - - -def show_valid_addresses(): - """Show all valid private IP addresses assigned to the system.""" - devs = psutil.net_if_addrs() - for dev, families in sorted(devs.items()): - for family in families: - if REGEX_VALID_IP.search(family.address): - # Valid IP found - show_data(message=dev, data=family.address) - - -def speedtest(): - """Run a network speedtest using speedtest-cli.""" - result = run_program(['speedtest-cli', '--simple']) - output = [line.strip() for line in result.stdout.decode().splitlines() - if line.strip()] - output = [line.split() for line in output] - output = [(a, float(b), c) for a, b, c in output] - return ['{:10}{:6.2f} {}'.format(*line) for line in output] - - -if __name__ == '__main__': - print("This file is not meant to be called directly.") - -# vim: sts=2 sw=2 ts=2