Move ANSI color escape sections to their own file

This commit is contained in:
2Shirt 2023-04-02 20:46:54 -07:00
parent 03a143488c
commit 95d7159414
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C
20 changed files with 194 additions and 178 deletions

View file

@ -5,7 +5,7 @@ import wk
# Classes
REBOOT_STR = wk.ui.cli.color_string('Reboot', 'YELLOW')
REBOOT_STR = wk.ansi.color_string('Reboot', 'YELLOW')
class MenuEntry():
"""Simple class to allow cleaner code below."""
def __init__(self, name, function=None, selected=True, **kwargs):

View file

@ -17,7 +17,7 @@ def main():
color = 'RED'
elif 'Already' in line:
color = 'YELLOW'
print(wk.ui.cli.color_string(line, color))
print(wk.ansi.color_string(line, color))
if __name__ == '__main__':

View file

@ -15,7 +15,7 @@ def main():
line = f' {line}'
if 'Not mounted' in line:
color = 'YELLOW'
print(wk.ui.cli.color_string(line, color))
print(wk.ansi.color_string(line, color))
if __name__ == '__main__':

View file

@ -3,6 +3,7 @@
from sys import stderr, version_info
from . import ansi
from . import cfg
from . import clone
from . import debug

67
scripts/wk/ansi.py Normal file
View file

@ -0,0 +1,67 @@
"""WizardKit: ANSI control/escape functions"""
# vim: sts=2 sw=2 ts=2
import itertools
import logging
import pathlib
# STATIC VARIABLES
LOG = logging.getLogger(__name__)
COLORS = {
'CLEAR': '\033[0m',
'RED': '\033[31m',
'RED_BLINK': '\033[31;5m',
'ORANGE': '\033[31;1m',
'ORANGE_RED': '\033[1;31;41m',
'GREEN': '\033[32m',
'YELLOW': '\033[33m',
'YELLOW_BLINK': '\033[33;5m',
'BLUE': '\033[34m',
'PURPLE': '\033[35m',
'CYAN': '\033[36m',
}
# Functions
def color_string(strings, colors, sep=' '):
"""Build colored string using ANSI escapes, returns str."""
clear_code = COLORS['CLEAR']
msg = []
# Convert to tuples if necessary
if isinstance(strings, (str, pathlib.Path)):
strings = (strings,)
if isinstance(colors, (str, pathlib.Path)):
colors = (colors,)
# Convert to strings if necessary
try:
iter(strings)
except TypeError:
# Assuming single element passed, convert to string
strings = (str(strings),)
try:
iter(colors)
except TypeError:
# Assuming single element passed, convert to string
colors = (str(colors),)
# Build new string with color escapes added
for string, color in itertools.zip_longest(strings, colors):
color_code = COLORS.get(color, clear_code)
msg.append(f'{color_code}{string}{clear_code}')
# Done
return sep.join(msg)
def strip_colors(string):
"""Strip known ANSI color escapes from string, returns str."""
LOG.debug('string: %s', string)
for color in COLORS.values():
string = string.replace(color, '')
return string
if __name__ == '__main__':
print("This file is not meant to be called directly.")

View file

