178 lines
4.8 KiB
Python
178 lines
4.8 KiB
Python
# Wizard Kit: Functions - osTicket
|
|
|
|
import mysql.connector as mariadb
|
|
|
|
from functions.common import *
|
|
from settings.osticket import *
|
|
|
|
# Classes
|
|
class osTicket():
|
|
"""Class to track osTicket data and functions."""
|
|
def __init__(self):
|
|
self.db_connection = None
|
|
self.db_cursor = None
|
|
self.errors = False
|
|
self.tunnel_proc = None
|
|
|
|
def connect(self):
|
|
"""Establish connection to osTicket via a SSH tunnel."""
|
|
cmd = [
|
|
'ssh', '-N',
|
|
'-p', OSTICKET['SSH']['Port'],
|
|
'-L3306:127.0.0.1:{Port}'.format(**OSTICKET['Database']),
|
|
'{User}@{Host}'.format(**OSTICKET['SSH']),
|
|
]
|
|
|
|
# Only open tunnel if one doesn't exist
|
|
if self.tunnel_proc is None or self.tunnel_proc.poll() is not None:
|
|
self.tunnel_proc = popen_program(cmd)
|
|
|
|
# Connect to database
|
|
for x in range(5):
|
|
sleep(2)
|
|
try:
|
|
self.db_connection = mariadb.connect(
|
|
user=OSTICKET['Database']['User'],
|
|
password=OSTICKET['Database']['Pass'],
|
|
database=OSTICKET['Database']['Name'],
|
|
)
|
|
self.db_cursor = self.db_connection.cursor()
|
|
except Exception:
|
|
# TODO: Refine exception handling
|
|
pass
|
|
else:
|
|
# Connection established
|
|
break
|
|
|
|
def disconnect(self, reset_errors=False):
|
|
"""Close osTicket connection."""
|
|
try:
|
|
self.db_cursor.close()
|
|
self.db_connection.close()
|
|
except Exception:
|
|
# TODO: Fix exception handling
|
|
pass
|
|
|
|
# Reset errors
|
|
if reset_errors:
|
|
self.errors = False
|
|
|
|
# Reset vars
|
|
self.db_cursor = None
|
|
self.db_connection = None
|
|
|
|
def get_ticket_name(self, ticket_id):
|
|
"""Lookup ticket and return name as str."""
|
|
name = None
|
|
sql_cmd = "SELECT name FROM `{Ticket}`".format(**OSTICKET['Tables'])
|
|
sql_cmd += " WHERE `ticket_id` = `{}`".format(ticket_id)
|
|
sql_cmd += ";"
|
|
# TODO: Is the ';' needed above? It wasn't in the prev version??
|
|
|
|
# Lookup name
|
|
# NOTE: If multiple entries are found it will return the last
|
|
try:
|
|
self.db_cursor.execute(sql_cmd)
|
|
for s in self.db_cursor:
|
|
name = s[0]
|
|
except Exception:
|
|
# TODO: Fix exception handling
|
|
self.errors = True
|
|
|
|
def get_ticket_number(self):
|
|
"""Get ticket number and confirm with name from osTicket DB."""
|
|
ticket_number = None
|
|
|
|
# Main loop
|
|
while ticket_number is None:
|
|
print_standard(' ')
|
|
_input = input('Enter ticket number (or leave blank to disable): ')
|
|
_input = _input.strip()
|
|
|
|
# No ticket ID entered
|
|
if re.match(r'^\s*$', _input):
|
|
if ask('Disable osTicket integration for this run?'):
|
|
break
|
|
|
|
# Invalid ID entered
|
|
if not re.match(r'^([0-9]+)$', _input):
|
|
continue
|
|
|
|
# Valid ID entered, lookup name and verify
|
|
_name = self.get_ticket_name(_input)
|
|
if _name:
|
|
print_standard('You have selected ticket #{} {}'.format(
|
|
_input, _name))
|
|
if ask('Is this correct?'):
|
|
ticket_number = _input
|
|
|
|
# Done
|
|
return ticket_number
|
|
|
|
def post_response(self, ticket_id, response):
|
|
"""Post a reply to a ticket in osTicket."""
|
|
self.connect()
|
|
|
|
# Build SQL cmd
|
|
sql_cmd = "INSERT INTO `{Name}`.`{Response}`".format(
|
|
**OSTICKET['Database'], **OSTICKET['Tables'])
|
|
sql_cmd += " (ticket_id, staff_id, staff_name, response, created)"
|
|
sql_cmd += " VALUES ("
|
|
sql_cmd += " '{}',".format(ticket_id)
|
|
sql_cmd += " '{ID}', '{Name}',".format(**OSTICKET['Staff'])
|
|
sql_cmd += " '{}',".format(response)
|
|
sql_cmd += " '{}'".format(time.strftime("%Y-%m-%d %H:%M:%S"))
|
|
sql_cmd += " );"
|
|
|
|
# Run SQL cmd
|
|
try:
|
|
self.cursor.execute(sql_cmd)
|
|
except Exception:
|
|
# TODO: Fix exception handling
|
|
self.errors = True
|
|
|
|
# Done
|
|
self.disconnect()
|
|
|
|
def set_disk_failed(self, ticket_id):
|
|
"""Mark disk as failed in osTicket."""
|
|
self.set_flag(
|
|
ticket_id,
|
|
OSTICKET['Disk Flag']['Name'],
|
|
OSTICKET['Disk Flag']['Fail'])
|
|
|
|
def set_disk_passed(self, ticket_id):
|
|
"""Mark disk as passed in osTicket."""
|
|
self.set_flag(
|
|
ticket_id,
|
|
OSTICKET['Disk Flag']['Name'],
|
|
OSTICKET['Disk Flag']['Pass'])
|
|
|
|
def set_flag(self, ticket_id, flag_name, flag_value):
|
|
"""Set flag in osTicket."""
|
|
self.connect()
|
|
|
|
# Build SQL cmd
|
|
sql_cmd = "UPDATE `{Name}`.`{Ticket}`".format(
|
|
**OSTICKET['Database'], **OSTICKET['Tables'])
|
|
sql_cmd += " SET `{}` = `{}`".format(flag_name, flag_value)
|
|
sql_cmd += " WHERE `{Ticket}`.`ticket_id` = `{ticket_id}`".format(
|
|
ticket_id=ticket_id, **OSTICKET['Tables'])
|
|
sql_cmd += ";"
|
|
|
|
# Run SQL cmd
|
|
try:
|
|
self.cursor.execute(sql_cmd)
|
|
except Exception:
|
|
# TODO: Fix exception handling
|
|
self.errors = True
|
|
|
|
# Done
|
|
self.disconnect()
|
|
|
|
# Functions
|
|
|
|
if __name__ == '__main__':
|
|
print("This file is not meant to be called directly.")
|
|
|
|
# vim: sts=2 sw=2 ts=2
|