From ba3bf480f7964e033278e981ace84b1037b8d8d6 Mon Sep 17 00:00:00 2001 From: 2Shirt <2xShirt@gmail.com> Date: Sun, 9 Apr 2023 15:59:34 -0700 Subject: [PATCH] BREAKING Add wk/ui/tui.py --- scripts/wk/cfg/hw.py | 14 -- scripts/wk/hw/diags.py | 153 ++++++++----------- scripts/wk/ui/__init__.py | 1 + scripts/wk/ui/tmux.py | 124 ++++++++++------ scripts/wk/ui/tui.py | 300 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 448 insertions(+), 144 deletions(-) create mode 100644 scripts/wk/ui/tui.py diff --git a/scripts/wk/cfg/hw.py b/scripts/wk/cfg/hw.py index 4ebadb84..2ebfd724 100644 --- a/scripts/wk/cfg/hw.py +++ b/scripts/wk/cfg/hw.py @@ -3,8 +3,6 @@ import re -from collections import OrderedDict - # STATIC VARIABLES ATTRIBUTE_COLORS = ( @@ -161,18 +159,6 @@ THRESH_HDD_AVG_LOW = 65 * 1024**2 THRESH_SSD_MIN = 90 * 1024**2 THRESH_SSD_AVG_HIGH = 135 * 1024**2 THRESH_SSD_AVG_LOW = 100 * 1024**2 -TMUX_SIDE_WIDTH = 20 -TMUX_LAYOUT = OrderedDict({ - 'Top': {'height': 2, 'Check': True}, - 'Started': {'width': TMUX_SIDE_WIDTH, 'Check': True}, - 'Progress': {'width': TMUX_SIDE_WIDTH, 'Check': True}, - # Testing panes - 'Temps': {'height': 1000, 'Check': False}, - 'Prime95': {'height': 11, 'Check': False}, - 'SMART': {'height': 4, 'Check': True}, - 'badblocks': {'height': 5, 'Check': True}, - 'I/O Benchmark': {'height': 1000, 'Check': False}, - }) # VOLUME THRESHOLDS in percent VOLUME_WARNING_THRESHOLD = 70 VOLUME_FAILURE_THRESHOLD = 85 diff --git a/scripts/wk/hw/diags.py b/scripts/wk/hw/diags.py index dc214205..60e42abe 100644 --- a/scripts/wk/hw/diags.py +++ b/scripts/wk/hw/diags.py @@ -25,8 +25,8 @@ from wk.hw.network import network_test from wk.hw.screensavers import screensaver from wk.hw.test import Test, TestGroup -from wk.ui import cli as ui -from wk.ui import ansi, tmux +from wk.ui import tui as ui +from wk.ui import ansi, cli, tmux # STATIC VARIABLES @@ -84,18 +84,15 @@ class State(): """Object for tracking hardware diagnostic data.""" def __init__(self, test_mode=False): self.disks = [] - self.layout = cfg.hw.TMUX_LAYOUT.copy() self.log_dir = None self.panes = {} + self.progress_file = None self.system = None self.test_groups = [] - self.top_text = ansi.color_string('Hardware Diagnostics', 'GREEN') + self.title_text = ansi.color_string('Hardware Diagnostics', 'GREEN') if test_mode: - self.top_text += ansi.color_string(' (Test Mode)', 'YELLOW') - - # Init tmux and start a background process to maintain layout - self.init_tmux() - exe.start_thread(self.fix_tmux_layout_loop) + self.title_text += ansi.color_string(' (Test Mode)', 'YELLOW') + self.ui = ui.TUI(self.title_text) def abort_testing(self) -> None: """Set unfinished tests as aborted and cleanup tmux panes.""" @@ -125,23 +122,6 @@ class State(): dev.disable_disk_tests() break - def fix_tmux_layout(self, forced=True) -> None: - """Fix tmux layout based on cfg.hw.TMUX_LAYOUT.""" - try: - tmux.fix_layout(self.panes, self.layout, forced=forced) - except RuntimeError: - # Assuming self.panes changed while running - pass - - def fix_tmux_layout_loop(self) -> None: - """Fix tmux layout on a loop. - - NOTE: This should be called as a thread. - """ - while True: - self.fix_tmux_layout(forced=False) - std.sleep(1) - def init_diags(self, menu) -> None: """Initialize diagnostic pass.""" @@ -163,15 +143,13 @@ class State(): keep_history=False, timestamp=False, ) - ui.clear_screen() - ui.print_info('Initializing...') + cli.clear_screen() + cli.print_info('Initializing...') # Progress Pane - self.update_progress_pane() - tmux.respawn_pane( - pane_id=self.panes['Progress'], - watch_file=f'{self.log_dir}/progress.out', - ) + self.progress_file = pathlib.Path(f'{self.log_dir}/progress.out') + self.update_progress_file() + self.ui.set_progress_file(self.progress_file) # Add HW Objects self.system = hw_system.System() @@ -222,7 +200,7 @@ class State(): behind=True, lines=2, vertical=True, - text=f'{self.top_text}\nMain Menu', + text=f'{self.title_text}\nMain Menu', ) # Started @@ -299,16 +277,15 @@ class State(): ), ) - def update_progress_pane(self) -> None: - """Update progress pane.""" + def update_progress_file(self) -> None: + """Update progress file.""" report = [] - width = cfg.hw.TMUX_SIDE_WIDTH for group in self.test_groups: report.append(ansi.color_string(group.name, 'BLUE')) for test in group.test_objects: report.append(ansi.color_string( - [test.label, f'{test.status:>{width-len(test.label)}}'], + [test.label, f'{test.status:>{self.ui.side_width-len(test.label)}}'], [None, STATUS_COLORS.get(test.status, None)], sep='', )) @@ -317,19 +294,17 @@ class State(): report.append(' ') # Write to progress file - out_path = pathlib.Path(f'{self.log_dir}/progress.out') - with open(out_path, 'w', encoding='utf-8') as _f: - _f.write('\n'.join(report)) + self.progress_file.write_text('\n'.join(report), encoding='utf-8') def update_top_pane(self, text) -> None: """Update top pane with text.""" - tmux.respawn_pane(self.panes['Top'], text=f'{self.top_text}\n{text}') + tmux.respawn_pane(self.panes['Top'], text=f'{self.title_text}\n{text}') # Functions -def build_menu(cli_mode=False, quick_mode=False) -> ui.Menu: +def build_menu(cli_mode=False, quick_mode=False) -> cli.Menu: """Build main menu, returns wk.ui.cli.Menu.""" - menu = ui.Menu(title=None) + menu = cli.Menu(title=None) # Add actions, options, etc for action in MENU_ACTIONS: @@ -407,7 +382,7 @@ def cpu_stress_tests(state, test_objects, test_mode=False) -> None: ) # Create monitor and worker panes - state.update_progress_pane() + state.update_progress_file() state.panes['Prime95'] = tmux.split_window( lines=10, vertical=True, watch_file=prime_log, watch_cmd='tail') if PLATFORM == 'Darwin': @@ -421,11 +396,11 @@ def cpu_stress_tests(state, test_objects, test_mode=False) -> None: state.layout['Current'] = {'height': 3, 'Check': True} # Get idle temps - ui.print_standard('Saving idle temps...') + cli.print_standard('Saving idle temps...') sensors.save_average_temps(temp_label='Idle', seconds=5) # Stress CPU - ui.print_info('Running stress test') + cli.print_info('Running stress test') hw_cpu.set_apple_fan_speed('max') proc_mprime = hw_cpu.start_mprime(state.log_dir, prime_log) @@ -443,13 +418,13 @@ def cpu_stress_tests(state, test_objects, test_mode=False) -> None: if sensors.cpu_reached_critical_temp() or aborted: test_cooling_obj.set_status('Aborted') test_mprime_obj.set_status('Aborted') - state.update_progress_pane() + state.update_progress_file() # Get cooldown temp - ui.clear_screen() - ui.print_standard('Letting CPU cooldown...') + cli.clear_screen() + cli.print_standard('Letting CPU cooldown...') std.sleep(5) - ui.print_standard('Saving cooldown temps...') + cli.print_standard('Saving cooldown temps...') sensors.save_average_temps(temp_label='Cooldown', seconds=5) # Check Prime95 results @@ -464,10 +439,10 @@ def cpu_stress_tests(state, test_objects, test_mode=False) -> None: ) if run_sysbench: LOG.info('CPU Test (Sysbench)') - ui.print_standard('Letting CPU cooldown more...') + cli.print_standard('Letting CPU cooldown more...') std.sleep(30) - ui.clear_screen() - ui.print_info('Running alternate stress test') + cli.clear_screen() + cli.print_info('Running alternate stress test') print('') proc_sysbench, filehandle_sysbench = hw_cpu.start_sysbench( sensors, @@ -490,14 +465,14 @@ def cpu_stress_tests(state, test_objects, test_mode=False) -> None: if sensors.cpu_reached_critical_temp() or aborted: test_cooling_obj.set_status('Aborted') test_mprime_obj.set_status('Aborted') - state.update_progress_pane() + state.update_progress_file() # Check Cooling results test_cooling_obj.report.append(ansi.color_string('Temps', 'BLUE')) hw_cpu.check_cooling_results(test_cooling_obj, sensors, run_sysbench) # Cleanup - state.update_progress_pane() + state.update_progress_file() sensors.stop_background_monitor() state.panes.pop('Current', None) tmux.kill_pane(state.panes.pop('Prime95', None)) @@ -519,7 +494,7 @@ def disk_attribute_check(state, test_objects, test_mode=False) -> None: continue # Done - state.update_progress_pane() + state.update_progress_file() def disk_io_benchmark( @@ -548,8 +523,8 @@ def disk_io_benchmark( continue # Start benchmark - ui.clear_screen() - ui.print_report(test.dev.generate_report()) + cli.clear_screen() + cli.print_report(test.dev.generate_report()) test.set_status('Working') test_log = f'{state.log_dir}/{test.dev.path.name}_benchmark.out' tmux.respawn_pane( @@ -557,7 +532,7 @@ def disk_io_benchmark( watch_cmd='tail', watch_file=test_log, ) - state.update_progress_pane() + state.update_progress_file() try: hw_benchmark.run_io_test(test, test_log, test_mode=test_mode) except KeyboardInterrupt: @@ -575,10 +550,10 @@ def disk_io_benchmark( break # Update progress after each test - state.update_progress_pane() + state.update_progress_file() # Cleanup - state.update_progress_pane() + state.update_progress_file() tmux.kill_pane(state.panes.pop('I/O Benchmark', None)) # Done @@ -597,7 +572,7 @@ def disk_self_test(state, test_objects, test_mode=False) -> None: state.update_top_pane( f'Disk self-test{"s" if len(test_objects) > 1 else ""}', ) - ui.print_info(f'Starting self-test{"s" if len(test_objects) > 1 else ""}') + cli.print_info(f'Starting self-test{"s" if len(test_objects) > 1 else ""}') show_failed_attributes(state) for test in reversed(test_objects): if test.disabled: @@ -616,7 +591,7 @@ def disk_self_test(state, test_objects, test_mode=False) -> None: ) # Wait for all tests to complete - state.update_progress_pane() + state.update_progress_file() try: while True: if any(t.is_alive() for t in threads): @@ -631,7 +606,7 @@ def disk_self_test(state, test_objects, test_mode=False) -> None: hw_smart.build_self_test_report(test, aborted=True) # Cleanup - state.update_progress_pane() + state.update_progress_file() for pane in state.panes['SMART']: tmux.kill_pane(pane) state.panes.pop('SMART', None) @@ -694,7 +669,7 @@ def disk_surface_scan(state, test_objects, test_mode=False) -> None: state.update_top_pane( f'Disk Surface Scan{"s" if len(test_objects) > 1 else ""}', ) - ui.print_info( + cli.print_info( f'Starting disk surface scan{"s" if len(test_objects) > 1 else ""}', ) show_failed_attributes(state) @@ -723,7 +698,7 @@ def disk_surface_scan(state, test_objects, test_mode=False) -> None: try: while True: if any(t.is_alive() for t in threads): - state.update_progress_pane() + state.update_progress_file() std.sleep(5) else: break @@ -737,7 +712,7 @@ def disk_surface_scan(state, test_objects, test_mode=False) -> None: test.report.append(ansi.color_string(' Aborted', 'YELLOW')) # Cleanup - state.update_progress_pane() + state.update_progress_file() for pane in state.panes['badblocks']: tmux.kill_pane(pane) state.panes.pop('badblocks', None) @@ -788,9 +763,9 @@ def main() -> None: try: action() except KeyboardInterrupt: - ui.print_warning('Aborted.') - ui.print_standard('') - ui.pause('Press Enter to return to main menu...') + cli.print_warning('Aborted.') + cli.print_standard('') + cli.pause('Press Enter to return to main menu...') if 'Clock Sync' in selection: state.update_clock() @@ -855,8 +830,8 @@ def run_diags(state, menu, quick_mode=False, test_mode=False) -> None: # Just return if no tests were selected if not state.test_groups: - ui.print_warning('No tests selected?') - ui.pause() + cli.print_warning('No tests selected?') + cli.pause() return # Run tests @@ -867,13 +842,13 @@ def run_diags(state, menu, quick_mode=False, test_mode=False) -> None: args = [group.test_objects] if group.name == 'Disk I/O Benchmark': args.append(menu.toggles['Skip USB Benchmarks']['Selected']) - ui.clear_screen() + cli.clear_screen() try: function(state, *args, test_mode=test_mode) except (KeyboardInterrupt, std.GenericAbort): aborted = True state.abort_testing() - state.update_progress_pane() + state.update_progress_file() break else: # Run safety checks after disk tests @@ -894,25 +869,25 @@ def run_diags(state, menu, quick_mode=False, test_mode=False) -> None: state.save_debug_reports() atexit.unregister(state.save_debug_reports) if quick_mode: - ui.pause('Press Enter to exit...') + cli.pause('Press Enter to exit...') else: - ui.pause('Press Enter to return to main menu...') + cli.pause('Press Enter to return to main menu...') def show_failed_attributes(state) -> None: """Show failed attributes for all disks.""" for dev in state.disks: - ui.print_colored([dev.name, dev.description], ['CYAN', None]) - ui.print_report( + cli.print_colored([dev.name, dev.description], ['CYAN', None]) + cli.print_report( hw_smart.generate_attribute_report(dev, only_failed=True), ) - ui.print_standard('') + cli.print_standard('') def show_results(state) -> None: """Show test results by device.""" std.sleep(0.5) - ui.clear_screen() + cli.clear_screen() state.update_top_pane('Results') # CPU Tests @@ -920,22 +895,22 @@ def show_results(state) -> None: group.name for group in state.test_groups if 'CPU' in group.name ] if cpu_tests_enabled: - ui.print_success('CPU:') - ui.print_report(state.system.generate_report()) - ui.print_standard(' ') + cli.print_success('CPU:') + cli.print_report(state.system.generate_report()) + cli.print_standard(' ') # Disk Tests disk_tests_enabled = [ group.name for group in state.test_groups if 'Disk' in group.name ] if disk_tests_enabled: - ui.print_success(f'Disk{"s" if len(state.disks) > 1 else ""}:') + cli.print_success(f'Disk{"s" if len(state.disks) > 1 else ""}:') for disk in state.disks: - ui.print_report(disk.generate_report()) - ui.print_standard(' ') + cli.print_report(disk.generate_report()) + cli.print_standard(' ') if not state.disks: - ui.print_warning('No devices') - ui.print_standard(' ') + cli.print_warning('No devices') + cli.print_standard(' ') def sync_clock() -> None: diff --git a/scripts/wk/ui/__init__.py b/scripts/wk/ui/__init__.py index b93cb848..75a335fc 100644 --- a/scripts/wk/ui/__init__.py +++ b/scripts/wk/ui/__init__.py @@ -3,3 +3,4 @@ from . import ansi from . import cli from . import tmux +from . import tui diff --git a/scripts/wk/ui/tmux.py b/scripts/wk/ui/tmux.py index 49fb3241..d473f81f 100644 --- a/scripts/wk/ui/tmux.py +++ b/scripts/wk/ui/tmux.py @@ -34,30 +34,76 @@ def clear_pane(pane_id=None): run_program(cmd, check=False) -def fix_layout(panes, layout, forced=False): +def fix_layout(layout, forced=False): """Fix pane sizes based on layout.""" - if not (forced or layout_needs_fixed(panes, layout)): + resize_kwargs = [] + + # Bail early + if not (forced or layout_needs_fixed(layout)): # Layout should be fine return - # Update panes - for name, data in layout.items(): - # Skip missing panes - if name not in panes: + # Remove closed panes + for data in layout.values(): + data['Panes'] = [pane for pane in data['Panes'] if poll_pane(pane)] + + # Update main panes + resize_pane(height=999) # Set active pane too large and then adjust down + for section, data in layout.items(): + if section == 'Workers': + # Skip for now continue - # Resize pane(s) - pane_list = panes[name] - if isinstance(pane_list, str): - pane_list = [pane_list] - for pane_id in pane_list: - if name == 'Current': - pane_id = None - try: - resize_pane(pane_id, **data) - except RuntimeError: - # Assuming pane was closed just before resizing - pass + if 'height' in data: + for pane_id in data['Panes']: + resize_kwargs.append({'pane_id': pane_id, 'height': data['height']}) + if 'width' in data: + for pane_id in data['Panes']: + resize_kwargs.append({'pane_id': pane_id, 'width': data['width']}) + for kwargs in resize_kwargs: + try: + resize_pane(**kwargs) + except RuntimeError: + # Assuming pane was closed just before resizing + pass + + # Update "group" panes widths + for group in ('Title', 'Info'): + num_panes = len(layout[group]['Panes']) + if num_panes <= 1: + continue + width = int( (get_pane_size()[0] - (1 - num_panes)) / num_panes ) + for pane_id in layout[group]['Panes']: + resize_pane(pane_id, width=width) + if group == 'Title': + # (re)fix Started pane + #TODO: REstore: resize_pane(layout['Started']['Panes'][0], width=TMUX_SIDE_WIDTH) + resize_pane(layout['Started']['Panes'][0], width=21) + + # Bail early + if not (layout['Workers']['Panes'] and layout['Workers']['height']): + return + + # Update worker heights + worker_height = layout['Workers']['height'] + workers = layout['Workers']['Panes'].copy() + num_workers = len(workers) + avail_height = sum(get_pane_size(pane)[1] for pane in workers) + avail_height += get_pane_size()[1] # Current pane + # Check if window is too small + if avail_height < (worker_height*num_workers) + 3: + # Just leave things as-is + return + # Resize current pane + resize_pane(height=avail_height-(worker_height*num_workers)) + # Resize bottom pane + resize_pane(workers.pop(0), height=worker_height) + # Resize the rest of the panes by adjusting the ones above them + while len(workers) > 1: + next_height = sum(get_pane_size(pane)[1] for pane in workers[:2]) + next_height -= worker_height + resize_pane(workers[1], height=next_height) + workers.pop(0) def get_pane_size(pane_id=None): @@ -96,34 +142,30 @@ def kill_pane(*pane_ids): run_program(cmd+[pane_id], check=False) -def layout_needs_fixed(panes, layout): +def layout_needs_fixed(layout): """Check if layout needs fixed, returns bool.""" needs_fixed = False # Check panes - for name, data in layout.items(): - # Skip unpredictably sized panes - if not data.get('Check', False): - continue + for data in layout.values(): + if 'height' in data: + needs_fixed = needs_fixed or any( + get_pane_size(pane)[1] != data['height'] for pane in data['Panes'] + ) + if 'width' in data: + needs_fixed = needs_fixed or any( + get_pane_size(pane)[0] != data['width'] for pane in data['Panes'] + ) - # Skip missing panes - if name not in panes: - continue - - # Check pane size(s) - pane_list = panes[name] - if isinstance(pane_list, str): - pane_list = [pane_list] - for pane_id in pane_list: - try: - width, height = get_pane_size(pane_id) - except ValueError: - # Pane may have disappeared during this loop - continue - if data.get('width', False) and data['width'] != width: - needs_fixed = True - if data.get('height', False) and data['height'] != height: - needs_fixed = True + # TODO: Re-enable? + ## Group panes + #for group in ('Title', 'Info'): + # num_panes = len(layout[group]['Panes']) + # if num_panes <= 1: + # continue + # width = int( (get_pane_size()[0] - (1 - num_panes)) / num_panes ) + # for pane in layout[group]['Panes']: + # needs_fixed = needs_fixed or abs(get_pane_size(pane)[0] - width) > 2 # Done return needs_fixed diff --git a/scripts/wk/ui/tui.py b/scripts/wk/ui/tui.py new file mode 100644 index 00000000..681f0a62 --- /dev/null +++ b/scripts/wk/ui/tui.py @@ -0,0 +1,300 @@ +"""WizardKit: TUI functions""" +# vim: sts=2 sw=2 ts=2 + +import logging +import time + +from copy import deepcopy + +from wk.exe import start_thread +from wk.std import sleep +from wk.ui import ansi, tmux + +# STATIC VARIABLES +LOG = logging.getLogger(__name__) +TMUX_SIDE_WIDTH = 21 +TMUX_TITLE_HEIGHT = 2 +TMUX_LAYOUT = { + 'Current': {'Panes': [None]}, + 'Title': {'Panes': [], 'height': TMUX_TITLE_HEIGHT}, + 'Started': {'Panes': [], 'width': TMUX_SIDE_WIDTH}, + 'Progress': {'Panes': [], 'width': TMUX_SIDE_WIDTH}, + 'Info': {'Panes': []}, + 'Workers': {'Panes': []}, + } + + +# Classes +class TUI(): + """Object for tracking TUI elements.""" + def __init__(self, title_text=None) -> None: + self.layout = deepcopy(TMUX_LAYOUT) + self.side_width = TMUX_SIDE_WIDTH + self.title_text = title_text if title_text else 'Title Text' + self.title_text_line2 = '' + self.title_colors = ['BLUE', None] + + # Init tmux and start a background process to maintain layout + self.init_tmux() + start_thread(self.fix_layout_loop) + + def add_info_pane(self, height, **tmux_args) -> None: + """Add info pane.""" + tmux_args.update({ + 'behind': True, + 'lines': height, + 'target_id': None, + 'vertical': True, + }) + if self.layout['Info']['Panes']: + tmux_args.update({ + 'behind': False, + 'percent': 50, + 'target_id': self.layout['Info']['Panes'][-1], + 'vertical': False, + }) + tmux_args.pop('lines') + + # Update layout + self.layout['Info']['height'] = height + + # Add pane + self.layout['Info']['Panes'].append(tmux.split_window(**tmux_args)) + + def add_title_pane(self, text) -> None: + """Add additional pane to title row.""" + tmux_args = { + 'behind': True, + 'lines': TMUX_TITLE_HEIGHT, + 'target_id': None, + 'text': text, + 'vertical': True, + } + if self.layout['Title']['Panes']: + tmux_args.update({ + 'behind': False, + 'percent': 50, + 'target_id': self.layout['Title']['Panes'][-1], + 'vertical': False, + }) + tmux_args.pop('lines') + + # Add pane + self.layout['Title']['Panes'].append(tmux.split_window(**tmux_args)) + + def add_worker_pane(self, height, **tmux_split_args) -> None: + """Add worker pane.""" + self.layout['Workers']['height'] = height + self.layout['Workers']['Panes'].append(tmux.split_window( + vertical=True, + lines=height, + **tmux_split_args, + )) + + def fix_layout(self, forced=True) -> None: + """Fix tmux layout based on self.layout.""" + try: + fix_layout(self.layout, forced=forced) + except RuntimeError: + # Assuming self.panes changed while running + pass + + def fix_layout_loop(self) -> None: + """Fix layout on a loop. + + NOTE: This should be called as a thread. + """ + while True: + self.fix_layout(forced=False) + sleep(1) + + def init_tmux(self) -> None: + """Initialize tmux layout.""" + tmux.kill_all_panes() + self.layout.clear() + self.layout.update(deepcopy(TMUX_LAYOUT)) + + # Title + self.layout['Title']['Panes'].append(tmux.split_window( + behind=True, + lines=2, + vertical=True, + text=ansi.color_string( + [self.title_text, self.title_text_line2], + self.title_colors, + sep = '\n', + ), + )) + + # Started + self.layout['Started']['Panes'].append(tmux.split_window( + lines=TMUX_SIDE_WIDTH, + target_id=self.layout['Title']['Panes'][0], + text=ansi.color_string( + ['Started', time.strftime("%Y-%m-%d %H:%M %Z")], + ['BLUE', None], + sep='\n', + ), + )) + + # Progress + self.layout['Progress']['Panes'].append(tmux.split_window( + lines=TMUX_SIDE_WIDTH, + text=' ', + )) + + def remove_all_info_panes(self) -> None: + """Remove all info panes and update layout.""" + self.layout['Info'].pop('height', None) + panes = self.layout['Info']['Panes'].copy() + self.layout['Info']['Panes'].clear() + tmux.kill_pane(*panes) + + def remove_all_worker_panes(self) -> None: + """Remove all worker panes and update layout.""" + self.layout['Workers'].pop('height', None) + panes = self.layout['Workers']['Panes'].copy() + self.layout['Workers']['Panes'].clear() + tmux.kill_pane(*panes) + + def set_progress_file(self, progress_file) -> None: + """Set the file to use for the progresse pane.""" + tmux.respawn_pane( + pane_id=self.layout['Progress']['Panes'][0], + watch_file=progress_file, + ) + + def set_title(self, line1, line2=None, colors=None) -> None: + """Set title text.""" + self.title_text = line1 + self.title_text_line2 = line2 if line2 else '' + if colors: + self.title_colors = colors + + # Update pane (if present) + if self.layout['Title']['Panes']: + tmux.respawn_pane( + pane_id=self.layout['Title']['Panes'][0], + text=ansi.color_string( + [self.title_text, self.title_text_line2], + self.title_colors, + sep = '\n', + ), + ) + +# Functions +def fix_layout(layout, forced=False): + """Fix pane sizes based on layout.""" + resize_kwargs = [] + + # Bail early + if not (forced or layout_needs_fixed(layout)): + # Layout should be fine + return + + # Remove closed panes + for data in layout.values(): + data['Panes'] = [pane for pane in data['Panes'] if tmux.poll_pane(pane)] + + # Update main panes + tmux.resize_pane(height=999) # Set active pane too large and then adjust down + for section, data in layout.items(): + if section == 'Workers': + # Skip for now + continue + + if 'height' in data: + for pane_id in data['Panes']: + resize_kwargs.append({'pane_id': pane_id, 'height': data['height']}) + if 'width' in data: + for pane_id in data['Panes']: + resize_kwargs.append({'pane_id': pane_id, 'width': data['width']}) + for kwargs in resize_kwargs: + try: + tmux.resize_pane(**kwargs) + except RuntimeError: + # Assuming pane was closed just before resizing + pass + + # Update "group" panes widths + for group in ('Title', 'Info'): + num_panes = len(layout[group]['Panes']) + if num_panes <= 1: + continue + width = int( (tmux.get_pane_size()[0] - (1 - num_panes)) / num_panes ) + for pane_id in layout[group]['Panes']: + tmux.resize_pane(pane_id, width=width) + if group == 'Title': + # (re)fix Started pane + tmux.resize_pane(layout['Started']['Panes'][0], width=TMUX_SIDE_WIDTH) + + # Bail early + if not (layout['Workers']['Panes'] and layout['Workers']['height']): + return + + # Update worker heights + worker_height = layout['Workers']['height'] + workers = layout['Workers']['Panes'].copy() + num_workers = len(workers) + avail_height = sum(tmux.get_pane_size(pane)[1] for pane in workers) + avail_height += tmux.get_pane_size()[1] # Current pane + # Check if window is too small + if avail_height < (worker_height*num_workers) + 3: + # Just leave things as-is + return + # Resize current pane + tmux.resize_pane(height=avail_height-(worker_height*num_workers)) + # Resize bottom pane + tmux.resize_pane(workers.pop(0), height=worker_height) + # Resize the rest of the panes by adjusting the ones above them + while len(workers) > 1: + next_height = sum(tmux.get_pane_size(pane)[1] for pane in workers[:2]) + next_height -= worker_height + tmux.resize_pane(workers[1], height=next_height) + workers.pop(0) + +def layout_needs_fixed(layout): + """Check if layout needs fixed, returns bool.""" + needs_fixed = False + + # Check panes + for data in layout.values(): + if 'height' in data: + needs_fixed = needs_fixed or any( + tmux.get_pane_size(pane)[1] != data['height'] for pane in data['Panes'] + ) + if 'width' in data: + needs_fixed = needs_fixed or any( + tmux.get_pane_size(pane)[0] != data['width'] for pane in data['Panes'] + ) + + # TODO: Re-enable? + ## Group panes + #for group in ('Title', 'Info'): + # num_panes = len(layout[group]['Panes']) + # if num_panes <= 1: + # continue + # width = int( (tmux.get_pane_size()[0] - (1 - num_panes)) / num_panes ) + # for pane in layout[group]['Panes']: + # needs_fixed = needs_fixed or abs(tmux.get_pane_size(pane)[0] - width) > 2 + + # Done + return needs_fixed + +def hmm(): + """Hmm?""" + + +def test(): + ui = TUI() + ui.add_info_pane(height=10, text='Info One') + ui.add_info_pane(height=10, text='Info Two') + ui.add_info_pane(height=10, text='Info Three') + ui.add_worker_pane(height=3, text='Work One') + ui.add_worker_pane(height=3, text='Work Two') + ui.add_worker_pane(height=3, text='Work Three') + return ui + + +if __name__ == '__main__': + print("This file is not meant to be called directly.")