Adjust ui imports and calls

This commit is contained in:
2Shirt 2023-04-02 20:12:18 -07:00
parent 6efc970374
commit 03a143488c
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C
19 changed files with 154 additions and 204 deletions

View file

@ -33,7 +33,7 @@ from wk.hw.smart import (
smart_status_ok,
update_smart_details,
)
from wk.ui import cli as ui # TODO: This is lazy
from wk.ui import cli as ui
from wk.ui import tmux
@ -1446,7 +1446,9 @@ def build_settings_menu(silent=True):
preset = 'Default'
if not silent:
# Ask which preset to use
print(f'Available ddrescue presets: {" / ".join(SETTING_PRESETS)}')
ui.print_standard(
f'Available ddrescue presets: {" / ".join(SETTING_PRESETS)}'
)
preset = ui.choice(SETTING_PRESETS, 'Please select a preset:')
# Fix selection
@ -2161,10 +2163,10 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True):
warning_message = 'Error(s) encountered, see message above'
state.update_top_panes()
if warning_message:
print(' ')
print(' ')
ui.print_standard(' ')
ui.print_standard(' ')
ui.print_error('DDRESCUE PROCESS HALTED')
print(' ')
ui.print_standard(' ')
ui.print_warning(warning_message)
# Needs attention?
@ -2377,7 +2379,7 @@ def select_disk_parts(prompt, disk):
object_list = [disk.path]
# Convert object_list to hw_disk.Disk() objects
print(' ')
ui.print_standard(' ')
ui.print_info('Getting disk/partition details...')
object_list = [hw_disk.Disk(path) for path in object_list]

View file

