Use prompt_toolkit for CLI input

This commit is contained in:
2Shirt 2023-04-08 16:26:51 -07:00
parent 13fc64e6ab
commit d302be2d7c
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C
4 changed files with 158 additions and 86 deletions

View file

@ -525,8 +525,8 @@ class State():
settings['Needs Format'] = True
offset = 0
user_choice = ui.choice(
['G', 'M', 'S'],
'Format clone using GPT, MBR, or match Source type?',
['G', 'M', 'S'],
)
if user_choice == 'G':
settings['Table Type'] = 'GPT'
@ -563,7 +563,7 @@ class State():
bp_dest = self.destination
self._add_block_pair(part, bp_dest)
def confirm_selections(self, prompt, source_parts=None):
def confirm_selections(self, prompt_msg, source_parts=None):
"""Show selection details and prompt for confirmation."""
report = []
@ -642,7 +642,7 @@ class State():
# Prompt user
ui.clear_screen()
ui.print_report(report)
if not ui.ask(prompt):
if not ui.ask(prompt_msg):
raise std.GenericAbort()
def generate_report(self):
@ -746,7 +746,7 @@ class State():
# Confirmation #1
self.confirm_selections(
prompt='Are these selections correct?',
prompt_msg='Are these selections correct?',
source_parts=source_parts,
)
@ -1449,7 +1449,7 @@ def build_settings_menu(silent=True):
ui.print_standard(
f'Available ddrescue presets: {" / ".join(SETTING_PRESETS)}'
)
preset = ui.choice(SETTING_PRESETS, 'Please select a preset:')
preset = ui.choice('Please select a preset:', SETTING_PRESETS)
# Fix selection
for _p in SETTING_PRESETS:
@ -1788,19 +1788,9 @@ def get_table_type(disk_path):
def get_working_dir(mode, destination, force_local=False):
"""Get working directory using mode and destination, returns path."""
ticket_id = None
ticket_id = ui.get_ticket_id()
working_dir = None
# Set ticket ID
while ticket_id is None:
ticket_id = ui.input_text(
prompt='Please enter ticket ID:',
allow_empty_response=False,
)
ticket_id = ticket_id.replace(' ', '_')
if not re.match(r'^\d+', ticket_id):
ticket_id = None
# Use preferred path if possible
if mode == 'Image':
try:
@ -2262,12 +2252,12 @@ def run_recovery(state, main_menu, settings_menu, dry_run=True):
state.update_progress_pane('Idle')
def select_disk(prompt, skip_disk=None):
def select_disk(prompt_msg, skip_disk=None):
"""Select disk from list, returns Disk()."""
ui.print_info('Scanning disks...')
disks = hw_disk.get_disks()
menu = ui.Menu(
title=ansi.color_string(f'ddrescue TUI: {prompt} Selection', 'GREEN'),
title=ansi.color_string(f'ddrescue TUI: {prompt_msg} Selection', 'GREEN'),
)
menu.disabled_str = 'Already selected'
menu.separator = ' '
@ -2307,7 +2297,7 @@ def select_disk(prompt, skip_disk=None):
return selected_disk
def select_disk_parts(prompt, disk):
def select_disk_parts(prompt_msg, disk):
"""Select disk parts from list, returns list of Disk()."""
title = ansi.color_string('ddrescue TUI: Partition Selection', 'GREEN')
title += f'\n\nDisk: {disk.path} {disk.description}'
@ -2323,7 +2313,7 @@ def select_disk_parts(prompt, disk):
"""Loop over selection menu until at least one partition selected."""
while True:
selection = menu.advanced_select(
f'Please select the parts to {prompt.lower()}: ',
f'Please select the parts to {prompt_msg.lower()}: ',
)
if 'All' in selection:
for option in menu.options.values():
@ -2373,7 +2363,7 @@ def select_disk_parts(prompt, disk):
# Check if whole disk selected
if len(object_list) == len(disk.children):
# NOTE: This is not true if the disk has no partitions
msg = f'Preserve partition table and unused space in {prompt.lower()}?'
msg = f'Preserve partition table and unused space in {prompt_msg.lower()}?'
if ui.ask(msg):
# Replace part list with whole disk obj
object_list = [disk.path]
@ -2387,11 +2377,11 @@ def select_disk_parts(prompt, disk):
return object_list
def select_path(prompt):
def select_path(prompt_msg):
"""Select path, returns pathlib.Path."""
invalid = False
menu = ui.Menu(
title=ansi.color_string(f'ddrescue TUI: {prompt} Path Selection', 'GREEN'),
title=ansi.color_string(f'ddrescue TUI: {prompt_msg} Path Selection', 'GREEN'),
)
menu.separator = ' '
menu.add_action('Quit')
@ -2433,7 +2423,7 @@ def set_mode(docopt_args):
# Ask user if necessary
if not mode:
answer = ui.choice(['C', 'I'], 'Are we cloning or imaging?')
answer = ui.choice('Are we cloning or imaging?', ['C', 'I'])
if answer == 'C':
mode = 'Clone'
else:

View file

