Refactor hardware diagnostics to use new TUI

This commit is contained in:
2Shirt 2023-05-27 19:47:26 -07:00
parent 4c76e59238
commit cb012423bb
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C
4 changed files with 158 additions and 183 deletions

View file

@ -11,7 +11,7 @@ from wk import exe
from wk.cfg.hw import CPU_FAILURE_TEMP from wk.cfg.hw import CPU_FAILURE_TEMP
from wk.os.mac import set_fans as macos_set_fans from wk.os.mac import set_fans as macos_set_fans
from wk.std import PLATFORM from wk.std import PLATFORM
from wk.ui import ansi, tmux from wk.ui import ansi
# STATIC VARIABLES # STATIC VARIABLES
@ -111,7 +111,7 @@ def start_mprime(working_dir, log_path) -> subprocess.Popen:
stdin=proc_mprime.stdout, stdin=proc_mprime.stdout,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
) )
proc_mprime.stdout.close() proc_mprime.stdout.close() # type: ignore[reportOptionalMemberAccess]
save_nsbr = exe.NonBlockingStreamReader(proc_grep.stdout) save_nsbr = exe.NonBlockingStreamReader(proc_grep.stdout)
exe.start_thread( exe.start_thread(
save_nsbr.save_to_file, save_nsbr.save_to_file,
@ -122,7 +122,7 @@ def start_mprime(working_dir, log_path) -> subprocess.Popen:
return proc_mprime return proc_mprime
def start_sysbench(sensors, sensors_out, log_path, pane) -> SysbenchType: def start_sysbench(sensors, sensors_out, log_path) -> SysbenchType:
"""Start sysbench, returns tuple with Popen object and file handle.""" """Start sysbench, returns tuple with Popen object and file handle."""
set_apple_fan_speed('max') set_apple_fan_speed('max')
sysbench_cmd = [ sysbench_cmd = [
@ -141,9 +141,6 @@ def start_sysbench(sensors, sensors_out, log_path, pane) -> SysbenchType:
thermal_action=('killall', 'sysbench', '-INT'), thermal_action=('killall', 'sysbench', '-INT'),
) )
# Update bottom pane
tmux.respawn_pane(pane, watch_file=log_path, watch_cmd='tail')
# Start sysbench # Start sysbench
filehandle_sysbench = open( filehandle_sysbench = open(
log_path, 'a', encoding='utf-8', log_path, 'a', encoding='utf-8',

View file

@ -25,8 +25,7 @@ from wk.hw.network import network_test
from wk.hw.screensavers import screensaver from wk.hw.screensavers import screensaver
from wk.hw.test import Test, TestGroup from wk.hw.test import Test, TestGroup
from wk.ui import tui as ui from wk.ui import ansi, cli, tui
from wk.ui import ansi, cli, tmux
# STATIC VARIABLES # STATIC VARIABLES
@ -85,33 +84,24 @@ class State():
def __init__(self, test_mode=False): def __init__(self, test_mode=False):
self.disks = [] self.disks = []
self.log_dir = None self.log_dir = None
self.panes = {}
self.progress_file = None self.progress_file = None
self.system = None self.system = None
self.test_groups = [] self.test_groups = []
self.title_text = ansi.color_string('Hardware Diagnostics', 'GREEN') self.title_text = ansi.color_string('Hardware Diagnostics', 'GREEN')
if test_mode: if test_mode:
self.title_text += ansi.color_string(' (Test Mode)', 'YELLOW') self.title_text += ansi.color_string(' (Test Mode)', 'YELLOW')
self.ui = ui.TUI(self.title_text) self.ui = tui.TUI(f'{self.title_text}\nMain Menu')
def abort_testing(self) -> None: def abort_testing(self) -> None:
"""Set unfinished tests as aborted and cleanup tmux panes.""" """Set unfinished tests as aborted and cleanup panes."""
for group in self.test_groups: for group in self.test_groups:
for test in group.test_objects: for test in group.test_objects:
if test.status in ('Pending', 'Working'): if test.status in ('Pending', 'Working'):
test.set_status('Aborted') test.set_status('Aborted')
# Cleanup tmux # Cleanup panes
self.panes.pop('Current', None) self.ui.remove_all_info_panes()
for key, pane_ids in self.panes.copy().items(): self.ui.remove_all_worker_panes()
if key in ('Top', 'Started', 'Progress'):
continue
if isinstance(pane_ids, str):
tmux.kill_pane(self.panes.pop(key))
else:
for _id in pane_ids:
tmux.kill_pane(_id)
self.panes.pop(key)
def disk_safety_checks(self) -> None: def disk_safety_checks(self) -> None:
"""Check for mid-run SMART failures and failed test(s).""" """Check for mid-run SMART failures and failed test(s)."""
@ -127,8 +117,6 @@ class State():
# Reset objects # Reset objects
self.disks.clear() self.disks.clear()
self.layout.clear()
self.layout.update(cfg.hw.TMUX_LAYOUT)
self.test_groups.clear() self.test_groups.clear()
# Set log # Set log
@ -191,35 +179,6 @@ class State():
test_group.test_objects.append(test_obj) test_group.test_objects.append(test_obj)
self.test_groups.append(test_group) self.test_groups.append(test_group)
def init_tmux(self) -> None:
"""Initialize tmux layout."""
tmux.kill_all_panes()
# Top
self.panes['Top'] = tmux.split_window(
behind=True,
lines=2,
vertical=True,
text=f'{self.title_text}\nMain Menu',
)
# Started
self.panes['Started'] = tmux.split_window(
lines=cfg.hw.TMUX_SIDE_WIDTH,
target_id=self.panes['Top'],
text=ansi.color_string(
['Started', time.strftime("%Y-%m-%d %H:%M %Z")],
['BLUE', None],
sep='\n',
),
)
# Progress
self.panes['Progress'] = tmux.split_window(
lines=cfg.hw.TMUX_SIDE_WIDTH,
text=' ',
)
def save_debug_reports(self) -> None: def save_debug_reports(self) -> None:
"""Save debug reports to disk.""" """Save debug reports to disk."""
LOG.info('Saving debug reports') LOG.info('Saving debug reports')
@ -266,17 +225,6 @@ class State():
_f.write(f'\n{test.name}:\n') _f.write(f'\n{test.name}:\n')
_f.write('\n'.join(debug.generate_object_report(test, indent=1))) _f.write('\n'.join(debug.generate_object_report(test, indent=1)))
def update_clock(self) -> None:
"""Update 'Started' pane following clock sync."""
tmux.respawn_pane(
pane_id=self.panes['Started'],
text=ansi.color_string(
['Started', time.strftime("%Y-%m-%d %H:%M %Z")],
['BLUE', None],
sep='\n',
),
)
def update_progress_file(self) -> None: def update_progress_file(self) -> None:
"""Update progress file.""" """Update progress file."""
report = [] report = []
@ -296,9 +244,9 @@ class State():
# Write to progress file # Write to progress file
self.progress_file.write_text('\n'.join(report), encoding='utf-8') self.progress_file.write_text('\n'.join(report), encoding='utf-8')
def update_top_pane(self, text) -> None: def update_title_text(self, text) -> None:
"""Update top pane with text.""" """Update top pane with text."""
tmux.respawn_pane(self.panes['Top'], text=f'{self.title_text}\n{text}') self.ui.set_title(self.title_text, text)
# Functions # Functions
@ -370,7 +318,7 @@ def cpu_stress_tests(state, test_objects, test_mode=False) -> None:
return return
# Prep # Prep
state.update_top_pane(test_mprime_obj.dev.cpu_description) state.update_title_text(test_mprime_obj.dev.cpu_description)
test_cooling_obj.set_status('Working') test_cooling_obj.set_status('Working')
test_mprime_obj.set_status('Working') test_mprime_obj.set_status('Working')
@ -383,17 +331,16 @@ def cpu_stress_tests(state, test_objects, test_mode=False) -> None:
# Create monitor and worker panes # Create monitor and worker panes
state.update_progress_file() state.update_progress_file()
state.panes['Prime95'] = tmux.split_window( state.ui.add_worker_pane(lines=10, watch_cmd='tail', watch_file=prime_log)
lines=10, vertical=True, watch_file=prime_log, watch_cmd='tail')
if PLATFORM == 'Darwin': if PLATFORM == 'Darwin':
state.panes['Temps'] = tmux.split_window( state.ui.add_info_pane(
behind=True, percent=80, vertical=True, cmd='./hw-sensors') percent=80, cmd='./hw-sensors', update_layout=False,
)
elif PLATFORM == 'Linux': elif PLATFORM == 'Linux':
state.panes['Temps'] = tmux.split_window( state.ui.add_info_pane(
behind=True, percent=80, vertical=True, watch_file=sensors_out) percent=80, watch_file=sensors_out, update_layout=False,
tmux.resize_pane(height=3) )
state.panes['Current'] = '' state.ui.set_current_pane_height(3)
state.layout['Current'] = {'height': 3, 'Check': True}
# Get idle temps # Get idle temps
cli.print_standard('Saving idle temps...') cli.print_standard('Saving idle temps...')
@ -421,7 +368,7 @@ def cpu_stress_tests(state, test_objects, test_mode=False) -> None:
state.update_progress_file() state.update_progress_file()
# Get cooldown temp # Get cooldown temp
cli.clear_screen() state.ui.clear_current_pane()
cli.print_standard('Letting CPU cooldown...') cli.print_standard('Letting CPU cooldown...')
std.sleep(5) std.sleep(5)
cli.print_standard('Saving cooldown temps...') cli.print_standard('Saving cooldown temps...')
@ -440,15 +387,18 @@ def cpu_stress_tests(state, test_objects, test_mode=False) -> None:
if run_sysbench: if run_sysbench:
LOG.info('CPU Test (Sysbench)') LOG.info('CPU Test (Sysbench)')
cli.print_standard('Letting CPU cooldown more...') cli.print_standard('Letting CPU cooldown more...')
std.sleep(30) std.sleep(10)
cli.clear_screen() state.ui.clear_current_pane()
cli.print_info('Running alternate stress test') cli.print_info('Running alternate stress test')
print('') print('')
sysbench_log = prime_log.with_name('sysbench.log')
sysbench_log.touch()
state.ui.remove_all_worker_panes()
state.ui.add_worker_pane(lines=10, watch_cmd='tail', watch_file=sysbench_log)
proc_sysbench, filehandle_sysbench = hw_cpu.start_sysbench( proc_sysbench, filehandle_sysbench = hw_cpu.start_sysbench(
sensors, sensors,
sensors_out, sensors_out,
log_path=prime_log.with_name('sysbench.log'), log_path=sysbench_log,
pane=state.panes['Prime95'],
) )
try: try:
print_countdown(proc=proc_sysbench, seconds=test_minutes*60) print_countdown(proc=proc_sysbench, seconds=test_minutes*60)
@ -474,9 +424,9 @@ def cpu_stress_tests(state, test_objects, test_mode=False) -> None:
# Cleanup # Cleanup
state.update_progress_file() state.update_progress_file()
sensors.stop_background_monitor() sensors.stop_background_monitor()
state.panes.pop('Current', None) state.ui.clear_current_pane_height()
tmux.kill_pane(state.panes.pop('Prime95', None)) state.ui.remove_all_info_panes()
tmux.kill_pane(state.panes.pop('Temps', None)) state.ui.remove_all_worker_panes()
# Done # Done
if aborted: if aborted:
@ -504,14 +454,10 @@ def disk_io_benchmark(
aborted = False aborted = False
# Run benchmarks # Run benchmarks
state.update_top_pane( state.update_title_text(
f'Disk I/O Benchmark{"s" if len(test_objects) > 1 else ""}', f'Disk I/O Benchmark{"s" if len(test_objects) > 1 else ""}',
) )
state.panes['I/O Benchmark'] = tmux.split_window( state.ui.set_current_pane_height(10)
percent=50,
vertical=True,
text=' ',
)
for test in test_objects: for test in test_objects:
if test.disabled: if test.disabled:
# Skip # Skip
@ -523,12 +469,14 @@ def disk_io_benchmark(
continue continue
# Start benchmark # Start benchmark
cli.clear_screen() state.ui.clear_current_pane()
cli.print_report(test.dev.generate_report()) cli.print_report(test.dev.generate_report())
test.set_status('Working') test.set_status('Working')
test_log = f'{state.log_dir}/{test.dev.path.name}_benchmark.out' test_log = f'{state.log_dir}/{test.dev.path.name}_benchmark.out'
tmux.respawn_pane( state.ui.remove_all_worker_panes()
state.panes['I/O Benchmark'], state.ui.add_worker_pane(
percent=50,
update_layout=False,
watch_cmd='tail', watch_cmd='tail',
watch_file=test_log, watch_file=test_log,
) )
@ -554,7 +502,8 @@ def disk_io_benchmark(
# Cleanup # Cleanup
state.update_progress_file() state.update_progress_file()
tmux.kill_pane(state.panes.pop('I/O Benchmark', None)) state.ui.clear_current_pane_height()
state.ui.remove_all_worker_panes()
# Done # Done
if aborted: if aborted:
@ -566,10 +515,9 @@ def disk_self_test(state, test_objects, test_mode=False) -> None:
LOG.info('Disk Self-Test(s)') LOG.info('Disk Self-Test(s)')
aborted = False aborted = False
threads = [] threads = []
state.panes['SMART'] = []
# Run self-tests # Run self-tests
state.update_top_pane( state.update_title_text(
f'Disk self-test{"s" if len(test_objects) > 1 else ""}', f'Disk self-test{"s" if len(test_objects) > 1 else ""}',
) )
cli.print_info(f'Starting self-test{"s" if len(test_objects) > 1 else ""}') cli.print_info(f'Starting self-test{"s" if len(test_objects) > 1 else ""}')
@ -586,9 +534,7 @@ def disk_self_test(state, test_objects, test_mode=False) -> None:
# Show progress # Show progress
if threads[-1].is_alive(): if threads[-1].is_alive():
state.panes['SMART'].append( state.ui.add_worker_pane(lines=4, watch_cmd='tail', watch_file=test_log)
tmux.split_window(lines=4, vertical=True, watch_file=test_log),
)
# Wait for all tests to complete # Wait for all tests to complete
state.update_progress_file() state.update_progress_file()
@ -607,9 +553,7 @@ def disk_self_test(state, test_objects, test_mode=False) -> None:
# Cleanup # Cleanup
state.update_progress_file() state.update_progress_file()
for pane in state.panes['SMART']: state.ui.remove_all_worker_panes()
tmux.kill_pane(pane)
state.panes.pop('SMART', None)
# Done # Done
if aborted: if aborted:
@ -663,10 +607,9 @@ def disk_surface_scan(state, test_objects, test_mode=False) -> None:
LOG.info('Disk Surface Scan (badblocks)') LOG.info('Disk Surface Scan (badblocks)')
aborted = False aborted = False
threads = [] threads = []
state.panes['badblocks'] = []
# Update panes # Update panes
state.update_top_pane( state.update_title_text(
f'Disk Surface Scan{"s" if len(test_objects) > 1 else ""}', f'Disk Surface Scan{"s" if len(test_objects) > 1 else ""}',
) )
cli.print_info( cli.print_info(
@ -685,14 +628,7 @@ def disk_surface_scan(state, test_objects, test_mode=False) -> None:
# Show progress # Show progress
if threads[-1].is_alive(): if threads[-1].is_alive():
state.panes['badblocks'].append( state.ui.add_worker_pane(lines=5, watch_cmd='tail', watch_file=test_log)
tmux.split_window(
lines=5,
vertical=True,
watch_cmd='tail',
watch_file=test_log,
),
)
# Wait for all tests to complete # Wait for all tests to complete
try: try:
@ -713,9 +649,7 @@ def disk_surface_scan(state, test_objects, test_mode=False) -> None:
# Cleanup # Cleanup
state.update_progress_file() state.update_progress_file()
for pane in state.panes['badblocks']: state.ui.remove_all_worker_panes()
tmux.kill_pane(pane)
state.panes.pop('badblocks', None)
# Done # Done
if aborted: if aborted:
@ -733,7 +667,6 @@ def main() -> None:
raise RuntimeError('tmux session not found') raise RuntimeError('tmux session not found')
# Init # Init
atexit.register(tmux.kill_all_panes)
menu = build_menu(cli_mode=args['--cli'], quick_mode=args['--quick']) menu = build_menu(cli_mode=args['--cli'], quick_mode=args['--quick'])
state = State(test_mode=args['--test-mode']) state = State(test_mode=args['--test-mode'])
@ -759,7 +692,7 @@ def main() -> None:
# Run simple test # Run simple test
if action: if action:
state.update_top_pane(selection[0]) state.update_title_text(selection[0])
try: try:
action() action()
except KeyboardInterrupt: except KeyboardInterrupt:
@ -767,7 +700,7 @@ def main() -> None:
cli.print_standard('') cli.print_standard('')
cli.pause('Press Enter to return to main menu...') cli.pause('Press Enter to return to main menu...')
if 'Clock Sync' in selection: if 'Clock Sync' in selection:
state.update_clock() state.ui.update_clock()
# Secrets # Secrets
if 'Matrix' in selection: if 'Matrix' in selection:
@ -791,7 +724,7 @@ def main() -> None:
run_diags(state, menu, quick_mode=False, test_mode=args['--test-mode']) run_diags(state, menu, quick_mode=False, test_mode=args['--test-mode'])
# Reset top pane # Reset top pane
state.update_top_pane('Main Menu') state.update_title_text('Main Menu')
def print_countdown(proc, seconds) -> None: def print_countdown(proc, seconds) -> None:
@ -842,7 +775,7 @@ def run_diags(state, menu, quick_mode=False, test_mode=False) -> None:
args = [group.test_objects] args = [group.test_objects]
if group.name == 'Disk I/O Benchmark': if group.name == 'Disk I/O Benchmark':
args.append(menu.toggles['Skip USB Benchmarks']['Selected']) args.append(menu.toggles['Skip USB Benchmarks']['Selected'])
cli.clear_screen() state.ui.clear_current_pane()
try: try:
function(state, *args, test_mode=test_mode) function(state, *args, test_mode=test_mode)
except (KeyboardInterrupt, std.GenericAbort): except (KeyboardInterrupt, std.GenericAbort):
@ -887,8 +820,8 @@ def show_failed_attributes(state) -> None:
def show_results(state) -> None: def show_results(state) -> None:
"""Show test results by device.""" """Show test results by device."""
std.sleep(0.5) std.sleep(0.5)
cli.clear_screen() state.ui.clear_current_pane()
state.update_top_pane('Results') state.update_title_text('Results')
# CPU Tests # CPU Tests
cpu_tests_enabled = [ cpu_tests_enabled = [

View file

@ -51,13 +51,27 @@ def fix_layout(layout, forced=False):
for data in layout.values(): for data in layout.values():
data['Panes'] = [pane for pane in data['Panes'] if poll_pane(pane)] data['Panes'] = [pane for pane in data['Panes'] if poll_pane(pane)]
# Calc height for "floating" row
# NOTE: We start with height +1 to account for the splits (i.e. splits = num rows - 1)
floating_height = 1 + get_window_size()[1]
for group in ('Title', 'Info', 'Current', 'Workers'):
if layout[group]['Panes']:
group_height = 1 + layout[group].get('height', 0)
if group == 'Workers':
group_height *= len(layout[group]['Panes'])
floating_height -= group_height
# Update main panes # Update main panes
resize_pane(height=999) # Set active pane too large and then adjust down
for section, data in layout.items(): for section, data in layout.items():
# "Floating" pane(s)
if 'height' not in data and section in ('Info', 'Current', 'Workers'):
for pane_id in data['Panes']:
resize_kwargs.append({'pane_id': pane_id, 'height': floating_height})
# Rest of the panes
if section == 'Workers': if section == 'Workers':
# Skip for now # Skip for now
continue continue
if 'height' in data: if 'height' in data:
for pane_id in data['Panes']: for pane_id in data['Panes']:
resize_kwargs.append({'pane_id': pane_id, 'height': data['height']}) resize_kwargs.append({'pane_id': pane_id, 'height': data['height']})
@ -81,33 +95,19 @@ def fix_layout(layout, forced=False):
resize_pane(pane_id, width=width) resize_pane(pane_id, width=width)
if group == 'Title': if group == 'Title':
# (re)fix Started pane # (re)fix Started pane
#TODO: REstore: resize_pane(layout['Started']['Panes'][0], width=TMUX_SIDE_WIDTH) resize_pane(layout['Started']['Panes'][0], width=layout['Started']['width'])
resize_pane(layout['Started']['Panes'][0], width=21)
# Bail early # Bail early
if not (layout['Workers']['Panes'] and layout['Workers']['height']): if not (
layout['Workers']['Panes']
and 'height' in layout['Workers']
and floating_height > 0
):
return return
# Update worker heights # Update worker heights
worker_height = layout['Workers']['height'] for worker in reversed(layout['Workers']['Panes']):
workers = layout['Workers']['Panes'].copy() resize_pane(worker, height=layout['Workers']['height'])
num_workers = len(workers)
avail_height = sum(get_pane_size(pane)[1] for pane in workers)
avail_height += get_pane_size()[1] # Current pane
# Check if window is too small
if avail_height < (worker_height*num_workers) + 3:
# Just leave things as-is
return
# Resize current pane
resize_pane(height=avail_height-(worker_height*num_workers))
# Resize bottom pane
resize_pane(workers.pop(0), height=worker_height)
# Resize the rest of the panes by adjusting the ones above them
while len(workers) > 1:
next_height = sum(get_pane_size(pane)[1] for pane in workers[:2])
next_height -= worker_height
resize_pane(workers[1], height=next_height)
workers.pop(0)
def get_pane_size(pane_id=None): def get_pane_size(pane_id=None):
@ -262,7 +262,7 @@ def prep_file(path):
pass pass
def resize_pane(pane_id=None, width=None, height=None, **kwargs): def resize_pane(pane_id=None, width=None, height=None):
"""Resize current or target pane. """Resize current or target pane.
NOTE: kwargs is only here to make calling this function easier NOTE: kwargs is only here to make calling this function easier

View file

@ -6,6 +6,7 @@ import logging
import time import time
from copy import deepcopy from copy import deepcopy
from os import environ
from wk.exe import start_thread from wk.exe import start_thread
from wk.std import sleep from wk.std import sleep
@ -15,13 +16,13 @@ from wk.ui import ansi, tmux
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
TMUX_SIDE_WIDTH = 21 TMUX_SIDE_WIDTH = 21
TMUX_TITLE_HEIGHT = 2 TMUX_TITLE_HEIGHT = 2
TMUX_LAYOUT = { TMUX_LAYOUT = { # NOTE: This needs to be in order from top to bottom
'Current': {'Panes': [None]},
'Title': {'Panes': [], 'height': TMUX_TITLE_HEIGHT}, 'Title': {'Panes': [], 'height': TMUX_TITLE_HEIGHT},
'Info': {'Panes': []},
'Current': {'Panes': [environ.get('TMUX_PANE', None)]},
'Workers': {'Panes': []},
'Started': {'Panes': [], 'width': TMUX_SIDE_WIDTH}, 'Started': {'Panes': [], 'width': TMUX_SIDE_WIDTH},
'Progress': {'Panes': [], 'width': TMUX_SIDE_WIDTH}, 'Progress': {'Panes': [], 'width': TMUX_SIDE_WIDTH},
'Info': {'Panes': []},
'Workers': {'Panes': []},
} }
@ -42,11 +43,22 @@ class TUI():
# Close all panes at exit # Close all panes at exit
atexit.register(tmux.kill_all_panes) atexit.register(tmux.kill_all_panes)
def add_info_pane(self, height, **tmux_args) -> None: def add_info_pane(
self, lines=None, percent=None, update_layout=True, **tmux_args,
) -> None:
"""Add info pane.""" """Add info pane."""
if not (lines or percent):
# Bail early
raise RuntimeError('Neither lines nor percent specified.')
# Calculate lines if needed
if not lines:
lines = int(tmux.get_pane_size()[1] * (percent/100))
# Set tmux split args
tmux_args.update({ tmux_args.update({
'behind': True, 'behind': True,
'lines': height, 'lines': lines,
'target_id': None, 'target_id': None,
'vertical': True, 'vertical': True,
}) })
@ -60,7 +72,8 @@ class TUI():
tmux_args.pop('lines') tmux_args.pop('lines')
# Update layout # Update layout
self.layout['Info']['height'] = height if update_layout:
self.layout['Info']['height'] = lines
# Add pane # Add pane
self.layout['Info']['Panes'].append(tmux.split_window(**tmux_args)) self.layout['Info']['Panes'].append(tmux.split_window(**tmux_args))
@ -91,19 +104,48 @@ class TUI():
# Add pane # Add pane
self.layout['Title']['Panes'].append(tmux.split_window(**tmux_args)) self.layout['Title']['Panes'].append(tmux.split_window(**tmux_args))
def add_worker_pane(self, height, **tmux_split_args) -> None: def add_worker_pane(
self, lines=None, percent=None, update_layout=True, **tmux_args,
) -> None:
"""Add worker pane.""" """Add worker pane."""
self.layout['Workers']['height'] = height height = lines
self.layout['Workers']['Panes'].append(tmux.split_window(
vertical=True, # Bail early
lines=height, if not (lines or percent):
**tmux_split_args, raise RuntimeError('Neither lines nor percent specified.')
))
# Calculate height if needed
if not height:
height = int(tmux.get_pane_size()[1] * (percent/100))
# Set tmux split args
tmux_args.update({
'behind': False,
'lines': lines,
'percent': percent,
'target_id': None,
'vertical': True,
})
# Update layout
if update_layout:
self.layout['Workers']['height'] = height
# Add pane
self.layout['Workers']['Panes'].append(tmux.split_window(**tmux_args))
def clear_current_pane(self) -> None:
"""Clear screen and history for current pane."""
tmux.clear_pane()
def clear_current_pane_height(self) -> None:
"""Clear current pane height and update layout."""
self.layout['Current'].pop('height', None)
def fix_layout(self, forced=True) -> None: def fix_layout(self, forced=True) -> None:
"""Fix tmux layout based on self.layout.""" """Fix tmux layout based on self.layout."""
try: try:
fix_layout(self.layout, forced=forced) tmux.fix_layout(self.layout, forced=forced)
except RuntimeError: except RuntimeError:
# Assuming self.panes changed while running # Assuming self.panes changed while running
pass pass
@ -166,6 +208,11 @@ class TUI():
self.layout['Workers']['Panes'].clear() self.layout['Workers']['Panes'].clear()
tmux.kill_pane(*panes) tmux.kill_pane(*panes)
def set_current_pane_height(self, height) -> None:
"""Set current pane height and update layout."""
self.layout['Current']['height'] = height
tmux.resize_pane(height=height)
def set_progress_file(self, progress_file) -> None: def set_progress_file(self, progress_file) -> None:
"""Set the file to use for the progresse pane.""" """Set the file to use for the progresse pane."""
tmux.respawn_pane( tmux.respawn_pane(
@ -191,6 +238,17 @@ class TUI():
), ),
) )
def update_clock(self) -> None:
"""Update 'Started' pane following clock sync."""
tmux.respawn_pane(
pane_id=self.layout['Started']['Panes'][0],
text=ansi.color_string(
['Started', time.strftime("%Y-%m-%d %H:%M %Z")],
['BLUE', None],
sep='\n',
),
)
# Functions # Functions
def fix_layout(layout, forced=False): def fix_layout(layout, forced=False):
@ -207,7 +265,6 @@ def fix_layout(layout, forced=False):
data['Panes'] = [pane for pane in data['Panes'] if tmux.poll_pane(pane)] data['Panes'] = [pane for pane in data['Panes'] if tmux.poll_pane(pane)]
# Update main panes # Update main panes
tmux.resize_pane(height=999) # Set active pane too large and then adjust down
for section, data in layout.items(): for section, data in layout.items():
if section == 'Workers': if section == 'Workers':
# Skip for now # Skip for now
@ -278,31 +335,19 @@ def layout_needs_fixed(layout):
tmux.get_pane_size(pane)[0] != data['width'] for pane in data['Panes'] tmux.get_pane_size(pane)[0] != data['width'] for pane in data['Panes']
) )
# TODO: Re-enable?
## Group panes
#for group in ('Title', 'Info'):
# num_panes = len(layout[group]['Panes'])
# if num_panes <= 1:
# continue
# width = int( (tmux.get_pane_size()[0] - (1 - num_panes)) / num_panes )
# for pane in layout[group]['Panes']:
# needs_fixed = needs_fixed or abs(tmux.get_pane_size(pane)[0] - width) > 2
# Done # Done
return needs_fixed return needs_fixed
def hmm():
"""Hmm?"""
def test(): def test():
"""TODO: Deleteme"""
ui = TUI() ui = TUI()
ui.add_info_pane(height=10, text='Info One') ui.add_info_pane(lines=10, text='Info One')
ui.add_info_pane(height=10, text='Info Two') ui.add_info_pane(lines=10, text='Info Two')
ui.add_info_pane(height=10, text='Info Three') ui.add_info_pane(lines=10, text='Info Three')
ui.add_worker_pane(height=3, text='Work One') ui.add_worker_pane(lines=3, text='Work One')
ui.add_worker_pane(height=3, text='Work Two') ui.add_worker_pane(lines=3, text='Work Two')
ui.add_worker_pane(height=3, text='Work Three') ui.add_worker_pane(lines=3, text='Work Three')
ui.fix_layout()
return ui return ui