Split wk.std into debug, std, and ui sections

This commit is contained in:
2Shirt 2023-04-01 22:14:03 -07:00
parent 9f66b151af
commit 89fd647792
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C
41 changed files with 1312 additions and 1271 deletions

View file

@ -5,7 +5,7 @@ import wk
# Classes # Classes
REBOOT_STR = wk.std.color_string('Reboot', 'YELLOW') REBOOT_STR = wk.ui.cli.color_string('Reboot', 'YELLOW')
class MenuEntry(): class MenuEntry():
"""Simple class to allow cleaner code below.""" """Simple class to allow cleaner code below."""
def __init__(self, name, function=None, selected=True, **kwargs): def __init__(self, name, function=None, selected=True, **kwargs):
@ -87,8 +87,8 @@ if __name__ == '__main__':
try: try:
wk.repairs.win.run_auto_repairs(BASE_MENUS, PRESETS) wk.repairs.win.run_auto_repairs(BASE_MENUS, PRESETS)
except KeyboardInterrupt: except KeyboardInterrupt:
wk.std.abort() wk.ui.cli.abort()
except SystemExit: except SystemExit:
raise raise
except: # noqa: E722 except: # noqa: E722
wk.std.major_exception() wk.ui.cli.major_exception()

View file

@ -154,8 +154,8 @@ if __name__ == '__main__':
try: try:
wk.setup.win.run_auto_setup(BASE_MENUS, PRESETS) wk.setup.win.run_auto_setup(BASE_MENUS, PRESETS)
except KeyboardInterrupt: except KeyboardInterrupt:
wk.std.abort() wk.ui.cli.abort()
except SystemExit: except SystemExit:
raise raise
except: # noqa: E722 except: # noqa: E722
wk.std.major_exception() wk.ui.cli.major_exception()

View file

@ -11,4 +11,4 @@ if __name__ == '__main__':
except SystemExit: except SystemExit:
raise raise
except: # noqa: E722 except: # noqa: E722
wk.std.major_exception() wk.ui.cli.major_exception()

View file

@ -8,8 +8,8 @@ if __name__ == '__main__':
try: try:
wk.kit.build.build_kit() wk.kit.build.build_kit()
except KeyboardInterrupt: except KeyboardInterrupt:
wk.std.abort() wk.ui.cli.abort()
except SystemExit: except SystemExit:
raise raise
except: # noqa: E722 except: # noqa: E722
wk.std.major_exception() wk.ui.cli.major_exception()

View file

@ -12,7 +12,7 @@ if __name__ == '__main__':
docopt(wk.clone.ddrescue.DOCSTRING) docopt(wk.clone.ddrescue.DOCSTRING)
except SystemExit: except SystemExit:
print('') print('')
wk.std.pause('Press Enter to exit...') wk.ui.cli.pause('Press Enter to exit...')
raise raise
try: try:
@ -20,4 +20,4 @@ if __name__ == '__main__':
except SystemExit: except SystemExit:
raise raise
except: # noqa: E722 except: # noqa: E722
wk.std.major_exception() wk.ui.cli.major_exception()

View file

