418 lines
11 KiB
Python
418 lines
11 KiB
Python
"""WizardKit: osTicket Functions"""
|
|
# vim: sts=2 sw=2 ts=2
|
|
|
|
import atexit
|
|
import logging
|
|
import socket
|
|
import time
|
|
|
|
import mysql.connector as mariadb
|
|
|
|
from wk import std
|
|
from wk.cfg.osticket import SQL, STAFF, TEST_STATIONS
|
|
|
|
|
|
# STATIC_VARIABLES
|
|
LOG = logging.getLogger(__name__)
|
|
FLAG_CODES = {
|
|
'Pass': 1,
|
|
'Fail': 2,
|
|
}
|
|
FLAG_CPU = 'zTemps'
|
|
FLAG_DISK = 'zHDTune'
|
|
FLAG_MAX_TEMP = 'zMaxTemp'
|
|
RESPONSE_COLOR_CODES = {
|
|
'Normal': '0',
|
|
'Contact': '1',
|
|
'Diags': '2',
|
|
'Diags FAIL': '3',
|
|
}
|
|
TABLE_RESPONSE = 'ost_ticket_response'
|
|
TABLE_TICKET = 'ost_ticket'
|
|
|
|
|
|
# Classes
|
|
class osTicket(): # pylint: disable=invalid-name
|
|
"""Class to track osTicket data and functions."""
|
|
def __init__(self):
|
|
self.db_connection = None
|
|
self.db_cursor = None
|
|
self.disabled = False
|
|
self.errors = False
|
|
self.note = None
|
|
self.ticket_id = None
|
|
self.ticket_name = None
|
|
|
|
# Ensure connection is closed atexit
|
|
atexit.register(self._disconnect)
|
|
|
|
def _connect(self, silent=True):
|
|
"""Establish connection to osTicket."""
|
|
if self.disabled:
|
|
return
|
|
|
|
# Connect to database
|
|
for i in range(3):
|
|
i += 1
|
|
try:
|
|
self.db_connection = mariadb.connect(
|
|
host=SQL['Host'],
|
|
port=SQL['Port'],
|
|
database=SQL['DB'],
|
|
user=SQL['User'],
|
|
password=SQL['Pass'],
|
|
)
|
|
self.db_cursor = self.db_connection.cursor()
|
|
except mariadb.errors.InterfaceError:
|
|
# Network issue? try again
|
|
std.sleep(2)
|
|
except mariadb.errors.Error:
|
|
# Bad creds or other SQL error, bail
|
|
break
|
|
else:
|
|
# Connection established
|
|
break
|
|
|
|
# Raise exception if necessary
|
|
if self.db_cursor is None:
|
|
LOG.error('Failed to connect to osTicket database')
|
|
if silent:
|
|
# Don't raise exceptions, just disable ost
|
|
self.disabled = True
|
|
self.errors = True
|
|
else:
|
|
raise RuntimeError('Failed to connect to osTicket database')
|
|
|
|
def _disconnect(self):
|
|
"""Close osTicket connection."""
|
|
for db_obj in (self.db_cursor, self.db_connection):
|
|
if db_obj:
|
|
try:
|
|
db_obj.close()
|
|
except mariadb.errors.Error:
|
|
# Ignore errors since vars will be reset below
|
|
pass
|
|
|
|
# Reset db objects
|
|
self.db_cursor = None
|
|
self.db_connection = None
|
|
|
|
def _get_flag(self, flag_name):
|
|
"""Get flag for self.ticket_id from osTicket, returns str."""
|
|
flag_value = None
|
|
self._verify_ticket_id()
|
|
|
|
# Build SQL cmd
|
|
sql_cmd = (
|
|
f"SELECT `{flag_name}` FROM `{SQL['DB']}`.`{TABLE_TICKET}` "
|
|
f"WHERE `{TABLE_TICKET}`.`ticket_id` = {self.ticket_id};"
|
|
)
|
|
|
|
# Run SQL cmd
|
|
try:
|
|
self.db_cursor.execute(sql_cmd)
|
|
for s in self.db_cursor:
|
|
flag_value = s[0]
|
|
except mariadb.errors.Error as err_msg:
|
|
std.print_error(err_msg)
|
|
self.errors = True
|
|
|
|
# Done
|
|
return str(flag_value)
|
|
|
|
def _get_ticket_field(self, ticket_id, field_name):
|
|
"""Get field for ticket_id from osTicket, returns str."""
|
|
field_data = None
|
|
|
|
# Build SQL cmd
|
|
sql_cmd = (
|
|
f"SELECT {field_name} FROM `{SQL['DB']}`.`{TABLE_TICKET}` "
|
|
f"WHERE `{TABLE_TICKET}`.`ticket_id` = {ticket_id};"
|
|
)
|
|
|
|
# Lookup data
|
|
# NOTE: If multiple entries are found it will return the last
|
|
try:
|
|
self.db_cursor.execute(sql_cmd)
|
|
for result in self.db_cursor:
|
|
field_data = result[0]
|
|
except mariadb.errors.Error as err_msg:
|
|
# Show error and return None
|
|
std.print_error(err_msg)
|
|
|
|
# Done
|
|
return field_data
|
|
|
|
def _set_flag(self, flag_name, flag_value):
|
|
"""Set flag_name to flag_value for ticket_id in osTicket.
|
|
|
|
NOTE: This will overwrite any existing value.
|
|
"""
|
|
self._verify_ticket_id()
|
|
sql_cmd = (
|
|
f"UPDATE `{SQL['DB']}`.`{TABLE_TICKET}` "
|
|
f"SET `{flag_name}` = '{flag_value}' "
|
|
f"WHERE `{TABLE_TICKET}`.`ticket_id` = {self.ticket_id};"
|
|
)
|
|
|
|
# Run SQL cmd
|
|
try:
|
|
self.db_cursor.execute(sql_cmd)
|
|
except mariadb.errors.Error as err_msg:
|
|
std.print_error(err_msg)
|
|
self.errors = True
|
|
|
|
def _verify_ticket_id(self):
|
|
"""Verify that ticket_id has been set."""
|
|
if not self.ticket_id:
|
|
LOG.error('Ticket ID not set')
|
|
raise RuntimeError('Ticket ID not set')
|
|
|
|
def add_note(self):
|
|
"""Add note to be included in osTicket replies."""
|
|
lines = []
|
|
|
|
# Instructions
|
|
std.print_standard('Please enter the additional information for this ticket')
|
|
std.print_info(' (End note with an empty line)')
|
|
std.print_standard(' ')
|
|
|
|
# Get note
|
|
while True:
|
|
text = std.input_text('> ')
|
|
if not text:
|
|
break
|
|
lines.append(text.strip())
|
|
|
|
# Save note
|
|
self.note = lines.pop(0)
|
|
for line in lines:
|
|
self.note += f'\n...{line}'
|
|
|
|
def init(self):
|
|
"""Revert to defaults."""
|
|
self._disconnect()
|
|
self.disabled = False
|
|
self.errors = False
|
|
self.note = None
|
|
self.ticket_id = None
|
|
self.ticket_name = None
|
|
|
|
def post_response(self, response, color='Normal'):
|
|
"""Post a reply to a ticket in osTicket."""
|
|
lines = []
|
|
test_station = get_test_station_name()
|
|
self._connect(silent=True)
|
|
self._verify_ticket_id()
|
|
|
|
# Bail if disabled
|
|
if self.disabled:
|
|
return
|
|
|
|
# Format response
|
|
if test_station:
|
|
lines.append(f'[Test-Station: {test_station}]')
|
|
lines.append(f'[Report for ticket #{self.ticket_id} {self.ticket_name}]')
|
|
if self.note:
|
|
lines.append(f'[Note] {self.note}\n')
|
|
lines.append(str(response))
|
|
response = '\n'.join(lines)
|
|
response = std.strip_colors(response)
|
|
response = response.replace("`", "'").replace("'", "\\'")
|
|
|
|
# Build SQL cmd
|
|
sql_cmd = (
|
|
f"INSERT INTO `{SQL['DB']}`.`{TABLE_RESPONSE}` "
|
|
f"(ticket_id, staff_id, staff_name, response, created, code) "
|
|
f"VALUES ("
|
|
f" '{self.ticket_id}',"
|
|
f" '{STAFF['ID']}',"
|
|
f" '{STAFF['Name']}',"
|
|
f" '{response}',"
|
|
f" '{time.strftime('%Y-%m-%d %H:%M:%S')}',"
|
|
f" '{RESPONSE_COLOR_CODES.get(color, 'Normal')}'"
|
|
f");"
|
|
)
|
|
|
|
# Run SQL cmd
|
|
try:
|
|
self.db_cursor.execute(sql_cmd)
|
|
except mariadb.errors.Error:
|
|
self.errors = True
|
|
|
|
# Done
|
|
self._disconnect()
|
|
|
|
def select_ticket(self):
|
|
"""Set ticket number and name from osTicket DB."""
|
|
std.print_standard('Connecting to osTicket...')
|
|
|
|
# Bail if disabled
|
|
if self.disabled:
|
|
return
|
|
|
|
# Connect
|
|
while True:
|
|
try:
|
|
self._connect(silent=False)
|
|
except (mariadb.errors.Error, RuntimeError):
|
|
std.print_warning('Failed to connect to osTicket')
|
|
if not std.ask('Try again?'):
|
|
std.print_standard('Integration disabled for this session')
|
|
self.disabled = True
|
|
return
|
|
else:
|
|
# Connection successful
|
|
break
|
|
|
|
# Main loop
|
|
while self.ticket_id is None:
|
|
std.print_standard(' ')
|
|
_id = std.input_text('Enter ticket number (or leave blank to disable): ')
|
|
_id = _id.strip()
|
|
|
|
# Nothing entered
|
|
if not _id:
|
|
print(' ')
|
|
if std.ask('Disable osTicket integration for this session?'):
|
|
self.disabled = True
|
|
break
|
|
|
|
# Invalid ID entered
|
|
if not _id.isnumeric():
|
|
continue
|
|
|
|
# Valid ID entered, lookup name
|
|
_name = self._get_ticket_field(_id, 'name')
|
|
|
|
# Invalid ticket selected
|
|
if _name is None:
|
|
std.print_error(f'Ticket #{_id} not found')
|
|
continue
|
|
|
|
# Valid ticket selected, lookup subject
|
|
_subject = self._get_ticket_field(_id, 'subject')
|
|
|
|
# Verify selection
|
|
std.print_colored(
|
|
['You have selected ticket', f'#{_id}', _name],
|
|
[None, 'BLUE', None],
|
|
)
|
|
std.print_colored(f' {_subject}', 'CYAN')
|
|
std.print_standard(' ')
|
|
if std.ask('Is this correct?'):
|
|
self.ticket_id = _id
|
|
self.ticket_name = _name
|
|
|
|
# Done
|
|
self._disconnect()
|
|
|
|
def set_cpu_max_temp(self, temp):
|
|
"""Set CPU max temp in osTicket for ticket_id.
|
|
|
|
NOTE: This will not replace a higher temp value.
|
|
"""
|
|
LOG.info('Setting max CPU temp to %s', temp)
|
|
self._connect(silent=True)
|
|
|
|
# Bail if disabled
|
|
if self.disabled:
|
|
return
|
|
|
|
# Compare to current temp
|
|
current_temp = self._get_flag(FLAG_MAX_TEMP)
|
|
if str(current_temp).isnumeric() and int(current_temp) > temp:
|
|
std.print_warning('Not replacing higher temp in osTicket')
|
|
self._disconnect()
|
|
return
|
|
|
|
# Update temp
|
|
self._set_flag(FLAG_MAX_TEMP, temp)
|
|
|
|
# Done
|
|
self._disconnect()
|
|
|
|
def set_flag_failed(self, flag_name):
|
|
"""Set flag as failed in osTicket for ticket_id."""
|
|
LOG.warning('Setting osTicket %s checkbox to FAILED', flag_name)
|
|
real_flag_name = FLAG_CPU if flag_name == 'CPU' else FLAG_DISK
|
|
self._connect(silent=True)
|
|
|
|
# Bail if disabled
|
|
if self.disabled:
|
|
return
|
|
|
|
# Set flag
|
|
self._set_flag(real_flag_name, FLAG_CODES['Fail'])
|
|
|
|
# Done
|
|
self._disconnect()
|
|
|
|
def set_flag_passed(self, flag_name):
|
|
"""Set flag as passed in osTicket for ticket_id.
|
|
|
|
NOTE: This will not overwrite a failed status.
|
|
"""
|
|
real_flag_name = FLAG_CPU if flag_name == 'CPU' else FLAG_DISK
|
|
self._connect(silent=True)
|
|
|
|
# Bail if disabled
|
|
if self.disabled:
|
|
return
|
|
|
|
# Bail if flag checkbox set as FAILED
|
|
if self._get_flag(real_flag_name) == str(FLAG_CODES['Fail']):
|
|
std.print_warning(
|
|
f'Not replacing osTicket {flag_name} checkbox FAILED value',
|
|
)
|
|
self._disconnect()
|
|
return
|
|
|
|
# Current value != to FAILED, updating checkbox
|
|
LOG.info('Setting osTicket %s checkbox to PASSED', flag_name)
|
|
self._set_flag(real_flag_name, FLAG_CODES['Pass'])
|
|
|
|
# Done
|
|
self._disconnect()
|
|
|
|
|
|
# Functions
|
|
def get_test_station_name():
|
|
"""Get test station name from hostname, returns str.
|
|
|
|
NOTES: This is quite broad and may include false-positives.
|
|
If not a test station then an empty string is returned.
|
|
"""
|
|
hostname = socket.getfqdn()
|
|
|
|
# Check if this is a test station
|
|
if TEST_STATIONS['Domain'] in hostname:
|
|
hostname = hostname.replace(TEST_STATIONS['Domain'], '')
|
|
if hostname.lower() in TEST_STATIONS['Known Names']:
|
|
hostname = TEST_STATIONS['Known Names'][hostname.lower()]
|
|
else:
|
|
hostname = ''
|
|
|
|
# Done
|
|
return hostname
|
|
|
|
|
|
def pad_with_dots(text, pad_right=False):
|
|
"""Replace space padding with dots, returns str.
|
|
|
|
NOTE: This is a dumb hack to better align text in osTicket.
|
|
"""
|
|
text = str(text)
|
|
text = text.replace(' ', '..')
|
|
if '.' in text:
|
|
if pad_right:
|
|
text = f'{text}.'
|
|
else:
|
|
text = f'.{text}'
|
|
|
|
# Done
|
|
return text
|
|
|
|
|
|
if __name__ == '__main__':
|
|
print("This file is not meant to be called directly.")
|