@ -212,7 +212,7 @@ def export_bitlocker_info():
]
# Get filename
file_name = ui.input_text(prompt='Enter filename', allow_empty_response=False)
file_name = ui.input_text(prompt_msg='Enter filename')
file_path = pathlib.Path(f'../../Bitlocker_{file_name}.txt').resolve()
# Save info

View file

@ -10,6 +10,8 @@ import sys
import traceback
from collections import OrderedDict
from prompt_toolkit import prompt
from prompt_toolkit.validation import Validator, ValidationError
try:
from functools import cache
@ -32,6 +34,74 @@ PLATFORM = platform.system()
# Classes
class InputChoiceValidator(Validator):
"""Validate that input is one of the provided choices."""
def __init__(self, choices, allow_empty=False):
self.allow_empty = allow_empty
self.choices = [str(c).upper() for c in choices]
super().__init__()
def validate(self, document):
text = document.text
if not (text or self.allow_empty):
raise ValidationError(
message='This input is required!',
cursor_position=len(text),
)
if text and text.upper() not in self.choices:
raise ValidationError(
message='Invalid selection',
cursor_position=len(text),
)
class InputNotEmptyValidator(Validator):
"""Validate that input is not empty."""
def validate(self, document):
text = document.text
if not text:
raise ValidationError(
message='This input is required!',
cursor_position=len(text),
)
class InputTicketIDValidator(Validator):
"""Validate that input resembles a ticket ID."""
def __init__(self, allow_empty=False):
self.allow_empty = allow_empty
super().__init__()
def validate(self, document):
text = document.text
if not (text or self.allow_empty):
raise ValidationError(
message='This input is required!',
cursor_position=len(text),
)
if text and not re.match(r'^\d', text):
raise ValidationError(
message='Ticket ID should start with a number!',
cursor_position=len(text),
)
class InputYesNoValidator(Validator):
"""Validate that input is a yes or no."""
def __init__(self, allow_empty=False):
self.allow_empty = allow_empty
super().__init__()
def validate(self, document):
text = document.text
if not (text or self.allow_empty):
raise ValidationError(
message='This input is required!',
cursor_position=len(text),
)
if text and not re.match(r'^(y(es|up|)|n(o|ope|))$', text, re.IGNORECASE):
raise ValidationError(
message='Please answer "yes" or "no"',
cursor_position=len(text),
)
class Menu():
"""Object for tracking menu specific data and methods.
@ -250,7 +320,7 @@ class Menu():
# Otherwise deselect
details['Selected'] = status and option in targets
def _user_select(self, prompt):
def _user_select(self, prompt_msg):
"""Show menu and select an entry, returns str."""
menu_text = self._generate_menu_text()
valid_answers = self._get_valid_answers()
@ -260,7 +330,7 @@ class Menu():
clear_screen()
print(menu_text)
sleep(0.01)
answer = input_text(prompt).strip()
answer = input_text(prompt_msg).strip()
if answer.upper() in valid_answers:
break
@ -297,14 +367,14 @@ class Menu():
details['Selected'] = details.get('Selected', False)
self.toggles[name] = details
def advanced_select(self, prompt='Please make a selection: '):
def advanced_select(self, prompt_msg='Please make a selection: '):
"""Display menu and make multiple selections, returns tuple.
NOTE: Menu is displayed until an action entry is selected.
"""
while True:
self._update(single_selection=False)
user_selection = self._user_select(prompt)
user_selection = self._user_select(prompt_msg)
selected_entry = self._resolve_selection(user_selection)
if user_selection.isnumeric():
# Update selection(s)
@ -316,19 +386,19 @@ class Menu():
# Done
return selected_entry
def settings_select(self, prompt='Please make a selection: '):
def settings_select(self, prompt_msg='Please make a selection: '):
"""Display menu and make multiple selections, returns tuple.
NOTE: Menu is displayed until an action entry is selected.
"""
choice_kwargs = {
'prompt_msg': 'Toggle or change value?',
'choices': ['T', 'C'],
'prompt': 'Toggle or change value?',
}
while True:
self._update(single_selection=True, settings_mode=True)
user_selection = self._user_select(prompt)
user_selection = self._user_select(prompt_msg)
selected_entry = self._resolve_selection(user_selection)
if user_selection.isnumeric():
if 'Value' in selected_entry[-1] and choice(**choice_kwargs) == 'C':
@ -344,18 +414,17 @@ class Menu():
# Done
return selected_entry
def simple_select(self, prompt='Please make a selection: ', update=True):
def simple_select(self, prompt_msg='Please make a selection: ', update=True):
"""Display menu and make a single selection, returns tuple."""
if update:
self._update()
user_selection = self._user_select(prompt)
user_selection = self._user_select(prompt_msg)
return self._resolve_selection(user_selection)
def update(self):
"""Update menu with default settings."""
self._update()
class TryAndPrint():
"""Object used to standardize running functions and returning the result.
@ -563,30 +632,28 @@ class TryAndPrint():
# Functions
def abort(prompt='Aborted.', show_prompt=True, return_code=1):
def abort(prompt_msg='Aborted.', show_prompt_msg=True, return_code=1):
"""Abort script."""
print_warning(prompt)
if show_prompt:
print_warning(prompt_msg)
if show_prompt_msg:
sleep(0.5)
pause(prompt='Press Enter to exit... ')
pause(prompt_msg='Press Enter to exit... ')
sys.exit(return_code)
def ask(prompt='Kotaero!'):
def ask(prompt_msg):
"""Prompt the user with a Y/N question, returns bool."""
answer = None
prompt = f'{prompt} [Y/N]: '
validator = InputYesNoValidator()
# Loop until acceptable answer is given
while answer is None:
tmp = input_text(prompt)
if re.search(r'^y(es|up|)$', tmp, re.IGNORECASE):
# Show prompt
response = input_text(f'{prompt_msg} [Y/N]: ', validator=validator)
if response.upper().startswith('Y'):
answer = True
elif re.search(r'^n(o|ope|)$', tmp, re.IGNORECASE):
elif response.upper().startswith('N'):
answer = False
# Done
LOG.info('%s%s', prompt, 'Yes' if answer else 'No')
LOG.info('%s%s', prompt_msg, 'Yes' if answer else 'No')
return answer
@ -599,27 +666,32 @@ def beep(repeat=1):
repeat -= 1
def choice(choices, prompt='答えろ!'):
def choice(prompt_msg, choices):
"""Choose an option from a provided list, returns str.
Choices provided will be converted to uppercase and returned as such.
Similar to the commands choice (Windows) and select (Linux).
"""
LOG.debug('choices: %s, prompt: %s', choices, prompt)
answer = None
LOG.debug('prompt_msg: %s, choices: %s', prompt_msg, choices)
choices = [str(c).upper()[:1] for c in choices]
prompt = f'{prompt} [{"/".join(choices)}]'
regex = f'^({"|".join(choices)})$'
prompt_msg = f'{prompt_msg} [{"/".join(choices)}]'
# Loop until acceptable answer is given
while answer is None:
tmp = input_text(prompt=prompt)
if re.search(regex, tmp, re.IGNORECASE):
answer = tmp.upper()
# Show prompt
response = input_text(prompt_msg, validator=InputChoiceValidator(choices))
# Done
LOG.info('%s %s', prompt, answer)
return answer
LOG.info('%s %s', prompt_msg, response)
return response.upper()
def fix_prompt(message):
"""Fix prompt, returns str."""
if not message:
message = 'Input text: '
message = str(message)
if message[-1:] != ' ':
message += ' '
return message
@cache
@ -659,29 +731,39 @@ def get_exception(name):
return obj
def input_text(prompt='Enter text', allow_empty_response=True):
"""Get text from user, returns string."""
prompt = str(prompt)
response = None
if prompt[-1:] != ' ':
prompt += ' '
print(prompt, end='', flush=True)
def get_ticket_id():
"""Get ticket ID, returns str."""
prompt_msg = 'Please enter ticket ID:'
validator = InputTicketIDValidator()
while response is None:
# Show prompt
ticket_id = input_text(prompt_msg, validator=validator)
# Done
return ticket_id
def input_text(
prompt_msg='Enter text: ', allow_empty=False, validator=None,
) -> str:
"""Get input from user, returns str."""
prompt_msg = fix_prompt(prompt_msg)
# Accept empty responses?
if not (allow_empty or validator):
validator = InputNotEmptyValidator()
# Show prompt
result = None
while result is None:
try:
response = input()
LOG.debug('%s%s', prompt, response)
except EOFError:
# Ignore and try again
LOG.warning('Exception occured', exc_info=True)
print('', flush=True)
if not allow_empty_response:
if response is None or not response.strip():
# The None check here is used to avoid a TypeError if response is None
print(f'\r{prompt}', end='', flush=True)
response = None
result = prompt(prompt_msg, validator=validator)
except KeyboardInterrupt:
# Ignore CTRL+c
pass
return response
# Done
return result
def major_exception():
@ -698,9 +780,9 @@ def major_exception():
raise SystemExit(1)
def pause(prompt='Press Enter to continue... '):
def pause(prompt_msg='Press Enter to continue... '):
"""Simple pause implementation."""
input_text(prompt)
input_text(prompt_msg, allow_empty=True)
def print_colored(strings, colors, log=False, sep=' ', **kwargs):

View file

@ -107,9 +107,9 @@ if ($MyInvocation.InvocationName -ne ".") {
$Url = FindDynamicUrl $DownloadPage $RegEx
DownloadFile -Path $Temp -Name "psutil64.whl" -Url $Url
# Python: pytz, requests, & dependancies
# Python: prompt_toolkit, pytz, requests, & dependancies
$RegEx = "href=.*.py3-none-any.whl"
foreach ($Module in @("chardet", "certifi", "idna", "pytz", "urllib3", "requests")) {
foreach ($Module in @("chardet", "certifi", "idna", "prompt_toolkit", "Pygments", "pytz", "requests", "urllib3", "wcwidth")) {
$DownloadPage = "https://pypi.org/project/$Module/"
$Name = "$Module.whl"
$Url = FindDynamicUrl -SourcePage $DownloadPage -RegEx $RegEx