360 lines
9.1 KiB
Python
360 lines
9.1 KiB
Python
"""WizardKit: tmux Functions"""
|
|
# vim: sts=2 sw=2 ts=2
|
|
|
|
import logging
|
|
import pathlib
|
|
|
|
from typing import Any
|
|
|
|
from wk.exe import run_program
|
|
from wk.std import PLATFORM
|
|
|
|
|
|
# STATIC_VARIABLES
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
# Functions
|
|
def capture_pane(pane_id: str | None = None) -> str:
|
|
"""Capture text from current or target pane, returns str."""
|
|
cmd = ['tmux', 'capture-pane', '-p']
|
|
if pane_id:
|
|
cmd.extend(['-t', pane_id])
|
|
|
|
# Capture and return
|
|
proc = run_program(cmd, check=False)
|
|
return proc.stdout.strip()
|
|
|
|
|
|
def clear_pane(pane_id: str | None = None) -> None:
|
|
"""Clear pane buffer for current or target pane."""
|
|
commands = [
|
|
['tmux', 'send-keys', '-R'],
|
|
['tmux', 'clear-history'],
|
|
]
|
|
if pane_id:
|
|
commands = [[*cmd, '-t', pane_id] for cmd in commands]
|
|
|
|
# Clear pane
|
|
for cmd in commands:
|
|
run_program(cmd, check=False)
|
|
|
|
|
|
def fix_layout(layout: dict[str, dict[str, Any]], forced: bool = False) -> None:
|
|
"""Fix pane sizes based on layout."""
|
|
resize_kwargs = []
|
|
|
|
# Bail early
|
|
if not (forced or layout_needs_fixed(layout)):
|
|
# Layout should be fine
|
|
return
|
|
|
|
# Remove closed panes
|
|
for data in layout.values():
|
|
data['Panes'] = [pane for pane in data['Panes'] if poll_pane(pane)]
|
|
|
|
# Calc height for "floating" row
|
|
# NOTE: We start with height +1 to account for the splits (i.e. splits = num rows - 1)
|
|
floating_height = 1 + get_window_size()[1]
|
|
for group in ('Title', 'Info', 'Current', 'Workers'):
|
|
if layout[group]['Panes']:
|
|
group_height = 1 + layout[group].get('height', 0)
|
|
if group == 'Workers':
|
|
group_height *= len(layout[group]['Panes'])
|
|
floating_height -= group_height
|
|
|
|
# Update main panes
|
|
for section, data in layout.items():
|
|
# "Floating" pane(s)
|
|
if 'height' not in data and section in ('Info', 'Current', 'Workers'):
|
|
for pane_id in data['Panes']:
|
|
resize_kwargs.append({'pane_id': pane_id, 'height': floating_height})
|
|
|
|
# Rest of the panes
|
|
if section == 'Workers':
|
|
# Skip for now
|
|
continue
|
|
if 'height' in data:
|
|
for pane_id in data['Panes']:
|
|
resize_kwargs.append({'pane_id': pane_id, 'height': data['height']})
|
|
if 'width' in data:
|
|
for pane_id in data['Panes']:
|
|
resize_kwargs.append({'pane_id': pane_id, 'width': data['width']})
|
|
for kwargs in resize_kwargs:
|
|
try:
|
|
resize_pane(**kwargs)
|
|
except RuntimeError:
|
|
# Assuming pane was closed just before resizing
|
|
pass
|
|
|
|
# Update "group" panes widths
|
|
for group in ('Title', 'Info'):
|
|
num_panes = len(layout[group]['Panes'])
|
|
if num_panes <= 1:
|
|
continue
|
|
width = int( (get_pane_size()[0] - (1 - num_panes)) / num_panes )
|
|
for pane_id in layout[group]['Panes']:
|
|
resize_pane(pane_id, width=width)
|
|
if group == 'Title':
|
|
# (re)fix Started pane
|
|
resize_pane(layout['Started']['Panes'][0], width=layout['Started']['width'])
|
|
|
|
# Bail early
|
|
if not (
|
|
layout['Workers']['Panes']
|
|
and 'height' in layout['Workers']
|
|
and floating_height > 0
|
|
):
|
|
return
|
|
|
|
# Update worker heights
|
|
for worker in reversed(layout['Workers']['Panes']):
|
|
resize_pane(worker, height=layout['Workers']['height'])
|
|
|
|
|
|
def get_pane_size(pane_id: str | None = None) -> tuple[int, int]:
|
|
"""Get current or target pane size, returns tuple."""
|
|
cmd = ['tmux', 'display', '-p']
|
|
if pane_id:
|
|
cmd.extend(['-t', pane_id])
|
|
cmd.append('#{pane_width} #{pane_height}')
|
|
|
|
# Get resolution
|
|
proc = run_program(cmd, check=False)
|
|
width, height = proc.stdout.strip().split()
|
|
width = int(width)
|
|
height = int(height)
|
|
|
|
# Done
|
|
return (width, height)
|
|
|
|
|
|
def get_window_size() -> tuple[int, int]:
|
|
"""Get current window size, returns tuple."""
|
|
cmd = ['tmux', 'display', '-p', '#{window_width} #{window_height}']
|
|
|
|
# Get resolution
|
|
proc = run_program(cmd, check=False)
|
|
width, height = proc.stdout.strip().split()
|
|
width = int(width)
|
|
height = int(height)
|
|
|
|
# Done
|
|
return (width, height)
|
|
|
|
|
|
def kill_all_panes(pane_id: str | None = None) -> None:
|
|
"""Kill all panes except for the current or target pane."""
|
|
cmd = ['tmux', 'kill-pane', '-a']
|
|
if pane_id:
|
|
cmd.extend(['-t', pane_id])
|
|
|
|
# Kill
|
|
run_program(cmd, check=False)
|
|
|
|
|
|
def kill_pane(*pane_ids: str) -> None:
|
|
"""Kill pane(s) by id."""
|
|
cmd = ['tmux', 'kill-pane', '-t']
|
|
|
|
# Iterate over all passed pane IDs
|
|
for pane_id in pane_ids:
|
|
run_program(cmd+[pane_id], check=False)
|
|
|
|
|
|
def layout_needs_fixed(layout: dict[str, dict[str, Any]]) -> bool:
|
|
"""Check if layout needs fixed, returns bool."""
|
|
needs_fixed = False
|
|
|
|
# Check panes
|
|
for data in layout.values():
|
|
if 'height' in data:
|
|
needs_fixed = needs_fixed or any(
|
|
get_pane_size(pane)[1] != data['height'] for pane in data['Panes']
|
|
)
|
|
if 'width' in data:
|
|
needs_fixed = needs_fixed or any(
|
|
get_pane_size(pane)[0] != data['width'] for pane in data['Panes']
|
|
)
|
|
|
|
# TODO: Re-enable?
|
|
## Group panes
|
|
#for group in ('Title', 'Info'):
|
|
# num_panes = len(layout[group]['Panes'])
|
|
# if num_panes <= 1:
|
|
# continue
|
|
# width = int( (get_pane_size()[0] - (1 - num_panes)) / num_panes )
|
|
# for pane in layout[group]['Panes']:
|
|
# needs_fixed = needs_fixed or abs(get_pane_size(pane)[0] - width) > 2
|
|
|
|
# Done
|
|
return needs_fixed
|
|
|
|
|
|
def poll_pane(pane_id: str) -> bool:
|
|
"""Check if pane exists, returns bool."""
|
|
cmd = ['tmux', 'list-panes', '-F', '#D']
|
|
|
|
# Get list of panes
|
|
proc = run_program(cmd, check=False)
|
|
existant_panes = proc.stdout.splitlines()
|
|
|
|
# Check if pane exists
|
|
return pane_id in existant_panes
|
|
|
|
|
|
def prep_action(
|
|
cmd: str | None = None,
|
|
working_dir: pathlib.Path | str | None = None,
|
|
text: str | None = None,
|
|
watch_file: pathlib.Path | str | None = None,
|
|
watch_cmd: str = 'cat',
|
|
) -> list[str]:
|
|
"""Prep action to perform during a tmux call, returns list.
|
|
|
|
This will prep for running a basic command, displaying text on screen,
|
|
or monitoring a file. The last option uses cat by default but can be
|
|
overridden by using the watch_cmd.
|
|
"""
|
|
action_cmd = []
|
|
if working_dir:
|
|
action_cmd.extend(['-c', working_dir])
|
|
|
|
if cmd:
|
|
# Basic command
|
|
action_cmd.append(cmd)
|
|
elif text:
|
|
# Display text
|
|
echo_cmd = ['echo']
|
|
if PLATFORM == 'Linux':
|
|
echo_cmd.append('-e')
|
|
action_cmd.extend([
|
|
'watch',
|
|
'--color',
|
|
'--exec',
|
|
'--no-title',
|
|
'--interval', '1',
|
|
])
|
|
action_cmd.extend(echo_cmd)
|
|
action_cmd.append(text)
|
|
elif watch_file:
|
|
# Monitor file
|
|
prep_file(watch_file)
|
|
if watch_cmd == 'cat':
|
|
action_cmd.extend([
|
|
'watch',
|
|
'--color',
|
|
'--no-title',
|
|
'--interval', '1',
|
|
'cat',
|
|
])
|
|
elif watch_cmd == 'tail':
|
|
action_cmd.extend(['tail', '-q', '-f'])
|
|
action_cmd.append(watch_file)
|
|
else:
|
|
LOG.error('No action specified')
|
|
raise RuntimeError('No action specified')
|
|
|
|
# Done
|
|
return action_cmd
|
|
|
|
|
|
def prep_file(path: pathlib.Path | str) -> None:
|
|
"""Check if file exists and create empty file if not."""
|
|
path = pathlib.Path(path).resolve()
|
|
try:
|
|
path.touch(exist_ok=False)
|
|
except FileExistsError:
|
|
# Leave existing files alone
|
|
pass
|
|
|
|
|
|
def resize_pane(
|
|
pane_id: str | None = None,
|
|
width: int | None = None,
|
|
height: int | None = None,
|
|
) -> None:
|
|
"""Resize current or target pane.
|
|
|
|
NOTE: kwargs is only here to make calling this function easier
|
|
by dropping any extra kwargs passed.
|
|
"""
|
|
cmd = ['tmux', 'resize-pane']
|
|
|
|
# Safety checks
|
|
if not (width or height):
|
|
LOG.error('Neither width nor height specified')
|
|
raise RuntimeError('Neither width nor height specified')
|
|
|
|
# Finish building cmd
|
|
if pane_id:
|
|
cmd.extend(['-t', pane_id])
|
|
if width:
|
|
cmd.extend(['-x', str(width)])
|
|
if height:
|
|
cmd.extend(['-y', str(height)])
|
|
|
|
# Resize
|
|
run_program(cmd, check=False)
|
|
|
|
|
|
def respawn_pane(pane_id: str, **action) -> None:
|
|
"""Respawn pane with action."""
|
|
cmd = ['tmux', 'respawn-pane', '-k', '-t', pane_id]
|
|
cmd.extend(prep_action(**action))
|
|
|
|
# Respawn
|
|
run_program(cmd, check=False)
|
|
|
|
|
|
def split_window(
|
|
lines: int | None = None,
|
|
percent: int | None = None,
|
|
behind: bool = False,
|
|
vertical: bool = False,
|
|
target_id: str | None = None,
|
|
**action) -> str:
|
|
"""Split tmux window, run action, and return pane_id as str."""
|
|
cmd = ['tmux', 'split-window', '-d', '-PF', '#D']
|
|
|
|
# Safety checks
|
|
if not (lines or percent):
|
|
LOG.error('Neither lines nor percent specified')
|
|
raise RuntimeError('Neither lines nor percent specified')
|
|
|
|
# New pane placement
|
|
if behind:
|
|
cmd.append('-b')
|
|
if vertical:
|
|
cmd.append('-v')
|
|
else:
|
|
cmd.append('-h')
|
|
if target_id:
|
|
cmd.extend(['-t', target_id])
|
|
|
|
# New pane size
|
|
if lines:
|
|
cmd.extend(['-l', str(lines)])
|
|
elif percent:
|
|
cmd.extend(['-p', str(percent)])
|
|
|
|
# New pane action
|
|
cmd.extend(prep_action(**action))
|
|
|
|
# Run and return pane_id
|
|
proc = run_program(cmd, check=False)
|
|
return proc.stdout.strip()
|
|
|
|
|
|
def zoom_pane(pane_id: str | None = None) -> None:
|
|
"""Toggle zoom status for current or target pane."""
|
|
cmd = ['tmux', 'resize-pane', '-Z']
|
|
if pane_id:
|
|
cmd.extend(['-t', pane_id])
|
|
|
|
# Toggle
|
|
run_program(cmd, check=False)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
print("This file is not meant to be called directly.")
|