diff --git a/scripts/auto_repairs.py b/scripts/auto_repairs.py index 0615f115..b68f5e0b 100644 --- a/scripts/auto_repairs.py +++ b/scripts/auto_repairs.py @@ -5,7 +5,7 @@ import wk # Classes -REBOOT_STR = wk.ui.cli.color_string('Reboot', 'YELLOW') +REBOOT_STR = wk.ansi.color_string('Reboot', 'YELLOW') class MenuEntry(): """Simple class to allow cleaner code below.""" def __init__(self, name, function=None, selected=True, **kwargs): diff --git a/scripts/mount-backup-shares b/scripts/mount-backup-shares index bf528771..973b76e2 100755 --- a/scripts/mount-backup-shares +++ b/scripts/mount-backup-shares @@ -17,7 +17,7 @@ def main(): color = 'RED' elif 'Already' in line: color = 'YELLOW' - print(wk.ui.cli.color_string(line, color)) + print(wk.ansi.color_string(line, color)) if __name__ == '__main__': diff --git a/scripts/unmount-backup-shares b/scripts/unmount-backup-shares index 57e7e880..81188020 100755 --- a/scripts/unmount-backup-shares +++ b/scripts/unmount-backup-shares @@ -15,7 +15,7 @@ def main(): line = f' {line}' if 'Not mounted' in line: color = 'YELLOW' - print(wk.ui.cli.color_string(line, color)) + print(wk.ansi.color_string(line, color)) if __name__ == '__main__': diff --git a/scripts/wk/__init__.py b/scripts/wk/__init__.py index afb3e8d5..2a77ed94 100644 --- a/scripts/wk/__init__.py +++ b/scripts/wk/__init__.py @@ -3,6 +3,7 @@ from sys import stderr, version_info +from . import ansi from . import cfg from . import clone from . import debug diff --git a/scripts/wk/ansi.py b/scripts/wk/ansi.py new file mode 100644 index 00000000..2acf515d --- /dev/null +++ b/scripts/wk/ansi.py @@ -0,0 +1,67 @@ +"""WizardKit: ANSI control/escape functions""" +# vim: sts=2 sw=2 ts=2 + +import itertools +import logging +import pathlib + +# STATIC VARIABLES +LOG = logging.getLogger(__name__) +COLORS = { + 'CLEAR': '\033[0m', + 'RED': '\033[31m', + 'RED_BLINK': '\033[31;5m', + 'ORANGE': '\033[31;1m', + 'ORANGE_RED': '\033[1;31;41m', + 'GREEN': '\033[32m', + 'YELLOW': '\033[33m', + 'YELLOW_BLINK': '\033[33;5m', + 'BLUE': '\033[34m', + 'PURPLE': '\033[35m', + 'CYAN': '\033[36m', + } + + +# Functions +def color_string(strings, colors, sep=' '): + """Build colored string using ANSI escapes, returns str.""" + clear_code = COLORS['CLEAR'] + msg = [] + + # Convert to tuples if necessary + if isinstance(strings, (str, pathlib.Path)): + strings = (strings,) + if isinstance(colors, (str, pathlib.Path)): + colors = (colors,) + + # Convert to strings if necessary + try: + iter(strings) + except TypeError: + # Assuming single element passed, convert to string + strings = (str(strings),) + try: + iter(colors) + except TypeError: + # Assuming single element passed, convert to string + colors = (str(colors),) + + # Build new string with color escapes added + for string, color in itertools.zip_longest(strings, colors): + color_code = COLORS.get(color, clear_code) + msg.append(f'{color_code}{string}{clear_code}') + + # Done + return sep.join(msg) + + +def strip_colors(string): + """Strip known ANSI color escapes from string, returns str.""" + LOG.debug('string: %s', string) + for color in COLORS.values(): + string = string.replace(color, '') + return string + + +if __name__ == '__main__': + print("This file is not meant to be called directly.") diff --git a/scripts/wk/clone/ddrescue.py b/scripts/wk/clone/ddrescue.py index 1499d296..672c3e25 100644 --- a/scripts/wk/clone/ddrescue.py +++ b/scripts/wk/clone/ddrescue.py @@ -20,7 +20,7 @@ from docopt import docopt import psutil import pytz -from wk import cfg, debug, exe, io, log, net, std +from wk import ansi, cfg, debug, exe, io, log, net, std from wk.cfg.ddrescue import ( DDRESCUE_MAP_TEMPLATE, DDRESCUE_SETTINGS, @@ -91,8 +91,8 @@ REGEX_REMAINING_TIME = re.compile( LOG = logging.getLogger(__name__) MENU_ACTIONS = ( 'Start', - f'Change settings {ui.color_string("(experts only)", "YELLOW")}', - f'Detect drives {ui.color_string("(experts only)", "YELLOW")}', + f'Change settings {ansi.color_string("(experts only)", "YELLOW")}', + f'Detect drives {ansi.color_string("(experts only)", "YELLOW")}', 'Quit') MENU_TOGGLES = { 'Auto continue (if recovery % over threshold)': True, @@ -422,7 +422,7 @@ class State(): self.panes['Started'] = tmux.split_window( lines=cfg.ddrescue.TMUX_SIDE_WIDTH, target_id=self.panes['Source'], - text=ui.color_string( + text=ansi.color_string( ['Started', time.strftime("%Y-%m-%d %H:%M %Z")], ['BLUE', None], sep='\n', @@ -568,14 +568,14 @@ class State(): report = [] # Source - report.append(ui.color_string('Source', 'GREEN')) + report.append(ansi.color_string('Source', 'GREEN')) report.extend(build_object_report(self.source)) report.append(' ') # Destination - report.append(ui.color_string('Destination', 'GREEN')) + report.append(ansi.color_string('Destination', 'GREEN')) if self.mode == 'Clone': - report[-1] += ui.color_string(' (ALL DATA WILL BE DELETED)', 'RED') + report[-1] += ansi.color_string(' (ALL DATA WILL BE DELETED)', 'RED') report.extend(build_object_report(self.destination)) report.append(' ') @@ -583,12 +583,12 @@ class State(): # NOTE: The check for block_pairs is to limit this section # to the second confirmation if self.mode == 'Clone' and self.block_pairs: - report.append(ui.color_string('WARNING', 'YELLOW')) + report.append(ansi.color_string('WARNING', 'YELLOW')) report.append( 'All data will be deleted from the destination listed above.', ) report.append( - ui.color_string( + ansi.color_string( ['This is irreversible and will lead to', 'DATA LOSS.'], ['YELLOW', 'RED'], ), @@ -607,18 +607,18 @@ class State(): # Map dir if self.working_dir: - report.append(ui.color_string('Map Save Directory', 'GREEN')) + report.append(ansi.color_string('Map Save Directory', 'GREEN')) report.append(f'{self.working_dir}/') report.append(' ') if not fstype_is_ok(self.working_dir, map_dir=True): report.append( - ui.color_string( + ansi.color_string( 'Map file(s) are being saved to a non-recommended filesystem.', 'YELLOW', ), ) report.append( - ui.color_string( + ansi.color_string( ['This is strongly discouraged and may lead to', 'DATA LOSS'], [None, 'RED'], ), @@ -627,11 +627,11 @@ class State(): # Source part(s) selected if source_parts: - report.append(ui.color_string('Source Part(s) selected', 'GREEN')) + report.append(ansi.color_string('Source Part(s) selected', 'GREEN')) if self.source.path.samefile(source_parts[0].path): report.append('Whole Disk') else: - report.append(ui.color_string(f'{"NAME":<9} SIZE', 'BLUE')) + report.append(ansi.color_string(f'{"NAME":<9} SIZE', 'BLUE')) for part in source_parts: report.append( f'{part.path.name:<9} ' @@ -663,7 +663,7 @@ class State(): error_size = self.get_error_size() error_size_str = std.bytes_to_string(error_size, decimals=2) if error_size > 0: - error_size_str = ui.color_string(error_size_str, 'YELLOW') + error_size_str = ansi.color_string(error_size_str, 'YELLOW') percent = self.get_percent_recovered() percent = format_status_string(percent, width=0) report.append(f'Overall rescued: {percent}, error size: {error_size_str}') @@ -675,7 +675,7 @@ class State(): error_size = pair.get_error_size() error_size_str = std.bytes_to_string(error_size, decimals=2) if error_size > 0: - error_size_str = ui.color_string(error_size_str, 'YELLOW') + error_size_str = ansi.color_string(error_size_str, 'YELLOW') pair_size = std.bytes_to_string(pair.size, decimals=2) percent = pair.get_percent_recovered() percent = format_status_string(percent, width=0) @@ -1066,10 +1066,10 @@ class State(): width = cfg.ddrescue.TMUX_SIDE_WIDTH # Status - report.append(ui.color_string(f'{"Status":^{width}}', 'BLUE')) + report.append(ansi.color_string(f'{"Status":^{width}}', 'BLUE')) if 'NEEDS ATTENTION' in overall_status: report.append( - ui.color_string(f'{overall_status:^{width}}', 'YELLOW_BLINK'), + ansi.color_string(f'{overall_status:^{width}}', 'YELLOW_BLINK'), ) else: report.append(f'{overall_status:^{width}}') @@ -1079,12 +1079,12 @@ class State(): if self.block_pairs: total_rescued = self.get_rescued_size() percent = self.get_percent_recovered() - report.append(ui.color_string('Overall Progress', 'BLUE')) + report.append(ansi.color_string('Overall Progress', 'BLUE')) report.append( f'Rescued: {format_status_string(percent, width=width-9)}', ) report.append( - ui.color_string( + ansi.color_string( [f'{std.bytes_to_string(total_rescued, decimals=2):>{width}}'], [get_percent_color(percent)], ), @@ -1093,7 +1093,7 @@ class State(): # Block pair progress for pair in self.block_pairs: - report.append(ui.color_string(pair.source, 'BLUE')) + report.append(ansi.color_string(pair.source, 'BLUE')) for name, status in pair.status.items(): name = name.title() report.append( @@ -1105,9 +1105,9 @@ class State(): if overall_status in ('Active', 'NEEDS ATTENTION'): etoc = get_etoc() report.append(separator) - report.append(ui.color_string('Estimated Pass Finish', 'BLUE')) + report.append(ansi.color_string('Estimated Pass Finish', 'BLUE')) if overall_status == 'NEEDS ATTENTION' or etoc == 'N/A': - report.append(ui.color_string('N/A', 'YELLOW')) + report.append(ansi.color_string('N/A', 'YELLOW')) else: report.append(etoc) @@ -1168,7 +1168,7 @@ class State(): source_str = _format_string(self.source, width) tmux.respawn_pane( self.panes['Source'], - text=ui.color_string( + text=ansi.color_string( ['Source', '' if source_exists else ' (Missing)', '\n', source_str], ['BLUE', 'RED', None, None], sep='', @@ -1183,7 +1183,7 @@ class State(): percent=50, vertical=False, target_id=self.panes['Source'], - text=ui.color_string( + text=ansi.color_string( ['Destination', '' if dest_exists else ' (Missing)', '\n', dest_str], ['BLUE', 'RED', None, None], sep='', @@ -1197,7 +1197,7 @@ def build_block_pair_report(block_pairs, settings): report = [] notes = [] if block_pairs: - report.append(ui.color_string('Block Pairs', 'GREEN')) + report.append(ansi.color_string('Block Pairs', 'GREEN')) else: # Bail early return report @@ -1216,7 +1216,7 @@ def build_block_pair_report(block_pairs, settings): if settings: if not settings['First Run']: notes.append( - ui.color_string( + ansi.color_string( ['NOTE:', 'Clone settings loaded from previous run.'], ['BLUE', None], ), @@ -1224,14 +1224,14 @@ def build_block_pair_report(block_pairs, settings): if settings['Needs Format'] and settings['Table Type']: msg = f'Destination will be formatted using {settings["Table Type"]}' notes.append( - ui.color_string( + ansi.color_string( ['NOTE:', msg], ['BLUE', None], ), ) if any(pair.get_rescued_size() > 0 for pair in block_pairs): notes.append( - ui.color_string( + ansi.color_string( ['NOTE:', 'Resume data loaded from map file(s).'], ['BLUE', None], ), @@ -1313,12 +1313,12 @@ def build_directory_report(path): for line in proc.stdout.splitlines(): line = line.replace('\n', '') if 'FSTYPE' in line: - line = ui.color_string(f'{"PATH":<{width}}{line}', 'BLUE') + line = ansi.color_string(f'{"PATH":<{width}}{line}', 'BLUE') else: line = f'{path:<{width}}{line}' report.append(line) else: - report.append(ui.color_string('PATH', 'BLUE')) + report.append(ansi.color_string('PATH', 'BLUE')) report.append(str(path)) # Done @@ -1354,7 +1354,7 @@ def build_disk_report(dev): # Partition details report.append( - ui.color_string( + ansi.color_string( ( f'{"NAME":<{widths["name"]}}' f'{" " if dev.children else ""}' @@ -1400,7 +1400,7 @@ def build_disk_report(dev): def build_main_menu(): """Build main menu, returns wk.ui.cli.Menu.""" - menu = ui.Menu(title=ui.color_string('ddrescue TUI: Main Menu', 'GREEN')) + menu = ui.Menu(title=ansi.color_string('ddrescue TUI: Main Menu', 'GREEN')) menu.separator = ' ' # Add actions, options, etc @@ -1433,9 +1433,9 @@ def build_object_report(obj): def build_settings_menu(silent=True): """Build settings menu, returns wk.ui.cli.Menu.""" title_text = [ - ui.color_string('ddrescue TUI: Expert Settings', 'GREEN'), + ansi.color_string('ddrescue TUI: Expert Settings', 'GREEN'), ' ', - ui.color_string( + ansi.color_string( ['These settings can cause', 'MAJOR DAMAGE', 'to drives'], ['YELLOW', 'RED', 'YELLOW'], ), @@ -1580,7 +1580,7 @@ def format_status_string(status, width): # Add color if necessary if color: - status_str = ui.color_string(status_str, color) + status_str = ansi.color_string(status_str, color) # Done return status_str @@ -1960,7 +1960,7 @@ def main(): # Save results to log LOG.info('') for line in state.generate_report(): - LOG.info(' %s', ui.strip_colors(line)) + LOG.info(' %s', ansi.strip_colors(line)) def mount_raw_image(path): @@ -2084,7 +2084,7 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True): now = datetime.datetime.now(tz=TIMEZONE).strftime('%Y-%m-%d %H:%M %Z') with open(f'{state.log_dir}/smart.out', 'w', encoding='utf-8') as _f: _f.write( - ui.color_string( + ansi.color_string( ['SMART Attributes', f'Updated: {now}\n'], ['BLUE', 'YELLOW'], sep='\t\t', @@ -2267,7 +2267,7 @@ def select_disk(prompt, skip_disk=None): ui.print_info('Scanning disks...') disks = hw_disk.get_disks() menu = ui.Menu( - title=ui.color_string(f'ddrescue TUI: {prompt} Selection', 'GREEN'), + title=ansi.color_string(f'ddrescue TUI: {prompt} Selection', 'GREEN'), ) menu.disabled_str = 'Already selected' menu.separator = ' ' @@ -2309,7 +2309,7 @@ def select_disk(prompt, skip_disk=None): def select_disk_parts(prompt, disk): """Select disk parts from list, returns list of Disk().""" - title = ui.color_string('ddrescue TUI: Partition Selection', 'GREEN') + title = ansi.color_string('ddrescue TUI: Partition Selection', 'GREEN') title += f'\n\nDisk: {disk.path} {disk.description}' menu = ui.Menu(title) menu.separator = ' ' @@ -2360,7 +2360,7 @@ def select_disk_parts(prompt, disk): if not menu.options: menu.add_option(whole_disk_str, {'Selected': True, 'Path': disk.path}) menu.title += '\n\n' - menu.title += ui.color_string(' No partitions detected.', 'YELLOW') + menu.title += ansi.color_string(' No partitions detected.', 'YELLOW') # Get selection _select_parts(menu) @@ -2391,7 +2391,7 @@ def select_path(prompt): """Select path, returns pathlib.Path.""" invalid = False menu = ui.Menu( - title=ui.color_string(f'ddrescue TUI: {prompt} Path Selection', 'GREEN'), + title=ansi.color_string(f'ddrescue TUI: {prompt} Path Selection', 'GREEN'), ) menu.separator = ' ' menu.add_action('Quit') diff --git a/scripts/wk/graph.py b/scripts/wk/graph.py index 0de162d5..70e26cfb 100644 --- a/scripts/wk/graph.py +++ b/scripts/wk/graph.py @@ -3,7 +3,7 @@ import logging -from wk.ui import cli as ui +from wk import ansi # STATIC VARIABLES @@ -52,27 +52,27 @@ def generate_horizontal_graph(rate_list, graph_width=40, oneline=False): rate_color = 'GREEN' # Build graph - full_block = ui.color_string((GRAPH_HORIZONTAL[-1],), (rate_color,)) + full_block = ansi.color_string((GRAPH_HORIZONTAL[-1],), (rate_color,)) if step >= 24: - graph[0] += ui.color_string((GRAPH_HORIZONTAL[step-24],), (rate_color,)) + graph[0] += ansi.color_string((GRAPH_HORIZONTAL[step-24],), (rate_color,)) graph[1] += full_block graph[2] += full_block graph[3] += full_block elif step >= 16: graph[0] += ' ' - graph[1] += ui.color_string((GRAPH_HORIZONTAL[step-16],), (rate_color,)) + graph[1] += ansi.color_string((GRAPH_HORIZONTAL[step-16],), (rate_color,)) graph[2] += full_block graph[3] += full_block elif step >= 8: graph[0] += ' ' graph[1] += ' ' - graph[2] += ui.color_string((GRAPH_HORIZONTAL[step-8],), (rate_color,)) + graph[2] += ansi.color_string((GRAPH_HORIZONTAL[step-8],), (rate_color,)) graph[3] += full_block else: graph[0] += ' ' graph[1] += ' ' graph[2] += ' ' - graph[3] += ui.color_string((GRAPH_HORIZONTAL[step],), (rate_color,)) + graph[3] += ansi.color_string((GRAPH_HORIZONTAL[step],), (rate_color,)) # Done if oneline: @@ -128,7 +128,7 @@ def vertical_graph_line(percent, rate, scale=32): color_rate = 'GREEN' # Build string - line = ui.color_string( + line = ansi.color_string( strings=( f'{percent:5.1f}%', f'{GRAPH_VERTICAL[step]:<4}', diff --git a/scripts/wk/hw/benchmark.py b/scripts/wk/hw/benchmark.py index 58d3ee83..5a33ad88 100644 --- a/scripts/wk/hw/benchmark.py +++ b/scripts/wk/hw/benchmark.py @@ -5,7 +5,7 @@ import logging from subprocess import PIPE, STDOUT -from wk import graph +from wk import ansi, graph from wk.cfg.hw import ( IO_ALT_TEST_SIZE_FACTOR, IO_BLOCK_SIZE, @@ -22,7 +22,6 @@ from wk.cfg.hw import ( ) from wk.exe import run_program from wk.std import PLATFORM -from wk.ui import cli as ui # STATIC VARIABLES @@ -113,7 +112,7 @@ def check_io_results(test_obj, rate_list, graph_width) -> None: # Add horizontal graph to report for line in graph.generate_horizontal_graph(rate_list, graph_width): - if not ui.strip_colors(line).strip(): + if not ansi.strip_colors(line).strip(): # Skip empty lines continue test_obj.report.append(line) @@ -151,7 +150,7 @@ def run_io_test(test_obj, log_path, test_mode=False) -> None: LOG.info('Using %s for better performance', dev_path) offset = 0 read_rates = [] - test_obj.report.append(ui.color_string('I/O Benchmark', 'BLUE')) + test_obj.report.append(ansi.color_string('I/O Benchmark', 'BLUE')) # Get dd values or bail try: @@ -159,7 +158,7 @@ def run_io_test(test_obj, log_path, test_mode=False) -> None: except DeviceTooSmallError: test_obj.set_status('N/A') test_obj.report.append( - ui.color_string('Disk too small to test', 'YELLOW'), + ansi.color_string('Disk too small to test', 'YELLOW'), ) return diff --git a/scripts/wk/hw/cpu.py b/scripts/wk/hw/cpu.py index 9a741ecd..af8b0c46 100644 --- a/scripts/wk/hw/cpu.py +++ b/scripts/wk/hw/cpu.py @@ -7,7 +7,7 @@ import subprocess from typing import TextIO -from wk import exe +from wk import ansi, exe from wk.cfg.hw import CPU_FAILURE_TEMP from wk.os.mac import set_fans as macos_set_fans from wk.std import PLATFORM @@ -93,9 +93,9 @@ def check_mprime_results(test_obj, working_dir) -> None: for line in passing_lines: test_obj.report.append(f' {line}') for line in warning_lines: - test_obj.report.append(ui.color_string(f' {line}', 'YELLOW')) + test_obj.report.append(ansi.color_string(f' {line}', 'YELLOW')) if not (passing_lines or warning_lines): - test_obj.report.append(ui.color_string(' Unknown result', 'YELLOW')) + test_obj.report.append(ansi.color_string(' Unknown result', 'YELLOW')) def start_mprime(working_dir, log_path) -> subprocess.Popen: diff --git a/scripts/wk/hw/diags.py b/scripts/wk/hw/diags.py index 72c594e4..0ad9206c 100644 --- a/scripts/wk/hw/diags.py +++ b/scripts/wk/hw/diags.py @@ -10,7 +10,7 @@ import time from docopt import docopt -from wk import cfg, debug, exe, log, std +from wk import ansi, cfg, debug, exe, log, std from wk.cfg.hw import STATUS_COLORS from wk.hw import benchmark as hw_benchmark from wk.hw import cpu as hw_cpu @@ -89,9 +89,9 @@ class State(): self.panes = {} self.system = None self.test_groups = [] - self.top_text = ui.color_string('Hardware Diagnostics', 'GREEN') + self.top_text = ansi.color_string('Hardware Diagnostics', 'GREEN') if test_mode: - self.top_text += ui.color_string(' (Test Mode)', 'YELLOW') + self.top_text += ansi.color_string(' (Test Mode)', 'YELLOW') # Init tmux and start a background process to maintain layout self.init_tmux() @@ -229,7 +229,7 @@ class State(): self.panes['Started'] = tmux.split_window( lines=cfg.hw.TMUX_SIDE_WIDTH, target_id=self.panes['Top'], - text=ui.color_string( + text=ansi.color_string( ['Started', time.strftime("%Y-%m-%d %H:%M %Z")], ['BLUE', None], sep='\n', @@ -292,7 +292,7 @@ class State(): """Update 'Started' pane following clock sync.""" tmux.respawn_pane( pane_id=self.panes['Started'], - text=ui.color_string( + text=ansi.color_string( ['Started', time.strftime("%Y-%m-%d %H:%M %Z")], ['BLUE', None], sep='\n', @@ -305,9 +305,9 @@ class State(): width = cfg.hw.TMUX_SIDE_WIDTH for group in self.test_groups: - report.append(ui.color_string(group.name, 'BLUE')) + report.append(ansi.color_string(group.name, 'BLUE')) for test in group.test_objects: - report.append(ui.color_string( + report.append(ansi.color_string( [test.label, f'{test.status:>{width-len(test.label)}}'], [None, STATUS_COLORS.get(test.status, None)], sep='', @@ -453,7 +453,7 @@ def cpu_stress_tests(state, test_objects, test_mode=False) -> None: sensors.save_average_temps(temp_label='Cooldown', seconds=5) # Check Prime95 results - test_mprime_obj.report.append(ui.color_string('Prime95', 'BLUE')) + test_mprime_obj.report.append(ansi.color_string('Prime95', 'BLUE')) hw_cpu.check_mprime_results( test_obj=test_mprime_obj, working_dir=state.log_dir, ) @@ -493,7 +493,7 @@ def cpu_stress_tests(state, test_objects, test_mode=False) -> None: state.update_progress_pane() # Check Cooling results - test_cooling_obj.report.append(ui.color_string('Temps', 'BLUE')) + test_cooling_obj.report.append(ansi.color_string('Temps', 'BLUE')) hw_cpu.check_cooling_results(test_cooling_obj, sensors, run_sysbench) # Cleanup @@ -566,12 +566,12 @@ def disk_io_benchmark( # Something went wrong LOG.error('%s', err) test.set_status('ERROR') - test.report.append(ui.color_string(' Unknown Error', 'RED')) + test.report.append(ansi.color_string(' Unknown Error', 'RED')) # Mark test(s) aborted if necessary if aborted: test.set_status('Aborted') - test.report.append(ui.color_string(' Aborted', 'YELLOW')) + test.report.append(ansi.color_string(' Aborted', 'YELLOW')) break # Update progress after each test @@ -734,7 +734,7 @@ def disk_surface_scan(state, test_objects, test_mode=False) -> None: for test in test_objects: if not (test.disabled or test.passed or test.failed): test.set_status('Aborted') - test.report.append(ui.color_string(' Aborted', 'YELLOW')) + test.report.append(ansi.color_string(' Aborted', 'YELLOW')) # Cleanup state.update_progress_pane() diff --git a/scripts/wk/hw/disk.py b/scripts/wk/hw/disk.py index 26a3d0c5..bae3e305 100644 --- a/scripts/wk/hw/disk.py +++ b/scripts/wk/hw/disk.py @@ -11,6 +11,7 @@ import re from dataclasses import dataclass, field from typing import Any, Union +from wk import ansi from wk.cfg.main import KIT_NAME_SHORT from wk.cfg.python import DATACLASS_DECORATOR_KWARGS from wk.exe import get_json_from_command, run_program @@ -20,7 +21,6 @@ from wk.hw.smart import ( get_known_disk_attributes, ) from wk.std import PLATFORM -from wk.ui import cli as ui # STATIC VARIABLES @@ -74,7 +74,7 @@ class Disk: def add_note(self, note, color=None) -> None: """Add note that will be included in the disk report.""" if color: - note = ui.color_string(note, color) + note = ansi.color_string(note, color) if note not in self.notes: self.notes.append(note) self.notes.sort() @@ -83,7 +83,7 @@ class Disk: """Check if note is already present.""" present = False for note in self.notes: - if note_str == ui.strip_colors(note): + if note_str == ansi.strip_colors(note): present = True return present @@ -99,18 +99,18 @@ class Disk: """Generate Disk report, returns list.""" report = [] if header: - report.append(ui.color_string(f'Device ({self.path.name})', 'BLUE')) + report.append(ansi.color_string(f'Device ({self.path.name})', 'BLUE')) report.append(f' {self.description}') # Attributes if self.attributes: if header: - report.append(ui.color_string('Attributes', 'BLUE')) + report.append(ansi.color_string('Attributes', 'BLUE')) report.extend(generate_attribute_report(self)) # Notes if self.notes: - report.append(ui.color_string('Notes', 'BLUE')) + report.append(ansi.color_string('Notes', 'BLUE')) for note in self.notes: report.append(f' {note}') diff --git a/scripts/wk/hw/sensors.py b/scripts/wk/hw/sensors.py index 858b6553..f05a628d 100644 --- a/scripts/wk/hw/sensors.py +++ b/scripts/wk/hw/sensors.py @@ -9,11 +9,11 @@ import re from subprocess import CalledProcessError from typing import Any +from wk import ansi from wk.cfg.hw import CPU_CRITICAL_TEMP, SMC_IDS, TEMP_COLORS from wk.exe import run_program, start_thread from wk.io import non_clobber_path from wk.std import PLATFORM, sleep -from wk.ui import cli as ui # STATIC VARIABLES @@ -110,7 +110,7 @@ class Sensors(): # Handle empty reports if not report: report = [ - ui.color_string('WARNING: No sensors found', 'YELLOW'), + ansi.color_string('WARNING: No sensors found', 'YELLOW'), '', 'Please monitor temps manually', ] @@ -426,7 +426,7 @@ def get_temp_str(temp, colored=True) -> str: temp = float(temp) except (TypeError, ValueError): # Invalid temp? - return ui.color_string(temp, 'PURPLE') + return ansi.color_string(temp, 'PURPLE') # Determine color if colored: @@ -436,7 +436,7 @@ def get_temp_str(temp, colored=True) -> str: break # Done - return ui.color_string(f'{"-" if temp < 0 else ""}{temp:2.0f}°C', temp_color) + return ansi.color_string(f'{"-" if temp < 0 else ""}{temp:2.0f}°C', temp_color) diff --git a/scripts/wk/hw/smart.py b/scripts/wk/hw/smart.py index c777f400..88c016ab 100644 --- a/scripts/wk/hw/smart.py +++ b/scripts/wk/hw/smart.py @@ -7,6 +7,7 @@ import re from typing import Any +from wk import ansi from wk.cfg.hw import ( ATTRIBUTE_COLORS, KEY_NVME, @@ -19,7 +20,6 @@ from wk.cfg.hw import ( ) from wk.exe import get_json_from_command, run_program from wk.std import bytes_to_string, sleep -from wk.ui import cli as ui # STATIC VARIABLES @@ -41,26 +41,26 @@ def build_self_test_report(test_obj, aborted=False) -> None: For instance if the test was aborted the report should include the last known progress instead of just "was aborted by host." """ - report = [ui.color_string('Self-Test', 'BLUE')] + report = [ansi.color_string('Self-Test', 'BLUE')] test_details = get_smart_self_test_details(test_obj.dev) test_result = test_details.get('status', {}).get('string', 'Unknown') # Build report if test_obj.disabled or test_obj.status == 'Denied': - report.append(ui.color_string(f' {test_obj.status}', 'RED')) + report.append(ansi.color_string(f' {test_obj.status}', 'RED')) elif test_obj.status == 'N/A' or not test_obj.dev.attributes: - report.append(ui.color_string(f' {test_obj.status}', 'YELLOW')) + report.append(ansi.color_string(f' {test_obj.status}', 'YELLOW')) elif test_obj.status == 'TestInProgress': - report.append(ui.color_string(' Failed to stop previous test', 'RED')) + report.append(ansi.color_string(' Failed to stop previous test', 'RED')) test_obj.set_status('Failed') else: # Other cases include self-test result string report.append(f' {test_result.capitalize()}') if aborted and not (test_obj.passed or test_obj.failed): - report.append(ui.color_string(' Aborted', 'YELLOW')) + report.append(ansi.color_string(' Aborted', 'YELLOW')) test_obj.set_status('Aborted') elif test_obj.status == 'TimedOut': - report.append(ui.color_string(' TimedOut', 'YELLOW')) + report.append(ansi.color_string(' TimedOut', 'YELLOW')) # Done test_obj.report.extend(report) @@ -137,7 +137,7 @@ def generate_attribute_report(dev, only_failed=False) -> list[str]: continue # Build colored string and append to report - line = ui.color_string( + line = ansi.color_string( [label, get_attribute_value_string(dev, attr), note], [None, value_color, 'YELLOW'], ) @@ -299,7 +299,7 @@ def run_smart_self_test(test_obj, log_path) -> bool: finished = False test_details = get_smart_self_test_details(test_obj.dev) size_str = bytes_to_string(test_obj.dev.size, use_binary=False) - header_str = ui.color_string( + header_str = ansi.color_string( ['[', test_obj.dev.path.name, ' ', size_str, ']'], [None, 'BLUE', None, 'CYAN', None], sep='', diff --git a/scripts/wk/hw/surface_scan.py b/scripts/wk/hw/surface_scan.py index 9a542d30..7f4ed243 100644 --- a/scripts/wk/hw/surface_scan.py +++ b/scripts/wk/hw/surface_scan.py @@ -5,6 +5,7 @@ import logging from subprocess import STDOUT +from wk import ansi from wk.cfg.hw import ( BADBLOCKS_EXTRA_LARGE_DISK, BADBLOCKS_LARGE_DISK, @@ -15,7 +16,6 @@ from wk.cfg.hw import ( ) from wk.exe import run_program from wk.std import PLATFORM, bytes_to_string -from wk.ui import cli as ui # STATIC VARIABLES @@ -27,7 +27,7 @@ def check_surface_scan_results(test_obj, log_path) -> None: """Check results and set test status.""" with open(log_path, 'r', encoding='utf-8') as _f: for line in _f.readlines(): - line = ui.strip_colors(line.strip()) + line = ansi.strip_colors(line.strip()) if not line or BADBLOCKS_SKIP_REGEX.match(line): # Skip continue @@ -44,10 +44,10 @@ def check_surface_scan_results(test_obj, log_path) -> None: test_obj.set_status('Passed') else: test_obj.failed = True - test_obj.report.append(f' {ui.color_string(line, "YELLOW")}') + test_obj.report.append(f' {ansi.color_string(line, "YELLOW")}') test_obj.set_status('Failed') else: - test_obj.report.append(f' {ui.color_string(line, "YELLOW")}') + test_obj.report.append(f' {ansi.color_string(line, "YELLOW")}') if not (test_obj.passed or test_obj.failed): test_obj.set_status('Unknown') @@ -61,7 +61,7 @@ def run_scan(test_obj, log_path, test_mode=False) -> None: # Use "RAW" disks under macOS dev_path = dev_path.with_name(f'r{dev_path.name}') LOG.info('Using %s for better performance', dev_path) - test_obj.report.append(ui.color_string('badblocks', 'BLUE')) + test_obj.report.append(ansi.color_string('badblocks', 'BLUE')) test_obj.set_status('Working') # Increase block size if necessary @@ -80,7 +80,7 @@ def run_scan(test_obj, log_path, test_mode=False) -> None: with open(log_path, 'a', encoding='utf-8') as _f: size_str = bytes_to_string(dev.size, use_binary=False) _f.write( - ui.color_string( + ansi.color_string( ['[', dev.path.name, ' ', size_str, ']\n'], [None, 'BLUE', None, 'CYAN', None], sep='', diff --git a/scripts/wk/hw/system.py b/scripts/wk/hw/system.py index b82f9433..949adb38 100644 --- a/scripts/wk/hw/system.py +++ b/scripts/wk/hw/system.py @@ -8,12 +8,12 @@ import re from dataclasses import dataclass, field from typing import Any +from wk import ansi from wk.cfg.hw import KNOWN_RAM_VENDOR_IDS from wk.cfg.python import DATACLASS_DECORATOR_KWARGS from wk.exe import get_json_from_command, run_program from wk.hw.test import Test from wk.std import PLATFORM, bytes_to_string, string_to_bytes -from wk.ui import cli as ui # STATIC VARIABLES @@ -37,11 +37,11 @@ class System: def generate_report(self) -> list[str]: """Generate CPU & RAM report, returns list.""" report = [] - report.append(ui.color_string('Device', 'BLUE')) + report.append(ansi.color_string('Device', 'BLUE')) report.append(f' {self.cpu_description}') # Include RAM details - report.append(ui.color_string('RAM', 'BLUE')) + report.append(ansi.color_string('RAM', 'BLUE')) report.append(f' {self.ram_total} ({", ".join(self.ram_dimms)})') # Tests diff --git a/scripts/wk/os/linux.py b/scripts/wk/os/linux.py index 991dd454..a4c82002 100644 --- a/scripts/wk/os/linux.py +++ b/scripts/wk/os/linux.py @@ -7,11 +7,11 @@ import pathlib import re import subprocess +from wk import ansi from wk.cfg.hw import VOLUME_FAILURE_THRESHOLD, VOLUME_WARNING_THRESHOLD from wk.exe import get_json_from_command, popen_program, run_program from wk.log import format_log_path from wk.std import bytes_to_string -from wk.ui import cli as ui # STATIC VARIABLES @@ -83,20 +83,20 @@ def build_volume_report(device_path=None) -> list: vol['mountpoint'] = f'Mounted on {vol["mountpoint"]}' # Name and size - line = ui.color_string( + line = ansi.color_string( [f'{vol["name"]:<20}', f'{vol["size"]:>9}'], [None, 'CYAN'], ) # Mountpoint and type - line = ui.color_string( + line = ansi.color_string( [line, f'{vol["mountpoint"]:<{m_width}}', f'{vol["fstype"]:<11}'], [None, None, 'BLUE'], ) # Used and free if any([vol['fsused'], vol['fsavail']]): - line = ui.color_string( + line = ansi.color_string( [line, f'({vol["fsused"]:>9} used, {vol["fsavail"]:>9} free)'], [None, size_color], ) diff --git a/scripts/wk/os/win.py b/scripts/wk/os/win.py index 852f6065..3b2b9d93 100644 --- a/scripts/wk/os/win.py +++ b/scripts/wk/os/win.py @@ -17,6 +17,7 @@ except ImportError as err: if platform.system() == 'Windows': raise err +from wk import ansi from wk.borrowed import acpi from wk.cfg.main import KIT_NAME_FULL from wk.cfg.windows_builds import ( @@ -182,7 +183,7 @@ def check_4k_alignment(show_alert=False): continue if int(match.group('offset')) % 4096 != 0: report.append( - ui.color_string( + ansi.color_string( f'{match.group("description")}' f' ({bytes_to_string(match.group("size"), decimals=1)})' , @@ -198,7 +199,7 @@ def check_4k_alignment(show_alert=False): if report: report.insert( 0, - ui.color_string('One or more partitions not 4K aligned', 'YELLOW'), + ansi.color_string('One or more partitions not 4K aligned', 'YELLOW'), ) return report @@ -250,13 +251,13 @@ def get_installed_antivirus(): state = proc.stdout.split('=')[1] state = hex(int(state)) if str(state)[3:5] not in ['10', '11']: - report.append(ui.color_string(f'[Disabled] {product}', 'YELLOW')) + report.append(ansi.color_string(f'[Disabled] {product}', 'YELLOW')) else: report.append(product) # Final check if not report: - report.append(ui.color_string('No products detected', 'RED')) + report.append(ansi.color_string('No products detected', 'RED')) # Done return report @@ -363,7 +364,7 @@ def get_volume_usage(use_colors=False): f' ({bytes_to_string(free, 2):>10} / {bytes_to_string(total, 2):>10})' ) if use_colors: - display_str = ui.color_string(display_str, color) + display_str = ansi.color_string(display_str, color) report.append(f'{disk.device} {display_str}') # Done diff --git a/scripts/wk/repairs/win.py b/scripts/wk/repairs/win.py index d94f6e3d..5ae419ed 100644 --- a/scripts/wk/repairs/win.py +++ b/scripts/wk/repairs/win.py @@ -11,6 +11,7 @@ import time from subprocess import CalledProcessError, DEVNULL from xml.dom.minidom import parse as xml_parse +from wk import ansi from wk.cfg.main import KIT_NAME_FULL, KIT_NAME_SHORT, WINDOWS_TIME_ZONE from wk.cfg.repairs import ( AUTO_REPAIR_DELAY_IN_SECONDS, @@ -104,7 +105,7 @@ for error in ('CalledProcessError', 'FileNotFoundError'): def build_menus(base_menus, title, presets): """Build menus, returns dict.""" menus = {} - menus['Main'] = ui.Menu(title=f'{title}\n{ui.color_string("Main Menu", "GREEN")}') + menus['Main'] = ui.Menu(title=f'{title}\n{ansi.color_string("Main Menu", "GREEN")}') # Main Menu for entry in base_menus['Actions']: @@ -113,7 +114,7 @@ def build_menus(base_menus, title, presets): menus['Main'].add_option(group, {'Selected': True}) # Options - menus['Options'] = ui.Menu(title=f'{title}\n{ui.color_string("Options", "GREEN")}') + menus['Options'] = ui.Menu(title=f'{title}\n{ansi.color_string("Options", "GREEN")}') for entry in base_menus['Options']: menus['Options'].add_option(entry.name, entry.details) menus['Options'].add_action('All') @@ -123,7 +124,7 @@ def build_menus(base_menus, title, presets): # Run groups for group, entries in base_menus['Groups'].items(): - menus[group] = ui.Menu(title=f'{title}\n{ui.color_string(group, "GREEN")}') + menus[group] = ui.Menu(title=f'{title}\n{ansi.color_string(group, "GREEN")}') menus[group].disabled_str = 'Locked' for entry in entries: menus[group].add_option(entry.name, entry.details) @@ -155,7 +156,7 @@ def build_menus(base_menus, title, presets): ) # Update presets Menu - MENU_PRESETS.title = f'{title}\n{ui.color_string("Load Preset", "GREEN")}' + MENU_PRESETS.title = f'{title}\n{ansi.color_string("Load Preset", "GREEN")}' MENU_PRESETS.add_option('Default') for name in presets: MENU_PRESETS.add_option(name) @@ -380,7 +381,7 @@ def load_settings(menus): if group == 'Main': continue for name in menu.options: - menu.options[name].update(get_entry_settings(group, ui.strip_colors(name))) + menu.options[name].update(get_entry_settings(group, ansi.strip_colors(name))) def run_auto_repairs(base_menus, presets): @@ -446,7 +447,7 @@ def run_group(group, menu): """Run entries in group if appropriate.""" ui.print_info(f' {group}') for name, details in menu.options.items(): - name_str = ui.strip_colors(name) + name_str = ansi.strip_colors(name) skipped = details.get('Skipped', False) done = details.get('Done', False) disabled = details.get('Disabled', False) @@ -501,7 +502,7 @@ def save_selection_settings(menus): def save_settings(group, name, result=None, **kwargs): """Save entry settings in the registry.""" - key_path = fr'{AUTO_REPAIR_KEY}\{group}\{ui.strip_colors(name)}' + key_path = fr'{AUTO_REPAIR_KEY}\{group}\{ansi.strip_colors(name)}' # Get values from TryAndPrint result if result: @@ -515,7 +516,7 @@ def save_settings(group, name, result=None, **kwargs): # Write values to registry for value_name, data in kwargs.items(): - value_name = ui.strip_colors(value_name) + value_name = ansi.strip_colors(value_name) if isinstance(data, bool): data = 1 if data else 0 if isinstance(data, int): diff --git a/scripts/wk/setup/win.py b/scripts/wk/setup/win.py index 22d8224f..96157af3 100644 --- a/scripts/wk/setup/win.py +++ b/scripts/wk/setup/win.py @@ -8,6 +8,7 @@ import os import re import sys +from wk import ansi from wk.cfg.main import KIT_NAME_FULL from wk.cfg.setup import ( BROWSER_PATHS, @@ -101,7 +102,7 @@ for error in ('CalledProcessError', 'FileNotFoundError'): def build_menus(base_menus, title, presets): """Build menus, returns dict.""" menus = {} - menus['Main'] = ui.Menu(title=f'{title}\n{ui.color_string("Main Menu", "GREEN")}') + menus['Main'] = ui.Menu(title=f'{title}\n{ansi.color_string("Main Menu", "GREEN")}') # Main Menu for entry in base_menus['Actions']: @@ -111,7 +112,7 @@ def build_menus(base_menus, title, presets): # Run groups for group, entries in base_menus['Groups'].items(): - menus[group] = ui.Menu(title=f'{title}\n{ui.color_string(group, "GREEN")}') + menus[group] = ui.Menu(title=f'{title}\n{ansi.color_string(group, "GREEN")}') for entry in entries: menus[group].add_option(entry.name, entry.details) menus[group].add_action('All') @@ -140,7 +141,7 @@ def build_menus(base_menus, title, presets): ) # Update presets Menu - MENU_PRESETS.title = f'{title}\n{ui.color_string("Load Preset", "GREEN")}' + MENU_PRESETS.title = f'{title}\n{ansi.color_string("Load Preset", "GREEN")}' MENU_PRESETS.add_option('Default') for name in presets: MENU_PRESETS.add_option(name) @@ -176,7 +177,7 @@ def check_os_and_set_menu_title(title): color = 'RED' # Done - return f'{title} ({ui.color_string(os_name, color)})' + return f'{title} ({ansi.color_string(os_name, color)})' def load_preset(menus, presets, title, enable_menu_exit=True): @@ -264,7 +265,7 @@ def run_group(group, menu): """Run entries in group if appropriate.""" ui.print_info(f' {group}') for name, details in menu.options.items(): - name_str = ui.strip_colors(name) + name_str = ansi.strip_colors(name) # Not selected if not details.get('Selected', False): @@ -893,7 +894,7 @@ def get_storage_status(): """Get storage status for fixed disks, returns list.""" report = get_volume_usage(use_colors=True) for disk in get_raw_disks(): - report.append(ui.color_string(f'Uninitialized Disk: {disk}', 'RED')) + report.append(ansi.color_string(f'Uninitialized Disk: {disk}', 'RED')) # Done return report diff --git a/scripts/wk/ui/cli.py b/scripts/wk/ui/cli.py index ecd8965a..6aa5a128 100644 --- a/scripts/wk/ui/cli.py +++ b/scripts/wk/ui/cli.py @@ -1,10 +1,8 @@ """WizardKit: CLI functions""" # vim: sts=2 sw=2 ts=2 -import itertools import logging import os -import pathlib import platform import re import subprocess @@ -19,6 +17,7 @@ except ImportError: # Assuming Python is < 3.9 from functools import lru_cache as cache +from wk.ansi import color_string, strip_colors from wk.cfg.main import ( ENABLED_UPLOAD_DATA, INDENT, @@ -28,19 +27,6 @@ from wk.cfg.main import ( from wk.std import (sleep, GenericWarning) # STATIC VARIABLES -COLORS = { - 'CLEAR': '\033[0m', - 'RED': '\033[31m', - 'RED_BLINK': '\033[31;5m', - 'ORANGE': '\033[31;1m', - 'ORANGE_RED': '\033[1;31;41m', - 'GREEN': '\033[32m', - 'YELLOW': '\033[33m', - 'YELLOW_BLINK': '\033[33;5m', - 'BLUE': '\033[34m', - 'PURPLE': '\033[35m', - 'CYAN': '\033[36m', - } LOG = logging.getLogger(__name__) PLATFORM = platform.system() @@ -646,38 +632,6 @@ def clear_screen(): print('\033c') -def color_string(strings, colors, sep=' '): - """Build colored string using ANSI escapes, returns str.""" - clear_code = COLORS['CLEAR'] - msg = [] - - # Convert to tuples if necessary - if isinstance(strings, (str, pathlib.Path)): - strings = (strings,) - if isinstance(colors, (str, pathlib.Path)): - colors = (colors,) - - # Convert to strings if necessary - try: - iter(strings) - except TypeError: - # Assuming single element passed, convert to string - strings = (str(strings),) - try: - iter(colors) - except TypeError: - # Assuming single element passed, convert to string - colors = (str(colors),) - - # Build new string with color escapes added - for string, color in itertools.zip_longest(strings, colors): - color_code = COLORS.get(color, clear_code) - msg.append(f'{color_code}{string}{clear_code}') - - # Done - return sep.join(msg) - - @cache def get_exception(name): """Get exception by name, returns exception object. @@ -850,13 +804,5 @@ def show_data(message, data, color=None, indent=None, width=None): ) -def strip_colors(string): - """Strip known ANSI color escapes from string, returns str.""" - LOG.debug('string: %s', string) - for color in COLORS.values(): - string = string.replace(color, '') - return string - - if __name__ == '__main__': print("This file is not meant to be called directly.")