@ -20,7 +20,7 @@ from docopt import docopt
import psutil
import pytz
from wk import cfg, debug, exe, io, log, net, std
from wk import ansi, cfg, debug, exe, io, log, net, std
from wk.cfg.ddrescue import (
DDRESCUE_MAP_TEMPLATE,
DDRESCUE_SETTINGS,
@ -91,8 +91,8 @@ REGEX_REMAINING_TIME = re.compile(
LOG = logging.getLogger(__name__)
MENU_ACTIONS = (
'Start',
f'Change settings {ui.color_string("(experts only)", "YELLOW")}',
f'Detect drives {ui.color_string("(experts only)", "YELLOW")}',
f'Change settings {ansi.color_string("(experts only)", "YELLOW")}',
f'Detect drives {ansi.color_string("(experts only)", "YELLOW")}',
'Quit')
MENU_TOGGLES = {
'Auto continue (if recovery % over threshold)': True,
@ -422,7 +422,7 @@ class State():
self.panes['Started'] = tmux.split_window(
lines=cfg.ddrescue.TMUX_SIDE_WIDTH,
target_id=self.panes['Source'],
text=ui.color_string(
text=ansi.color_string(
['Started', time.strftime("%Y-%m-%d %H:%M %Z")],
['BLUE', None],
sep='\n',
@ -568,14 +568,14 @@ class State():
report = []
# Source
report.append(ui.color_string('Source', 'GREEN'))
report.append(ansi.color_string('Source', 'GREEN'))
report.extend(build_object_report(self.source))
report.append(' ')
# Destination
report.append(ui.color_string('Destination', 'GREEN'))
report.append(ansi.color_string('Destination', 'GREEN'))
if self.mode == 'Clone':
report[-1] += ui.color_string(' (ALL DATA WILL BE DELETED)', 'RED')
report[-1] += ansi.color_string(' (ALL DATA WILL BE DELETED)', 'RED')
report.extend(build_object_report(self.destination))
report.append(' ')
@ -583,12 +583,12 @@ class State():
# NOTE: The check for block_pairs is to limit this section
# to the second confirmation
if self.mode == 'Clone' and self.block_pairs:
report.append(ui.color_string('WARNING', 'YELLOW'))
report.append(ansi.color_string('WARNING', 'YELLOW'))
report.append(
'All data will be deleted from the destination listed above.',
)
report.append(
ui.color_string(
ansi.color_string(
['This is irreversible and will lead to', 'DATA LOSS.'],
['YELLOW', 'RED'],
),
@ -607,18 +607,18 @@ class State():
# Map dir
if self.working_dir:
report.append(ui.color_string('Map Save Directory', 'GREEN'))
report.append(ansi.color_string('Map Save Directory', 'GREEN'))
report.append(f'{self.working_dir}/')
report.append(' ')
if not fstype_is_ok(self.working_dir, map_dir=True):
report.append(
ui.color_string(
ansi.color_string(
'Map file(s) are being saved to a non-recommended filesystem.',
'YELLOW',
),
)
report.append(
ui.color_string(
ansi.color_string(
['This is strongly discouraged and may lead to', 'DATA LOSS'],
[None, 'RED'],
),
@ -627,11 +627,11 @@ class State():
# Source part(s) selected
if source_parts:
report.append(ui.color_string('Source Part(s) selected', 'GREEN'))
report.append(ansi.color_string('Source Part(s) selected', 'GREEN'))
if self.source.path.samefile(source_parts[0].path):
report.append('Whole Disk')
else:
report.append(ui.color_string(f'{"NAME":<9} SIZE', 'BLUE'))
report.append(ansi.color_string(f'{"NAME":<9} SIZE', 'BLUE'))
for part in source_parts:
report.append(
f'{part.path.name:<9} '
@ -663,7 +663,7 @@ class State():
error_size = self.get_error_size()
error_size_str = std.bytes_to_string(error_size, decimals=2)
if error_size > 0:
error_size_str = ui.color_string(error_size_str, 'YELLOW')
error_size_str = ansi.color_string(error_size_str, 'YELLOW')
percent = self.get_percent_recovered()
percent = format_status_string(percent, width=0)
report.append(f'Overall rescued: {percent}, error size: {error_size_str}')
@ -675,7 +675,7 @@ class State():
error_size = pair.get_error_size()
error_size_str = std.bytes_to_string(error_size, decimals=2)
if error_size > 0:
error_size_str = ui.color_string(error_size_str, 'YELLOW')
error_size_str = ansi.color_string(error_size_str, 'YELLOW')
pair_size = std.bytes_to_string(pair.size, decimals=2)
percent = pair.get_percent_recovered()
percent = format_status_string(percent, width=0)
@ -1066,10 +1066,10 @@ class State():
width = cfg.ddrescue.TMUX_SIDE_WIDTH
# Status
report.append(ui.color_string(f'{"Status":^{width}}', 'BLUE'))
report.append(ansi.color_string(f'{"Status":^{width}}', 'BLUE'))
if 'NEEDS ATTENTION' in overall_status:
report.append(
ui.color_string(f'{overall_status:^{width}}', 'YELLOW_BLINK'),
ansi.color_string(f'{overall_status:^{width}}', 'YELLOW_BLINK'),
)
else:
report.append(f'{overall_status:^{width}}')
@ -1079,12 +1079,12 @@ class State():
if self.block_pairs:
total_rescued = self.get_rescued_size()
percent = self.get_percent_recovered()
report.append(ui.color_string('Overall Progress', 'BLUE'))
report.append(ansi.color_string('Overall Progress', 'BLUE'))
report.append(
f'Rescued: {format_status_string(percent, width=width-9)}',
)
report.append(
ui.color_string(
ansi.color_string(
[f'{std.bytes_to_string(total_rescued, decimals=2):>{width}}'],
[get_percent_color(percent)],
),
@ -1093,7 +1093,7 @@ class State():
# Block pair progress
for pair in self.block_pairs:
report.append(ui.color_string(pair.source, 'BLUE'))
report.append(ansi.color_string(pair.source, 'BLUE'))
for name, status in pair.status.items():
name = name.title()
report.append(
@ -1105,9 +1105,9 @@ class State():
if overall_status in ('Active', 'NEEDS ATTENTION'):
etoc = get_etoc()
report.append(separator)
report.append(ui.color_string('Estimated Pass Finish', 'BLUE'))
report.append(ansi.color_string('Estimated Pass Finish', 'BLUE'))
if overall_status == 'NEEDS ATTENTION' or etoc == 'N/A':
report.append(ui.color_string('N/A', 'YELLOW'))
report.append(ansi.color_string('N/A', 'YELLOW'))
else:
report.append(etoc)
@ -1168,7 +1168,7 @@ class State():
source_str = _format_string(self.source, width)
tmux.respawn_pane(
self.panes['Source'],
text=ui.color_string(
text=ansi.color_string(
['Source', '' if source_exists else ' (Missing)', '\n', source_str],
['BLUE', 'RED', None, None],
sep='',
@ -1183,7 +1183,7 @@ class State():
percent=50,
vertical=False,
target_id=self.panes['Source'],
text=ui.color_string(
text=ansi.color_string(
['Destination', '' if dest_exists else ' (Missing)', '\n', dest_str],
['BLUE', 'RED', None, None],
sep='',
@ -1197,7 +1197,7 @@ def build_block_pair_report(block_pairs, settings):
report = []
notes = []
if block_pairs:
report.append(ui.color_string('Block Pairs', 'GREEN'))
report.append(ansi.color_string('Block Pairs', 'GREEN'))
else:
# Bail early
return report
@ -1216,7 +1216,7 @@ def build_block_pair_report(block_pairs, settings):
if settings:
if not settings['First Run']:
notes.append(
ui.color_string(
ansi.color_string(
['NOTE:', 'Clone settings loaded from previous run.'],
['BLUE', None],
),
@ -1224,14 +1224,14 @@ def build_block_pair_report(block_pairs, settings):
if settings['Needs Format'] and settings['Table Type']:
msg = f'Destination will be formatted using {settings["Table Type"]}'
notes.append(
ui.color_string(
ansi.color_string(
['NOTE:', msg],
['BLUE', None],
),
)
if any(pair.get_rescued_size() > 0 for pair in block_pairs):
notes.append(
ui.color_string(
ansi.color_string(
['NOTE:', 'Resume data loaded from map file(s).'],
['BLUE', None],
),
@ -1313,12 +1313,12 @@ def build_directory_report(path):
for line in proc.stdout.splitlines():
line = line.replace('\n', '')
if 'FSTYPE' in line:
line = ui.color_string(f'{"PATH":<{width}}{line}', 'BLUE')
line = ansi.color_string(f'{"PATH":<{width}}{line}', 'BLUE')
else:
line = f'{path:<{width}}{line}'
report.append(line)
else:
report.append(ui.color_string('PATH', 'BLUE'))
report.append(ansi.color_string('PATH', 'BLUE'))
report.append(str(path))
# Done
@ -1354,7 +1354,7 @@ def build_disk_report(dev):
# Partition details
report.append(
ui.color_string(
ansi.color_string(
(
f'{"NAME":<{widths["name"]}}'
f'{" " if dev.children else ""}'
@ -1400,7 +1400,7 @@ def build_disk_report(dev):
def build_main_menu():
"""Build main menu, returns wk.ui.cli.Menu."""
menu = ui.Menu(title=ui.color_string('ddrescue TUI: Main Menu', 'GREEN'))
menu = ui.Menu(title=ansi.color_string('ddrescue TUI: Main Menu', 'GREEN'))
menu.separator = ' '
# Add actions, options, etc
@ -1433,9 +1433,9 @@ def build_object_report(obj):
def build_settings_menu(silent=True):
"""Build settings menu, returns wk.ui.cli.Menu."""
title_text = [
ui.color_string('ddrescue TUI: Expert Settings', 'GREEN'),
ansi.color_string('ddrescue TUI: Expert Settings', 'GREEN'),
' ',
ui.color_string(
ansi.color_string(
['These settings can cause', 'MAJOR DAMAGE', 'to drives'],
['YELLOW', 'RED', 'YELLOW'],
),
@ -1580,7 +1580,7 @@ def format_status_string(status, width):
# Add color if necessary
if color:
status_str = ui.color_string(status_str, color)
status_str = ansi.color_string(status_str, color)
# Done
return status_str
@ -1960,7 +1960,7 @@ def main():
# Save results to log
LOG.info('')
for line in state.generate_report():
LOG.info(' %s', ui.strip_colors(line))
LOG.info(' %s', ansi.strip_colors(line))
def mount_raw_image(path):
@ -2084,7 +2084,7 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True):
now = datetime.datetime.now(tz=TIMEZONE).strftime('%Y-%m-%d %H:%M %Z')
with open(f'{state.log_dir}/smart.out', 'w', encoding='utf-8') as _f:
_f.write(
ui.color_string(
ansi.color_string(
['SMART Attributes', f'Updated: {now}\n'],
['BLUE', 'YELLOW'],
sep='\t\t',
@ -2267,7 +2267,7 @@ def select_disk(prompt, skip_disk=None):
ui.print_info('Scanning disks...')
disks = hw_disk.get_disks()
menu = ui.Menu(
title=ui.color_string(f'ddrescue TUI: {prompt} Selection', 'GREEN'),
title=ansi.color_string(f'ddrescue TUI: {prompt} Selection', 'GREEN'),
)
menu.disabled_str = 'Already selected'
menu.separator = ' '
@ -2309,7 +2309,7 @@ def select_disk(prompt, skip_disk=None):
def select_disk_parts(prompt, disk):
"""Select disk parts from list, returns list of Disk()."""
title = ui.color_string('ddrescue TUI: Partition Selection', 'GREEN')
title = ansi.color_string('ddrescue TUI: Partition Selection', 'GREEN')
title += f'\n\nDisk: {disk.path} {disk.description}'
menu = ui.Menu(title)
menu.separator = ' '
@ -2360,7 +2360,7 @@ def select_disk_parts(prompt, disk):
if not menu.options:
menu.add_option(whole_disk_str, {'Selected': True, 'Path': disk.path})
menu.title += '\n\n'
menu.title += ui.color_string(' No partitions detected.', 'YELLOW')
menu.title += ansi.color_string(' No partitions detected.', 'YELLOW')
# Get selection
_select_parts(menu)
@ -2391,7 +2391,7 @@ def select_path(prompt):
"""Select path, returns pathlib.Path."""
invalid = False
menu = ui.Menu(
title=ui.color_string(f'ddrescue TUI: {prompt} Path Selection', 'GREEN'),
title=ansi.color_string(f'ddrescue TUI: {prompt} Path Selection', 'GREEN'),
)
menu.separator = ' '
menu.add_action('Quit')

View file

@ -3,7 +3,7 @@
import logging
from wk.ui import cli as ui
from wk import ansi
# STATIC VARIABLES
@ -52,27 +52,27 @@ def generate_horizontal_graph(rate_list, graph_width=40, oneline=False):
rate_color = 'GREEN'
# Build graph
full_block = ui.color_string((GRAPH_HORIZONTAL[-1],), (rate_color,))
full_block = ansi.color_string((GRAPH_HORIZONTAL[-1],), (rate_color,))
if step >= 24:
graph[0] += ui.color_string((GRAPH_HORIZONTAL[step-24],), (rate_color,))
graph[0] += ansi.color_string((GRAPH_HORIZONTAL[step-24],), (rate_color,))
graph[1] += full_block
graph[2] += full_block
graph[3] += full_block
elif step >= 16:
graph[0] += ' '
graph[1] += ui.color_string((GRAPH_HORIZONTAL[step-16],), (rate_color,))
graph[1] += ansi.color_string((GRAPH_HORIZONTAL[step-16],), (rate_color,))
graph[2] += full_block
graph[3] += full_block
elif step >= 8:
graph[0] += ' '
graph[1] += ' '
graph[2] += ui.color_string((GRAPH_HORIZONTAL[step-8],), (rate_color,))
graph[2] += ansi.color_string((GRAPH_HORIZONTAL[step-8],), (rate_color,))
graph[3] += full_block
else:
graph[0] += ' '
graph[1] += ' '
graph[2] += ' '
graph[3] += ui.color_string((GRAPH_HORIZONTAL[step],), (rate_color,))
graph[3] += ansi.color_string((GRAPH_HORIZONTAL[step],), (rate_color,))
# Done
if oneline:
@ -128,7 +128,7 @@ def vertical_graph_line(percent, rate, scale=32):
color_rate = 'GREEN'
# Build string
line = ui.color_string(
line = ansi.color_string(
strings=(
f'{percent:5.1f}%',
f'{GRAPH_VERTICAL[step]:<4}',

View file

@ -5,7 +5,7 @@ import logging
from subprocess import PIPE, STDOUT
from wk import graph
from wk import ansi, graph
from wk.cfg.hw import (
IO_ALT_TEST_SIZE_FACTOR,
IO_BLOCK_SIZE,
@ -22,7 +22,6 @@ from wk.cfg.hw import (
)
from wk.exe import run_program
from wk.std import PLATFORM
from wk.ui import cli as ui
# STATIC VARIABLES
@ -113,7 +112,7 @@ def check_io_results(test_obj, rate_list, graph_width) -> None:
# Add horizontal graph to report
for line in graph.generate_horizontal_graph(rate_list, graph_width):
if not ui.strip_colors(line).strip():
if not ansi.strip_colors(line).strip():
# Skip empty lines
continue
test_obj.report.append(line)
@ -151,7 +150,7 @@ def run_io_test(test_obj, log_path, test_mode=False) -> None:
LOG.info('Using %s for better performance', dev_path)
offset = 0
read_rates = []
test_obj.report.append(ui.color_string('I/O Benchmark', 'BLUE'))
test_obj.report.append(ansi.color_string('I/O Benchmark', 'BLUE'))
# Get dd values or bail
try:
@ -159,7 +158,7 @@ def run_io_test(test_obj, log_path, test_mode=False) -> None:
except DeviceTooSmallError:
test_obj.set_status('N/A')
test_obj.report.append(
ui.color_string('Disk too small to test', 'YELLOW'),
ansi.color_string('Disk too small to test', 'YELLOW'),
)
return

View file

@ -7,7 +7,7 @@ import subprocess
from typing import TextIO
from wk import exe
from wk import ansi, exe
from wk.cfg.hw import CPU_FAILURE_TEMP
from wk.os.mac import set_fans as macos_set_fans
from wk.std import PLATFORM
@ -93,9 +93,9 @@ def check_mprime_results(test_obj, working_dir) -> None:
for line in passing_lines:
test_obj.report.append(f' {line}')
for line in warning_lines:
test_obj.report.append(ui.color_string(f' {line}', 'YELLOW'))
test_obj.report.append(ansi.color_string(f' {line}', 'YELLOW'))
if not (passing_lines or warning_lines):
test_obj.report.append(ui.color_string(' Unknown result', 'YELLOW'))
test_obj.report.append(ansi.color_string(' Unknown result', 'YELLOW'))
def start_mprime(working_dir, log_path) -> subprocess.Popen:

View file

@ -10,7 +10,7 @@ import time
from docopt import docopt
from wk import cfg, debug, exe, log, std
from wk import ansi, cfg, debug, exe, log, std
from wk.cfg.hw import STATUS_COLORS
from wk.hw import benchmark as hw_benchmark
from wk.hw import cpu as hw_cpu
@ -89,9 +89,9 @@ class State():
self.panes = {}
self.system = None
self.test_groups = []
self.top_text = ui.color_string('Hardware Diagnostics', 'GREEN')
self.top_text = ansi.color_string('Hardware Diagnostics', 'GREEN')
if test_mode:
self.top_text += ui.color_string(' (Test Mode)', 'YELLOW')
self.top_text += ansi.color_string(' (Test Mode)', 'YELLOW')
# Init tmux and start a background process to maintain layout
self.init_tmux()
@ -229,7 +229,7 @@ class State():
self.panes['Started'] = tmux.split_window(
lines=cfg.hw.TMUX_SIDE_WIDTH,
target_id=self.panes['Top'],
text=ui.color_string(
text=ansi.color_string(
['Started', time.strftime("%Y-%m-%d %H:%M %Z")],
['BLUE', None],
sep='\n',
@ -292,7 +292,7 @@ class State():
"""Update 'Started' pane following clock sync."""
tmux.respawn_pane(
pane_id=self.panes['Started'],
text=ui.color_string(
text=ansi.color_string(
['Started', time.strftime("%Y-%m-%d %H:%M %Z")],
['BLUE', None],
sep='\n',
@ -305,9 +305,9 @@ class State():
width = cfg.hw.TMUX_SIDE_WIDTH
for group in self.test_groups:
report.append(ui.color_string(group.name, 'BLUE'))
report.append(ansi.color_string(group.name, 'BLUE'))
for test in group.test_objects:
report.append(ui.color_string(
report.append(ansi.color_string(
[test.label, f'{test.status:>{width-len(test.label)}}'],
[None, STATUS_COLORS.get(test.status, None)],
sep='',
@ -453,7 +453,7 @@ def cpu_stress_tests(state, test_objects, test_mode=False) -> None:
sensors.save_average_temps(temp_label='Cooldown', seconds=5)
# Check Prime95 results
test_mprime_obj.report.append(ui.color_string('Prime95', 'BLUE'))
test_mprime_obj.report.append(ansi.color_string('Prime95', 'BLUE'))
hw_cpu.check_mprime_results(
test_obj=test_mprime_obj, working_dir=state.log_dir,
)
@ -493,7 +493,7 @@ def cpu_stress_tests(state, test_objects, test_mode=False) -> None:
state.update_progress_pane()
# Check Cooling results
test_cooling_obj.report.append(ui.color_string('Temps', 'BLUE'))
test_cooling_obj.report.append(ansi.color_string('Temps', 'BLUE'))
hw_cpu.check_cooling_results(test_cooling_obj, sensors, run_sysbench)
# Cleanup
@ -566,12 +566,12 @@ def disk_io_benchmark(
# Something went wrong
LOG.error('%s', err)
test.set_status('ERROR')
test.report.append(ui.color_string(' Unknown Error', 'RED'))
test.report.append(ansi.color_string(' Unknown Error', 'RED'))
# Mark test(s) aborted if necessary
if aborted:
test.set_status('Aborted')
test.report.append(ui.color_string(' Aborted', 'YELLOW'))
test.report.append(ansi.color_string(' Aborted', 'YELLOW'))
break
# Update progress after each test
@ -734,7 +734,7 @@ def disk_surface_scan(state, test_objects, test_mode=False) -> None:
for test in test_objects:
if not (test.disabled or test.passed or test.failed):
test.set_status('Aborted')
test.report.append(ui.color_string(' Aborted', 'YELLOW'))
test.report.append(ansi.color_string(' Aborted', 'YELLOW'))
# Cleanup
state.update_progress_pane()

View file

@ -11,6 +11,7 @@ import re
from dataclasses import dataclass, field
from typing import Any, Union
from wk import ansi
from wk.cfg.main import KIT_NAME_SHORT
from wk.cfg.python import DATACLASS_DECORATOR_KWARGS
from wk.exe import get_json_from_command, run_program
@ -20,7 +21,6 @@ from wk.hw.smart import (
get_known_disk_attributes,
)
from wk.std import PLATFORM
from wk.ui import cli as ui
# STATIC VARIABLES
@ -74,7 +74,7 @@ class Disk:
def add_note(self, note, color=None) -> None:
"""Add note that will be included in the disk report."""
if color:
note = ui.color_string(note, color)
note = ansi.color_string(note, color)
if note not in self.notes:
self.notes.append(note)
self.notes.sort()
@ -83,7 +83,7 @@ class Disk:
"""Check if note is already present."""
present = False
for note in self.notes:
if note_str == ui.strip_colors(note):
if note_str == ansi.strip_colors(note):
present = True
return present
@ -99,18 +99,18 @@ class Disk:
"""Generate Disk report, returns list."""
report = []
if header:
report.append(ui.color_string(f'Device ({self.path.name})', 'BLUE'))
report.append(ansi.color_string(f'Device ({self.path.name})', 'BLUE'))
report.append(f' {self.description}')
# Attributes
if self.attributes:
if header:
report.append(ui.color_string('Attributes', 'BLUE'))
report.append(ansi.color_string('Attributes', 'BLUE'))
report.extend(generate_attribute_report(self))
# Notes
if self.notes:
report.append(ui.color_string('Notes', 'BLUE'))
report.append(ansi.color_string('Notes', 'BLUE'))
for note in self.notes:
report.append(f' {note}')

View file

@ -9,11 +9,11 @@ import re
from subprocess import CalledProcessError
from typing import Any
from wk import ansi
from wk.cfg.hw import CPU_CRITICAL_TEMP, SMC_IDS, TEMP_COLORS
from wk.exe import run_program, start_thread
from wk.io import non_clobber_path
from wk.std import PLATFORM, sleep
from wk.ui import cli as ui
# STATIC VARIABLES
@ -110,7 +110,7 @@ class Sensors():
# Handle empty reports
if not report:
report = [
ui.color_string('WARNING: No sensors found', 'YELLOW'),
ansi.color_string('WARNING: No sensors found', 'YELLOW'),
'',
'Please monitor temps manually',
]
@ -426,7 +426,7 @@ def get_temp_str(temp, colored=True) -> str:
temp = float(temp)
except (TypeError, ValueError):
# Invalid temp?
return ui.color_string(temp, 'PURPLE')
return ansi.color_string(temp, 'PURPLE')
# Determine color
if colored:
@ -436,7 +436,7 @@ def get_temp_str(temp, colored=True) -> str:
break
# Done
return ui.color_string(f'{"-" if temp < 0 else ""}{temp:2.0f}°C', temp_color)
return ansi.color_string(f'{"-" if temp < 0 else ""}{temp:2.0f}°C', temp_color)

View file

@ -7,6 +7,7 @@ import re
from typing import Any
from wk import ansi
from wk.cfg.hw import (
ATTRIBUTE_COLORS,
KEY_NVME,
@ -19,7 +20,6 @@ from wk.cfg.hw import (
)
from wk.exe import get_json_from_command, run_program
from wk.std import bytes_to_string, sleep
from wk.ui import cli as ui
# STATIC VARIABLES
@ -41,26 +41,26 @@ def build_self_test_report(test_obj, aborted=False) -> None:
For instance if the test was aborted the report should include the
last known progress instead of just "was aborted by host."
"""
report = [ui.color_string('Self-Test', 'BLUE')]
report = [ansi.color_string('Self-Test', 'BLUE')]
test_details = get_smart_self_test_details(test_obj.dev)
test_result = test_details.get('status', {}).get('string', 'Unknown')
# Build report
if test_obj.disabled or test_obj.status == 'Denied':
report.append(ui.color_string(f' {test_obj.status}', 'RED'))
report.append(ansi.color_string(f' {test_obj.status}', 'RED'))
elif test_obj.status == 'N/A' or not test_obj.dev.attributes:
report.append(ui.color_string(f' {test_obj.status}', 'YELLOW'))
report.append(ansi.color_string(f' {test_obj.status}', 'YELLOW'))
elif test_obj.status == 'TestInProgress':
report.append(ui.color_string(' Failed to stop previous test', 'RED'))
report.append(ansi.color_string(' Failed to stop previous test', 'RED'))
test_obj.set_status('Failed')
else:
# Other cases include self-test result string
report.append(f' {test_result.capitalize()}')
if aborted and not (test_obj.passed or test_obj.failed):
report.append(ui.color_string(' Aborted', 'YELLOW'))
report.append(ansi.color_string(' Aborted', 'YELLOW'))
test_obj.set_status('Aborted')
elif test_obj.status == 'TimedOut':
report.append(ui.color_string(' TimedOut', 'YELLOW'))
report.append(ansi.color_string(' TimedOut', 'YELLOW'))
# Done
test_obj.report.extend(report)
@ -137,7 +137,7 @@ def generate_attribute_report(dev, only_failed=False) -> list[str]:
continue
# Build colored string and append to report
line = ui.color_string(
line = ansi.color_string(
[label, get_attribute_value_string(dev, attr), note],
[None, value_color, 'YELLOW'],
)
@ -299,7 +299,7 @@ def run_smart_self_test(test_obj, log_path) -> bool:
finished = False
test_details = get_smart_self_test_details(test_obj.dev)
size_str = bytes_to_string(test_obj.dev.size, use_binary=False)
header_str = ui.color_string(
header_str = ansi.color_string(
['[', test_obj.dev.path.name, ' ', size_str, ']'],
[None, 'BLUE', None, 'CYAN', None],
sep='',

View file

@ -5,6 +5,7 @@ import logging
from subprocess import STDOUT
from wk import ansi
from wk.cfg.hw import (
BADBLOCKS_EXTRA_LARGE_DISK,
BADBLOCKS_LARGE_DISK,
@ -15,7 +16,6 @@ from wk.cfg.hw import (
)
from wk.exe import run_program
from wk.std import PLATFORM, bytes_to_string
from wk.ui import cli as ui
# STATIC VARIABLES
@ -27,7 +27,7 @@ def check_surface_scan_results(test_obj, log_path) -> None:
"""Check results and set test status."""
with open(log_path, 'r', encoding='utf-8') as _f:
for line in _f.readlines():
line = ui.strip_colors(line.strip())
line = ansi.strip_colors(line.strip())
if not line or BADBLOCKS_SKIP_REGEX.match(line):
# Skip
continue
@ -44,10 +44,10 @@ def check_surface_scan_results(test_obj, log_path) -> None:
test_obj.set_status('Passed')
else:
test_obj.failed = True
test_obj.report.append(f' {ui.color_string(line, "YELLOW")}')
test_obj.report.append(f' {ansi.color_string(line, "YELLOW")}')
test_obj.set_status('Failed')
else:
test_obj.report.append(f' {ui.color_string(line, "YELLOW")}')
test_obj.report.append(f' {ansi.color_string(line, "YELLOW")}')
if not (test_obj.passed or test_obj.failed):
test_obj.set_status('Unknown')
@ -61,7 +61,7 @@ def run_scan(test_obj, log_path, test_mode=False) -> None:
# Use "RAW" disks under macOS
dev_path = dev_path.with_name(f'r{dev_path.name}')
LOG.info('Using %s for better performance', dev_path)
test_obj.report.append(ui.color_string('badblocks', 'BLUE'))
test_obj.report.append(ansi.color_string('badblocks', 'BLUE'))
test_obj.set_status('Working')
# Increase block size if necessary
@ -80,7 +80,7 @@ def run_scan(test_obj, log_path, test_mode=False) -> None:
with open(log_path, 'a', encoding='utf-8') as _f:
size_str = bytes_to_string(dev.size, use_binary=False)
_f.write(
ui.color_string(
ansi.color_string(
['[', dev.path.name, ' ', size_str, ']\n'],
[None, 'BLUE', None, 'CYAN', None],
sep='',

View file

@ -8,12 +8,12 @@ import re
from dataclasses import dataclass, field
from typing import Any
from wk import ansi
from wk.cfg.hw import KNOWN_RAM_VENDOR_IDS
from wk.cfg.python import DATACLASS_DECORATOR_KWARGS
from wk.exe import get_json_from_command, run_program
from wk.hw.test import Test
from wk.std import PLATFORM, bytes_to_string, string_to_bytes
from wk.ui import cli as ui
# STATIC VARIABLES
@ -37,11 +37,11 @@ class System:
def generate_report(self) -> list[str]:
"""Generate CPU & RAM report, returns list."""
report = []
report.append(ui.color_string('Device', 'BLUE'))
report.append(ansi.color_string('Device', 'BLUE'))
report.append(f' {self.cpu_description}')
# Include RAM details
report.append(ui.color_string('RAM', 'BLUE'))
report.append(ansi.color_string('RAM', 'BLUE'))
report.append(f' {self.ram_total} ({", ".join(self.ram_dimms)})')
# Tests

View file

@ -7,11 +7,11 @@ import pathlib
import re
import subprocess
from wk import ansi
from wk.cfg.hw import VOLUME_FAILURE_THRESHOLD, VOLUME_WARNING_THRESHOLD
from wk.exe import get_json_from_command, popen_program, run_program
from wk.log import format_log_path
from wk.std import bytes_to_string
from wk.ui import cli as ui
# STATIC VARIABLES
@ -83,20 +83,20 @@ def build_volume_report(device_path=None) -> list:
vol['mountpoint'] = f'Mounted on {vol["mountpoint"]}'
# Name and size
line = ui.color_string(
line = ansi.color_string(
[f'{vol["name"]:<20}', f'{vol["size"]:>9}'],
[None, 'CYAN'],
)
# Mountpoint and type
line = ui.color_string(
line = ansi.color_string(
[line, f'{vol["mountpoint"]:<{m_width}}', f'{vol["fstype"]:<11}'],
[None, None, 'BLUE'],
)
# Used and free
if any([vol['fsused'], vol['fsavail']]):
line = ui.color_string(
line = ansi.color_string(
[line, f'({vol["fsused"]:>9} used, {vol["fsavail"]:>9} free)'],
[None, size_color],
)

View file

@ -17,6 +17,7 @@ except ImportError as err:
if platform.system() == 'Windows':
raise err
from wk import ansi
from wk.borrowed import acpi
from wk.cfg.main import KIT_NAME_FULL
from wk.cfg.windows_builds import (
@ -182,7 +183,7 @@ def check_4k_alignment(show_alert=False):
continue
if int(match.group('offset')) % 4096 != 0:
report.append(
ui.color_string(
ansi.color_string(
f'{match.group("description")}'
f' ({bytes_to_string(match.group("size"), decimals=1)})'
,
@ -198,7 +199,7 @@ def check_4k_alignment(show_alert=False):
if report:
report.insert(
0,
ui.color_string('One or more partitions not 4K aligned', 'YELLOW'),
ansi.color_string('One or more partitions not 4K aligned', 'YELLOW'),
)
return report
@ -250,13 +251,13 @@ def get_installed_antivirus():
state = proc.stdout.split('=')[1]
state = hex(int(state))
if str(state)[3:5] not in ['10', '11']:
report.append(ui.color_string(f'[Disabled] {product}', 'YELLOW'))
report.append(ansi.color_string(f'[Disabled] {product}', 'YELLOW'))
else:
report.append(product)
# Final check
if not report:
report.append(ui.color_string('No products detected', 'RED'))
report.append(ansi.color_string('No products detected', 'RED'))
# Done
return report
@ -363,7 +364,7 @@ def get_volume_usage(use_colors=False):
f' ({bytes_to_string(free, 2):>10} / {bytes_to_string(total, 2):>10})'
)
if use_colors:
display_str = ui.color_string(display_str, color)
display_str = ansi.color_string(display_str, color)
report.append(f'{disk.device} {display_str}')
# Done

View file

@ -11,6 +11,7 @@ import time
from subprocess import CalledProcessError, DEVNULL
from xml.dom.minidom import parse as xml_parse
from wk import ansi
from wk.cfg.main import KIT_NAME_FULL, KIT_NAME_SHORT, WINDOWS_TIME_ZONE
from wk.cfg.repairs import (
AUTO_REPAIR_DELAY_IN_SECONDS,
@ -104,7 +105,7 @@ for error in ('CalledProcessError', 'FileNotFoundError'):
def build_menus(base_menus, title, presets):
"""Build menus, returns dict."""
menus = {}
menus['Main'] = ui.Menu(title=f'{title}\n{ui.color_string("Main Menu", "GREEN")}')
menus['Main'] = ui.Menu(title=f'{title}\n{ansi.color_string("Main Menu", "GREEN")}')
# Main Menu
for entry in base_menus['Actions']:
@ -113,7 +114,7 @@ def build_menus(base_menus, title, presets):
menus['Main'].add_option(group, {'Selected': True})
# Options
menus['Options'] = ui.Menu(title=f'{title}\n{ui.color_string("Options", "GREEN")}')
menus['Options'] = ui.Menu(title=f'{title}\n{ansi.color_string("Options", "GREEN")}')
for entry in base_menus['Options']:
menus['Options'].add_option(entry.name, entry.details)
menus['Options'].add_action('All')
@ -123,7 +124,7 @@ def build_menus(base_menus, title, presets):
# Run groups
for group, entries in base_menus['Groups'].items():
menus[group] = ui.Menu(title=f'{title}\n{ui.color_string(group, "GREEN")}')
menus[group] = ui.Menu(title=f'{title}\n{ansi.color_string(group, "GREEN")}')
menus[group].disabled_str = 'Locked'
for entry in entries:
menus[group].add_option(entry.name, entry.details)
@ -155,7 +156,7 @@ def build_menus(base_menus, title, presets):
)
# Update presets Menu
MENU_PRESETS.title = f'{title}\n{ui.color_string("Load Preset", "GREEN")}'
MENU_PRESETS.title = f'{title}\n{ansi.color_string("Load Preset", "GREEN")}'
MENU_PRESETS.add_option('Default')
for name in presets:
MENU_PRESETS.add_option(name)
@ -380,7 +381,7 @@ def load_settings(menus):
if group == 'Main':
continue
for name in menu.options:
menu.options[name].update(get_entry_settings(group, ui.strip_colors(name)))
menu.options[name].update(get_entry_settings(group, ansi.strip_colors(name)))
def run_auto_repairs(base_menus, presets):
@ -446,7 +447,7 @@ def run_group(group, menu):
"""Run entries in group if appropriate."""
ui.print_info(f' {group}')
for name, details in menu.options.items():
name_str = ui.strip_colors(name)
name_str = ansi.strip_colors(name)
skipped = details.get('Skipped', False)
done = details.get('Done', False)
disabled = details.get('Disabled', False)
@ -501,7 +502,7 @@ def save_selection_settings(menus):
def save_settings(group, name, result=None, **kwargs):
"""Save entry settings in the registry."""
key_path = fr'{AUTO_REPAIR_KEY}\{group}\{ui.strip_colors(name)}'
key_path = fr'{AUTO_REPAIR_KEY}\{group}\{ansi.strip_colors(name)}'
# Get values from TryAndPrint result
if result:
@ -515,7 +516,7 @@ def save_settings(group, name, result=None, **kwargs):
# Write values to registry
for value_name, data in kwargs.items():
value_name = ui.strip_colors(value_name)
value_name = ansi.strip_colors(value_name)
if isinstance(data, bool):
data = 1 if data else 0
if isinstance(data, int):

View file

@ -8,6 +8,7 @@ import os
import re
import sys
from wk import ansi
from wk.cfg.main import KIT_NAME_FULL
from wk.cfg.setup import (
BROWSER_PATHS,
@ -101,7 +102,7 @@ for error in ('CalledProcessError', 'FileNotFoundError'):
def build_menus(base_menus, title, presets):
"""Build menus, returns dict."""
menus = {}
menus['Main'] = ui.Menu(title=f'{title}\n{ui.color_string("Main Menu", "GREEN")}')
menus['Main'] = ui.Menu(title=f'{title}\n{ansi.color_string("Main Menu", "GREEN")}')
# Main Menu
for entry in base_menus['Actions']:
@ -111,7 +112,7 @@ def build_menus(base_menus, title, presets):
# Run groups
for group, entries in base_menus['Groups'].items():
menus[group] = ui.Menu(title=f'{title}\n{ui.color_string(group, "GREEN")}')
menus[group] = ui.Menu(title=f'{title}\n{ansi.color_string(group, "GREEN")}')
for entry in entries:
menus[group].add_option(entry.name, entry.details)
menus[group].add_action('All')
@ -140,7 +141,7 @@ def build_menus(base_menus, title, presets):
)
# Update presets Menu
MENU_PRESETS.title = f'{title}\n{ui.color_string("Load Preset", "GREEN")}'
MENU_PRESETS.title = f'{title}\n{ansi.color_string("Load Preset", "GREEN")}'
MENU_PRESETS.add_option('Default')
for name in presets:
MENU_PRESETS.add_option(name)
@ -176,7 +177,7 @@ def check_os_and_set_menu_title(title):
color = 'RED'
# Done
return f'{title} ({ui.color_string(os_name, color)})'
return f'{title} ({ansi.color_string(os_name, color)})'
def load_preset(menus, presets, title, enable_menu_exit=True):
@ -264,7 +265,7 @@ def run_group(group, menu):
"""Run entries in group if appropriate."""
ui.print_info(f' {group}')
for name, details in menu.options.items():
name_str = ui.strip_colors(name)
name_str = ansi.strip_colors(name)
# Not selected
if not details.get('Selected', False):
@ -893,7 +894,7 @@ def get_storage_status():
"""Get storage status for fixed disks, returns list."""
report = get_volume_usage(use_colors=True)
for disk in get_raw_disks():
report.append(ui.color_string(f'Uninitialized Disk: {disk}', 'RED'))
report.append(ansi.color_string(f'Uninitialized Disk: {disk}', 'RED'))
# Done
return report

View file

@ -1,10 +1,8 @@
"""WizardKit: CLI functions"""
# vim: sts=2 sw=2 ts=2
import itertools
import logging
import os
import pathlib
import platform
import re
import subprocess
@ -19,6 +17,7 @@ except ImportError:
# Assuming Python is < 3.9
from functools import lru_cache as cache
from wk.ansi import color_string, strip_colors
from wk.cfg.main import (
ENABLED_UPLOAD_DATA,
INDENT,
@ -28,19 +27,6 @@ from wk.cfg.main import (
from wk.std import (sleep, GenericWarning)
# STATIC VARIABLES
COLORS = {
'CLEAR': '\033[0m',
'RED': '\033[31m',
'RED_BLINK': '\033[31;5m',
'ORANGE': '\033[31;1m',
'ORANGE_RED': '\033[1;31;41m',
'GREEN': '\033[32m',
'YELLOW': '\033[33m',
'YELLOW_BLINK': '\033[33;5m',
'BLUE': '\033[34m',
'PURPLE': '\033[35m',
'CYAN': '\033[36m',
}
LOG = logging.getLogger(__name__)
PLATFORM = platform.system()
@ -646,38 +632,6 @@ def clear_screen():
print('\033c')
def color_string(strings, colors, sep=' '):
"""Build colored string using ANSI escapes, returns str."""
clear_code = COLORS['CLEAR']
msg = []
# Convert to tuples if necessary
if isinstance(strings, (str, pathlib.Path)):
strings = (strings,)
if isinstance(colors, (str, pathlib.Path)):
colors = (colors,)
# Convert to strings if necessary
try:
iter(strings)
except TypeError:
# Assuming single element passed, convert to string
strings = (str(strings),)
try:
iter(colors)
except TypeError:
# Assuming single element passed, convert to string
colors = (str(colors),)
# Build new string with color escapes added
for string, color in itertools.zip_longest(strings, colors):
color_code = COLORS.get(color, clear_code)
msg.append(f'{color_code}{string}{clear_code}')
# Done
return sep.join(msg)
@cache
def get_exception(name):
"""Get exception by name, returns exception object.
@ -850,13 +804,5 @@ def show_data(message, data, color=None, indent=None, width=None):
)
def strip_colors(string):
"""Strip known ANSI color escapes from string, returns str."""
LOG.debug('string: %s', string)
for color in COLORS.values():
string = string.replace(color, '')
return string
if __name__ == '__main__':
print("This file is not meant to be called directly.")