@ -3,7 +3,7 @@
import logging
from wk.ui.cli import color_string # TODO: This is lazy
from wk.ui import cli as ui
# STATIC VARIABLES
@ -52,27 +52,27 @@ def generate_horizontal_graph(rate_list, graph_width=40, oneline=False):
rate_color = 'GREEN'
# Build graph
full_block = color_string((GRAPH_HORIZONTAL[-1],), (rate_color,))
full_block = ui.color_string((GRAPH_HORIZONTAL[-1],), (rate_color,))
if step >= 24:
graph[0] += color_string((GRAPH_HORIZONTAL[step-24],), (rate_color,))
graph[0] += ui.color_string((GRAPH_HORIZONTAL[step-24],), (rate_color,))
graph[1] += full_block
graph[2] += full_block
graph[3] += full_block
elif step >= 16:
graph[0] += ' '
graph[1] += color_string((GRAPH_HORIZONTAL[step-16],), (rate_color,))
graph[1] += ui.color_string((GRAPH_HORIZONTAL[step-16],), (rate_color,))
graph[2] += full_block
graph[3] += full_block
elif step >= 8:
graph[0] += ' '
graph[1] += ' '
graph[2] += color_string((GRAPH_HORIZONTAL[step-8],), (rate_color,))
graph[2] += ui.color_string((GRAPH_HORIZONTAL[step-8],), (rate_color,))
graph[3] += full_block
else:
graph[0] += ' '
graph[1] += ' '
graph[2] += ' '
graph[3] += color_string((GRAPH_HORIZONTAL[step],), (rate_color,))
graph[3] += ui.color_string((GRAPH_HORIZONTAL[step],), (rate_color,))
# Done
if oneline:
@ -128,7 +128,7 @@ def vertical_graph_line(percent, rate, scale=32):
color_rate = 'GREEN'
# Build string
line = color_string(
line = ui.color_string(
strings=(
f'{percent:5.1f}%',
f'{GRAPH_VERTICAL[step]:<4}',

View file

@ -22,10 +22,7 @@ from wk.cfg.hw import (
)
from wk.exe import run_program
from wk.std import PLATFORM
from wk.ui.cli import ( # TODO: This is lazy
strip_colors,
color_string,
)
from wk.ui import cli as ui
# STATIC VARIABLES
@ -116,7 +113,7 @@ def check_io_results(test_obj, rate_list, graph_width) -> None:
# Add horizontal graph to report
for line in graph.generate_horizontal_graph(rate_list, graph_width):
if not strip_colors(line).strip():
if not ui.strip_colors(line).strip():
# Skip empty lines
continue
test_obj.report.append(line)
@ -154,7 +151,7 @@ def run_io_test(test_obj, log_path, test_mode=False) -> None:
LOG.info('Using %s for better performance', dev_path)
offset = 0
read_rates = []
test_obj.report.append(color_string('I/O Benchmark', 'BLUE'))
test_obj.report.append(ui.color_string('I/O Benchmark', 'BLUE'))
# Get dd values or bail
try:
@ -162,7 +159,7 @@ def run_io_test(test_obj, log_path, test_mode=False) -> None:
except DeviceTooSmallError:
test_obj.set_status('N/A')
test_obj.report.append(
color_string('Disk too small to test', 'YELLOW'),
ui.color_string('Disk too small to test', 'YELLOW'),
)
return

View file

@ -11,12 +11,8 @@ from wk import exe
from wk.cfg.hw import CPU_FAILURE_TEMP
from wk.os.mac import set_fans as macos_set_fans
from wk.std import PLATFORM
from wk.ui.cli import ( # TODO: This is lazy
color_string,
print_error,
print_warning,
)
from wk.ui.tmux import respawn_pane as tmux_respawn_pane
from wk.ui import cli as ui
from wk.ui import tmux
# STATIC VARIABLES
@ -97,9 +93,9 @@ def check_mprime_results(test_obj, working_dir) -> None:
for line in passing_lines:
test_obj.report.append(f' {line}')
for line in warning_lines:
test_obj.report.append(color_string(f' {line}', 'YELLOW'))
test_obj.report.append(ui.color_string(f' {line}', 'YELLOW'))
if not (passing_lines or warning_lines):
test_obj.report.append(color_string(' Unknown result', 'YELLOW'))
test_obj.report.append(ui.color_string(' Unknown result', 'YELLOW'))
def start_mprime(working_dir, log_path) -> subprocess.Popen:
@ -147,7 +143,7 @@ def start_sysbench(sensors, sensors_out, log_path, pane) -> SysbenchType:
)
# Update bottom pane
tmux_respawn_pane(pane, watch_file=log_path, watch_cmd='tail')
tmux.respawn_pane(pane, watch_file=log_path, watch_cmd='tail')
# Start sysbench
filehandle_sysbench = open(
@ -174,9 +170,9 @@ def set_apple_fan_speed(speed) -> None:
except (RuntimeError, ValueError, subprocess.CalledProcessError) as err:
LOG.error('Failed to set fans to %s', speed)
LOG.error('Error: %s', err)
print_error(f'Failed to set fans to {speed}')
ui.print_error(f'Failed to set fans to {speed}')
for line in str(err).splitlines():
print_warning(f' {line.strip()}')
ui.print_warning(f' {line.strip()}')
elif PLATFORM == 'Linux':
cmd = ['apple-fans', speed]
exe.run_program(cmd, check=False)

View file

@ -25,7 +25,7 @@ from wk.hw.network import network_test
from wk.hw.screensavers import screensaver
from wk.hw.test import Test, TestGroup
from wk.ui import cli as ui # TODO: This is lazy
from wk.ui import cli as ui
from wk.ui import tmux

View file

@ -20,7 +20,7 @@ from wk.hw.smart import (
get_known_disk_attributes,
)
from wk.std import PLATFORM
from wk.ui.cli import color_string, strip_colors # TODO: This is lazy
from wk.ui import cli as ui
# STATIC VARIABLES
@ -74,7 +74,7 @@ class Disk:
def add_note(self, note, color=None) -> None:
"""Add note that will be included in the disk report."""
if color:
note = color_string(note, color)
note = ui.color_string(note, color)
if note not in self.notes:
self.notes.append(note)
self.notes.sort()
@ -83,7 +83,7 @@ class Disk:
"""Check if note is already present."""
present = False
for note in self.notes:
if note_str == strip_colors(note):
if note_str == ui.strip_colors(note):
present = True
return present
@ -99,18 +99,18 @@ class Disk:
"""Generate Disk report, returns list."""
report = []
if header:
report.append(color_string(f'Device ({self.path.name})', 'BLUE'))
report.append(ui.color_string(f'Device ({self.path.name})', 'BLUE'))
report.append(f' {self.description}')
# Attributes
if self.attributes:
if header:
report.append(color_string('Attributes', 'BLUE'))
report.append(ui.color_string('Attributes', 'BLUE'))
report.extend(generate_attribute_report(self))
# Notes
if self.notes:
report.append(color_string('Notes', 'BLUE'))
report.append(ui.color_string('Notes', 'BLUE'))
for note in self.notes:
report.append(f' {note}')

View file

@ -5,7 +5,7 @@ import logging
from wk.exe import run_program
from wk.std import PLATFORM
from wk.ui.cli import print_warning # TODO: This is lazy
from wk.ui import cli as ui
# STATIC VARIABLES
@ -18,7 +18,7 @@ def keyboard_test() -> None:
if PLATFORM == 'Linux':
run_xev()
else:
print_warning(f'Not supported under this OS: {PLATFORM}')
ui.print_warning(f'Not supported under this OS: {PLATFORM}')
def run_xev() -> None:

View file

@ -9,12 +9,7 @@ from wk.net import (
show_valid_addresses,
speedtest,
)
from wk.ui.cli import (
# TODO: This is lazy
TryAndPrint,
pause,
print_warning,
)
from wk.ui import cli as ui
# STATIC VARIABLES
@ -25,7 +20,7 @@ LOG = logging.getLogger(__name__)
def network_test() -> None:
"""Run network tests."""
LOG.info('Network Test')
try_and_print = TryAndPrint()
try_and_print = ui.TryAndPrint()
result = try_and_print.run(
message='Network connection...',
function=connected_to_private_network,
@ -35,8 +30,8 @@ def network_test() -> None:
# Bail if not connected
if result['Failed']:
print_warning('Please connect to a network and try again')
pause('Press Enter to return to main menu...')
ui.print_warning('Please connect to a network and try again')
ui.pause('Press Enter to return to main menu...')
return
# Show IP address(es)
@ -52,7 +47,7 @@ def network_test() -> None:
try_and_print.run('Speedtest...', speedtest)
# Done
pause('Press Enter to return to main menu...')
ui.pause('Press Enter to return to main menu...')
if __name__ == '__main__':

View file

@ -13,7 +13,7 @@ from wk.cfg.hw import CPU_CRITICAL_TEMP, SMC_IDS, TEMP_COLORS
from wk.exe import run_program, start_thread
from wk.io import non_clobber_path
from wk.std import PLATFORM, sleep
from wk.ui.cli import color_string # TODO: This is lazy
from wk.ui import cli as ui
# STATIC VARIABLES
@ -110,7 +110,7 @@ class Sensors():
# Handle empty reports
if not report:
report = [
color_string('WARNING: No sensors found', 'YELLOW'),
ui.color_string('WARNING: No sensors found', 'YELLOW'),
'',
'Please monitor temps manually',
]
@ -426,7 +426,7 @@ def get_temp_str(temp, colored=True) -> str:
temp = float(temp)
except (TypeError, ValueError):
# Invalid temp?
return color_string(temp, 'PURPLE')
return ui.color_string(temp, 'PURPLE')
# Determine color
if colored:
@ -436,7 +436,7 @@ def get_temp_str(temp, colored=True) -> str:
break
# Done
return color_string(f'{"-" if temp < 0 else ""}{temp:2.0f}°C', temp_color)
return ui.color_string(f'{"-" if temp < 0 else ""}{temp:2.0f}°C', temp_color)

View file

@ -19,7 +19,7 @@ from wk.cfg.hw import (
)
from wk.exe import get_json_from_command, run_program
from wk.std import bytes_to_string, sleep
from wk.ui.cli import color_string # TODO: This is lazy
from wk.ui import cli as ui
# STATIC VARIABLES
@ -41,26 +41,26 @@ def build_self_test_report(test_obj, aborted=False) -> None:
For instance if the test was aborted the report should include the
last known progress instead of just "was aborted by host."
"""
report = [color_string('Self-Test', 'BLUE')]
report = [ui.color_string('Self-Test', 'BLUE')]
test_details = get_smart_self_test_details(test_obj.dev)
test_result = test_details.get('status', {}).get('string', 'Unknown')
# Build report
if test_obj.disabled or test_obj.status == 'Denied':
report.append(color_string(f' {test_obj.status}', 'RED'))
report.append(ui.color_string(f' {test_obj.status}', 'RED'))
elif test_obj.status == 'N/A' or not test_obj.dev.attributes:
report.append(color_string(f' {test_obj.status}', 'YELLOW'))
report.append(ui.color_string(f' {test_obj.status}', 'YELLOW'))
elif test_obj.status == 'TestInProgress':
report.append(color_string(' Failed to stop previous test', 'RED'))
report.append(ui.color_string(' Failed to stop previous test', 'RED'))
test_obj.set_status('Failed')
else:
# Other cases include self-test result string
report.append(f' {test_result.capitalize()}')
if aborted and not (test_obj.passed or test_obj.failed):
report.append(color_string(' Aborted', 'YELLOW'))
report.append(ui.color_string(' Aborted', 'YELLOW'))
test_obj.set_status('Aborted')
elif test_obj.status == 'TimedOut':
report.append(color_string(' TimedOut', 'YELLOW'))
report.append(ui.color_string(' TimedOut', 'YELLOW'))
# Done
test_obj.report.extend(report)
@ -137,7 +137,7 @@ def generate_attribute_report(dev, only_failed=False) -> list[str]:
continue
# Build colored string and append to report
line = color_string(
line = ui.color_string(
[label, get_attribute_value_string(dev, attr), note],
[None, value_color, 'YELLOW'],
)
@ -299,7 +299,7 @@ def run_smart_self_test(test_obj, log_path) -> bool:
finished = False
test_details = get_smart_self_test_details(test_obj.dev)
size_str = bytes_to_string(test_obj.dev.size, use_binary=False)
header_str = color_string(
header_str = ui.color_string(
['[', test_obj.dev.path.name, ' ', size_str, ']'],
[None, 'BLUE', None, 'CYAN', None],
sep='',

View file

@ -15,7 +15,7 @@ from wk.cfg.hw import (
)
from wk.exe import run_program
from wk.std import PLATFORM, bytes_to_string
from wk.ui.cli import color_string, strip_colors # TODO: This is lazy
from wk.ui import cli as ui
# STATIC VARIABLES
@ -27,7 +27,7 @@ def check_surface_scan_results(test_obj, log_path) -> None:
"""Check results and set test status."""
with open(log_path, 'r', encoding='utf-8') as _f:
for line in _f.readlines():
line = strip_colors(line.strip())
line = ui.strip_colors(line.strip())
if not line or BADBLOCKS_SKIP_REGEX.match(line):
# Skip
continue
@ -44,10 +44,10 @@ def check_surface_scan_results(test_obj, log_path) -> None:
test_obj.set_status('Passed')
else:
test_obj.failed = True
test_obj.report.append(f' {color_string(line, "YELLOW")}')
test_obj.report.append(f' {ui.color_string(line, "YELLOW")}')
test_obj.set_status('Failed')
else:
test_obj.report.append(f' {color_string(line, "YELLOW")}')
test_obj.report.append(f' {ui.color_string(line, "YELLOW")}')
if not (test_obj.passed or test_obj.failed):
test_obj.set_status('Unknown')
@ -61,7 +61,7 @@ def run_scan(test_obj, log_path, test_mode=False) -> None:
# Use "RAW" disks under macOS
dev_path = dev_path.with_name(f'r{dev_path.name}')
LOG.info('Using %s for better performance', dev_path)
test_obj.report.append(color_string('badblocks', 'BLUE'))
test_obj.report.append(ui.color_string('badblocks', 'BLUE'))
test_obj.set_status('Working')
# Increase block size if necessary
@ -80,7 +80,7 @@ def run_scan(test_obj, log_path, test_mode=False) -> None:
with open(log_path, 'a', encoding='utf-8') as _f:
size_str = bytes_to_string(dev.size, use_binary=False)
_f.write(
color_string(
ui.color_string(
['[', dev.path.name, ' ', size_str, ']\n'],
[None, 'BLUE', None, 'CYAN', None],
sep='',

View file

@ -13,7 +13,7 @@ from wk.cfg.python import DATACLASS_DECORATOR_KWARGS
from wk.exe import get_json_from_command, run_program
from wk.hw.test import Test
from wk.std import PLATFORM, bytes_to_string, string_to_bytes
from wk.ui.cli import color_string # TODO: This is lazy
from wk.ui import cli as ui
# STATIC VARIABLES
@ -37,11 +37,11 @@ class System:
def generate_report(self) -> list[str]:
"""Generate CPU & RAM report, returns list."""
report = []
report.append(color_string('Device', 'BLUE'))
report.append(ui.color_string('Device', 'BLUE'))
report.append(f' {self.cpu_description}')
# Include RAM details
report.append(color_string('RAM', 'BLUE'))
report.append(ui.color_string('RAM', 'BLUE'))
report.append(f' {self.ram_total} ({", ".join(self.ram_dimms)})')
# Tests

View file

@ -22,16 +22,7 @@ from wk.kit.tools import (
)
from wk.log import update_log_path
from wk.std import GenericError
from wk.ui.cli import (
# TODO: This is lazy
TryAndPrint,
clear_screen,
pause,
print_info,
print_success,
set_title,
sleep,
)
from wk.ui import cli as ui
# STATIC VARIABLES
@ -246,7 +237,7 @@ def download_libreoffice():
for arch in 32, 64:
out_path = INSTALLERS_DIR.joinpath(f'LibreOffice{arch}.msi')
download_file(out_path, SOURCES[f'LibreOffice{arch}'])
sleep(1)
ui.sleep(1)
def download_neutron():
@ -316,7 +307,7 @@ def download_snappy_driver_installer_origin():
cmd.append('-new_console:n')
cmd.append('-new_console:s33V')
popen_program(cmd, cwd=aria2c.parent)
sleep(1)
ui.sleep(1)
wait_for_procs('aria2c.exe')
else:
run_program(cmd)
@ -460,13 +451,13 @@ def build_kit():
"""Build Kit."""
update_log_path(dest_name='Build Tool', timestamp=True)
title = f'{KIT_NAME_FULL}: Build Tool'
clear_screen()
set_title(title)
print_info(title)
ui.clear_screen()
ui.set_title(title)
ui.print_info(title)
print('')
# Set up TryAndPrint
try_print = TryAndPrint()
try_print = ui.TryAndPrint()
try_print.width = WIDTH
try_print.verbose = True
for error in ('CalledProcessError', 'FileNotFoundError'):
@ -497,15 +488,15 @@ def build_kit():
# Pause
print('', flush=True)
pause('Please review and press Enter to continue...')
ui.pause('Please review and press Enter to continue...')
# Compress .cbin
try_print.run('Compress cbin...', compress_cbin_dirs)
# Generate launcher scripts
print_success('Generating launchers')
ui.print_success('Generating launchers')
for section, launchers in sorted(LAUNCHERS.items()):
print_info(f' {section if section else "(Root)"}')
ui.print_info(f' {section if section else "(Root)"}')
for name, options in sorted(launchers.items()):
try_print.run(
f' {name}...', generate_launcher,
@ -515,7 +506,7 @@ def build_kit():
# Done
print('')
print('Done.')
pause('Press Enter to exit...')
ui.pause('Press Enter to exit...')
if __name__ == '__main__':

View file

@ -23,7 +23,7 @@ from wk.cfg.ufd import (
from wk.exe import get_json_from_command, run_program
from wk.os import linux
from wk.ui import cli as ui # TODO: This is lazy
from wk.ui import cli as ui
# STATIC VARIABLES

View file

@ -11,7 +11,7 @@ from wk.exe import get_json_from_command, run_program
from wk.std import PLATFORM, GenericError
from wk.cfg.net import BACKUP_SERVERS
from wk.ui.cli import show_data # TODO: This is lazy
from wk.ui import cli as ui
# REGEX
@ -200,7 +200,7 @@ def show_valid_addresses():
for family in families:
if REGEX_VALID_IP.search(family.address):
# Valid IP found
show_data(message=dev, data=family.address)
ui.show_data(message=dev, data=family.address)
def speedtest():

View file

@ -11,7 +11,7 @@ from wk.cfg.hw import VOLUME_FAILURE_THRESHOLD, VOLUME_WARNING_THRESHOLD
from wk.exe import get_json_from_command, popen_program, run_program
from wk.log import format_log_path
from wk.std import bytes_to_string
from wk.ui.cli import color_string # TODO: This is lazy
from wk.ui import cli as ui
# STATIC VARIABLES
@ -83,20 +83,20 @@ def build_volume_report(device_path=None) -> list:
vol['mountpoint'] = f'Mounted on {vol["mountpoint"]}'
# Name and size
line = color_string(
line = ui.color_string(
[f'{vol["name"]:<20}', f'{vol["size"]:>9}'],
[None, 'CYAN'],
)
# Mountpoint and type
line = color_string(
line = ui.color_string(
[line, f'{vol["mountpoint"]:<{m_width}}', f'{vol["fstype"]:<11}'],
[None, None, 'BLUE'],
)
# Used and free
if any([vol['fsused'], vol['fsavail']]):
line = color_string(
line = ui.color_string(
[line, f'({vol["fsused"]:>9} used, {vol["fsavail"]:>9} free)'],
[None, size_color],
)

View file

@ -32,7 +32,7 @@ from wk.std import (
bytes_to_string,
sleep,
)
from wk.ui.cli import color_string, input_text # TODO: This is lazy
from wk.ui import cli as ui
# STATIC VARIABLES
@ -182,7 +182,7 @@ def check_4k_alignment(show_alert=False):
continue
if int(match.group('offset')) % 4096 != 0:
report.append(
color_string(
ui.color_string(
f'{match.group("description")}'
f' ({bytes_to_string(match.group("size"), decimals=1)})'
,
@ -198,7 +198,7 @@ def check_4k_alignment(show_alert=False):
if report:
report.insert(
0,
color_string('One or more partitions not 4K aligned', 'YELLOW'),
ui.color_string('One or more partitions not 4K aligned', 'YELLOW'),
)
return report
@ -211,7 +211,7 @@ def export_bitlocker_info():
]
# Get filename
file_name = input_text(prompt='Enter filename', allow_empty_response=False)
file_name = ui.input_text(prompt='Enter filename', allow_empty_response=False)
file_path = pathlib.Path(f'../../Bitlocker_{file_name}.txt').resolve()
# Save info
@ -250,13 +250,13 @@ def get_installed_antivirus():
state = proc.stdout.split('=')[1]
state = hex(int(state))
if str(state)[3:5] not in ['10', '11']:
report.append(color_string(f'[Disabled] {product}', 'YELLOW'))
report.append(ui.color_string(f'[Disabled] {product}', 'YELLOW'))
else:
report.append(product)
# Final check
if not report:
report.append(color_string('No products detected', 'RED'))
report.append(ui.color_string('No products detected', 'RED'))
# Done
return report
@ -363,7 +363,7 @@ def get_volume_usage(use_colors=False):
f' ({bytes_to_string(free, 2):>10} / {bytes_to_string(total, 2):>10})'
)
if use_colors:
display_str = color_string(display_str, color)
display_str = ui.color_string(display_str, color)
report.append(f'{disk.device} {display_str}')
# Done

View file

@ -60,22 +60,7 @@ from wk.std import (
GenericWarning,
sleep,
)
from wk.ui.cli import (
# TODO: This is lazy
Menu,
TryAndPrint,
abort,
ask,
clear_screen,
color_string,
pause,
print_info,
print_standard,
print_warning,
set_title,
show_data,
strip_colors,
)
from wk.ui import cli as ui
# STATIC VARIABLES
@ -90,7 +75,7 @@ GPUPDATE_SUCCESS_STRINGS = (
'User Policy update has completed successfully.',
)
IN_CONEMU = 'ConEmuPID' in os.environ
MENU_PRESETS = Menu()
MENU_PRESETS = ui.Menu()
PROGRAMDATA = os.environ.get('{ALLUSERSPROFILE}', r'C:\ProgramData')
PROGRAMFILES_32 = os.environ.get(
'PROGRAMFILES(X86)', os.environ.get(
@ -108,7 +93,7 @@ WHITELIST = '\n'.join((
fr'{PROGRAMFILES_32}\TeamViewer\tv_x64.exe',
sys.executable,
))
TRY_PRINT = TryAndPrint()
TRY_PRINT = ui.TryAndPrint()
TRY_PRINT.width = WIDTH
TRY_PRINT.verbose = True
for error in ('CalledProcessError', 'FileNotFoundError'):
@ -119,7 +104,7 @@ for error in ('CalledProcessError', 'FileNotFoundError'):
def build_menus(base_menus, title, presets):
"""Build menus, returns dict."""
menus = {}
menus['Main'] = Menu(title=f'{title}\n{color_string("Main Menu", "GREEN")}')
menus['Main'] = ui.Menu(title=f'{title}\n{ui.color_string("Main Menu", "GREEN")}')
# Main Menu
for entry in base_menus['Actions']:
@ -128,7 +113,7 @@ def build_menus(base_menus, title, presets):
menus['Main'].add_option(group, {'Selected': True})
# Options
menus['Options'] = Menu(title=f'{title}\n{color_string("Options", "GREEN")}')
menus['Options'] = ui.Menu(title=f'{title}\n{ui.color_string("Options", "GREEN")}')
for entry in base_menus['Options']:
menus['Options'].add_option(entry.name, entry.details)
menus['Options'].add_action('All')
@ -138,7 +123,7 @@ def build_menus(base_menus, title, presets):
# Run groups
for group, entries in base_menus['Groups'].items():
menus[group] = Menu(title=f'{title}\n{color_string(group, "GREEN")}')
menus[group] = ui.Menu(title=f'{title}\n{ui.color_string(group, "GREEN")}')
menus[group].disabled_str = 'Locked'
for entry in entries:
menus[group].add_option(entry.name, entry.details)
@ -170,7 +155,7 @@ def build_menus(base_menus, title, presets):
)
# Update presets Menu
MENU_PRESETS.title = f'{title}\n{color_string("Load Preset", "GREEN")}'
MENU_PRESETS.title = f'{title}\n{ui.color_string("Load Preset", "GREEN")}'
MENU_PRESETS.add_option('Default')
for name in presets:
MENU_PRESETS.add_option(name)
@ -274,7 +259,7 @@ def init(menus, presets):
# Resume session
load_settings(menus)
print_info('Resuming session, press CTRL+c to cancel')
ui.print_info('Resuming session, press CTRL+c to cancel')
for _x in range(AUTO_REPAIR_DELAY_IN_SECONDS, 0, -1):
print(f' {_x} second{"" if _x==1 else "s"} remaining... \r', end='')
sleep(1)
@ -319,7 +304,7 @@ def init_session(options):
'The timezone is currently set to '
f'{zone}, switch it to {WINDOWS_TIME_ZONE}?'
)
if zone != WINDOWS_TIME_ZONE and ask(msg):
if zone != WINDOWS_TIME_ZONE and ui.ask(msg):
set_timezone(WINDOWS_TIME_ZONE)
# One-time tasks
@ -395,20 +380,20 @@ def load_settings(menus):
if group == 'Main':
continue
for name in menu.options:
menu.options[name].update(get_entry_settings(group, strip_colors(name)))
menu.options[name].update(get_entry_settings(group, ui.strip_colors(name)))
def run_auto_repairs(base_menus, presets):
"""Run Auto Repairs."""
set_log_path()
title = f'{KIT_NAME_FULL}: Auto Repairs'
clear_screen()
set_title(title)
print_info(title)
ui.clear_screen()
ui.set_title(title)
ui.print_info(title)
print('')
# Generate menus
print_standard('Initializing...')
ui.print_standard('Initializing...')
menus = build_menus(base_menus, title, presets)
# Init
@ -426,21 +411,21 @@ def run_auto_repairs(base_menus, presets):
try:
show_main_menu(base_menus, menus, presets, title)
except SystemExit:
if ask('End session?'):
if ui.ask('End session?'):
end_session()
raise
# Start or resume repairs
clear_screen()
print_standard(title)
ui.clear_screen()
ui.print_standard(title)
print('')
save_selection_settings(menus)
print_info('Initializing...')
ui.print_info('Initializing...')
init_run(menus['Options'].options)
save_selection_settings(menus)
if not session_started:
init_session(menus['Options'].options)
print_info('Running repairs')
ui.print_info('Running repairs')
# Run repairs
for group, menu in menus.items():
@ -449,19 +434,19 @@ def run_auto_repairs(base_menus, presets):
try:
run_group(group, menu)
except KeyboardInterrupt:
abort()
ui.abort()
# Done
end_session()
print_info('Done')
pause('Press Enter to exit...')
ui.print_info('Done')
ui.pause('Press Enter to exit...')
def run_group(group, menu):
"""Run entries in group if appropriate."""
print_info(f' {group}')
ui.print_info(f' {group}')
for name, details in menu.options.items():
name_str = strip_colors(name)
name_str = ui.strip_colors(name)
skipped = details.get('Skipped', False)
done = details.get('Done', False)
disabled = details.get('Disabled', False)
@ -475,7 +460,7 @@ def run_group(group, menu):
# Previously skipped
if skipped:
show_data(f'{name_str}...', 'Skipped', 'YELLOW', width=WIDTH)
ui.show_data(f'{name_str}...', 'Skipped', 'YELLOW', width=WIDTH)
continue
# Previously ran
@ -485,7 +470,7 @@ def run_group(group, menu):
color = 'YELLOW'
elif details.get('Failed', False):
color = 'RED'
show_data(
ui.show_data(
f'{name_str}...',
details.get('Message', 'Unknown'), color, width=WIDTH,
)
@ -493,7 +478,7 @@ def run_group(group, menu):
# Not selected
if not selected:
show_data(f'{name_str}...', 'Skipped', 'YELLOW', width=WIDTH)
ui.show_data(f'{name_str}...', 'Skipped', 'YELLOW', width=WIDTH)
save_settings(group, name, skipped=True)
continue
@ -516,7 +501,7 @@ def save_selection_settings(menus):
def save_settings(group, name, result=None, **kwargs):
"""Save entry settings in the registry."""
key_path = fr'{AUTO_REPAIR_KEY}\{group}\{strip_colors(name)}'
key_path = fr'{AUTO_REPAIR_KEY}\{group}\{ui.strip_colors(name)}'
# Get values from TryAndPrint result
if result:
@ -530,7 +515,7 @@ def save_settings(group, name, result=None, **kwargs):
# Write values to registry
for value_name, data in kwargs.items():
value_name = strip_colors(value_name)
value_name = ui.strip_colors(value_name)
if isinstance(data, bool):
data = 1 if data else 0
if isinstance(data, int):
@ -894,7 +879,7 @@ def backup_all_browser_profiles(use_try_print=False):
users = get_path_obj(f'{SYSTEMDRIVE}/Users')
for userprofile in users.iterdir():
if use_try_print:
print_info(f'{" "*6}{userprofile.name}')
ui.print_info(f'{" "*6}{userprofile.name}')
backup_browser_profiles(userprofile, use_try_print)
@ -908,7 +893,7 @@ def backup_browser_chromium(backup_path, browser, search_path, use_try_print):
if output_path.exists():
# Assuming backup was already done
if use_try_print:
show_data(
ui.show_data(
f'{" "*8}{browser} ({item.name})...', 'Backup already exists.',
color='YELLOW', width=WIDTH,
)
@ -936,7 +921,7 @@ def backup_browser_firefox(backup_path, search_path, use_try_print):
if output_path.exists():
# Assuming backup was already done
if use_try_print:
show_data(
ui.show_data(
f'{" "*8}Firefox (All)...', 'Backup already exists.',
color='YELLOW', width=WIDTH,
)
@ -1322,7 +1307,7 @@ def kill_explorer():
def reboot(timeout=10):
"""Reboot the system."""
atexit.unregister(start_explorer)
print_warning(f'Rebooting the system in {timeout} seconds...')
ui.print_warning(f'Rebooting the system in {timeout} seconds...')
sleep(timeout)
cmd = ['shutdown', '-r', '-t', '0']
run_program(cmd, check=False)

View file

@ -62,23 +62,7 @@ from wk.std import (
GenericWarning,
sleep,
)
from wk.ui.cli import (
# TODO: This is lazy
Menu,
TryAndPrint,
abort,
ask,
clear_screen,
color_string,
pause,
print_error,
print_info,
print_standard,
print_warning,
set_title,
show_data,
strip_colors,
)
from wk.ui import cli as ui
# STATIC VARIABLES
@ -94,7 +78,7 @@ KNOWN_ENCODINGS = (
'utf-32-le',
)
IN_CONEMU = 'ConEmuPID' in os.environ
MENU_PRESETS = Menu()
MENU_PRESETS = ui.Menu()
PROGRAMFILES_32 = os.environ.get(
'PROGRAMFILES(X86)', os.environ.get(
'PROGRAMFILES', r'C:\Program Files (x86)',
@ -106,7 +90,7 @@ PROGRAMFILES_64 = os.environ.get(
),
)
SYSTEMDRIVE = os.environ.get('SYSTEMDRIVE', 'C:')
TRY_PRINT = TryAndPrint()
TRY_PRINT = ui.TryAndPrint()
TRY_PRINT.width = WIDTH
TRY_PRINT.verbose = True
for error in ('CalledProcessError', 'FileNotFoundError'):
@ -117,7 +101,7 @@ for error in ('CalledProcessError', 'FileNotFoundError'):
def build_menus(base_menus, title, presets):
"""Build menus, returns dict."""
menus = {}
menus['Main'] = Menu(title=f'{title}\n{color_string("Main Menu", "GREEN")}')
menus['Main'] = ui.Menu(title=f'{title}\n{ui.color_string("Main Menu", "GREEN")}')
# Main Menu
for entry in base_menus['Actions']:
@ -127,7 +111,7 @@ def build_menus(base_menus, title, presets):
# Run groups
for group, entries in base_menus['Groups'].items():
menus[group] = Menu(title=f'{title}\n{color_string(group, "GREEN")}')
menus[group] = ui.Menu(title=f'{title}\n{ui.color_string(group, "GREEN")}')
for entry in entries:
menus[group].add_option(entry.name, entry.details)
menus[group].add_action('All')
@ -156,7 +140,7 @@ def build_menus(base_menus, title, presets):
)
# Update presets Menu
MENU_PRESETS.title = f'{title}\n{color_string("Load Preset", "GREEN")}'
MENU_PRESETS.title = f'{title}\n{ui.color_string("Load Preset", "GREEN")}'
MENU_PRESETS.add_option('Default')
for name in presets:
MENU_PRESETS.add_option(name)
@ -173,26 +157,26 @@ def check_os_and_set_menu_title(title):
"""Check OS version and update title for menus, returns str."""
color = None
os_name = get_os_name(check=False)
print_standard(f'Operating System: {os_name}')
ui.print_standard(f'Operating System: {os_name}')
# Check support status and set color
try:
get_os_name()
except GenericWarning:
# Outdated version
print_warning('OS version is outdated, updating is recommended.')
if not ask('Continue anyway?'):
abort()
ui.print_warning('OS version is outdated, updating is recommended.')
if not ui.ask('Continue anyway?'):
ui.abort()
color = 'YELLOW'
except GenericError:
# Unsupported version
print_error('OS version is unsupported, updating is recommended.')
if not ask('Continue anyway? (NOT RECOMMENDED)'):
abort()
ui.print_error('OS version is unsupported, updating is recommended.')
if not ui.ask('Continue anyway? (NOT RECOMMENDED)'):
ui.abort()
color = 'RED'
# Done
return f'{title} ({color_string(os_name, color)})'
return f'{title} ({ui.color_string(os_name, color)})'
def load_preset(menus, presets, title, enable_menu_exit=True):
@ -218,10 +202,10 @@ def load_preset(menus, presets, title, enable_menu_exit=True):
menu.options[name]['Selected'] = value
# Ask selection question(s)
clear_screen()
print_standard(f'{title}')
ui.clear_screen()
ui.print_standard(f'{title}')
print('')
if selection[0] == 'Default' and ask('Install LibreOffice?'):
if selection[0] == 'Default' and ui.ask('Install LibreOffice?'):
menus['Install Software'].options['LibreOffice']['Selected'] = True
# Re-enable Main Menu action if disabled
@ -238,11 +222,11 @@ def run_auto_setup(base_menus, presets):
"""Run Auto Setup."""
update_log_path(dest_name='Auto Setup', timestamp=True)
title = f'{KIT_NAME_FULL}: Auto Setup'
clear_screen()
set_title(title)
print_info(title)
ui.clear_screen()
ui.set_title(title)
ui.print_info(title)
print('')
print_standard('Initializing...')
ui.print_standard('Initializing...')
# Check OS and update title for menus
title = check_os_and_set_menu_title(title)
@ -257,10 +241,10 @@ def run_auto_setup(base_menus, presets):
show_main_menu(base_menus, menus, presets, title)
# Start setup
clear_screen()
print_standard(title)
ui.clear_screen()
ui.print_standard(title)
print('')
print_info('Running setup')
ui.print_info('Running setup')
# Run setup
for group, menu in menus.items():
@ -269,22 +253,22 @@ def run_auto_setup(base_menus, presets):
try:
run_group(group, menu)
except KeyboardInterrupt:
abort()
ui.abort()
# Done
print_info('Done')
pause('Press Enter to exit...')
ui.print_info('Done')
ui.pause('Press Enter to exit...')
def run_group(group, menu):
"""Run entries in group if appropriate."""
print_info(f' {group}')
ui.print_info(f' {group}')
for name, details in menu.options.items():
name_str = strip_colors(name)
name_str = ui.strip_colors(name)
# Not selected
if not details.get('Selected', False):
show_data(f'{name_str}...', 'Skipped', 'YELLOW', width=WIDTH)
ui.show_data(f'{name_str}...', 'Skipped', 'YELLOW', width=WIDTH)
continue
# Selected
@ -421,7 +405,7 @@ def auto_config_browsers():
'Set default browser...', set_default_browser, msg_good='STARTED',
)
print(prompt, end='', flush=True)
pause('')
ui.pause('')
# Move cursor to beginning of the previous line and clear prompt
print(f'\033[F\r{" "*len(prompt)}\r', end='', flush=True)
@ -807,8 +791,8 @@ def install_software_bundle():
warning = 'NOTE: Press CTRL+c to manually resume if it gets stuck...'
# Start installations and wait for them to finish
print_standard(msg)
print_warning(warning, end='', flush=True)
ui.print_standard(msg)
ui.print_warning(warning, end='', flush=True)
proc = popen_program([installer])
try:
proc.wait()
@ -909,7 +893,7 @@ def get_storage_status():
"""Get storage status for fixed disks, returns list."""
report = get_volume_usage(use_colors=True)
for disk in get_raw_disks():
report.append(color_string(f'Uninitialized Disk: {disk}', 'RED'))
report.append(ui.color_string(f'Uninitialized Disk: {disk}', 'RED'))
# Done
return report