@ -8,7 +8,7 @@ python.exe -i embedded_python_env.py
import wk import wk
wk.std.print_colored( wk.ui.cli.print_colored(
(wk.cfg.main.KIT_NAME_FULL, ': ', 'Debug Console'), (wk.cfg.main.KIT_NAME_FULL, ': ', 'Debug Console'),
('GREEN', None, 'YELLOW'), ('GREEN', None, 'YELLOW'),
sep='', sep='',

View file

@ -12,7 +12,7 @@ if __name__ == '__main__':
docopt(wk.hw.diags.DOCSTRING) docopt(wk.hw.diags.DOCSTRING)
except SystemExit: except SystemExit:
print('') print('')
wk.std.pause('Press Enter to exit...') wk.ui.cli.pause('Press Enter to exit...')
raise raise
try: try:
@ -20,4 +20,4 @@ if __name__ == '__main__':
except SystemExit: except SystemExit:
raise raise
except: # noqa: E722 except: # noqa: E722
wk.std.major_exception() wk.ui.cli.major_exception()

View file

@ -11,11 +11,11 @@ def main():
"""Show sensor data on screen.""" """Show sensor data on screen."""
sensors = wk.hw.sensors.Sensors() sensors = wk.hw.sensors.Sensors()
if platform.system() == 'Darwin': if platform.system() == 'Darwin':
wk.std.clear_screen() wk.ui.cli.clear_screen()
while True: while True:
print('\033[100A', end='') print('\033[100A', end='')
sensors.update_sensor_data() sensors.update_sensor_data()
wk.std.print_report(sensors.generate_report('Current', 'Max')) wk.ui.cli.print_report(sensors.generate_report('Current', 'Max'))
wk.std.sleep(1) wk.std.sleep(1)
elif platform.system() == 'Linux': elif platform.system() == 'Linux':
proc = wk.exe.run_program(cmd=['mktemp']) proc = wk.exe.run_program(cmd=['mktemp'])
@ -43,4 +43,4 @@ if __name__ == '__main__':
except SystemExit: except SystemExit:
raise raise
except: # noqa: E722 except: # noqa: E722
wk.std.major_exception() wk.ui.cli.major_exception()

View file

@ -22,10 +22,10 @@ SDIO_REMOTE_PATH = wk.io.get_path_obj(
# Functions # Functions
def try_again(): def try_again():
"""Ask to try again or quit.""" """Ask to try again or quit."""
if wk.std.ask(' Try again?'): if wk.ui.cli.ask(' Try again?'):
return True return True
if not wk.std.ask(' Use local version?'): if not wk.ui.cli.ask(' Use local version?'):
wk.std.abort() wk.ui.cli.abort()
return False return False
@ -47,7 +47,7 @@ def use_network_sdio():
except KeyboardInterrupt: except KeyboardInterrupt:
break break
except MOUNT_EXCEPTIONS as err: except MOUNT_EXCEPTIONS as err:
wk.std.print_error(f' {err}') wk.ui.cli.print_error(f' {err}')
if not try_again(): if not try_again():
break break
else: else:
@ -57,7 +57,7 @@ def use_network_sdio():
break break
# Failed to mount # Failed to mount
wk.std.print_error(' Failed to mount server') wk.ui.cli.print_error(' Failed to mount server')
if not try_again(): if not try_again():
break break
@ -66,7 +66,7 @@ def use_network_sdio():
if __name__ == '__main__': if __name__ == '__main__':
wk.std.set_title( wk.ui.cli.set_title(
f'{wk.cfg.main.KIT_NAME_FULL}: Snappy Driver Installer Origin Launcher', f'{wk.cfg.main.KIT_NAME_FULL}: Snappy Driver Installer Origin Launcher',
) )
log_dir = wk.log.format_log_path(tool=True).parent log_dir = wk.log.format_log_path(tool=True).parent
@ -76,7 +76,7 @@ if __name__ == '__main__':
try: try:
USE_NETWORK = use_network_sdio() USE_NETWORK = use_network_sdio()
except KeyboardInterrupt: except KeyboardInterrupt:
wk.std.abort() wk.ui.cli.abort()
# Run SDIO # Run SDIO
EXE_PATH = SDIO_LOCAL_PATH EXE_PATH = SDIO_LOCAL_PATH

View file

@ -10,32 +10,32 @@ import wk
# Functions # Functions
def main(): def main():
"""Mount all volumes and show results.""" """Mount all volumes and show results."""
wk.std.print_standard(f'{wk.cfg.main.KIT_NAME_FULL}: Volume mount tool') wk.ui.cli.print_standard(f'{wk.cfg.main.KIT_NAME_FULL}: Volume mount tool')
wk.std.print_standard(' ') wk.ui.cli.print_standard(' ')
# Mount volumes and get report # Mount volumes and get report
wk.std.print_standard('Mounting volumes...') wk.ui.cli.print_standard('Mounting volumes...')
wk.os.linux.mount_volumes() wk.os.linux.mount_volumes()
report = wk.os.linux.build_volume_report() report = wk.os.linux.build_volume_report()
# Show results # Show results
wk.std.print_info('Results') wk.ui.cli.print_info('Results')
wk.std.print_report(report) wk.ui.cli.print_report(report)
# GUI mode # GUI mode
if 'gui' in sys.argv: if 'gui' in sys.argv:
wk.std.pause('Press Enter to exit...') wk.ui.cli.pause('Press Enter to exit...')
wk.exe.popen_program(['nohup', 'thunar', '/media']) wk.exe.popen_program(['nohup', 'thunar', '/media'])
if __name__ == '__main__': if __name__ == '__main__':
if wk.std.PLATFORM != 'Linux': if wk.std.PLATFORM != 'Linux':
os_name = wk.std.PLATFORM.replace('Darwin', 'macOS') os_name = wk.std.PLATFORM.replace('Darwin', 'macOS')
wk.std.print_error(f'This script is not supported under {os_name}.') wk.ui.cli.print_error(f'This script is not supported under {os_name}.')
wk.std.abort() wk.ui.cli.abort()
try: try:
main() main()
except SystemExit: except SystemExit:
raise raise
except: # noqa: E722 except: # noqa: E722
wk.std.major_exception() wk.ui.cli.major_exception()

View file

@ -8,7 +8,7 @@ import wk
# Functions # Functions
def main(): def main():
"""Attempt to mount backup shares and print report.""" """Attempt to mount backup shares and print report."""
wk.std.print_info('Mounting Backup Shares') wk.ui.cli.print_info('Mounting Backup Shares')
report = wk.net.mount_backup_shares() report = wk.net.mount_backup_shares()
for line in report: for line in report:
color = 'GREEN' color = 'GREEN'
@ -17,7 +17,7 @@ def main():
color = 'RED' color = 'RED'
elif 'Already' in line: elif 'Already' in line:
color = 'YELLOW' color = 'YELLOW'
print(wk.std.color_string(line, color)) print(wk.ui.cli.color_string(line, color))
if __name__ == '__main__': if __name__ == '__main__':
@ -26,4 +26,4 @@ if __name__ == '__main__':
except SystemExit: except SystemExit:
raise raise
except: # noqa: E722 except: # noqa: E722
wk.std.major_exception() wk.ui.cli.major_exception()

View file

@ -46,13 +46,13 @@ def scan_file(file_path, search):
if __name__ == '__main__': if __name__ == '__main__':
try: try:
# Prep # Prep
wk.std.clear_screen() wk.ui.cli.clear_screen()
terms = [re.sub(r'\s+', r'\s*', t) for t in sys.argv[1:]] terms = [re.sub(r'\s+', r'\s*', t) for t in sys.argv[1:]]
search = '({})'.format('|'.join(terms)) search = '({})'.format('|'.join(terms))
if len(sys.argv) == 1: if len(sys.argv) == 1:
# Print usage # Print usage
wk.std.print_standard(USAGE) wk.ui.cli.print_standard(USAGE)
else: else:
matches = [] matches = []
for entry in scan_for_docs(SCANDIR): for entry in scan_for_docs(SCANDIR):
@ -60,20 +60,20 @@ if __name__ == '__main__':
# Strip None values (i.e. non-matching entries) # Strip None values (i.e. non-matching entries)
matches = [m for m in matches if m] matches = [m for m in matches if m]
if matches: if matches:
wk.std.print_success('Found {} {}:'.format( wk.ui.cli.print_success('Found {} {}:'.format(
len(matches), len(matches),
'Matches' if len(matches) > 1 else 'Match')) 'Matches' if len(matches) > 1 else 'Match'))
for match in matches: for match in matches:
wk.std.print_standard(match) wk.ui.cli.print_standard(match)
else: else:
wk.std.print_error('No matches found.') wk.ui.cli.print_error('No matches found.')
# Done # Done
wk.std.print_standard('\nDone.') wk.ui.cli.print_standard('\nDone.')
#pause("Press Enter to exit...") #pause("Press Enter to exit...")
except SystemExit: except SystemExit:
raise raise
except: # noqa: E722 except: # noqa: E722
wk.std.major_exception() wk.ui.cli.major_exception()
# vim: sts=2 sw=2 ts=2 # vim: sts=2 sw=2 ts=2

View file

@ -8,6 +8,7 @@
"wk/os/__init__.py" = ["F401"] "wk/os/__init__.py" = ["F401"]
"wk/repairs/__init__.py" = ["F401"] "wk/repairs/__init__.py" = ["F401"]
"wk/setup/__init__.py" = ["F401"] "wk/setup/__init__.py" = ["F401"]
"wk/ui/__init__.py" = ["F401"]
# Long lines # Long lines
"wk/borrowed/acpi.py" = ["E501", "F841"] "wk/borrowed/acpi.py" = ["E501", "F841"]

View file

@ -8,14 +8,14 @@ import wk
# Functions # Functions
def main(): def main():
"""Attempt to mount backup shares and print report.""" """Attempt to mount backup shares and print report."""
wk.std.print_info('Unmounting Backup Shares') wk.ui.cli.print_info('Unmounting Backup Shares')
report = wk.net.unmount_backup_shares() report = wk.net.unmount_backup_shares()
for line in report: for line in report:
color = 'GREEN' color = 'GREEN'
line = f' {line}' line = f' {line}'
if 'Not mounted' in line: if 'Not mounted' in line:
color = 'YELLOW' color = 'YELLOW'
print(wk.std.color_string(line, color)) print(wk.ui.cli.color_string(line, color))
if __name__ == '__main__': if __name__ == '__main__':
@ -24,4 +24,4 @@ if __name__ == '__main__':
except SystemExit: except SystemExit:
raise raise
except: # noqa: E722 except: # noqa: E722
wk.std.major_exception() wk.ui.cli.major_exception()

View file

@ -28,21 +28,21 @@ if PLATFORM not in ('macOS', 'Linux'):
def main(): def main():
"""Upload logs for review.""" """Upload logs for review."""
lines = [] lines = []
try_and_print = wk.std.TryAndPrint() try_and_print = wk.ui.cli.TryAndPrint()
# Set log # Set log
wk.log.update_log_path(dest_name='Upload-Logs', timestamp=True) wk.log.update_log_path(dest_name='Upload-Logs', timestamp=True)
# Instructions # Instructions
wk.std.print_success(f'{wk.cfg.main.KIT_NAME_FULL}: Upload Logs') wk.ui.cli.print_success(f'{wk.cfg.main.KIT_NAME_FULL}: Upload Logs')
wk.std.print_standard('') wk.ui.cli.print_standard('')
wk.std.print_standard('Please state the reason for the review.') wk.ui.cli.print_standard('Please state the reason for the review.')
wk.std.print_info(' End note with an empty line.') wk.ui.cli.print_info(' End note with an empty line.')
wk.std.print_standard('') wk.ui.cli.print_standard('')
# Get reason note # Get reason note
while True: while True:
text = wk.std.input_text('> ') text = wk.ui.cli.input_text('> ')
if not text: if not text:
lines.append('') lines.append('')
break break

View file

@ -1,7 +1,7 @@
"""WizardKit: wk module init""" """WizardKit: wk module init"""
# vim: sts=2 sw=2 ts=2 # vim: sts=2 sw=2 ts=2
from sys import version_info as version from sys import stderr, version_info
from . import cfg from . import cfg
from . import clone from . import clone
@ -18,20 +18,22 @@ from . import repairs
from . import setup from . import setup
from . import std from . import std
from . import tmux from . import tmux
from . import ui
# Check env # Check env
if version < (3, 7): if version_info < (3, 7):
# Unsupported # Unsupported
raise RuntimeError( raise RuntimeError(
f'This package is unsupported on Python {version.major}.{version.minor}' 'This package is unsupported on Python '
f'{version_info.major}.{version_info.minor}'
) )
# Init # Init
try: try:
log.start() log.start()
except UserWarning as err: except UserWarning as err:
std.print_warning(err) print(err, file=stderr)
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -33,6 +33,7 @@ from wk.hw.smart import (
smart_status_ok, smart_status_ok,
update_smart_details, update_smart_details,
) )
from wk.ui import cli as ui # TODO: This is lazy
# STATIC VARIABLES # STATIC VARIABLES
@ -89,8 +90,8 @@ REGEX_REMAINING_TIME = re.compile(
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
MENU_ACTIONS = ( MENU_ACTIONS = (
'Start', 'Start',
f'Change settings {std.color_string("(experts only)", "YELLOW")}', f'Change settings {ui.color_string("(experts only)", "YELLOW")}',
f'Detect drives {std.color_string("(experts only)", "YELLOW")}', f'Detect drives {ui.color_string("(experts only)", "YELLOW")}',
'Quit') 'Quit')
MENU_TOGGLES = { MENU_TOGGLES = {
'Auto continue (if recovery % over threshold)': True, 'Auto continue (if recovery % over threshold)': True,
@ -296,7 +297,7 @@ class BlockPair():
# Check destination size if cloning # Check destination size if cloning
if not self.destination.is_file() and dest_size < self.size: if not self.destination.is_file() and dest_size < self.size:
std.print_error(f'Invalid destination: {self.destination}') ui.print_error(f'Invalid destination: {self.destination}')
raise std.GenericAbort() raise std.GenericAbort()
def set_initial_status(self): def set_initial_status(self):
@ -420,7 +421,7 @@ class State():
self.panes['Started'] = tmux.split_window( self.panes['Started'] = tmux.split_window(
lines=cfg.ddrescue.TMUX_SIDE_WIDTH, lines=cfg.ddrescue.TMUX_SIDE_WIDTH,
target_id=self.panes['Source'], target_id=self.panes['Source'],
text=std.color_string( text=ui.color_string(
['Started', time.strftime("%Y-%m-%d %H:%M %Z")], ['Started', time.strftime("%Y-%m-%d %H:%M %Z")],
['BLUE', None], ['BLUE', None],
sep='\n', sep='\n',
@ -442,7 +443,7 @@ class State():
settings = json.loads(_f.read()) settings = json.loads(_f.read())
except (OSError, json.JSONDecodeError) as err: except (OSError, json.JSONDecodeError) as err:
LOG.error('Failed to load clone settings') LOG.error('Failed to load clone settings')
std.print_error('Invalid clone settings detected.') ui.print_error('Invalid clone settings detected.')
raise std.GenericAbort() from err raise std.GenericAbort() from err
# Check settings # Check settings
@ -454,10 +455,10 @@ class State():
bail = False bail = False
for key in ('model', 'serial'): for key in ('model', 'serial'):
if settings['Source'][key] != getattr(self.source, key): if settings['Source'][key] != getattr(self.source, key):
std.print_error(f"Clone settings don't match source {key}") ui.print_error(f"Clone settings don't match source {key}")
bail = True bail = True
if settings['Destination'][key] != getattr(self.destination, key): if settings['Destination'][key] != getattr(self.destination, key):
std.print_error(f"Clone settings don't match destination {key}") ui.print_error(f"Clone settings don't match destination {key}")
bail = True bail = True
if bail: if bail:
raise std.GenericAbort() raise std.GenericAbort()
@ -488,7 +489,7 @@ class State():
with open(settings_file, 'w', encoding='utf-8') as _f: with open(settings_file, 'w', encoding='utf-8') as _f:
json.dump(settings, _f) json.dump(settings, _f)
except OSError as err: except OSError as err:
std.print_error('Failed to save clone settings') ui.print_error('Failed to save clone settings')
raise std.GenericAbort() from err raise std.GenericAbort() from err
def add_clone_block_pairs(self): def add_clone_block_pairs(self):
@ -522,7 +523,7 @@ class State():
# New run, use new settings file # New run, use new settings file
settings['Needs Format'] = True settings['Needs Format'] = True
offset = 0 offset = 0
user_choice = std.choice( user_choice = ui.choice(
['G', 'M', 'S'], ['G', 'M', 'S'],
'Format clone using GPT, MBR, or match Source type?', 'Format clone using GPT, MBR, or match Source type?',
) )
@ -533,7 +534,7 @@ class State():
else: else:
# Match source type # Match source type
settings['Table Type'] = get_table_type(self.source.path) settings['Table Type'] = get_table_type(self.source.path)
if std.ask('Create an empty Windows boot partition on the clone?'): if ui.ask('Create an empty Windows boot partition on the clone?'):
settings['Create Boot Partition'] = True settings['Create Boot Partition'] = True
offset = 2 if settings['Table Type'] == 'GPT' else 1 offset = 2 if settings['Table Type'] == 'GPT' else 1
@ -566,14 +567,14 @@ class State():
report = [] report = []
# Source # Source
report.append(std.color_string('Source', 'GREEN')) report.append(ui.color_string('Source', 'GREEN'))
report.extend(build_object_report(self.source)) report.extend(build_object_report(self.source))
report.append(' ') report.append(' ')
# Destination # Destination
report.append(std.color_string('Destination', 'GREEN')) report.append(ui.color_string('Destination', 'GREEN'))
if self.mode == 'Clone': if self.mode == 'Clone':
report[-1] += std.color_string(' (ALL DATA WILL BE DELETED)', 'RED') report[-1] += ui.color_string(' (ALL DATA WILL BE DELETED)', 'RED')
report.extend(build_object_report(self.destination)) report.extend(build_object_report(self.destination))
report.append(' ') report.append(' ')
@ -581,12 +582,12 @@ class State():
# NOTE: The check for block_pairs is to limit this section # NOTE: The check for block_pairs is to limit this section
# to the second confirmation # to the second confirmation
if self.mode == 'Clone' and self.block_pairs: if self.mode == 'Clone' and self.block_pairs:
report.append(std.color_string('WARNING', 'YELLOW')) report.append(ui.color_string('WARNING', 'YELLOW'))
report.append( report.append(
'All data will be deleted from the destination listed above.', 'All data will be deleted from the destination listed above.',
) )
report.append( report.append(
std.color_string( ui.color_string(
['This is irreversible and will lead to', 'DATA LOSS.'], ['This is irreversible and will lead to', 'DATA LOSS.'],
['YELLOW', 'RED'], ['YELLOW', 'RED'],
), ),
@ -605,18 +606,18 @@ class State():
# Map dir # Map dir
if self.working_dir: if self.working_dir:
report.append(std.color_string('Map Save Directory', 'GREEN')) report.append(ui.color_string('Map Save Directory', 'GREEN'))
report.append(f'{self.working_dir}/') report.append(f'{self.working_dir}/')
report.append(' ') report.append(' ')
if not fstype_is_ok(self.working_dir, map_dir=True): if not fstype_is_ok(self.working_dir, map_dir=True):
report.append( report.append(
std.color_string( ui.color_string(
'Map file(s) are being saved to a non-recommended filesystem.', 'Map file(s) are being saved to a non-recommended filesystem.',
'YELLOW', 'YELLOW',
), ),
) )
report.append( report.append(
std.color_string( ui.color_string(
['This is strongly discouraged and may lead to', 'DATA LOSS'], ['This is strongly discouraged and may lead to', 'DATA LOSS'],
[None, 'RED'], [None, 'RED'],
), ),
@ -625,11 +626,11 @@ class State():
# Source part(s) selected # Source part(s) selected
if source_parts: if source_parts:
report.append(std.color_string('Source Part(s) selected', 'GREEN')) report.append(ui.color_string('Source Part(s) selected', 'GREEN'))
if self.source.path.samefile(source_parts[0].path): if self.source.path.samefile(source_parts[0].path):
report.append('Whole Disk') report.append('Whole Disk')
else: else:
report.append(std.color_string(f'{"NAME":<9} SIZE', 'BLUE')) report.append(ui.color_string(f'{"NAME":<9} SIZE', 'BLUE'))
for part in source_parts: for part in source_parts:
report.append( report.append(
f'{part.path.name:<9} ' f'{part.path.name:<9} '
@ -638,9 +639,9 @@ class State():
report.append(' ') report.append(' ')
# Prompt user # Prompt user
std.clear_screen() ui.clear_screen()
std.print_report(report) ui.print_report(report)
if not std.ask(prompt): if not ui.ask(prompt):
raise std.GenericAbort() raise std.GenericAbort()
def generate_report(self): def generate_report(self):
@ -661,7 +662,7 @@ class State():
error_size = self.get_error_size() error_size = self.get_error_size()
error_size_str = std.bytes_to_string(error_size, decimals=2) error_size_str = std.bytes_to_string(error_size, decimals=2)
if error_size > 0: if error_size > 0:
error_size_str = std.color_string(error_size_str, 'YELLOW') error_size_str = ui.color_string(error_size_str, 'YELLOW')
percent = self.get_percent_recovered() percent = self.get_percent_recovered()
percent = format_status_string(percent, width=0) percent = format_status_string(percent, width=0)
report.append(f'Overall rescued: {percent}, error size: {error_size_str}') report.append(f'Overall rescued: {percent}, error size: {error_size_str}')
@ -673,7 +674,7 @@ class State():
error_size = pair.get_error_size() error_size = pair.get_error_size()
error_size_str = std.bytes_to_string(error_size, decimals=2) error_size_str = std.bytes_to_string(error_size, decimals=2)
if error_size > 0: if error_size > 0:
error_size_str = std.color_string(error_size_str, 'YELLOW') error_size_str = ui.color_string(error_size_str, 'YELLOW')
pair_size = std.bytes_to_string(pair.size, decimals=2) pair_size = std.bytes_to_string(pair.size, decimals=2)
percent = pair.get_percent_recovered() percent = pair.get_percent_recovered()
percent = format_status_string(percent, width=0) percent = format_status_string(percent, width=0)
@ -704,7 +705,7 @@ class State():
def init_recovery(self, docopt_args): def init_recovery(self, docopt_args):
"""Select source/dest and set env.""" """Select source/dest and set env."""
std.clear_screen() ui.clear_screen()
source_parts = [] source_parts = []
# Set log # Set log
@ -795,8 +796,8 @@ class State():
try: try:
exe.run_program(cmd) exe.run_program(cmd)
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
std.print_error('Failed to unmount source and/or destination') ui.print_error('Failed to unmount source and/or destination')
std.abort() ui.abort()
# Prep destination # Prep destination
if self.mode == 'Clone': if self.mode == 'Clone':
@ -928,7 +929,7 @@ class State():
check=False, check=False,
) )
if proc.returncode != 0: if proc.returncode != 0:
std.print_error('Error(s) encoundtered while formatting destination') ui.print_error('Error(s) encoundtered while formatting destination')
raise std.GenericAbort() raise std.GenericAbort()
# Update settings # Update settings
@ -969,13 +970,13 @@ class State():
# Check for critical errors # Check for critical errors
if not smart_status_ok(self.destination): if not smart_status_ok(self.destination):
std.print_error( ui.print_error(
f'Critical error(s) detected for: {self.destination.path}', f'Critical error(s) detected for: {self.destination.path}',
) )
# Check for minor errors # Check for minor errors
if not check_attributes(self.destination, only_blocking=False): if not check_attributes(self.destination, only_blocking=False):
std.print_warning( ui.print_warning(
f'Attribute error(s) detected for: {self.destination.path}', f'Attribute error(s) detected for: {self.destination.path}',
) )
@ -1026,7 +1027,7 @@ class State():
destination_size *= 1.05 destination_size *= 1.05
error_msg = 'Not enough free space on the destination' error_msg = 'Not enough free space on the destination'
if required_size > destination_size: if required_size > destination_size:
std.print_error(error_msg) ui.print_error(error_msg)
raise std.GenericAbort() raise std.GenericAbort()
def save_debug_reports(self): def save_debug_reports(self):
@ -1037,7 +1038,7 @@ class State():
debug_dir.mkdir() debug_dir.mkdir()
# State (self) # State (self)
std.save_pickles({'state': self}, debug_dir) debug.save_pickles({'state': self}, debug_dir)
with open(f'{debug_dir}/state.report', 'a', encoding='utf-8') as _f: with open(f'{debug_dir}/state.report', 'a', encoding='utf-8') as _f:
_f.write('[Debug report]\n') _f.write('[Debug report]\n')
_f.write('\n'.join(debug.generate_object_report(self))) _f.write('\n'.join(debug.generate_object_report(self)))
@ -1064,10 +1065,10 @@ class State():
width = cfg.ddrescue.TMUX_SIDE_WIDTH width = cfg.ddrescue.TMUX_SIDE_WIDTH
# Status # Status
report.append(std.color_string(f'{"Status":^{width}}', 'BLUE')) report.append(ui.color_string(f'{"Status":^{width}}', 'BLUE'))
if 'NEEDS ATTENTION' in overall_status: if 'NEEDS ATTENTION' in overall_status:
report.append( report.append(
std.color_string(f'{overall_status:^{width}}', 'YELLOW_BLINK'), ui.color_string(f'{overall_status:^{width}}', 'YELLOW_BLINK'),
) )
else: else:
report.append(f'{overall_status:^{width}}') report.append(f'{overall_status:^{width}}')
@ -1077,12 +1078,12 @@ class State():
if self.block_pairs: if self.block_pairs:
total_rescued = self.get_rescued_size() total_rescued = self.get_rescued_size()
percent = self.get_percent_recovered() percent = self.get_percent_recovered()
report.append(std.color_string('Overall Progress', 'BLUE')) report.append(ui.color_string('Overall Progress', 'BLUE'))
report.append( report.append(
f'Rescued: {format_status_string(percent, width=width-9)}', f'Rescued: {format_status_string(percent, width=width-9)}',
) )
report.append( report.append(
std.color_string( ui.color_string(
[f'{std.bytes_to_string(total_rescued, decimals=2):>{width}}'], [f'{std.bytes_to_string(total_rescued, decimals=2):>{width}}'],
[get_percent_color(percent)], [get_percent_color(percent)],
), ),
@ -1091,7 +1092,7 @@ class State():
# Block pair progress # Block pair progress
for pair in self.block_pairs: for pair in self.block_pairs:
report.append(std.color_string(pair.source, 'BLUE')) report.append(ui.color_string(pair.source, 'BLUE'))
for name, status in pair.status.items(): for name, status in pair.status.items():
name = name.title() name = name.title()
report.append( report.append(
@ -1103,9 +1104,9 @@ class State():
if overall_status in ('Active', 'NEEDS ATTENTION'): if overall_status in ('Active', 'NEEDS ATTENTION'):
etoc = get_etoc() etoc = get_etoc()
report.append(separator) report.append(separator)
report.append(std.color_string('Estimated Pass Finish', 'BLUE')) report.append(ui.color_string('Estimated Pass Finish', 'BLUE'))
if overall_status == 'NEEDS ATTENTION' or etoc == 'N/A': if overall_status == 'NEEDS ATTENTION' or etoc == 'N/A':
report.append(std.color_string('N/A', 'YELLOW')) report.append(ui.color_string('N/A', 'YELLOW'))
else: else:
report.append(etoc) report.append(etoc)
@ -1166,7 +1167,7 @@ class State():
source_str = _format_string(self.source, width) source_str = _format_string(self.source, width)
tmux.respawn_pane( tmux.respawn_pane(
self.panes['Source'], self.panes['Source'],
text=std.color_string( text=ui.color_string(
['Source', '' if source_exists else ' (Missing)', '\n', source_str], ['Source', '' if source_exists else ' (Missing)', '\n', source_str],
['BLUE', 'RED', None, None], ['BLUE', 'RED', None, None],
sep='', sep='',
@ -1181,7 +1182,7 @@ class State():
percent=50, percent=50,
vertical=False, vertical=False,
target_id=self.panes['Source'], target_id=self.panes['Source'],
text=std.color_string( text=ui.color_string(
['Destination', '' if dest_exists else ' (Missing)', '\n', dest_str], ['Destination', '' if dest_exists else ' (Missing)', '\n', dest_str],
['BLUE', 'RED', None, None], ['BLUE', 'RED', None, None],
sep='', sep='',
@ -1195,7 +1196,7 @@ def build_block_pair_report(block_pairs, settings):
report = [] report = []
notes = [] notes = []
if block_pairs: if block_pairs:
report.append(std.color_string('Block Pairs', 'GREEN')) report.append(ui.color_string('Block Pairs', 'GREEN'))
else: else:
# Bail early # Bail early
return report return report
@ -1214,7 +1215,7 @@ def build_block_pair_report(block_pairs, settings):
if settings: if settings:
if not settings['First Run']: if not settings['First Run']:
notes.append( notes.append(
std.color_string( ui.color_string(
['NOTE:', 'Clone settings loaded from previous run.'], ['NOTE:', 'Clone settings loaded from previous run.'],
['BLUE', None], ['BLUE', None],
), ),
@ -1222,14 +1223,14 @@ def build_block_pair_report(block_pairs, settings):
if settings['Needs Format'] and settings['Table Type']: if settings['Needs Format'] and settings['Table Type']:
msg = f'Destination will be formatted using {settings["Table Type"]}' msg = f'Destination will be formatted using {settings["Table Type"]}'
notes.append( notes.append(
std.color_string( ui.color_string(
['NOTE:', msg], ['NOTE:', msg],
['BLUE', None], ['BLUE', None],
), ),
) )
if any(pair.get_rescued_size() > 0 for pair in block_pairs): if any(pair.get_rescued_size() > 0 for pair in block_pairs):
notes.append( notes.append(
std.color_string( ui.color_string(
['NOTE:', 'Resume data loaded from map file(s).'], ['NOTE:', 'Resume data loaded from map file(s).'],
['BLUE', None], ['BLUE', None],
), ),
@ -1311,12 +1312,12 @@ def build_directory_report(path):
for line in proc.stdout.splitlines(): for line in proc.stdout.splitlines():
line = line.replace('\n', '') line = line.replace('\n', '')
if 'FSTYPE' in line: if 'FSTYPE' in line:
line = std.color_string(f'{"PATH":<{width}}{line}', 'BLUE') line = ui.color_string(f'{"PATH":<{width}}{line}', 'BLUE')
else: else:
line = f'{path:<{width}}{line}' line = f'{path:<{width}}{line}'
report.append(line) report.append(line)
else: else:
report.append(std.color_string('PATH', 'BLUE')) report.append(ui.color_string('PATH', 'BLUE'))
report.append(str(path)) report.append(str(path))
# Done # Done
@ -1352,7 +1353,7 @@ def build_disk_report(dev):
# Partition details # Partition details
report.append( report.append(
std.color_string( ui.color_string(
( (
f'{"NAME":<{widths["name"]}}' f'{"NAME":<{widths["name"]}}'
f'{" " if dev.children else ""}' f'{" " if dev.children else ""}'
@ -1397,8 +1398,8 @@ def build_disk_report(dev):
def build_main_menu(): def build_main_menu():
"""Build main menu, returns wk.std.Menu.""" """Build main menu, returns wk.ui.cli.Menu."""
menu = std.Menu(title=std.color_string('ddrescue TUI: Main Menu', 'GREEN')) menu = ui.Menu(title=ui.color_string('ddrescue TUI: Main Menu', 'GREEN'))
menu.separator = ' ' menu.separator = ' '
# Add actions, options, etc # Add actions, options, etc
@ -1429,23 +1430,23 @@ def build_object_report(obj):
def build_settings_menu(silent=True): def build_settings_menu(silent=True):
"""Build settings menu, returns wk.std.Menu.""" """Build settings menu, returns wk.ui.cli.Menu."""
title_text = [ title_text = [
std.color_string('ddrescue TUI: Expert Settings', 'GREEN'), ui.color_string('ddrescue TUI: Expert Settings', 'GREEN'),
' ', ' ',
std.color_string( ui.color_string(
['These settings can cause', 'MAJOR DAMAGE', 'to drives'], ['These settings can cause', 'MAJOR DAMAGE', 'to drives'],
['YELLOW', 'RED', 'YELLOW'], ['YELLOW', 'RED', 'YELLOW'],
), ),
'Please read the manual before making changes', 'Please read the manual before making changes',
] ]
menu = std.Menu(title='\n'.join(title_text)) menu = ui.Menu(title='\n'.join(title_text))
menu.separator = ' ' menu.separator = ' '
preset = 'Default' preset = 'Default'
if not silent: if not silent:
# Ask which preset to use # Ask which preset to use
print(f'Available ddrescue presets: {" / ".join(SETTING_PRESETS)}') print(f'Available ddrescue presets: {" / ".join(SETTING_PRESETS)}')
preset = std.choice(SETTING_PRESETS, 'Please select a preset:') preset = ui.choice(SETTING_PRESETS, 'Please select a preset:')
# Fix selection # Fix selection
for _p in SETTING_PRESETS: for _p in SETTING_PRESETS:
@ -1494,7 +1495,7 @@ def build_sfdisk_partition_line(table_type, dev_path, size, details):
# Safety Check # Safety Check
if not dest_type: if not dest_type:
std.print_error(f'Failed to determine partition type for: {dev_path}') ui.print_error(f'Failed to determine partition type for: {dev_path}')
raise std.GenericAbort() raise std.GenericAbort()
# Add extra details # Add extra details
@ -1576,7 +1577,7 @@ def format_status_string(status, width):
# Add color if necessary # Add color if necessary
if color: if color:
status_str = std.color_string(status_str, color) status_str = ui.color_string(status_str, color)
# Done # Done
return status_str return status_str
@ -1698,8 +1699,8 @@ def get_object(path):
# Child/Parent check # Child/Parent check
if obj.parent: if obj.parent:
std.print_warning(f'"{obj.path}" is a child device') ui.print_warning(f'"{obj.path}" is a child device')
if std.ask(f'Use parent device "{obj.parent}" instead?'): if ui.ask(f'Use parent device "{obj.parent}" instead?'):
obj = hw_disk.Disk(obj.parent) obj = hw_disk.Disk(obj.parent)
elif path.is_dir(): elif path.is_dir():
obj = path obj = path
@ -1710,7 +1711,7 @@ def get_object(path):
# Abort if obj not set # Abort if obj not set
if not obj: if not obj:
std.print_error(f'Invalid source/dest path: {path}') ui.print_error(f'Invalid source/dest path: {path}')
raise std.GenericAbort() raise std.GenericAbort()
# Done # Done
@ -1775,7 +1776,7 @@ def get_table_type(disk_path):
# Check type # Check type
if table_type not in ('GPT', 'MBR'): if table_type not in ('GPT', 'MBR'):
std.print_error(f'Unsupported partition table type: {table_type}') ui.print_error(f'Unsupported partition table type: {table_type}')
raise std.GenericAbort() raise std.GenericAbort()
# Done # Done
@ -1789,7 +1790,7 @@ def get_working_dir(mode, destination, force_local=False):
# Set ticket ID # Set ticket ID
while ticket_id is None: while ticket_id is None:
ticket_id = std.input_text( ticket_id = ui.input_text(
prompt='Please enter ticket ID:', prompt='Please enter ticket ID:',
allow_empty_response=False, allow_empty_response=False,
) )
@ -1802,12 +1803,12 @@ def get_working_dir(mode, destination, force_local=False):
try: try:
path = pathlib.Path(destination).resolve() path = pathlib.Path(destination).resolve()
except TypeError as err: except TypeError as err:
std.print_error(f'Invalid destination: {destination}') ui.print_error(f'Invalid destination: {destination}')
raise std.GenericAbort() from err raise std.GenericAbort() from err
if path.exists() and fstype_is_ok(path, map_dir=False): if path.exists() and fstype_is_ok(path, map_dir=False):
working_dir = path working_dir = path
elif mode == 'Clone' and not force_local: elif mode == 'Clone' and not force_local:
std.print_info('Mounting backup shares...') ui.print_info('Mounting backup shares...')
net.mount_backup_shares(read_write=True) net.mount_backup_shares(read_write=True)
for server in cfg.net.BACKUP_SERVERS: for server in cfg.net.BACKUP_SERVERS:
path = pathlib.Path( path = pathlib.Path(
@ -1851,11 +1852,11 @@ def is_missing_source_or_destination(state):
if hasattr(item, 'path'): if hasattr(item, 'path'):
if not item.path.exists(): if not item.path.exists():
missing = True missing = True
std.print_error(f'{name} disappeared') ui.print_error(f'{name} disappeared')
elif hasattr(item, 'exists'): elif hasattr(item, 'exists'):
if not item.exists(): if not item.exists():
missing = True missing = True
std.print_error(f'{name} disappeared') ui.print_error(f'{name} disappeared')
else: else:
LOG.error('Unknown %s type: %s', name, item) LOG.error('Unknown %s type: %s', name, item)
@ -1887,7 +1888,7 @@ def source_or_destination_changed(state):
# Done # Done
if changed: if changed:
std.print_error('Source and/or Destination changed') ui.print_error('Source and/or Destination changed')
return changed return changed
@ -1910,7 +1911,7 @@ def main():
state.init_recovery(args) state.init_recovery(args)
except (FileNotFoundError, std.GenericAbort): except (FileNotFoundError, std.GenericAbort):
is_missing_source_or_destination(state) is_missing_source_or_destination(state)
std.abort() ui.abort()
# Show menu # Show menu
while True: while True:
@ -1928,18 +1929,18 @@ def main():
# Detect drives # Detect drives
if 'Detect drives' in selection[0]: if 'Detect drives' in selection[0]:
std.clear_screen() ui.clear_screen()
std.print_warning(DETECT_DRIVES_NOTICE) ui.print_warning(DETECT_DRIVES_NOTICE)
if std.ask('Are you sure you proceed?'): if ui.ask('Are you sure you proceed?'):
std.print_standard('Forcing controllers to rescan for devices...') ui.print_standard('Forcing controllers to rescan for devices...')
cmd = 'echo "- - -" | sudo tee /sys/class/scsi_host/host*/scan' cmd = 'echo "- - -" | sudo tee /sys/class/scsi_host/host*/scan'
exe.run_program(cmd, check=False, shell=True) exe.run_program(cmd, check=False, shell=True)
if source_or_destination_changed(state): if source_or_destination_changed(state):
std.abort() ui.abort()
# Start recovery # Start recovery
if 'Start' in selection: if 'Start' in selection:
std.clear_screen() ui.clear_screen()
run_recovery(state, main_menu, settings_menu, dry_run=args['--dry-run']) run_recovery(state, main_menu, settings_menu, dry_run=args['--dry-run'])
# Quit # Quit
@ -1949,14 +1950,14 @@ def main():
break break
# Recovey < 100% # Recovey < 100%
std.print_warning('Recovery is less than 100%') ui.print_warning('Recovery is less than 100%')
if std.ask('Are you sure you want to quit?'): if ui.ask('Are you sure you want to quit?'):
break break
# Save results to log # Save results to log
LOG.info('') LOG.info('')
for line in state.generate_report(): for line in state.generate_report():
LOG.info(' %s', std.strip_colors(line)) LOG.info(' %s', ui.strip_colors(line))
def mount_raw_image(path): def mount_raw_image(path):
@ -1970,7 +1971,7 @@ def mount_raw_image(path):
# Check # Check
if not loopback_path: if not loopback_path:
std.print_error(f'Failed to mount image: {path}') ui.print_error(f'Failed to mount image: {path}')
# Register unmount atexit # Register unmount atexit
atexit.register(unmount_loopback_device, loopback_path) atexit.register(unmount_loopback_device, loopback_path)
@ -2037,7 +2038,7 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True):
cmd = build_ddrescue_cmd(block_pair, pass_name, settings) cmd = build_ddrescue_cmd(block_pair, pass_name, settings)
poweroff_source_after_idle = True poweroff_source_after_idle = True
state.update_progress_pane('Active') state.update_progress_pane('Active')
std.clear_screen() ui.clear_screen()
warning_message = '' warning_message = ''
def _poweroff_source_drive(idle_minutes): def _poweroff_source_drive(idle_minutes):
@ -2056,8 +2057,8 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True):
return return
if i % 600 == 0 and i > 0: if i % 600 == 0 and i > 0:
if i == 600: if i == 600:
std.print_standard(' ', flush=True) ui.print_standard(' ', flush=True)
std.print_warning( ui.print_warning(
f'Powering off source in {int((idle_minutes*60-i)/60)} minutes...', f'Powering off source in {int((idle_minutes*60-i)/60)} minutes...',
) )
std.sleep(5) std.sleep(5)
@ -2067,10 +2068,10 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True):
cmd = ['sudo', 'hdparm', '-Y', source_dev] cmd = ['sudo', 'hdparm', '-Y', source_dev]
proc = exe.run_program(cmd, check=False) proc = exe.run_program(cmd, check=False)
if proc.returncode: if proc.returncode:
std.print_error(f'Failed to poweroff source {source_dev}') ui.print_error(f'Failed to poweroff source {source_dev}')
else: else:
std.print_warning(f'Powered off source {source_dev}') ui.print_warning(f'Powered off source {source_dev}')
std.print_standard( ui.print_standard(
'Press Enter to return to main menu...', end='', flush=True, 'Press Enter to return to main menu...', end='', flush=True,
) )
@ -2080,7 +2081,7 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True):
now = datetime.datetime.now(tz=TIMEZONE).strftime('%Y-%m-%d %H:%M %Z') now = datetime.datetime.now(tz=TIMEZONE).strftime('%Y-%m-%d %H:%M %Z')
with open(f'{state.log_dir}/smart.out', 'w', encoding='utf-8') as _f: with open(f'{state.log_dir}/smart.out', 'w', encoding='utf-8') as _f:
_f.write( _f.write(
std.color_string( ui.color_string(
['SMART Attributes', f'Updated: {now}\n'], ['SMART Attributes', f'Updated: {now}\n'],
['BLUE', 'YELLOW'], ['BLUE', 'YELLOW'],
sep='\t\t', sep='\t\t',
@ -2113,7 +2114,7 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True):
if warning_message: if warning_message:
# Error detected on destination, stop recovery # Error detected on destination, stop recovery
exe.stop_process(proc) exe.stop_process(proc)
std.print_error(warning_message) ui.print_error(warning_message)
break break
if _i % 60 == 0: if _i % 60 == 0:
@ -2161,17 +2162,17 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True):
if warning_message: if warning_message:
print(' ') print(' ')
print(' ') print(' ')
std.print_error('DDRESCUE PROCESS HALTED') ui.print_error('DDRESCUE PROCESS HALTED')
print(' ') print(' ')
std.print_warning(warning_message) ui.print_warning(warning_message)
# Needs attention? # Needs attention?
if str(proc.poll()) != '0': if str(proc.poll()) != '0':
state.update_progress_pane('NEEDS ATTENTION') state.update_progress_pane('NEEDS ATTENTION')
std.pause('Press Enter to return to main menu...') ui.pause('Press Enter to return to main menu...')
# Stop source poweroff countdown # Stop source poweroff countdown
std.print_standard('Stopping device poweroff countdown...', flush=True) ui.print_standard('Stopping device poweroff countdown...', flush=True)
poweroff_source_after_idle = False poweroff_source_after_idle = False
poweroff_thread.join() poweroff_thread.join()
@ -2187,12 +2188,12 @@ def run_recovery(state, main_menu, settings_menu, dry_run=True):
# Bail early # Bail early
if is_missing_source_or_destination(state): if is_missing_source_or_destination(state):
std.print_standard('') ui.print_standard('')
std.pause('Press Enter to return to main menu...') ui.pause('Press Enter to return to main menu...')
return return
if source_or_destination_changed(state): if source_or_destination_changed(state):
std.print_standard('') ui.print_standard('')
std.abort() ui.abort()
# Get settings # Get settings
for name, details in main_menu.toggles.items(): for name, details in main_menu.toggles.items():
@ -2248,9 +2249,9 @@ def run_recovery(state, main_menu, settings_menu, dry_run=True):
# Show warning if nothing was done # Show warning if nothing was done
if not attempted_recovery: if not attempted_recovery:
std.print_warning('No actions performed') ui.print_warning('No actions performed')
std.print_standard(' ') ui.print_standard(' ')
std.pause('Press Enter to return to main menu...') ui.pause('Press Enter to return to main menu...')
# Done # Done
state.save_debug_reports() state.save_debug_reports()
@ -2260,10 +2261,10 @@ def run_recovery(state, main_menu, settings_menu, dry_run=True):
def select_disk(prompt, skip_disk=None): def select_disk(prompt, skip_disk=None):
"""Select disk from list, returns Disk().""" """Select disk from list, returns Disk()."""
std.print_info('Scanning disks...') ui.print_info('Scanning disks...')
disks = hw_disk.get_disks() disks = hw_disk.get_disks()
menu = std.Menu( menu = ui.Menu(
title=std.color_string(f'ddrescue TUI: {prompt} Selection', 'GREEN'), title=ui.color_string(f'ddrescue TUI: {prompt} Selection', 'GREEN'),
) )
menu.disabled_str = 'Already selected' menu.disabled_str = 'Already selected'
menu.separator = ' ' menu.separator = ' '
@ -2305,9 +2306,9 @@ def select_disk(prompt, skip_disk=None):
def select_disk_parts(prompt, disk): def select_disk_parts(prompt, disk):
"""Select disk parts from list, returns list of Disk().""" """Select disk parts from list, returns list of Disk()."""
title = std.color_string('ddrescue TUI: Partition Selection', 'GREEN') title = ui.color_string('ddrescue TUI: Partition Selection', 'GREEN')
title += f'\n\nDisk: {disk.path} {disk.description}' title += f'\n\nDisk: {disk.path} {disk.description}'
menu = std.Menu(title) menu = ui.Menu(title)
menu.separator = ' ' menu.separator = ' '
menu.add_action('All') menu.add_action('All')
menu.add_action('None') menu.add_action('None')
@ -2356,7 +2357,7 @@ def select_disk_parts(prompt, disk):
if not menu.options: if not menu.options:
menu.add_option(whole_disk_str, {'Selected': True, 'Path': disk.path}) menu.add_option(whole_disk_str, {'Selected': True, 'Path': disk.path})
menu.title += '\n\n' menu.title += '\n\n'
menu.title += std.color_string(' No partitions detected.', 'YELLOW') menu.title += ui.color_string(' No partitions detected.', 'YELLOW')
# Get selection # Get selection
_select_parts(menu) _select_parts(menu)
@ -2370,13 +2371,13 @@ def select_disk_parts(prompt, disk):
if len(object_list) == len(disk.children): if len(object_list) == len(disk.children):
# NOTE: This is not true if the disk has no partitions # NOTE: This is not true if the disk has no partitions
msg = f'Preserve partition table and unused space in {prompt.lower()}?' msg = f'Preserve partition table and unused space in {prompt.lower()}?'
if std.ask(msg): if ui.ask(msg):
# Replace part list with whole disk obj # Replace part list with whole disk obj
object_list = [disk.path] object_list = [disk.path]
# Convert object_list to hw_disk.Disk() objects # Convert object_list to hw_disk.Disk() objects
print(' ') print(' ')
std.print_info('Getting disk/partition details...') ui.print_info('Getting disk/partition details...')
object_list = [hw_disk.Disk(path) for path in object_list] object_list = [hw_disk.Disk(path) for path in object_list]
# Done # Done
@ -2386,8 +2387,8 @@ def select_disk_parts(prompt, disk):
def select_path(prompt): def select_path(prompt):
"""Select path, returns pathlib.Path.""" """Select path, returns pathlib.Path."""
invalid = False invalid = False
menu = std.Menu( menu = ui.Menu(
title=std.color_string(f'ddrescue TUI: {prompt} Path Selection', 'GREEN'), title=ui.color_string(f'ddrescue TUI: {prompt} Path Selection', 'GREEN'),
) )
menu.separator = ' ' menu.separator = ' '
menu.add_action('Quit') menu.add_action('Quit')
@ -2400,7 +2401,7 @@ def select_path(prompt):
if 'Current directory' in selection: if 'Current directory' in selection:
path = os.getcwd() path = os.getcwd()
elif 'Enter manually' in selection: elif 'Enter manually' in selection:
path = std.input_text('Please enter path: ') path = ui.input_text('Please enter path: ')
elif 'Quit' in selection: elif 'Quit' in selection:
raise std.GenericAbort() raise std.GenericAbort()
@ -2410,7 +2411,7 @@ def select_path(prompt):
except TypeError: except TypeError:
invalid = True invalid = True
if invalid or not path.is_dir(): if invalid or not path.is_dir():
std.print_error(f'Invalid path: {path}') ui.print_error(f'Invalid path: {path}')
raise std.GenericAbort() raise std.GenericAbort()
# Done # Done
@ -2429,7 +2430,7 @@ def set_mode(docopt_args):
# Ask user if necessary # Ask user if necessary
if not mode: if not mode:
answer = std.choice(['C', 'I'], 'Are we cloning or imaging?') answer = ui.choice(['C', 'I'], 'Are we cloning or imaging?')
if answer == 'C': if answer == 'C':
mode = 'Clone' mode = 'Clone'
else: else:

View file

@ -1,6 +1,21 @@
"""WizardKit: Debug Functions""" """WizardKit: Debug Functions"""
# vim: sts=2 sw=2 ts=2 # vim: sts=2 sw=2 ts=2
import inspect
import logging
import lzma
import os
import pickle
import platform
import re
import socket
import sys
import time
import requests
from wk.cfg.net import CRASH_SERVER
from wk.log import get_log_filepath, get_root_logger_path
# Classes # Classes
class Debug(): class Debug():
@ -10,11 +25,55 @@ class Debug():
# STATIC VARIABLES # STATIC VARIABLES
LOG = logging.getLogger(__name__)
DEBUG_CLASS = Debug() DEBUG_CLASS = Debug()
METHOD_TYPE = type(DEBUG_CLASS.method) METHOD_TYPE = type(DEBUG_CLASS.method)
# Functions # Functions
def generate_debug_report():
"""Generate debug report, returns str."""
platform_function_list = (
'architecture',
'machine',
'platform',
'python_version',
)
report = []
# Logging data
log_path = get_log_filepath()
if log_path:
report.append('------ Start Log -------')
report.append('')
with open(log_path, 'r', encoding='utf-8') as log_file:
report.extend(log_file.read().splitlines())
report.append('')
report.append('------- End Log --------')
# System
report.append('--- Start debug info ---')
report.append('')
report.append('[System]')
report.append(f' {"FQDN":<24} {socket.getfqdn()}')
for func in platform_function_list:
func_name = func.replace('_', ' ').capitalize()
func_result = getattr(platform, func)()
report.append(f' {func_name:<24} {func_result}')
report.append(f' {"Python sys.argv":<24} {sys.argv}')
report.append('')
# Environment
report.append('[Environment Variables]')
for key, value in sorted(os.environ.items()):
report.append(f' {key:<24} {value}')
report.append('')
# Done
report.append('---- End debug info ----')
return '\n'.join(report)
def generate_object_report(obj, indent=0): def generate_object_report(obj, indent=0):
"""Generate debug report for obj, returns list.""" """Generate debug report for obj, returns list."""
report = [] report = []
@ -46,5 +105,67 @@ def generate_object_report(obj, indent=0):
return report return report
def save_pickles(obj_dict, out_path=None):
"""Save dict of objects using pickle."""
LOG.info('Saving pickles')
# Set path
if not out_path:
out_path = get_root_logger_path()
out_path = out_path.parent.joinpath('../debug').resolve()
# Save pickles
try:
for name, obj in obj_dict.copy().items():
if name.startswith('__') or inspect.ismodule(obj):
continue
with open(f'{out_path}/{name}.pickle', 'wb') as _f:
pickle.dump(obj, _f, protocol=pickle.HIGHEST_PROTOCOL)
except Exception:
LOG.error('Failed to save all the pickles', exc_info=True)
def upload_debug_report(report, compress=True, reason='DEBUG'):
"""Upload debug report to CRASH_SERVER as specified in wk.cfg.main."""
LOG.info('Uploading debug report to %s', CRASH_SERVER.get('Name', '?'))
headers = CRASH_SERVER.get('Headers', {'X-Requested-With': 'XMLHttpRequest'})
if compress:
headers['Content-Type'] = 'application/octet-stream'
# Check if the required server details are available
if not all(CRASH_SERVER.get(key, False) for key in ('Name', 'Url', 'User')):
msg = 'Server details missing, aborting upload.'
print(msg)
raise RuntimeError(msg)
# Set filename (based on the logging config if possible)
filename = 'Unknown'
log_path = get_log_filepath()
if log_path:
# Strip everything but the prefix
filename = re.sub(r'^(.*)_(\d{4}-\d{2}-\d{2}.*)', r'\1', log_path.name)
filename = f'{filename}_{reason}_{time.strftime("%Y-%m-%d_%H%M%S%z")}.log'
LOG.debug('filename: %s', filename)
# Compress report
if compress:
filename += '.xz'
xz_report = lzma.compress(report.encode('utf8'))
# Upload report
url = f'{CRASH_SERVER["Url"]}/{filename}'
response = requests.put(
url,
auth=(CRASH_SERVER['User'], CRASH_SERVER.get('Pass', '')),
data=xz_report if compress else report,
headers=headers,
timeout=60,
)
# Check response
if not response.ok:
raise RuntimeError('Failed to upload report')
if __name__ == '__main__': if __name__ == '__main__':
print("This file is not meant to be called directly.") print("This file is not meant to be called directly.")

View file

@ -3,7 +3,7 @@
import logging import logging
from wk.std import color_string from wk.ui.cli import color_string # TODO: This is lazy
# STATIC VARIABLES # STATIC VARIABLES

View file

@ -21,8 +21,8 @@ from wk.cfg.hw import (
THRESH_SSD_MIN, THRESH_SSD_MIN,
) )
from wk.exe import run_program from wk.exe import run_program
from wk.std import ( from wk.std import PLATFORM
PLATFORM, from wk.ui.cli import ( # TODO: This is lazy
strip_colors, strip_colors,
color_string, color_string,
) )

View file

@ -10,13 +10,13 @@ from typing import TextIO
from wk import exe from wk import exe
from wk.cfg.hw import CPU_FAILURE_TEMP from wk.cfg.hw import CPU_FAILURE_TEMP
from wk.os.mac import set_fans as macos_set_fans from wk.os.mac import set_fans as macos_set_fans
from wk.std import ( from wk.std import PLATFORM
PLATFORM, from wk.tmux import respawn_pane as tmux_respawn_pane
from wk.ui.cli import ( # TODO: This is lazy
color_string, color_string,
print_error, print_error,
print_warning, print_warning,
) )
from wk.tmux import respawn_pane as tmux_respawn_pane
# STATIC VARIABLES # STATIC VARIABLES

View file

@ -25,6 +25,8 @@ from wk.hw.network import network_test
from wk.hw.screensavers import screensaver from wk.hw.screensavers import screensaver
from wk.hw.test import Test, TestGroup from wk.hw.test import Test, TestGroup
from wk.ui import cli as ui # TODO: This is lazy
# STATIC VARIABLES # STATIC VARIABLES
DOCSTRING = f'''{cfg.main.KIT_NAME_FULL}: Hardware Diagnostics DOCSTRING = f'''{cfg.main.KIT_NAME_FULL}: Hardware Diagnostics
@ -86,9 +88,9 @@ class State():
self.panes = {} self.panes = {}
self.system = None self.system = None
self.test_groups = [] self.test_groups = []
self.top_text = std.color_string('Hardware Diagnostics', 'GREEN') self.top_text = ui.color_string('Hardware Diagnostics', 'GREEN')
if test_mode: if test_mode:
self.top_text += std.color_string(' (Test Mode)', 'YELLOW') self.top_text += ui.color_string(' (Test Mode)', 'YELLOW')
# Init tmux and start a background process to maintain layout # Init tmux and start a background process to maintain layout
self.init_tmux() self.init_tmux()
@ -160,8 +162,8 @@ class State():
keep_history=False, keep_history=False,
timestamp=False, timestamp=False,
) )
std.clear_screen() ui.clear_screen()
std.print_info('Initializing...') ui.print_info('Initializing...')
# Progress Pane # Progress Pane
self.update_progress_pane() self.update_progress_pane()
@ -226,7 +228,7 @@ class State():
self.panes['Started'] = tmux.split_window( self.panes['Started'] = tmux.split_window(
lines=cfg.hw.TMUX_SIDE_WIDTH, lines=cfg.hw.TMUX_SIDE_WIDTH,
target_id=self.panes['Top'], target_id=self.panes['Top'],
text=std.color_string( text=ui.color_string(
['Started', time.strftime("%Y-%m-%d %H:%M %Z")], ['Started', time.strftime("%Y-%m-%d %H:%M %Z")],
['BLUE', None], ['BLUE', None],
sep='\n', sep='\n',
@ -247,7 +249,7 @@ class State():
debug_dir.mkdir() debug_dir.mkdir()
# State (self) # State (self)
std.save_pickles({'state': self}, debug_dir) debug.save_pickles({'state': self}, debug_dir)
with open(f'{debug_dir}/state.report', 'a', encoding='utf-8') as _f: with open(f'{debug_dir}/state.report', 'a', encoding='utf-8') as _f:
_f.write('\n'.join(debug.generate_object_report(self))) _f.write('\n'.join(debug.generate_object_report(self)))
@ -289,7 +291,7 @@ class State():
"""Update 'Started' pane following clock sync.""" """Update 'Started' pane following clock sync."""
tmux.respawn_pane( tmux.respawn_pane(
pane_id=self.panes['Started'], pane_id=self.panes['Started'],
text=std.color_string( text=ui.color_string(
['Started', time.strftime("%Y-%m-%d %H:%M %Z")], ['Started', time.strftime("%Y-%m-%d %H:%M %Z")],
['BLUE', None], ['BLUE', None],
sep='\n', sep='\n',
@ -302,9 +304,9 @@ class State():
width = cfg.hw.TMUX_SIDE_WIDTH width = cfg.hw.TMUX_SIDE_WIDTH
for group in self.test_groups: for group in self.test_groups:
report.append(std.color_string(group.name, 'BLUE')) report.append(ui.color_string(group.name, 'BLUE'))
for test in group.test_objects: for test in group.test_objects:
report.append(std.color_string( report.append(ui.color_string(
[test.label, f'{test.status:>{width-len(test.label)}}'], [test.label, f'{test.status:>{width-len(test.label)}}'],
[None, STATUS_COLORS.get(test.status, None)], [None, STATUS_COLORS.get(test.status, None)],
sep='', sep='',
@ -324,9 +326,9 @@ class State():
# Functions # Functions
def build_menu(cli_mode=False, quick_mode=False) -> std.Menu: def build_menu(cli_mode=False, quick_mode=False) -> ui.Menu:
"""Build main menu, returns wk.std.Menu.""" """Build main menu, returns wk.ui.cli.Menu."""
menu = std.Menu(title=None) menu = ui.Menu(title=None)
# Add actions, options, etc # Add actions, options, etc
for action in MENU_ACTIONS: for action in MENU_ACTIONS:
@ -418,11 +420,11 @@ def cpu_stress_tests(state, test_objects, test_mode=False) -> None:
state.layout['Current'] = {'height': 3, 'Check': True} state.layout['Current'] = {'height': 3, 'Check': True}
# Get idle temps # Get idle temps
std.print_standard('Saving idle temps...') ui.print_standard('Saving idle temps...')
sensors.save_average_temps(temp_label='Idle', seconds=5) sensors.save_average_temps(temp_label='Idle', seconds=5)
# Stress CPU # Stress CPU
std.print_info('Running stress test') ui.print_info('Running stress test')
hw_cpu.set_apple_fan_speed('max') hw_cpu.set_apple_fan_speed('max')
proc_mprime = hw_cpu.start_mprime(state.log_dir, prime_log) proc_mprime = hw_cpu.start_mprime(state.log_dir, prime_log)
@ -443,14 +445,14 @@ def cpu_stress_tests(state, test_objects, test_mode=False) -> None:
state.update_progress_pane() state.update_progress_pane()
# Get cooldown temp # Get cooldown temp
std.clear_screen() ui.clear_screen()
std.print_standard('Letting CPU cooldown...') ui.print_standard('Letting CPU cooldown...')
std.sleep(5) std.sleep(5)
std.print_standard('Saving cooldown temps...') ui.print_standard('Saving cooldown temps...')
sensors.save_average_temps(temp_label='Cooldown', seconds=5) sensors.save_average_temps(temp_label='Cooldown', seconds=5)
# Check Prime95 results # Check Prime95 results
test_mprime_obj.report.append(std.color_string('Prime95', 'BLUE')) test_mprime_obj.report.append(ui.color_string('Prime95', 'BLUE'))
hw_cpu.check_mprime_results( hw_cpu.check_mprime_results(
test_obj=test_mprime_obj, working_dir=state.log_dir, test_obj=test_mprime_obj, working_dir=state.log_dir,
) )
@ -461,10 +463,10 @@ def cpu_stress_tests(state, test_objects, test_mode=False) -> None:
) )
if run_sysbench: if run_sysbench:
LOG.info('CPU Test (Sysbench)') LOG.info('CPU Test (Sysbench)')
std.print_standard('Letting CPU cooldown more...') ui.print_standard('Letting CPU cooldown more...')
std.sleep(30) std.sleep(30)
std.clear_screen() ui.clear_screen()
std.print_info('Running alternate stress test') ui.print_info('Running alternate stress test')
print('') print('')
proc_sysbench, filehandle_sysbench = hw_cpu.start_sysbench( proc_sysbench, filehandle_sysbench = hw_cpu.start_sysbench(
sensors, sensors,
@ -490,7 +492,7 @@ def cpu_stress_tests(state, test_objects, test_mode=False) -> None:
state.update_progress_pane() state.update_progress_pane()
# Check Cooling results # Check Cooling results
test_cooling_obj.report.append(std.color_string('Temps', 'BLUE')) test_cooling_obj.report.append(ui.color_string('Temps', 'BLUE'))
hw_cpu.check_cooling_results(test_cooling_obj, sensors, run_sysbench) hw_cpu.check_cooling_results(test_cooling_obj, sensors, run_sysbench)
# Cleanup # Cleanup
@ -545,8 +547,8 @@ def disk_io_benchmark(
continue continue
# Start benchmark # Start benchmark
std.clear_screen() ui.clear_screen()
std.print_report(test.dev.generate_report()) ui.print_report(test.dev.generate_report())
test.set_status('Working') test.set_status('Working')
test_log = f'{state.log_dir}/{test.dev.path.name}_benchmark.out' test_log = f'{state.log_dir}/{test.dev.path.name}_benchmark.out'
tmux.respawn_pane( tmux.respawn_pane(
@ -563,12 +565,12 @@ def disk_io_benchmark(
# Something went wrong # Something went wrong
LOG.error('%s', err) LOG.error('%s', err)
test.set_status('ERROR') test.set_status('ERROR')
test.report.append(std.color_string(' Unknown Error', 'RED')) test.report.append(ui.color_string(' Unknown Error', 'RED'))
# Mark test(s) aborted if necessary # Mark test(s) aborted if necessary
if aborted: if aborted:
test.set_status('Aborted') test.set_status('Aborted')
test.report.append(std.color_string(' Aborted', 'YELLOW')) test.report.append(ui.color_string(' Aborted', 'YELLOW'))
break break
# Update progress after each test # Update progress after each test
@ -594,7 +596,7 @@ def disk_self_test(state, test_objects, test_mode=False) -> None:
state.update_top_pane( state.update_top_pane(
f'Disk self-test{"s" if len(test_objects) > 1 else ""}', f'Disk self-test{"s" if len(test_objects) > 1 else ""}',
) )
std.print_info(f'Starting self-test{"s" if len(test_objects) > 1 else ""}') ui.print_info(f'Starting self-test{"s" if len(test_objects) > 1 else ""}')
show_failed_attributes(state) show_failed_attributes(state)
for test in reversed(test_objects): for test in reversed(test_objects):
if test.disabled: if test.disabled:
@ -691,7 +693,7 @@ def disk_surface_scan(state, test_objects, test_mode=False) -> None:
state.update_top_pane( state.update_top_pane(
f'Disk Surface Scan{"s" if len(test_objects) > 1 else ""}', f'Disk Surface Scan{"s" if len(test_objects) > 1 else ""}',
) )
std.print_info( ui.print_info(
f'Starting disk surface scan{"s" if len(test_objects) > 1 else ""}', f'Starting disk surface scan{"s" if len(test_objects) > 1 else ""}',
) )
show_failed_attributes(state) show_failed_attributes(state)
@ -731,7 +733,7 @@ def disk_surface_scan(state, test_objects, test_mode=False) -> None:
for test in test_objects: for test in test_objects:
if not (test.disabled or test.passed or test.failed): if not (test.disabled or test.passed or test.failed):
test.set_status('Aborted') test.set_status('Aborted')
test.report.append(std.color_string(' Aborted', 'YELLOW')) test.report.append(ui.color_string(' Aborted', 'YELLOW'))
# Cleanup # Cleanup
state.update_progress_pane() state.update_progress_pane()
@ -785,9 +787,9 @@ def main() -> None:
try: try:
action() action()
except KeyboardInterrupt: except KeyboardInterrupt:
std.print_warning('Aborted.') ui.print_warning('Aborted.')
std.print_standard('') ui.print_standard('')
std.pause('Press Enter to return to main menu...') ui.pause('Press Enter to return to main menu...')
if 'Clock Sync' in selection: if 'Clock Sync' in selection:
state.update_clock() state.update_clock()
@ -852,8 +854,8 @@ def run_diags(state, menu, quick_mode=False, test_mode=False) -> None:
# Just return if no tests were selected # Just return if no tests were selected
if not state.test_groups: if not state.test_groups:
std.print_warning('No tests selected?') ui.print_warning('No tests selected?')
std.pause() ui.pause()
return return
# Run tests # Run tests
@ -864,7 +866,7 @@ def run_diags(state, menu, quick_mode=False, test_mode=False) -> None:
args = [group.test_objects] args = [group.test_objects]
if group.name == 'Disk I/O Benchmark': if group.name == 'Disk I/O Benchmark':
args.append(menu.toggles['Skip USB Benchmarks']['Selected']) args.append(menu.toggles['Skip USB Benchmarks']['Selected'])
std.clear_screen() ui.clear_screen()
try: try:
function(state, *args, test_mode=test_mode) function(state, *args, test_mode=test_mode)
except (KeyboardInterrupt, std.GenericAbort): except (KeyboardInterrupt, std.GenericAbort):
@ -891,25 +893,25 @@ def run_diags(state, menu, quick_mode=False, test_mode=False) -> None:
state.save_debug_reports() state.save_debug_reports()
atexit.unregister(state.save_debug_reports) atexit.unregister(state.save_debug_reports)
if quick_mode: if quick_mode:
std.pause('Press Enter to exit...') ui.pause('Press Enter to exit...')
else: else:
std.pause('Press Enter to return to main menu...') ui.pause('Press Enter to return to main menu...')
def show_failed_attributes(state) -> None: def show_failed_attributes(state) -> None:
"""Show failed attributes for all disks.""" """Show failed attributes for all disks."""
for dev in state.disks: for dev in state.disks:
std.print_colored([dev.name, dev.description], ['CYAN', None]) ui.print_colored([dev.name, dev.description], ['CYAN', None])
std.print_report( ui.print_report(
hw_smart.generate_attribute_report(dev, only_failed=True), hw_smart.generate_attribute_report(dev, only_failed=True),
) )
std.print_standard('') ui.print_standard('')
def show_results(state) -> None: def show_results(state) -> None:
"""Show test results by device.""" """Show test results by device."""
std.sleep(0.5) std.sleep(0.5)
std.clear_screen() ui.clear_screen()
state.update_top_pane('Results') state.update_top_pane('Results')
# CPU Tests # CPU Tests
@ -917,22 +919,22 @@ def show_results(state) -> None:
group.name for group in state.test_groups if 'CPU' in group.name group.name for group in state.test_groups if 'CPU' in group.name
] ]
if cpu_tests_enabled: if cpu_tests_enabled:
std.print_success('CPU:') ui.print_success('CPU:')
std.print_report(state.system.generate_report()) ui.print_report(state.system.generate_report())
std.print_standard(' ') ui.print_standard(' ')
# Disk Tests # Disk Tests
disk_tests_enabled = [ disk_tests_enabled = [
group.name for group in state.test_groups if 'Disk' in group.name group.name for group in state.test_groups if 'Disk' in group.name
] ]
if disk_tests_enabled: if disk_tests_enabled:
std.print_success(f'Disk{"s" if len(state.disks) > 1 else ""}:') ui.print_success(f'Disk{"s" if len(state.disks) > 1 else ""}:')
for disk in state.disks: for disk in state.disks:
std.print_report(disk.generate_report()) ui.print_report(disk.generate_report())
std.print_standard(' ') ui.print_standard(' ')
if not state.disks: if not state.disks:
std.print_warning('No devices') ui.print_warning('No devices')
std.print_standard(' ') ui.print_standard(' ')
def sync_clock() -> None: def sync_clock() -> None:

View file

@ -19,7 +19,8 @@ from wk.hw.smart import (
generate_attribute_report, generate_attribute_report,
get_known_disk_attributes, get_known_disk_attributes,
) )
from wk.std import PLATFORM, color_string, strip_colors from wk.std import PLATFORM
from wk.ui.cli import color_string, strip_colors # TODO: This is lazy
# STATIC VARIABLES # STATIC VARIABLES

View file

@ -4,7 +4,8 @@
import logging import logging
from wk.exe import run_program from wk.exe import run_program
from wk.std import PLATFORM, print_warning from wk.std import PLATFORM
from wk.ui.cli import print_warning # TODO: This is lazy
# STATIC VARIABLES # STATIC VARIABLES

View file

@ -9,7 +9,8 @@ from wk.net import (
show_valid_addresses, show_valid_addresses,
speedtest, speedtest,
) )
from wk.std import ( from wk.ui.cli import (
# TODO: This is lazy
TryAndPrint, TryAndPrint,
pause, pause,
print_warning, print_warning,

View file

@ -12,7 +12,8 @@ from typing import Any
from wk.cfg.hw import CPU_CRITICAL_TEMP, SMC_IDS, TEMP_COLORS from wk.cfg.hw import CPU_CRITICAL_TEMP, SMC_IDS, TEMP_COLORS
from wk.exe import run_program, start_thread from wk.exe import run_program, start_thread
from wk.io import non_clobber_path from wk.io import non_clobber_path
from wk.std import PLATFORM, color_string, sleep from wk.std import PLATFORM, sleep
from wk.ui.cli import color_string # TODO: This is lazy
# STATIC VARIABLES # STATIC VARIABLES

View file

@ -18,7 +18,8 @@ from wk.cfg.hw import (
SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS, SMART_SELF_TEST_START_TIMEOUT_IN_SECONDS,
) )
from wk.exe import get_json_from_command, run_program from wk.exe import get_json_from_command, run_program
from wk.std import bytes_to_string, color_string, sleep from wk.std import bytes_to_string, sleep
from wk.ui.cli import color_string # TODO: This is lazy
# STATIC VARIABLES # STATIC VARIABLES

View file

@ -14,12 +14,8 @@ from wk.cfg.hw import (
TEST_MODE_BADBLOCKS_LIMIT, TEST_MODE_BADBLOCKS_LIMIT,
) )
from wk.exe import run_program from wk.exe import run_program
from wk.std import ( from wk.std import PLATFORM, bytes_to_string
PLATFORM, from wk.ui.cli import color_string, strip_colors # TODO: This is lazy
bytes_to_string,
color_string,
strip_colors,
)
# STATIC VARIABLES # STATIC VARIABLES

View file

@ -12,12 +12,8 @@ from wk.cfg.hw import KNOWN_RAM_VENDOR_IDS
from wk.cfg.python import DATACLASS_DECORATOR_KWARGS from wk.cfg.python import DATACLASS_DECORATOR_KWARGS
from wk.exe import get_json_from_command, run_program from wk.exe import get_json_from_command, run_program
from wk.hw.test import Test from wk.hw.test import Test
from wk.std import ( from wk.std import PLATFORM, bytes_to_string, string_to_bytes
PLATFORM, from wk.ui.cli import color_string # TODO: This is lazy
bytes_to_string,
color_string,
string_to_bytes,
)
# STATIC VARIABLES # STATIC VARIABLES

View file

@ -21,8 +21,9 @@ from wk.kit.tools import (
get_tool_path, get_tool_path,
) )
from wk.log import update_log_path from wk.log import update_log_path
from wk.std import ( from wk.std import GenericError
GenericError, from wk.ui.cli import (
# TODO: This is lazy
TryAndPrint, TryAndPrint,
clear_screen, clear_screen,
pause, pause,

View file

@ -10,7 +10,7 @@ from subprocess import CalledProcessError
from collections import OrderedDict from collections import OrderedDict
from docopt import docopt from docopt import docopt
from wk import io, log, std from wk import io, log
from wk.cfg.main import KIT_NAME_FULL, KIT_NAME_SHORT from wk.cfg.main import KIT_NAME_FULL, KIT_NAME_SHORT
from wk.cfg.ufd import ( from wk.cfg.ufd import (
BOOT_ENTRIES, BOOT_ENTRIES,
@ -23,6 +23,8 @@ from wk.cfg.ufd import (
from wk.exe import get_json_from_command, run_program from wk.exe import get_json_from_command, run_program
from wk.os import linux from wk.os import linux
from wk.ui import cli as ui # TODO: This is lazy
# STATIC VARIABLES # STATIC VARIABLES
DOCSTRING = '''WizardKit: Build UFD DOCSTRING = '''WizardKit: Build UFD
@ -93,10 +95,10 @@ def build_ufd():
if args['--debug']: if args['--debug']:
log.enable_debug_mode() log.enable_debug_mode()
if args['--update'] and args['EXTRA_IMAGES']: if args['--update'] and args['EXTRA_IMAGES']:
std.print_warning('Extra images are ignored when updating') ui.print_warning('Extra images are ignored when updating')
args['EXTRA_IMAGES'] = [] args['EXTRA_IMAGES'] = []
log.update_log_path(dest_name='build-ufd', timestamp=True) log.update_log_path(dest_name='build-ufd', timestamp=True)
try_print = std.TryAndPrint() try_print = ui.TryAndPrint()
try_print.add_error('FileNotFoundError') try_print.add_error('FileNotFoundError')
try_print.catch_all = False try_print.catch_all = False
try_print.indent = 2 try_print.indent = 2
@ -104,9 +106,9 @@ def build_ufd():
try_print.width = 64 try_print.width = 64
# Show header # Show header
std.print_success(KIT_NAME_FULL) ui.print_success(KIT_NAME_FULL)
std.print_warning('UFD Build Tool') ui.print_warning('UFD Build Tool')
std.print_warning(' ') ui.print_warning(' ')
# Verify selections # Verify selections
ufd_dev = verify_ufd(args['--ufd-device']) ufd_dev = verify_ufd(args['--ufd-device'])
@ -118,7 +120,7 @@ def build_ufd():
# Prep UFD # Prep UFD
if not args['--update']: if not args['--update']:
std.print_info('Prep UFD') ui.print_info('Prep UFD')
try_print.run( try_print.run(
message='Zeroing first 64MiB...', message='Zeroing first 64MiB...',
function=zero_device, function=zero_device,
@ -170,8 +172,8 @@ def build_ufd():
) )
# Copy sources # Copy sources
std.print_standard(' ') ui.print_standard(' ')
std.print_info('Copy Sources') ui.print_info('Copy Sources')
try_print.run( try_print.run(
'Copying Memtest86...', io.recursive_copy, 'Copying Memtest86...', io.recursive_copy,
'/usr/share/memtest86-efi/', '/mnt/UFD/EFI/Memtest86/', overwrite=True, '/usr/share/memtest86-efi/', '/mnt/UFD/EFI/Memtest86/', overwrite=True,
@ -187,8 +189,8 @@ def build_ufd():
# Apply extra images # Apply extra images
if not args['--update']: if not args['--update']:
std.print_standard(' ') ui.print_standard(' ')
std.print_info('Apply Extra Images') ui.print_info('Apply Extra Images')
for part_num, image_path in enumerate(extra_images): for part_num, image_path in enumerate(extra_images):
try_print.run( try_print.run(
message=f'Applying {image_path.name}...', message=f'Applying {image_path.name}...',
@ -203,8 +205,8 @@ def build_ufd():
_f.write('\n'.join([image.name for image in extra_images])) _f.write('\n'.join([image.name for image in extra_images]))
# Update boot entries # Update boot entries
std.print_standard(' ') ui.print_standard(' ')
std.print_info('Boot Setup') ui.print_info('Boot Setup')
try_print.run( try_print.run(
message='Updating boot entries...', message='Updating boot entries...',
function=update_boot_entries, function=update_boot_entries,
@ -235,8 +237,8 @@ def build_ufd():
) )
# Hide items # Hide items
std.print_standard(' ') ui.print_standard(' ')
std.print_info('Final Touches') ui.print_info('Final Touches')
try_print.run( try_print.run(
message='Hiding items...', message='Hiding items...',
function=hide_items, function=hide_items,
@ -245,30 +247,30 @@ def build_ufd():
) )
# Done # Done
std.print_standard('\nDone.') ui.print_standard('\nDone.')
if not args['--force']: if not args['--force']:
std.pause('Press Enter to exit...') ui.pause('Press Enter to exit...')
def confirm_selections(update=False): def confirm_selections(update=False):
"""Ask tech to confirm selections, twice if necessary.""" """Ask tech to confirm selections, twice if necessary."""
if not std.ask('Is the above information correct?'): if not ui.ask('Is the above information correct?'):
std.abort() ui.abort()
# Safety check # Safety check
if not update: if not update:
std.print_standard(' ') ui.print_standard(' ')
std.print_warning('SAFETY CHECK') ui.print_warning('SAFETY CHECK')
std.print_standard( ui.print_standard(
'All data will be DELETED from the disk and partition(s) listed above.') 'All data will be DELETED from the disk and partition(s) listed above.')
std.print_colored( ui.print_colored(
['This is irreversible and will lead to', 'DATA LOSS'], ['This is irreversible and will lead to', 'DATA LOSS'],
[None, 'RED'], [None, 'RED'],
) )
if not std.ask('Asking again to confirm, is this correct?'): if not ui.ask('Asking again to confirm, is this correct?'):
std.abort() ui.abort()
std.print_standard(' ') ui.print_standard(' ')
def copy_source(source, items, overwrite=False): def copy_source(source, items, overwrite=False):
@ -481,12 +483,12 @@ def show_selections(args, sources, ufd_dev, ufd_sources, extra_images):
"""Show selections including non-specified options.""" """Show selections including non-specified options."""
# Sources # Sources
std.print_info('Sources') ui.print_info('Sources')
for label in ufd_sources.keys(): for label in ufd_sources.keys():
if label in sources: if label in sources:
std.print_standard(f' {label+":":<18} {sources[label]}') ui.print_standard(f' {label+":":<18} {sources[label]}')
else: else:
std.print_colored( ui.print_colored(
[f' {label+":":<18}', 'Not Specified'], [f' {label+":":<18}', 'Not Specified'],
[None, 'YELLOW'], [None, 'YELLOW'],
) )
@ -498,15 +500,15 @@ def show_selections(args, sources, ufd_dev, ufd_sources, extra_images):
print(f' {" ":<18} {image}') print(f' {" ":<18} {image}')
# Destination # Destination
std.print_standard(' ') ui.print_standard(' ')
std.print_info('Destination') ui.print_info('Destination')
cmd = [ cmd = [
'lsblk', '--nodeps', '--noheadings', '--paths', 'lsblk', '--nodeps', '--noheadings', '--paths',
'--output', 'NAME,FSTYPE,TRAN,SIZE,VENDOR,MODEL,SERIAL', '--output', 'NAME,FSTYPE,TRAN,SIZE,VENDOR,MODEL,SERIAL',
ufd_dev, ufd_dev,
] ]
proc = run_program(cmd, check=False) proc = run_program(cmd, check=False)
std.print_standard(proc.stdout.strip()) ui.print_standard(proc.stdout.strip())
cmd = [ cmd = [
'lsblk', '--noheadings', '--paths', 'lsblk', '--noheadings', '--paths',
'--output', 'NAME,SIZE,FSTYPE,LABEL,MOUNTPOINT', '--output', 'NAME,SIZE,FSTYPE,LABEL,MOUNTPOINT',
@ -514,14 +516,14 @@ def show_selections(args, sources, ufd_dev, ufd_sources, extra_images):
] ]
proc = run_program(cmd, check=False) proc = run_program(cmd, check=False)
for line in proc.stdout.splitlines()[1:]: for line in proc.stdout.splitlines()[1:]:
std.print_standard(line) ui.print_standard(line)
# Notes # Notes
if args['--update']: if args['--update']:
std.print_warning('Updating kit in-place') ui.print_warning('Updating kit in-place')
elif args['--use-mbr']: elif args['--use-mbr']:
std.print_warning('Formatting using legacy MBR') ui.print_warning('Formatting using legacy MBR')
std.print_standard(' ') ui.print_standard(' ')
def update_boot_entries(ufd_dev, images=None): def update_boot_entries(ufd_dev, images=None):
@ -621,11 +623,11 @@ def verify_sources(args, ufd_sources):
try: try:
s_path_obj = io.case_insensitive_path(s_path) s_path_obj = io.case_insensitive_path(s_path)
except FileNotFoundError: except FileNotFoundError:
std.print_error(f'ERROR: {label} not found: {s_path}') ui.print_error(f'ERROR: {label} not found: {s_path}')
std.abort() ui.abort()
if not is_valid_path(s_path_obj, data['Type']): if not is_valid_path(s_path_obj, data['Type']):
std.print_error(f'ERROR: Invalid {label} source: {s_path}') ui.print_error(f'ERROR: Invalid {label} source: {s_path}')
std.abort() ui.abort()
sources[label] = s_path_obj sources[label] = s_path_obj
return sources return sources
@ -638,12 +640,12 @@ def verify_ufd(dev_path):
try: try:
ufd_dev = io.case_insensitive_path(dev_path) ufd_dev = io.case_insensitive_path(dev_path)
except FileNotFoundError: except FileNotFoundError:
std.print_error(f'ERROR: UFD device not found: {dev_path}') ui.print_error(f'ERROR: UFD device not found: {dev_path}')
std.abort() ui.abort()
if not is_valid_path(ufd_dev, 'UFD'): if not is_valid_path(ufd_dev, 'UFD'):
std.print_error(f'ERROR: Invalid UFD device: {ufd_dev}') ui.print_error(f'ERROR: Invalid UFD device: {ufd_dev}')
std.abort() ui.abort()
return ufd_dev return ufd_dev

View file

@ -61,6 +61,24 @@ def format_log_path(
return log_path return log_path
def get_log_filepath():
"""Get the log filepath from the root logger, returns pathlib.Path obj.
NOTE: This will use the first handler baseFilename it finds (if any).
"""
log_filepath = None
root_logger = logging.getLogger()
# Check handlers
for handler in root_logger.handlers:
if hasattr(handler, 'baseFilename'):
log_filepath = pathlib.Path(handler.baseFilename).resolve()
break
# Done
return log_filepath
def get_root_logger_path(): def get_root_logger_path():
"""Get path to log file from root logger, returns pathlib.Path obj.""" """Get path to log file from root logger, returns pathlib.Path obj."""
log_path = None log_path = None

View file

@ -8,9 +8,10 @@ import re
import psutil import psutil
from wk.exe import get_json_from_command, run_program from wk.exe import get_json_from_command, run_program
from wk.std import PLATFORM, GenericError, show_data from wk.std import PLATFORM, GenericError
from wk.cfg.net import BACKUP_SERVERS from wk.cfg.net import BACKUP_SERVERS
from wk.ui.cli import show_data # TODO: This is lazy
# REGEX # REGEX

View file

@ -10,7 +10,8 @@ import subprocess
from wk.cfg.hw import VOLUME_FAILURE_THRESHOLD, VOLUME_WARNING_THRESHOLD from wk.cfg.hw import VOLUME_FAILURE_THRESHOLD, VOLUME_WARNING_THRESHOLD
from wk.exe import get_json_from_command, popen_program, run_program from wk.exe import get_json_from_command, popen_program, run_program
from wk.log import format_log_path from wk.log import format_log_path
from wk.std import bytes_to_string, color_string from wk.std import bytes_to_string
from wk.ui.cli import color_string # TODO: This is lazy
# STATIC VARIABLES # STATIC VARIABLES

View file

@ -30,10 +30,9 @@ from wk.std import (
GenericError, GenericError,
GenericWarning, GenericWarning,
bytes_to_string, bytes_to_string,
color_string,
input_text,
sleep, sleep,
) )
from wk.ui.cli import color_string, input_text # TODO: This is lazy
# STATIC VARIABLES # STATIC VARIABLES

View file

@ -58,6 +58,10 @@ from wk.os.win import (
from wk.std import ( from wk.std import (
GenericError, GenericError,
GenericWarning, GenericWarning,
sleep,
)
from wk.ui.cli import (
# TODO: This is lazy
Menu, Menu,
TryAndPrint, TryAndPrint,
abort, abort,
@ -70,7 +74,6 @@ from wk.std import (
print_warning, print_warning,
set_title, set_title,
show_data, show_data,
sleep,
strip_colors, strip_colors,
) )

View file

@ -60,6 +60,10 @@ from wk.repairs.win import (
from wk.std import ( from wk.std import (
GenericError, GenericError,
GenericWarning, GenericWarning,
sleep,
)
from wk.ui.cli import (
# TODO: This is lazy
Menu, Menu,
TryAndPrint, TryAndPrint,
abort, abort,
@ -73,7 +77,6 @@ from wk.std import (
print_warning, print_warning,
set_title, set_title,
show_data, show_data,
sleep,
strip_colors, strip_colors,
) )

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,3 @@
"""WizardKit: ui module init"""
from . import cli

862
scripts/wk/ui/cli.py Normal file
View file

@ -0,0 +1,862 @@
"""WizardKit: CLI functions"""
# vim: sts=2 sw=2 ts=2
import itertools
import logging
import os
import pathlib
import platform
import re
import subprocess
import sys
import traceback
from collections import OrderedDict
try:
from functools import cache
except ImportError:
# Assuming Python is < 3.9
from functools import lru_cache as cache
from wk.cfg.main import (
ENABLED_UPLOAD_DATA,
INDENT,
SUPPORT_MESSAGE,
WIDTH,
)
from wk.std import (sleep, GenericWarning)
# STATIC VARIABLES
COLORS = {
'CLEAR': '\033[0m',
'RED': '\033[31m',
'RED_BLINK': '\033[31;5m',
'ORANGE': '\033[31;1m',
'ORANGE_RED': '\033[1;31;41m',
'GREEN': '\033[32m',
'YELLOW': '\033[33m',
'YELLOW_BLINK': '\033[33;5m',
'BLUE': '\033[34m',
'PURPLE': '\033[35m',
'CYAN': '\033[36m',
}
LOG = logging.getLogger(__name__)
PLATFORM = platform.system()
# Classes
class Menu():
"""Object for tracking menu specific data and methods.
Menu items are added to an OrderedDict so the order is preserved.
ASSUMPTIONS:
1. All entry names are unique.
2. All action entry names start with different letters.
"""
def __init__(self, title='[Untitled Menu]'):
self.actions = OrderedDict()
self.options = OrderedDict()
self.sets = OrderedDict()
self.toggles = OrderedDict()
self.disabled_str = 'Disabled'
self.separator = ''
self.title = title
def _generate_menu_text(self):
"""Generate menu text, returns str."""
separator_string = self._get_separator_string()
menu_lines = [self.title, separator_string] if self.title else []
# Sets & toggles
for section in (self.sets, self.toggles):
for details in section.values():
if details.get('Hidden', False):
continue
if details.get('Separator', False):
menu_lines.append(separator_string)
menu_lines.append(details['Display Name'])
if self.sets or self.toggles:
menu_lines.append(separator_string)
# Options
for details in self.options.values():
if details.get('Hidden', False):
continue
if details.get('Separator', False):
menu_lines.append(separator_string)
menu_lines.append(details['Display Name'])
if self.options:
menu_lines.append(separator_string)
# Actions
for details in self.actions.values():
if details.get('Hidden', False):
continue
if details.get('Separator', False):
menu_lines.append(separator_string)
menu_lines.append(details['Display Name'])
# Show menu
menu_lines.append('')
menu_lines = [str(line) for line in menu_lines]
return '\n'.join(menu_lines)
def _get_display_name(
self, name, details,
index=None, no_checkboxes=True, setting_item=False):
"""Format display name based on details and args, returns str."""
disabled = details.get('Disabled', False)
if setting_item and not details['Selected']:
# Display item in YELLOW
disabled = True
checkmark = '*'
if 'DISPLAY' in os.environ or PLATFORM == 'Darwin':
checkmark = ''
display_name = f'{index if index else name[:1].upper()}: '
if not (index and index >= 10):
display_name = f' {display_name}'
if setting_item and 'Value' in details:
name = f'{name} = {details["Value"]}'
# Add enabled status if necessary
if not no_checkboxes:
display_name += f'[{checkmark if details["Selected"] else " "}] '
# Add name
if disabled:
display_name += color_string(f'{name} ({self.disabled_str})', 'YELLOW')
else:
display_name += name
# Done
return display_name
def _get_separator_string(self):
"""Format separator length based on name lengths, returns str."""
separator_length = 0
# Check title line(s)
if self.title:
for line in self.title.split('\n'):
separator_length = max(separator_length, len(strip_colors(line)))
# Loop over all item names
for section in (self.actions, self.options, self.sets, self.toggles):
for details in section.values():
if details.get('Hidden', False):
# Skip hidden lines
continue
line = strip_colors(details['Display Name'])
separator_length = max(separator_length, len(line))
separator_length += 1
# Done
return self.separator * separator_length
def _get_valid_answers(self):
"""Get valid answers based on menu items, returns list."""
valid_answers = []
# Numbered items
index = 0
for section in (self.sets, self.toggles, self.options):
for details in section.values():
if details.get('Hidden', False):
# Don't increment index or add to valid_answers
continue
index += 1
if not details.get('Disabled', False):
valid_answers.append(str(index))
# Action items
for name, details in self.actions.items():
if not details.get('Disabled', False):
valid_answers.append(name[:1].upper())
# Done
return valid_answers
def _resolve_selection(self, selection):
"""Get menu item based on user selection, returns tuple."""
offset = 1
resolved_selection = None
if selection.isnumeric():
# Enumerate over numbered entries
entries = [
*self.sets.items(),
*self.toggles.items(),
*self.options.items(),
]
for _i, details in enumerate(entries):
if details[1].get('Hidden', False):
offset -= 1
elif str(_i+offset) == selection:
resolved_selection = (details)
break
else:
# Just check actions
for action, details in self.actions.items():
if action.lower().startswith(selection.lower()):
resolved_selection = (action, details)
break
# Done
return resolved_selection
def _update(self, single_selection=True, settings_mode=False):
"""Update menu items in preparation for printing to screen."""
index = 0
# Fix selection status for sets
for set_details in self.sets.values():
set_selected = True
set_targets = set_details['Targets']
for option, option_details in self.options.items():
if option in set_targets and not option_details['Selected']:
set_selected = False
elif option not in set_targets and option_details['Selected']:
set_selected = False
set_details['Selected'] = set_selected
# Numbered sections
for section in (self.sets, self.toggles, self.options):
for name, details in section.items():
if details.get('Hidden', False):
# Skip hidden lines and don't increment index
continue
index += 1
details['Display Name'] = self._get_display_name(
name,
details,
index=index,
no_checkboxes=single_selection,
setting_item=settings_mode,
)
# Actions
for name, details in self.actions.items():
details['Display Name'] = self._get_display_name(
name,
details,
no_checkboxes=True,
)
def _update_entry_selection_status(self, entry, toggle=True, status=None):
"""Update entry selection status either directly or by toggling."""
if entry in self.sets:
# Update targets not the set itself
new_status = not self.sets[entry]['Selected'] if toggle else status
targets = self.sets[entry]['Targets']
self._update_set_selection_status(targets, new_status)
for section in (self.toggles, self.options, self.actions):
if entry in section:
if toggle:
section[entry]['Selected'] = not section[entry]['Selected']
else:
section[entry]['Selected'] = status
def _update_set_selection_status(self, targets, status):
"""Select or deselect options based on targets and status."""
for option, details in self.options.items():
# If (new) status is True and this option is a target then select
# Otherwise deselect
details['Selected'] = status and option in targets
def _user_select(self, prompt):
"""Show menu and select an entry, returns str."""
menu_text = self._generate_menu_text()
valid_answers = self._get_valid_answers()
# Menu loop
while True:
clear_screen()
print(menu_text)
sleep(0.01)
answer = input_text(prompt).strip()
if answer.upper() in valid_answers:
break
# Done
return answer
def add_action(self, name, details=None):
"""Add action to menu."""
details = details if details else {}
details['Selected'] = details.get('Selected', False)
self.actions[name] = details
def add_option(self, name, details=None):
"""Add option to menu."""
details = details if details else {}
details['Selected'] = details.get('Selected', False)
self.options[name] = details
def add_set(self, name, details=None):
"""Add set to menu."""
details = details if details else {}
details['Selected'] = details.get('Selected', False)
# Safety check
if 'Targets' not in details:
raise KeyError('Menu set has no targets')
# Add set
self.sets[name] = details
def add_toggle(self, name, details=None):
"""Add toggle to menu."""
details = details if details else {}
details['Selected'] = details.get('Selected', False)
self.toggles[name] = details
def advanced_select(self, prompt='Please make a selection: '):
"""Display menu and make multiple selections, returns tuple.
NOTE: Menu is displayed until an action entry is selected.
"""
while True:
self._update(single_selection=False)
user_selection = self._user_select(prompt)
selected_entry = self._resolve_selection(user_selection)
if user_selection.isnumeric():
# Update selection(s)
self._update_entry_selection_status(selected_entry[0])
else:
# Action selected
break
# Done
return selected_entry
def settings_select(self, prompt='Please make a selection: '):
"""Display menu and make multiple selections, returns tuple.
NOTE: Menu is displayed until an action entry is selected.
"""
choice_kwargs = {
'choices': ['T', 'C'],
'prompt': 'Toggle or change value?',
}
while True:
self._update(single_selection=True, settings_mode=True)
user_selection = self._user_select(prompt)
selected_entry = self._resolve_selection(user_selection)
if user_selection.isnumeric():
if 'Value' in selected_entry[-1] and choice(**choice_kwargs) == 'C':
# Change
selected_entry[-1]['Value'] = input_text('Enter new value: ')
else:
# Toggle
self._update_entry_selection_status(selected_entry[0])
else:
# Action selected
break
# Done
return selected_entry
def simple_select(self, prompt='Please make a selection: ', update=True):
"""Display menu and make a single selection, returns tuple."""
if update:
self._update()
user_selection = self._user_select(prompt)
return self._resolve_selection(user_selection)
def update(self):
"""Update menu with default settings."""
self._update()
class TryAndPrint():
"""Object used to standardize running functions and returning the result.
The errors and warning attributes are used to allow fine-tuned results
based on exception names.
"""
def __init__(self, msg_bad='FAILED', msg_good='SUCCESS'):
self.catch_all = True
self.indent = INDENT
self.list_errors = ['GenericError']
self.list_warnings = ['GenericWarning']
self.msg_bad = msg_bad
self.msg_good = msg_good
self.verbose = False
self.width = WIDTH
def _format_exception_message(self, _exception):
"""Format using the exception's args or name, returns str."""
LOG.debug(
'Formatting exception: %s, %s',
_exception.__class__.__name__,
_exception,
)
message = ''
# Format message string from _exception
try:
if isinstance(_exception, subprocess.CalledProcessError):
message = _exception.stderr
if not isinstance(message, str):
message = message.decode('utf-8')
message = message.strip()
elif isinstance(_exception, ZeroDivisionError):
# Skip and just use exception name below
pass
else:
message = str(_exception)
except Exception:
# Just use the exception name instead
pass
# Prepend exception name
if _exception.__class__.__name__ not in ('GenericError', 'GenericWarning'):
try:
message = f'{_exception.__class__.__name__}: {message}'
except Exception:
message = f'UNKNOWN ERROR: {message}'
# Fix multi-line messages
if '\n' in message:
try:
lines = [
f'{" "*(self.indent+self.width)}{line.strip()}'
for line in message.splitlines() if line.strip()
]
lines[0] = lines[0].strip()
message = '\n'.join(lines)
except Exception:
pass
# Done
return message
def _format_function_output(self, output, msg_good):
"""Format function output for use in try_and_print(), returns str."""
LOG.debug('Formatting output: %s', output)
if not output:
raise GenericWarning('No output')
# Ensure we're working with a list
if isinstance(output, subprocess.CompletedProcess):
stdout = output.stdout
if not isinstance(stdout, str):
stdout = stdout.decode('utf8')
output = stdout.strip().splitlines()
if not output:
# Going to treat these as successes (for now)
LOG.warning('Program output was empty, assuming good result.')
return color_string(msg_good, 'GREEN')
else:
try:
output = list(output)
except TypeError:
output = [output]
# Safety check
if not output:
# Going to ignore empty function output for now
LOG.error('Output is empty')
return 'UNKNOWN'
# Build result_msg
result_msg = f'{output.pop(0)}'
if output:
output = [f'{" "*(self.indent+self.width)}{line}' for line in output]
result_msg += '\n' + '\n'.join(output)
# Done
return result_msg
def _log_result(self, message, result_msg):
"""Log result text without color formatting."""
log_text = f'{" "*self.indent}{message:<{self.width}}{result_msg}'
for line in log_text.splitlines():
line = strip_colors(line)
LOG.info(line)
def add_error(self, exception_name):
"""Add exception name to error list."""
if exception_name not in self.list_errors:
self.list_errors.append(exception_name)
def add_warning(self, exception_name):
"""Add exception name to warning list."""
if exception_name not in self.list_warnings:
self.list_warnings.append(exception_name)
def run(
self, message, function, *args,
catch_all=None, msg_good=None, verbose=None, **kwargs):
"""Run a function and print the results, returns results as dict.
If catch_all is True then (nearly) all exceptions will be caught.
Otherwise if an exception occurs that wasn't specified it will be
re-raised.
If the function returns data it will be used instead of msg_good,
msg_bad, or exception text.
The output should be a list or a subprocess.CompletedProcess object.
If msg_good is passed it will override self.msg_good for this call.
If verbose is True then exception names or messages will be used for
the result message. Otherwise it will simply be set to result_bad.
If catch_all and/or verbose are passed it will override
self.catch_all and/or self.verbose for this call.
args and kwargs are passed to the function.
"""
LOG.debug('function: %s.%s', function.__module__, function.__name__)
LOG.debug('args: %s', args)
LOG.debug('kwargs: %s', kwargs)
LOG.debug(
'catch_all: %s, msg_good: %s, verbose: %s',
catch_all,
msg_good,
verbose,
)
f_exception = None
catch_all = catch_all if catch_all is not None else self.catch_all
msg_good = msg_good if msg_good is not None else self.msg_good
output = None
result_msg = 'UNKNOWN'
verbose = verbose if verbose is not None else self.verbose
# Build exception tuples
e_exceptions = tuple(get_exception(e) for e in self.list_errors)
w_exceptions = tuple(get_exception(e) for e in self.list_warnings)
# Run function and catch exceptions
print(f'{" "*self.indent}{message:<{self.width}}', end='', flush=True)
LOG.debug('Running function: %s.%s', function.__module__, function.__name__)
try:
output = function(*args, **kwargs)
except w_exceptions as _exception:
# Warnings
result_msg = self._format_exception_message(_exception)
print_warning(result_msg, log=False)
f_exception = _exception
except e_exceptions as _exception:
# Exceptions
result_msg = self._format_exception_message(_exception)
print_error(result_msg, log=False)
f_exception = _exception
except Exception as _exception:
# Unexpected exceptions
if verbose:
result_msg = self._format_exception_message(_exception)
else:
result_msg = self.msg_bad
print_error(result_msg, log=False)
f_exception = _exception
if not catch_all:
# Re-raise error as necessary
raise
else:
# Success
if output:
result_msg = self._format_function_output(output, msg_good)
print(result_msg)
else:
result_msg = msg_good
print_success(result_msg, log=False)
# Done
self._log_result(message, result_msg)
return {
'Exception': f_exception,
'Failed': bool(f_exception),
'Message': result_msg,
'Output': output,
}
# Functions
def abort(prompt='Aborted.', show_prompt=True, return_code=1):
"""Abort script."""
print_warning(prompt)
if show_prompt:
sleep(0.5)
pause(prompt='Press Enter to exit... ')
sys.exit(return_code)
def ask(prompt='Kotaero!'):
"""Prompt the user with a Y/N question, returns bool."""
answer = None
prompt = f'{prompt} [Y/N]: '
# Loop until acceptable answer is given
while answer is None:
tmp = input_text(prompt)
if re.search(r'^y(es|up|)$', tmp, re.IGNORECASE):
answer = True
elif re.search(r'^n(o|ope|)$', tmp, re.IGNORECASE):
answer = False
# Done
LOG.info('%s%s', prompt, 'Yes' if answer else 'No')
return answer
def beep(repeat=1):
"""Play system bell with optional repeat."""
while repeat >= 1:
# Print bell char without a newline
print('\a', end='', flush=True)
sleep(0.5)
repeat -= 1
def choice(choices, prompt='答えろ!'):
"""Choose an option from a provided list, returns str.
Choices provided will be converted to uppercase and returned as such.
Similar to the commands choice (Windows) and select (Linux).
"""
LOG.debug('choices: %s, prompt: %s', choices, prompt)
answer = None
choices = [str(c).upper()[:1] for c in choices]
prompt = f'{prompt} [{"/".join(choices)}]'
regex = f'^({"|".join(choices)})$'
# Loop until acceptable answer is given
while answer is None:
tmp = input_text(prompt=prompt)
if re.search(regex, tmp, re.IGNORECASE):
answer = tmp.upper()
# Done
LOG.info('%s %s', prompt, answer)
return answer
def clear_screen():
"""Simple wrapper for clear/cls."""
cmd = 'cls' if os.name == 'nt' else 'clear'
proc = subprocess.run(cmd, check=False, shell=True, stderr=subprocess.PIPE)
# Workaround for live macOS env
if proc.returncode != 0:
print('\033c')
def color_string(strings, colors, sep=' '):
"""Build colored string using ANSI escapes, returns str."""
clear_code = COLORS['CLEAR']
msg = []
# Convert to tuples if necessary
if isinstance(strings, (str, pathlib.Path)):
strings = (strings,)
if isinstance(colors, (str, pathlib.Path)):
colors = (colors,)
# Convert to strings if necessary
try:
iter(strings)
except TypeError:
# Assuming single element passed, convert to string
strings = (str(strings),)
try:
iter(colors)
except TypeError:
# Assuming single element passed, convert to string
colors = (str(colors),)
# Build new string with color escapes added
for string, color in itertools.zip_longest(strings, colors):
color_code = COLORS.get(color, clear_code)
msg.append(f'{color_code}{string}{clear_code}')
# Done
return sep.join(msg)
@cache
def get_exception(name):
"""Get exception by name, returns exception object.
[Doctest]
>>> t = TryAndPrint()
>>> t._get_exception('AttributeError')
<class 'AttributeError'>
>>> t._get_exception('CalledProcessError')
<class 'subprocess.CalledProcessError'>
>>> t._get_exception('GenericError')
<class 'wk.std.GenericError'>
"""
LOG.debug('Getting exception: %s', name)
obj = getattr(sys.modules[__name__], name, None)
if obj:
return obj
# Try builtin classes
obj = getattr(sys.modules['builtins'], name, None)
if obj:
return obj
# Try all modules
for _mod in sys.modules.values():
obj = getattr(_mod, name, None)
if obj:
break
# Check if not found
if not obj:
raise AttributeError(f'Failed to find exception: {name}')
# Done
return obj
def input_text(prompt='Enter text', allow_empty_response=True):
"""Get text from user, returns string."""
prompt = str(prompt)
response = None
if prompt[-1:] != ' ':
prompt += ' '
print(prompt, end='', flush=True)
while response is None:
try:
response = input()
LOG.debug('%s%s', prompt, response)
except EOFError:
# Ignore and try again
LOG.warning('Exception occured', exc_info=True)
print('', flush=True)
if not allow_empty_response:
if response is None or not response.strip():
# The None check here is used to avoid a TypeError if response is None
print(f'\r{prompt}', end='', flush=True)
response = None
return response
def major_exception():
"""Display traceback, optionally upload detailes, and exit."""
LOG.critical('Major exception encountered', exc_info=True)
print_error('Major exception', log=False)
print_warning(SUPPORT_MESSAGE)
if ENABLED_UPLOAD_DATA:
print_warning('Also, please run upload-logs to help debugging!')
print(traceback.format_exc())
# Done
pause('Press Enter to exit... ')
raise SystemExit(1)
def pause(prompt='Press Enter to continue... '):
"""Simple pause implementation."""
input_text(prompt)
def print_colored(strings, colors, log=False, sep=' ', **kwargs):
"""Prints strings in the colors specified."""
LOG.debug(
'strings: %s, colors: %s, sep: %s, kwargs: %s',
strings, colors, sep, kwargs,
)
msg = color_string(strings, colors, sep=sep)
print_options = {
'end': kwargs.get('end', '\n'),
'file': kwargs.get('file', sys.stdout),
'flush': kwargs.get('flush', False),
}
print(msg, **print_options)
if log:
LOG.info(strip_colors(msg))
def print_error(msg, log=True, **kwargs):
"""Prints message in RED and log as ERROR."""
if 'file' not in kwargs:
# Only set if not specified
kwargs['file'] = sys.stderr
print_colored(msg, 'RED', **kwargs)
if log:
LOG.error(msg)
def print_info(msg, log=True, **kwargs):
"""Prints message in BLUE and log as INFO."""
print_colored(msg, 'BLUE', **kwargs)
if log:
LOG.info(msg)
def print_report(report, indent=None, log=True):
"""Print report to screen and optionally to log."""
for line in report:
if indent:
line = f'{" "*indent}{line}'
print(line)
if log:
LOG.info(strip_colors(line))
def print_standard(msg, log=True, **kwargs):
"""Prints message and log as INFO."""
print(msg, **kwargs)
if log:
LOG.info(msg)
def print_success(msg, log=True, **kwargs):
"""Prints message in GREEN and log as INFO."""
print_colored(msg, 'GREEN', **kwargs)
if log:
LOG.info(msg)
def print_warning(msg, log=True, **kwargs):
"""Prints message in YELLOW and log as WARNING."""
if 'file' not in kwargs:
# Only set if not specified
kwargs['file'] = sys.stderr
print_colored(msg, 'YELLOW', **kwargs)
if log:
LOG.warning(msg)
def set_title(title):
"""Set window title."""
LOG.debug('title: %s', title)
if os.name == 'nt':
os.system(f'title {title}')
else:
print_error('Setting the title is only supported under Windows.')
def show_data(message, data, color=None, indent=None, width=None):
"""Display info using default or provided indent and width."""
colors = (None, color if color else None)
indent = INDENT if indent is None else indent
width = WIDTH if width is None else width
print_colored(
(f'{" "*indent}{message:<{width}}', data),
colors,
log=True,
sep='',
)
def strip_colors(string):
"""Strip known ANSI color escapes from string, returns str."""
LOG.debug('string: %s', string)
for color in COLORS.values():
string = string.replace(color, '')
return string
if __name__ == '__main__':
print("This file is not meant to be called directly.")

View file

@ -17,7 +17,7 @@ OPTIONS = {
def get_debug_prefix() -> str: def get_debug_prefix() -> str:
"""Ask what we're debugging, returns log dir prefix.""" """Ask what we're debugging, returns log dir prefix."""
menu = wk.std.Menu(title=f'{wk.cfg.main.KIT_NAME_FULL}: Debugging Menu\n') menu = wk.ui.cli.Menu(title=f'{wk.cfg.main.KIT_NAME_FULL}: Debugging Menu\n')
for name, prefix in OPTIONS.items(): for name, prefix in OPTIONS.items():
menu.add_option(name, {'Prefix': prefix}) menu.add_option(name, {'Prefix': prefix})
selection = menu.simple_select() selection = menu.simple_select()
@ -38,14 +38,14 @@ def get_debug_path() -> pathlib.Path:
# Safety check # Safety check
if not debug_paths: if not debug_paths:
wk.std.abort('No logs found, aborting.') wk.ui.cli.abort('No logs found, aborting.')
# Use latest option # Use latest option
if wk.std.ask('Use latest session?'): if wk.ui.cli.ask('Use latest session?'):
return debug_paths[-1] return debug_paths[-1]
# Select from list # Select from list
menu = wk.std.Menu(title=f'{wk.cfg.main.KIT_NAME_FULL}: Debugging Menu\n') menu = wk.ui.cli.Menu(title=f'{wk.cfg.main.KIT_NAME_FULL}: Debugging Menu\n')
for item in debug_paths: for item in debug_paths:
menu.add_option(item.parent.name, {'Path': item}) menu.add_option(item.parent.name, {'Path': item})
selection = menu.simple_select() selection = menu.simple_select()