WizardKit/scripts/wk/osticket.py

431 lines
11 KiB
Python

"""WizardKit: osTicket Functions"""
# vim: sts=2 sw=2 ts=2
import atexit
import logging
import pathlib
import time
import mariadb
from wk.cfg.hw import TESTSTATION_FILE
from wk.cfg.osticket import SQL, STAFF
from wk.ui import ansi, cli
# STATIC_VARIABLES
LOG = logging.getLogger(__name__)
FLAG_CODES = {
'Pass': 1,
'Fail': 2,
}
FLAG_CPU = 'zTemps'
FLAG_DISK = 'zHDTune'
FLAG_MAX_TEMP = 'zMaxTemp'
RESPONSE_COLOR_CODES = {
'Normal': '0',
'Contact': '1',
'Diags': '2',
'Diags FAIL': '3',
'Money': '4',
}
TABLE_RESPONSE = 'ost_ticket_response'
TABLE_TICKET = 'ost_ticket'
# Classes
class osTicket():
"""Class to track osTicket data and functions."""
def __init__(self):
self.db_connection = None
self.db_cursor = None
self.disabled: bool = False
self.errors: bool = False
self.note: str = ''
self.ticket_id: int | None = None
self.ticket_name: str | None = None
self.ticket_subject: str | None = None
# Ensure connection is closed atexit
atexit.register(self._disconnect)
def _connect(self, silent=True):
"""Establish connection to osTicket."""
if self.disabled:
return
# Connect to database
try:
self.db_connection = mariadb.connect(
host=SQL['Host'],
port=SQL['Port'],
database=SQL['DB'],
user=SQL['User'],
password=SQL['Pass'],
connect_timeout=5,
)
self.db_cursor = self.db_connection.cursor()
except mariadb.Error:
# Assuming network issue or bad creds
pass
# Raise exception if necessary
if self.db_cursor is None:
LOG.error('Failed to connect to osTicket database')
if silent:
# Don't raise exceptions, just disable ost
self.disabled = True
self.errors = True
else:
raise RuntimeError('Failed to connect to osTicket database')
def _disconnect(self):
"""Close osTicket connection."""
for db_obj in (self.db_cursor, self.db_connection):
if db_obj:
try:
db_obj.close()
except mariadb.Error:
# Ignore errors since vars will be reset below
pass
# Reset db objects
self.db_cursor = None
self.db_connection = None
def _get_flag(self, flag_name):
"""Get flag for self.ticket_id from osTicket, returns str."""
flag_value = None
self._verify_ticket_id()
# Build SQL cmd
sql_cmd = (
f"SELECT `{flag_name}` FROM `{SQL['DB']}`.`{TABLE_TICKET}` "
f"WHERE `{TABLE_TICKET}`.`ticket_id` = {self.ticket_id};"
)
# Run SQL cmd
try:
self.db_cursor.execute(sql_cmd)
for s in self.db_cursor:
flag_value = s[0]
except mariadb.Error as err_msg:
cli.print_error(err_msg)
self.errors = True
# Done
return str(flag_value)
def _get_ticket_field(self, ticket_id, field_name):
"""Get field for ticket_id from osTicket, returns str."""
field_data = None
# Build SQL cmd
sql_cmd = (
f"SELECT {field_name} FROM `{SQL['DB']}`.`{TABLE_TICKET}` "
f"WHERE `{TABLE_TICKET}`.`ticket_id` = {ticket_id};"
)
# Lookup data
# NOTE: If multiple entries are found it will return the last
try:
self.db_cursor.execute(sql_cmd)
for result in self.db_cursor:
field_data = result[0]
except mariadb.Error as err_msg:
# Show error and return None
cli.print_error(err_msg)
# Done
return field_data
def _set_flag(self, flag_name, flag_value):
"""Set flag_name to flag_value for ticket_id in osTicket.
NOTE: This will overwrite any existing value.
"""
self._verify_ticket_id()
sql_cmd = (
f"UPDATE `{SQL['DB']}`.`{TABLE_TICKET}` "
f"SET `{flag_name}` = '{flag_value}' "
f"WHERE `{TABLE_TICKET}`.`ticket_id` = {self.ticket_id};"
)
# Run SQL cmd
try:
self.db_cursor.execute(sql_cmd)
except mariadb.Error as err_msg:
cli.print_error(err_msg)
self.errors = True
def _verify_ticket_id(self):
"""Verify that ticket_id has been set."""
if not self.ticket_id:
LOG.error('Ticket ID not set')
raise RuntimeError('Ticket ID not set')
def add_note(self, prompt: str = 'Add note') -> list[str]:
"""Add note to be included in osTicket replies."""
lines = []
if not prompt:
prompt = 'Please enter any additional information for this ticket'
# Instructions
cli.print_standard(prompt)
cli.print_info(' (End note with an empty line)')
cli.print_standard(' ')
# Get note
while True:
text = cli.input_text('> ', allow_empty=True)
if not text:
break
lines.append(text.strip())
# Save note
if lines:
self.note = lines[0]
for line in lines[1:]:
self.note += f'\n...{line}'
else:
self.note = ''
# Done
return lines
def init(self):
"""Revert to defaults."""
self._disconnect()
self.disabled = False
self.errors = False
self.note = None
self.ticket_id = None
self.ticket_name = None
def post_response(self, response, color='Normal'):
"""Post a reply to a ticket in osTicket."""
cur_date_time = time.strftime('%Y-%m-%d %H:%M:%S')
lines = []
test_station = get_test_station_name()
self._connect(silent=True)
self._verify_ticket_id()
# Bail if disabled
if self.disabled:
return
# Format response
if test_station:
lines.append(f'[Test-Station: {test_station}]')
lines.append(f'[Report for ticket #{self.ticket_id} {self.ticket_name}]')
if self.note:
lines.append(f'[Note] {self.note}\n')
lines.append(str(response))
response = '\n'.join(lines)
response = ansi.strip_colors(response)
response = response.replace("`", "'").replace("'", "\\'")
# Build SQL cmd
sql_cmd = (
f"INSERT INTO `{SQL['DB']}`.`{TABLE_RESPONSE}` "
f"(ticket_id, staff_id, staff_name, response, created, code) "
f"VALUES ("
f" '{self.ticket_id}',"
f" '{STAFF['ID']}',"
f" '{STAFF['Name']}',"
f" '{response}',"
f" '{cur_date_time}',"
f" '{RESPONSE_COLOR_CODES.get(color, 'Normal')}'"
f");"
)
# Run SQL cmd
try:
self.db_cursor.execute(sql_cmd)
except mariadb.Error:
self.errors = True
# Update ticket last repsonse field
sql_cmd = (
f"UPDATE `{SQL['DB']}`.`{TABLE_TICKET}`"
f"SET `lastresponse` = '{cur_date_time}' "
f"WHERE `{TABLE_TICKET}`.`ticket_id` = {self.ticket_id};"
)
try:
self.db_cursor.execute(sql_cmd)
except mariadb.Error:
self.errors = True
# Done
self._disconnect()
def select_ticket(self):
"""Set ticket number and name from osTicket DB."""
cli.print_standard('Connecting to osTicket...')
# Bail if disabled
if self.disabled:
return
# Connect
while True:
try:
self._connect(silent=False)
except (mariadb.Error, RuntimeError):
cli.print_warning('Failed to connect to osTicket')
if not cli.ask('Try again?'):
cli.print_standard('Integration disabled for this session')
self.disabled = True
return
else:
# Connection successful
break
# Main loop
while self.ticket_id is None:
cli.print_standard(' ')
_id = cli.input_text(
'Enter ticket number (or leave blank to disable): ', allow_empty=True,
)
_id = _id.strip()
# Nothing entered
if not _id:
print(' ')
if cli.ask('Disable osTicket integration for this session?'):
self.disabled = True
break
# Invalid ID entered
if not _id.isnumeric():
continue
# Valid ID entered, lookup name
_name = self._get_ticket_field(_id, 'name')
# Invalid ticket selected
if _name is None:
cli.print_error(f'Ticket #{_id} not found')
continue
# Valid ticket selected, lookup subject
_subject = self._get_ticket_field(_id, 'subject')
# Verify selection
cli.print_colored(
['You have selected ticket', f'#{_id}', _name],
[None, 'BLUE', None],
)
cli.print_colored(f' {_subject}', 'CYAN')
cli.print_standard(' ')
if cli.ask('Is this correct?'):
self.ticket_id = _id
self.ticket_name = _name
self.ticket_subject = _subject
# Done
self._disconnect()
def set_cpu_max_temp(self, temp):
"""Set CPU max temp in osTicket for ticket_id.
NOTE: This will not replace a higher temp value.
"""
LOG.info('Setting max CPU temp to %s', temp)
self._connect(silent=True)
# Bail if disabled
if self.disabled:
return
# Compare to current temp
current_temp = self._get_flag(FLAG_MAX_TEMP)
if str(current_temp).isnumeric() and int(current_temp) > temp:
cli.print_warning('Not replacing higher temp in osTicket')
self._disconnect()
return
# Update temp
self._set_flag(FLAG_MAX_TEMP, int(temp))
# Done
self._disconnect()
def set_flag_failed(self, flag_name):
"""Set flag as failed in osTicket for ticket_id."""
LOG.warning('Setting osTicket %s checkbox to FAILED', flag_name)
real_flag_name = FLAG_CPU if flag_name == 'CPU' else FLAG_DISK
self._connect(silent=True)
# Bail if disabled
if self.disabled:
return
# Set flag
self._set_flag(real_flag_name, FLAG_CODES['Fail'])
# Done
self._disconnect()
def set_flag_passed(self, flag_name):
"""Set flag as passed in osTicket for ticket_id.
NOTE: This will not overwrite a failed status.
"""
real_flag_name = FLAG_CPU if flag_name == 'CPU' else FLAG_DISK
self._connect(silent=True)
# Bail if disabled
if self.disabled:
return
# Bail if flag checkbox set as FAILED
if self._get_flag(real_flag_name) == str(FLAG_CODES['Fail']):
cli.print_warning(
f'Not replacing osTicket {flag_name} checkbox FAILED value',
)
self._disconnect()
return
# Current value != to FAILED, updating checkbox
LOG.info('Setting osTicket %s checkbox to PASSED', flag_name)
self._set_flag(real_flag_name, FLAG_CODES['Pass'])
# Done
self._disconnect()
# Functions
def get_test_station_name():
"""Get test station name, returns str."""
hostname_file = pathlib.Path(TESTSTATION_FILE)
# Bail early
if not hostname_file.exists():
return ''
if not hostname_file.read_text(encoding='utf-8'):
return ''
# Done
return hostname_file.read_text(encoding='utf-8').splitlines()[0]
def pad_with_dots(text, pad_right=False):
"""Replace space padding with dots, returns str.
NOTE: This is a dumb hack to better align text in osTicket.
"""
text = str(text)
text = text.replace(' ', '..')
if '.' in text:
if pad_right:
text = f'{text}.'
else:
text = f'.{text}'
# Done
return text
if __name__ == '__main__':
print("This file is not meant to be called directly.")