BREAKING Add wk/ui/tui.py

This commit is contained in:
2Shirt 2023-04-09 15:59:34 -07:00
parent f9bcd534d4
commit ba3bf480f7
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C
5 changed files with 448 additions and 144 deletions

View file

@ -3,8 +3,6 @@
import re import re
from collections import OrderedDict
# STATIC VARIABLES # STATIC VARIABLES
ATTRIBUTE_COLORS = ( ATTRIBUTE_COLORS = (
@ -161,18 +159,6 @@ THRESH_HDD_AVG_LOW = 65 * 1024**2
THRESH_SSD_MIN = 90 * 1024**2 THRESH_SSD_MIN = 90 * 1024**2
THRESH_SSD_AVG_HIGH = 135 * 1024**2 THRESH_SSD_AVG_HIGH = 135 * 1024**2
THRESH_SSD_AVG_LOW = 100 * 1024**2 THRESH_SSD_AVG_LOW = 100 * 1024**2
TMUX_SIDE_WIDTH = 20
TMUX_LAYOUT = OrderedDict({
'Top': {'height': 2, 'Check': True},
'Started': {'width': TMUX_SIDE_WIDTH, 'Check': True},
'Progress': {'width': TMUX_SIDE_WIDTH, 'Check': True},
# Testing panes
'Temps': {'height': 1000, 'Check': False},
'Prime95': {'height': 11, 'Check': False},
'SMART': {'height': 4, 'Check': True},
'badblocks': {'height': 5, 'Check': True},
'I/O Benchmark': {'height': 1000, 'Check': False},
})
# VOLUME THRESHOLDS in percent # VOLUME THRESHOLDS in percent
VOLUME_WARNING_THRESHOLD = 70 VOLUME_WARNING_THRESHOLD = 70
VOLUME_FAILURE_THRESHOLD = 85 VOLUME_FAILURE_THRESHOLD = 85

View file

@ -25,8 +25,8 @@ from wk.hw.network import network_test
from wk.hw.screensavers import screensaver from wk.hw.screensavers import screensaver
from wk.hw.test import Test, TestGroup from wk.hw.test import Test, TestGroup
from wk.ui import cli as ui from wk.ui import tui as ui
from wk.ui import ansi, tmux from wk.ui import ansi, cli, tmux
# STATIC VARIABLES # STATIC VARIABLES
@ -84,18 +84,15 @@ class State():
"""Object for tracking hardware diagnostic data.""" """Object for tracking hardware diagnostic data."""
def __init__(self, test_mode=False): def __init__(self, test_mode=False):
self.disks = [] self.disks = []
self.layout = cfg.hw.TMUX_LAYOUT.copy()
self.log_dir = None self.log_dir = None
self.panes = {} self.panes = {}
self.progress_file = None
self.system = None self.system = None
self.test_groups = [] self.test_groups = []
self.top_text = ansi.color_string('Hardware Diagnostics', 'GREEN') self.title_text = ansi.color_string('Hardware Diagnostics', 'GREEN')
if test_mode: if test_mode:
self.top_text += ansi.color_string(' (Test Mode)', 'YELLOW') self.title_text += ansi.color_string(' (Test Mode)', 'YELLOW')
self.ui = ui.TUI(self.title_text)
# Init tmux and start a background process to maintain layout
self.init_tmux()
exe.start_thread(self.fix_tmux_layout_loop)
def abort_testing(self) -> None: def abort_testing(self) -> None:
"""Set unfinished tests as aborted and cleanup tmux panes.""" """Set unfinished tests as aborted and cleanup tmux panes."""
@ -125,23 +122,6 @@ class State():
dev.disable_disk_tests() dev.disable_disk_tests()
break break
def fix_tmux_layout(self, forced=True) -> None:
"""Fix tmux layout based on cfg.hw.TMUX_LAYOUT."""
try:
tmux.fix_layout(self.panes, self.layout, forced=forced)
except RuntimeError:
# Assuming self.panes changed while running
pass
def fix_tmux_layout_loop(self) -> None:
"""Fix tmux layout on a loop.
NOTE: This should be called as a thread.
"""
while True:
self.fix_tmux_layout(forced=False)
std.sleep(1)
def init_diags(self, menu) -> None: def init_diags(self, menu) -> None:
"""Initialize diagnostic pass.""" """Initialize diagnostic pass."""
@ -163,15 +143,13 @@ class State():
keep_history=False, keep_history=False,
timestamp=False, timestamp=False,
) )
ui.clear_screen() cli.clear_screen()
ui.print_info('Initializing...') cli.print_info('Initializing...')
# Progress Pane # Progress Pane
self.update_progress_pane() self.progress_file = pathlib.Path(f'{self.log_dir}/progress.out')
tmux.respawn_pane( self.update_progress_file()
pane_id=self.panes['Progress'], self.ui.set_progress_file(self.progress_file)
watch_file=f'{self.log_dir}/progress.out',
)
# Add HW Objects # Add HW Objects
self.system = hw_system.System() self.system = hw_system.System()
@ -222,7 +200,7 @@ class State():
behind=True, behind=True,
lines=2, lines=2,
vertical=True, vertical=True,
text=f'{self.top_text}\nMain Menu', text=f'{self.title_text}\nMain Menu',
) )
# Started # Started
@ -299,16 +277,15 @@ class State():
), ),
) )
def update_progress_pane(self) -> None: def update_progress_file(self) -> None:
"""Update progress pane.""" """Update progress file."""
report = [] report = []
width = cfg.hw.TMUX_SIDE_WIDTH
for group in self.test_groups: for group in self.test_groups:
report.append(ansi.color_string(group.name, 'BLUE')) report.append(ansi.color_string(group.name, 'BLUE'))
for test in group.test_objects: for test in group.test_objects:
report.append(ansi.color_string( report.append(ansi.color_string(
[test.label, f'{test.status:>{width-len(test.label)}}'], [test.label, f'{test.status:>{self.ui.side_width-len(test.label)}}'],
[None, STATUS_COLORS.get(test.status, None)], [None, STATUS_COLORS.get(test.status, None)],
sep='', sep='',
)) ))
@ -317,19 +294,17 @@ class State():
report.append(' ') report.append(' ')
# Write to progress file # Write to progress file
out_path = pathlib.Path(f'{self.log_dir}/progress.out') self.progress_file.write_text('\n'.join(report), encoding='utf-8')
with open(out_path, 'w', encoding='utf-8') as _f:
_f.write('\n'.join(report))
def update_top_pane(self, text) -> None: def update_top_pane(self, text) -> None:
"""Update top pane with text.""" """Update top pane with text."""
tmux.respawn_pane(self.panes['Top'], text=f'{self.top_text}\n{text}') tmux.respawn_pane(self.panes['Top'], text=f'{self.title_text}\n{text}')
# Functions # Functions
def build_menu(cli_mode=False, quick_mode=False) -> ui.Menu: def build_menu(cli_mode=False, quick_mode=False) -> cli.Menu:
"""Build main menu, returns wk.ui.cli.Menu.""" """Build main menu, returns wk.ui.cli.Menu."""
menu = ui.Menu(title=None) menu = cli.Menu(title=None)
# Add actions, options, etc # Add actions, options, etc
for action in MENU_ACTIONS: for action in MENU_ACTIONS:
@ -407,7 +382,7 @@ def cpu_stress_tests(state, test_objects, test_mode=False) -> None:
) )
# Create monitor and worker panes # Create monitor and worker panes
state.update_progress_pane() state.update_progress_file()
state.panes['Prime95'] = tmux.split_window( state.panes['Prime95'] = tmux.split_window(
lines=10, vertical=True, watch_file=prime_log, watch_cmd='tail') lines=10, vertical=True, watch_file=prime_log, watch_cmd='tail')
if PLATFORM == 'Darwin': if PLATFORM == 'Darwin':
@ -421,11 +396,11 @@ def cpu_stress_tests(state, test_objects, test_mode=False) -> None:
state.layout['Current'] = {'height': 3, 'Check': True} state.layout['Current'] = {'height': 3, 'Check': True}
# Get idle temps # Get idle temps
ui.print_standard('Saving idle temps...') cli.print_standard('Saving idle temps...')
sensors.save_average_temps(temp_label='Idle', seconds=5) sensors.save_average_temps(temp_label='Idle', seconds=5)
# Stress CPU # Stress CPU
ui.print_info('Running stress test') cli.print_info('Running stress test')
hw_cpu.set_apple_fan_speed('max') hw_cpu.set_apple_fan_speed('max')
proc_mprime = hw_cpu.start_mprime(state.log_dir, prime_log) proc_mprime = hw_cpu.start_mprime(state.log_dir, prime_log)
@ -443,13 +418,13 @@ def cpu_stress_tests(state, test_objects, test_mode=False) -> None:
if sensors.cpu_reached_critical_temp() or aborted: if sensors.cpu_reached_critical_temp() or aborted:
test_cooling_obj.set_status('Aborted') test_cooling_obj.set_status('Aborted')
test_mprime_obj.set_status('Aborted') test_mprime_obj.set_status('Aborted')
state.update_progress_pane() state.update_progress_file()
# Get cooldown temp # Get cooldown temp
ui.clear_screen() cli.clear_screen()
ui.print_standard('Letting CPU cooldown...') cli.print_standard('Letting CPU cooldown...')
std.sleep(5) std.sleep(5)
ui.print_standard('Saving cooldown temps...') cli.print_standard('Saving cooldown temps...')
sensors.save_average_temps(temp_label='Cooldown', seconds=5) sensors.save_average_temps(temp_label='Cooldown', seconds=5)
# Check Prime95 results # Check Prime95 results
@ -464,10 +439,10 @@ def cpu_stress_tests(state, test_objects, test_mode=False) -> None:
) )
if run_sysbench: if run_sysbench:
LOG.info('CPU Test (Sysbench)') LOG.info('CPU Test (Sysbench)')
ui.print_standard('Letting CPU cooldown more...') cli.print_standard('Letting CPU cooldown more...')
std.sleep(30) std.sleep(30)
ui.clear_screen() cli.clear_screen()
ui.print_info('Running alternate stress test') cli.print_info('Running alternate stress test')
print('') print('')
proc_sysbench, filehandle_sysbench = hw_cpu.start_sysbench( proc_sysbench, filehandle_sysbench = hw_cpu.start_sysbench(
sensors, sensors,
@ -490,14 +465,14 @@ def cpu_stress_tests(state, test_objects, test_mode=False) -> None:
if sensors.cpu_reached_critical_temp() or aborted: if sensors.cpu_reached_critical_temp() or aborted:
test_cooling_obj.set_status('Aborted') test_cooling_obj.set_status('Aborted')
test_mprime_obj.set_status('Aborted') test_mprime_obj.set_status('Aborted')
state.update_progress_pane() state.update_progress_file()
# Check Cooling results # Check Cooling results
test_cooling_obj.report.append(ansi.color_string('Temps', 'BLUE')) test_cooling_obj.report.append(ansi.color_string('Temps', 'BLUE'))
hw_cpu.check_cooling_results(test_cooling_obj, sensors, run_sysbench) hw_cpu.check_cooling_results(test_cooling_obj, sensors, run_sysbench)
# Cleanup # Cleanup
state.update_progress_pane() state.update_progress_file()
sensors.stop_background_monitor() sensors.stop_background_monitor()
state.panes.pop('Current', None) state.panes.pop('Current', None)
tmux.kill_pane(state.panes.pop('Prime95', None)) tmux.kill_pane(state.panes.pop('Prime95', None))
@ -519,7 +494,7 @@ def disk_attribute_check(state, test_objects, test_mode=False) -> None:
continue continue
# Done # Done
state.update_progress_pane() state.update_progress_file()
def disk_io_benchmark( def disk_io_benchmark(
@ -548,8 +523,8 @@ def disk_io_benchmark(
continue continue
# Start benchmark # Start benchmark
ui.clear_screen() cli.clear_screen()
ui.print_report(test.dev.generate_report()) cli.print_report(test.dev.generate_report())
test.set_status('Working') test.set_status('Working')
test_log = f'{state.log_dir}/{test.dev.path.name}_benchmark.out' test_log = f'{state.log_dir}/{test.dev.path.name}_benchmark.out'
tmux.respawn_pane( tmux.respawn_pane(
@ -557,7 +532,7 @@ def disk_io_benchmark(
watch_cmd='tail', watch_cmd='tail',
watch_file=test_log, watch_file=test_log,
) )
state.update_progress_pane() state.update_progress_file()
try: try:
hw_benchmark.run_io_test(test, test_log, test_mode=test_mode) hw_benchmark.run_io_test(test, test_log, test_mode=test_mode)
except KeyboardInterrupt: except KeyboardInterrupt:
@ -575,10 +550,10 @@ def disk_io_benchmark(
break break
# Update progress after each test # Update progress after each test
state.update_progress_pane() state.update_progress_file()
# Cleanup # Cleanup
state.update_progress_pane() state.update_progress_file()
tmux.kill_pane(state.panes.pop('I/O Benchmark', None)) tmux.kill_pane(state.panes.pop('I/O Benchmark', None))
# Done # Done
@ -597,7 +572,7 @@ def disk_self_test(state, test_objects, test_mode=False) -> None:
state.update_top_pane( state.update_top_pane(
f'Disk self-test{"s" if len(test_objects) > 1 else ""}', f'Disk self-test{"s" if len(test_objects) > 1 else ""}',
) )
ui.print_info(f'Starting self-test{"s" if len(test_objects) > 1 else ""}') cli.print_info(f'Starting self-test{"s" if len(test_objects) > 1 else ""}')
show_failed_attributes(state) show_failed_attributes(state)
for test in reversed(test_objects): for test in reversed(test_objects):
if test.disabled: if test.disabled:
@ -616,7 +591,7 @@ def disk_self_test(state, test_objects, test_mode=False) -> None:
) )
# Wait for all tests to complete # Wait for all tests to complete
state.update_progress_pane() state.update_progress_file()
try: try:
while True: while True:
if any(t.is_alive() for t in threads): if any(t.is_alive() for t in threads):
@ -631,7 +606,7 @@ def disk_self_test(state, test_objects, test_mode=False) -> None:
hw_smart.build_self_test_report(test, aborted=True) hw_smart.build_self_test_report(test, aborted=True)
# Cleanup # Cleanup
state.update_progress_pane() state.update_progress_file()
for pane in state.panes['SMART']: for pane in state.panes['SMART']:
tmux.kill_pane(pane) tmux.kill_pane(pane)
state.panes.pop('SMART', None) state.panes.pop('SMART', None)
@ -694,7 +669,7 @@ def disk_surface_scan(state, test_objects, test_mode=False) -> None:
state.update_top_pane( state.update_top_pane(
f'Disk Surface Scan{"s" if len(test_objects) > 1 else ""}', f'Disk Surface Scan{"s" if len(test_objects) > 1 else ""}',
) )
ui.print_info( cli.print_info(
f'Starting disk surface scan{"s" if len(test_objects) > 1 else ""}', f'Starting disk surface scan{"s" if len(test_objects) > 1 else ""}',
) )
show_failed_attributes(state) show_failed_attributes(state)
@ -723,7 +698,7 @@ def disk_surface_scan(state, test_objects, test_mode=False) -> None:
try: try:
while True: while True:
if any(t.is_alive() for t in threads): if any(t.is_alive() for t in threads):
state.update_progress_pane() state.update_progress_file()
std.sleep(5) std.sleep(5)
else: else:
break break
@ -737,7 +712,7 @@ def disk_surface_scan(state, test_objects, test_mode=False) -> None:
test.report.append(ansi.color_string(' Aborted', 'YELLOW')) test.report.append(ansi.color_string(' Aborted', 'YELLOW'))
# Cleanup # Cleanup
state.update_progress_pane() state.update_progress_file()
for pane in state.panes['badblocks']: for pane in state.panes['badblocks']:
tmux.kill_pane(pane) tmux.kill_pane(pane)
state.panes.pop('badblocks', None) state.panes.pop('badblocks', None)
@ -788,9 +763,9 @@ def main() -> None:
try: try:
action() action()
except KeyboardInterrupt: except KeyboardInterrupt:
ui.print_warning('Aborted.') cli.print_warning('Aborted.')
ui.print_standard('') cli.print_standard('')
ui.pause('Press Enter to return to main menu...') cli.pause('Press Enter to return to main menu...')
if 'Clock Sync' in selection: if 'Clock Sync' in selection:
state.update_clock() state.update_clock()
@ -855,8 +830,8 @@ def run_diags(state, menu, quick_mode=False, test_mode=False) -> None:
# Just return if no tests were selected # Just return if no tests were selected
if not state.test_groups: if not state.test_groups:
ui.print_warning('No tests selected?') cli.print_warning('No tests selected?')
ui.pause() cli.pause()
return return
# Run tests # Run tests
@ -867,13 +842,13 @@ def run_diags(state, menu, quick_mode=False, test_mode=False) -> None:
args = [group.test_objects] args = [group.test_objects]
if group.name == 'Disk I/O Benchmark': if group.name == 'Disk I/O Benchmark':
args.append(menu.toggles['Skip USB Benchmarks']['Selected']) args.append(menu.toggles['Skip USB Benchmarks']['Selected'])
ui.clear_screen() cli.clear_screen()
try: try:
function(state, *args, test_mode=test_mode) function(state, *args, test_mode=test_mode)
except (KeyboardInterrupt, std.GenericAbort): except (KeyboardInterrupt, std.GenericAbort):
aborted = True aborted = True
state.abort_testing() state.abort_testing()
state.update_progress_pane() state.update_progress_file()
break break
else: else:
# Run safety checks after disk tests # Run safety checks after disk tests
@ -894,25 +869,25 @@ def run_diags(state, menu, quick_mode=False, test_mode=False) -> None:
state.save_debug_reports() state.save_debug_reports()
atexit.unregister(state.save_debug_reports) atexit.unregister(state.save_debug_reports)
if quick_mode: if quick_mode:
ui.pause('Press Enter to exit...') cli.pause('Press Enter to exit...')
else: else:
ui.pause('Press Enter to return to main menu...') cli.pause('Press Enter to return to main menu...')
def show_failed_attributes(state) -> None: def show_failed_attributes(state) -> None:
"""Show failed attributes for all disks.""" """Show failed attributes for all disks."""
for dev in state.disks: for dev in state.disks:
ui.print_colored([dev.name, dev.description], ['CYAN', None]) cli.print_colored([dev.name, dev.description], ['CYAN', None])
ui.print_report( cli.print_report(
hw_smart.generate_attribute_report(dev, only_failed=True), hw_smart.generate_attribute_report(dev, only_failed=True),
) )
ui.print_standard('') cli.print_standard('')
def show_results(state) -> None: def show_results(state) -> None:
"""Show test results by device.""" """Show test results by device."""
std.sleep(0.5) std.sleep(0.5)
ui.clear_screen() cli.clear_screen()
state.update_top_pane('Results') state.update_top_pane('Results')
# CPU Tests # CPU Tests
@ -920,22 +895,22 @@ def show_results(state) -> None:
group.name for group in state.test_groups if 'CPU' in group.name group.name for group in state.test_groups if 'CPU' in group.name
] ]
if cpu_tests_enabled: if cpu_tests_enabled:
ui.print_success('CPU:') cli.print_success('CPU:')
ui.print_report(state.system.generate_report()) cli.print_report(state.system.generate_report())
ui.print_standard(' ') cli.print_standard(' ')
# Disk Tests # Disk Tests
disk_tests_enabled = [ disk_tests_enabled = [
group.name for group in state.test_groups if 'Disk' in group.name group.name for group in state.test_groups if 'Disk' in group.name
] ]
if disk_tests_enabled: if disk_tests_enabled:
ui.print_success(f'Disk{"s" if len(state.disks) > 1 else ""}:') cli.print_success(f'Disk{"s" if len(state.disks) > 1 else ""}:')
for disk in state.disks: for disk in state.disks:
ui.print_report(disk.generate_report()) cli.print_report(disk.generate_report())
ui.print_standard(' ') cli.print_standard(' ')
if not state.disks: if not state.disks:
ui.print_warning('No devices') cli.print_warning('No devices')
ui.print_standard(' ') cli.print_standard(' ')
def sync_clock() -> None: def sync_clock() -> None:

View file

@ -3,3 +3,4 @@
from . import ansi from . import ansi
from . import cli from . import cli
from . import tmux from . import tmux
from . import tui

View file

@ -34,30 +34,76 @@ def clear_pane(pane_id=None):
run_program(cmd, check=False) run_program(cmd, check=False)
def fix_layout(panes, layout, forced=False): def fix_layout(layout, forced=False):
"""Fix pane sizes based on layout.""" """Fix pane sizes based on layout."""
if not (forced or layout_needs_fixed(panes, layout)): resize_kwargs = []
# Bail early
if not (forced or layout_needs_fixed(layout)):
# Layout should be fine # Layout should be fine
return return
# Update panes # Remove closed panes
for name, data in layout.items(): for data in layout.values():
# Skip missing panes data['Panes'] = [pane for pane in data['Panes'] if poll_pane(pane)]
if name not in panes:
# Update main panes
resize_pane(height=999) # Set active pane too large and then adjust down
for section, data in layout.items():
if section == 'Workers':
# Skip for now
continue continue
# Resize pane(s) if 'height' in data:
pane_list = panes[name] for pane_id in data['Panes']:
if isinstance(pane_list, str): resize_kwargs.append({'pane_id': pane_id, 'height': data['height']})
pane_list = [pane_list] if 'width' in data:
for pane_id in pane_list: for pane_id in data['Panes']:
if name == 'Current': resize_kwargs.append({'pane_id': pane_id, 'width': data['width']})
pane_id = None for kwargs in resize_kwargs:
try: try:
resize_pane(pane_id, **data) resize_pane(**kwargs)
except RuntimeError: except RuntimeError:
# Assuming pane was closed just before resizing # Assuming pane was closed just before resizing
pass pass
# Update "group" panes widths
for group in ('Title', 'Info'):
num_panes = len(layout[group]['Panes'])
if num_panes <= 1:
continue
width = int( (get_pane_size()[0] - (1 - num_panes)) / num_panes )
for pane_id in layout[group]['Panes']:
resize_pane(pane_id, width=width)
if group == 'Title':
# (re)fix Started pane
#TODO: REstore: resize_pane(layout['Started']['Panes'][0], width=TMUX_SIDE_WIDTH)
resize_pane(layout['Started']['Panes'][0], width=21)
# Bail early
if not (layout['Workers']['Panes'] and layout['Workers']['height']):
return
# Update worker heights
worker_height = layout['Workers']['height']
workers = layout['Workers']['Panes'].copy()
num_workers = len(workers)
avail_height = sum(get_pane_size(pane)[1] for pane in workers)
avail_height += get_pane_size()[1] # Current pane
# Check if window is too small
if avail_height < (worker_height*num_workers) + 3:
# Just leave things as-is
return
# Resize current pane
resize_pane(height=avail_height-(worker_height*num_workers))
# Resize bottom pane
resize_pane(workers.pop(0), height=worker_height)
# Resize the rest of the panes by adjusting the ones above them
while len(workers) > 1:
next_height = sum(get_pane_size(pane)[1] for pane in workers[:2])
next_height -= worker_height
resize_pane(workers[1], height=next_height)
workers.pop(0)
def get_pane_size(pane_id=None): def get_pane_size(pane_id=None):
@ -96,34 +142,30 @@ def kill_pane(*pane_ids):
run_program(cmd+[pane_id], check=False) run_program(cmd+[pane_id], check=False)
def layout_needs_fixed(panes, layout): def layout_needs_fixed(layout):
"""Check if layout needs fixed, returns bool.""" """Check if layout needs fixed, returns bool."""
needs_fixed = False needs_fixed = False
# Check panes # Check panes
for name, data in layout.items(): for data in layout.values():
# Skip unpredictably sized panes if 'height' in data:
if not data.get('Check', False): needs_fixed = needs_fixed or any(
continue get_pane_size(pane)[1] != data['height'] for pane in data['Panes']
)
if 'width' in data:
needs_fixed = needs_fixed or any(
get_pane_size(pane)[0] != data['width'] for pane in data['Panes']
)
# Skip missing panes # TODO: Re-enable?
if name not in panes: ## Group panes
continue #for group in ('Title', 'Info'):
# num_panes = len(layout[group]['Panes'])
# Check pane size(s) # if num_panes <= 1:
pane_list = panes[name] # continue
if isinstance(pane_list, str): # width = int( (get_pane_size()[0] - (1 - num_panes)) / num_panes )
pane_list = [pane_list] # for pane in layout[group]['Panes']:
for pane_id in pane_list: # needs_fixed = needs_fixed or abs(get_pane_size(pane)[0] - width) > 2
try:
width, height = get_pane_size(pane_id)
except ValueError:
# Pane may have disappeared during this loop
continue
if data.get('width', False) and data['width'] != width:
needs_fixed = True
if data.get('height', False) and data['height'] != height:
needs_fixed = True
# Done # Done
return needs_fixed return needs_fixed

300
scripts/wk/ui/tui.py Normal file
View file

@ -0,0 +1,300 @@
"""WizardKit: TUI functions"""
# vim: sts=2 sw=2 ts=2
import logging
import time
from copy import deepcopy
from wk.exe import start_thread
from wk.std import sleep
from wk.ui import ansi, tmux
# STATIC VARIABLES
LOG = logging.getLogger(__name__)
TMUX_SIDE_WIDTH = 21
TMUX_TITLE_HEIGHT = 2
TMUX_LAYOUT = {
'Current': {'Panes': [None]},
'Title': {'Panes': [], 'height': TMUX_TITLE_HEIGHT},
'Started': {'Panes': [], 'width': TMUX_SIDE_WIDTH},
'Progress': {'Panes': [], 'width': TMUX_SIDE_WIDTH},
'Info': {'Panes': []},
'Workers': {'Panes': []},
}
# Classes
class TUI():
"""Object for tracking TUI elements."""
def __init__(self, title_text=None) -> None:
self.layout = deepcopy(TMUX_LAYOUT)
self.side_width = TMUX_SIDE_WIDTH
self.title_text = title_text if title_text else 'Title Text'
self.title_text_line2 = ''
self.title_colors = ['BLUE', None]
# Init tmux and start a background process to maintain layout
self.init_tmux()
start_thread(self.fix_layout_loop)
def add_info_pane(self, height, **tmux_args) -> None:
"""Add info pane."""
tmux_args.update({
'behind': True,
'lines': height,
'target_id': None,
'vertical': True,
})
if self.layout['Info']['Panes']:
tmux_args.update({
'behind': False,
'percent': 50,
'target_id': self.layout['Info']['Panes'][-1],
'vertical': False,
})
tmux_args.pop('lines')
# Update layout
self.layout['Info']['height'] = height
# Add pane
self.layout['Info']['Panes'].append(tmux.split_window(**tmux_args))
def add_title_pane(self, text) -> None:
"""Add additional pane to title row."""
tmux_args = {
'behind': True,
'lines': TMUX_TITLE_HEIGHT,
'target_id': None,
'text': text,
'vertical': True,
}
if self.layout['Title']['Panes']:
tmux_args.update({
'behind': False,
'percent': 50,
'target_id': self.layout['Title']['Panes'][-1],
'vertical': False,
})
tmux_args.pop('lines')
# Add pane
self.layout['Title']['Panes'].append(tmux.split_window(**tmux_args))
def add_worker_pane(self, height, **tmux_split_args) -> None:
"""Add worker pane."""
self.layout['Workers']['height'] = height
self.layout['Workers']['Panes'].append(tmux.split_window(
vertical=True,
lines=height,
**tmux_split_args,
))
def fix_layout(self, forced=True) -> None:
"""Fix tmux layout based on self.layout."""
try:
fix_layout(self.layout, forced=forced)
except RuntimeError:
# Assuming self.panes changed while running
pass
def fix_layout_loop(self) -> None:
"""Fix layout on a loop.
NOTE: This should be called as a thread.
"""
while True:
self.fix_layout(forced=False)
sleep(1)
def init_tmux(self) -> None:
"""Initialize tmux layout."""
tmux.kill_all_panes()
self.layout.clear()
self.layout.update(deepcopy(TMUX_LAYOUT))
# Title
self.layout['Title']['Panes'].append(tmux.split_window(
behind=True,
lines=2,
vertical=True,
text=ansi.color_string(
[self.title_text, self.title_text_line2],
self.title_colors,
sep = '\n',
),
))
# Started
self.layout['Started']['Panes'].append(tmux.split_window(
lines=TMUX_SIDE_WIDTH,
target_id=self.layout['Title']['Panes'][0],
text=ansi.color_string(
['Started', time.strftime("%Y-%m-%d %H:%M %Z")],
['BLUE', None],
sep='\n',
),
))
# Progress
self.layout['Progress']['Panes'].append(tmux.split_window(
lines=TMUX_SIDE_WIDTH,
text=' ',
))
def remove_all_info_panes(self) -> None:
"""Remove all info panes and update layout."""
self.layout['Info'].pop('height', None)
panes = self.layout['Info']['Panes'].copy()
self.layout['Info']['Panes'].clear()
tmux.kill_pane(*panes)
def remove_all_worker_panes(self) -> None:
"""Remove all worker panes and update layout."""
self.layout['Workers'].pop('height', None)
panes = self.layout['Workers']['Panes'].copy()
self.layout['Workers']['Panes'].clear()
tmux.kill_pane(*panes)
def set_progress_file(self, progress_file) -> None:
"""Set the file to use for the progresse pane."""
tmux.respawn_pane(
pane_id=self.layout['Progress']['Panes'][0],
watch_file=progress_file,
)
def set_title(self, line1, line2=None, colors=None) -> None:
"""Set title text."""
self.title_text = line1
self.title_text_line2 = line2 if line2 else ''
if colors:
self.title_colors = colors
# Update pane (if present)
if self.layout['Title']['Panes']:
tmux.respawn_pane(
pane_id=self.layout['Title']['Panes'][0],
text=ansi.color_string(
[self.title_text, self.title_text_line2],
self.title_colors,
sep = '\n',
),
)
# Functions
def fix_layout(layout, forced=False):
"""Fix pane sizes based on layout."""
resize_kwargs = []
# Bail early
if not (forced or layout_needs_fixed(layout)):
# Layout should be fine
return
# Remove closed panes
for data in layout.values():
data['Panes'] = [pane for pane in data['Panes'] if tmux.poll_pane(pane)]
# Update main panes
tmux.resize_pane(height=999) # Set active pane too large and then adjust down
for section, data in layout.items():
if section == 'Workers':
# Skip for now
continue
if 'height' in data:
for pane_id in data['Panes']:
resize_kwargs.append({'pane_id': pane_id, 'height': data['height']})
if 'width' in data:
for pane_id in data['Panes']:
resize_kwargs.append({'pane_id': pane_id, 'width': data['width']})
for kwargs in resize_kwargs:
try:
tmux.resize_pane(**kwargs)
except RuntimeError:
# Assuming pane was closed just before resizing
pass
# Update "group" panes widths
for group in ('Title', 'Info'):
num_panes = len(layout[group]['Panes'])
if num_panes <= 1:
continue
width = int( (tmux.get_pane_size()[0] - (1 - num_panes)) / num_panes )
for pane_id in layout[group]['Panes']:
tmux.resize_pane(pane_id, width=width)
if group == 'Title':
# (re)fix Started pane
tmux.resize_pane(layout['Started']['Panes'][0], width=TMUX_SIDE_WIDTH)
# Bail early
if not (layout['Workers']['Panes'] and layout['Workers']['height']):
return
# Update worker heights
worker_height = layout['Workers']['height']
workers = layout['Workers']['Panes'].copy()
num_workers = len(workers)
avail_height = sum(tmux.get_pane_size(pane)[1] for pane in workers)
avail_height += tmux.get_pane_size()[1] # Current pane
# Check if window is too small
if avail_height < (worker_height*num_workers) + 3:
# Just leave things as-is
return
# Resize current pane
tmux.resize_pane(height=avail_height-(worker_height*num_workers))
# Resize bottom pane
tmux.resize_pane(workers.pop(0), height=worker_height)
# Resize the rest of the panes by adjusting the ones above them
while len(workers) > 1:
next_height = sum(tmux.get_pane_size(pane)[1] for pane in workers[:2])
next_height -= worker_height
tmux.resize_pane(workers[1], height=next_height)
workers.pop(0)
def layout_needs_fixed(layout):
"""Check if layout needs fixed, returns bool."""
needs_fixed = False
# Check panes
for data in layout.values():
if 'height' in data:
needs_fixed = needs_fixed or any(
tmux.get_pane_size(pane)[1] != data['height'] for pane in data['Panes']
)
if 'width' in data:
needs_fixed = needs_fixed or any(
tmux.get_pane_size(pane)[0] != data['width'] for pane in data['Panes']
)
# TODO: Re-enable?
## Group panes
#for group in ('Title', 'Info'):
# num_panes = len(layout[group]['Panes'])
# if num_panes <= 1:
# continue
# width = int( (tmux.get_pane_size()[0] - (1 - num_panes)) / num_panes )
# for pane in layout[group]['Panes']:
# needs_fixed = needs_fixed or abs(tmux.get_pane_size(pane)[0] - width) > 2
# Done
return needs_fixed
def hmm():
"""Hmm?"""
def test():
ui = TUI()
ui.add_info_pane(height=10, text='Info One')
ui.add_info_pane(height=10, text='Info Two')
ui.add_info_pane(height=10, text='Info Three')
ui.add_worker_pane(height=3, text='Work One')
ui.add_worker_pane(height=3, text='Work Two')
ui.add_worker_pane(height=3, text='Work Three')
return ui
if __name__ == '__main__':
print("This file is not meant to be called directly.")