From e9e19053f421cdd3d5f6b3339e4c2d77e96c5747 Mon Sep 17 00:00:00 2001 From: 2Shirt <2xShirt@gmail.com> Date: Fri, 10 Jan 2020 16:29:20 -0700 Subject: [PATCH] Finished osTicket checkbox functions --- scripts/wk.prev/settings/osticket.py | 63 ------ scripts/wk/__init__.py | 1 + scripts/wk/cfg/osticket.py | 37 ++++ scripts/wk/ost.py | 113 ---------- scripts/wk/osticket.py | 316 +++++++++++++++++++++++++++ 5 files changed, 354 insertions(+), 176 deletions(-) delete mode 100644 scripts/wk.prev/settings/osticket.py create mode 100644 scripts/wk/cfg/osticket.py delete mode 100644 scripts/wk/ost.py create mode 100644 scripts/wk/osticket.py diff --git a/scripts/wk.prev/settings/osticket.py b/scripts/wk.prev/settings/osticket.py deleted file mode 100644 index 3c10ba83..00000000 --- a/scripts/wk.prev/settings/osticket.py +++ /dev/null @@ -1,63 +0,0 @@ -# Wizard Kit: Settings - osTicket - -OSTICKET = { - 'Color Codes': { - 'Normal': '0', - 'Contact': '1', - 'Diags': '2', - 'Diags FAIL': '3', - }, - 'CPU Flag': { - 'Name': 'zTemps', - 'Pass': 1, - 'Fail': 2, - }, - 'CPU Temp': { - 'Name': 'zMaxTemp', - }, - 'Database': { - 'Name': 'osticket', - 'User': 'wizardkit', - 'Pass': 'U9bJnF9eamVkfsVw', - 'Port': '3306', - }, - 'Disk Flag': { - 'Name': 'zHDTune', - 'Pass': 1, - 'Fail': 2, - }, - 'SSH': { - 'Host': 'osticket.1201.com', - 'Port': '22', - 'User': 'sql_tunnel', - }, - 'Staff': { - 'ID': '23', - 'Name': 'Wizard Kit', - }, - 'Tables': { - 'Response': 'ost_ticket_response', - 'Ticket': 'ost_ticket', - }, - } -TEST_STATIONS = { - 'bender': 'Bender', - 'combine': 'Combine', - 'control': 'Control', - 'cortana': 'Cortana', - 'data': 'Data', - 'glados': 'GLaDOS', - 'locutus': 'Locutus', - 'lore': 'Lore', - 'sex-robot': 'Sex-Robot', - 'shodan': 'Shodan', - 'six': 'Six', - 'skynet': 'Skynet', - 'supremo': 'Supremo', - 'unicron': 'Unicron', - } - -if __name__ == '__main__': - print("This file is not meant to be called directly.") - -# vim: sts=2 sw=2 ts=2 diff --git a/scripts/wk/__init__.py b/scripts/wk/__init__.py index b6a11b56..298c07fd 100644 --- a/scripts/wk/__init__.py +++ b/scripts/wk/__init__.py @@ -13,6 +13,7 @@ from wk import kit from wk import log from wk import net from wk import os +from wk import osticket from wk import std from wk import sw from wk import tmux diff --git a/scripts/wk/cfg/osticket.py b/scripts/wk/cfg/osticket.py new file mode 100644 index 00000000..1ba4fe5a --- /dev/null +++ b/scripts/wk/cfg/osticket.py @@ -0,0 +1,37 @@ +"""Wizard Kit: Config - osTicket""" +# vim: sts=2 sw=2 ts=2 + +SQL = { + 'DB': 'osticket', + 'Host': 'osticket.1201.com', + 'Port': '3306', + 'User': 'wizardkit', + 'Pass': 'U9bJnF9eamVkfsVw', + } +STAFF = { + 'ID': '23', + 'Name': 'Wizard Kit', + } +TEST_STATIONS = { + # Domain will be stripped from the FQDN so the initial '.' is included + 'Domain': '.1201.com', + # The rest are used to get the test-station name with proper capitalization + 'bender': 'Bender', + 'combine': 'Combine', + 'control': 'Control', + 'cortana': 'Cortana', + 'data': 'Data', + 'deepmind': 'DeepMind', + 'glados': 'GLaDOS', + 'locutus': 'Locutus', + 'lore': 'Lore', + 'sex-robot': 'Sex-Robot', + 'shodan': 'Shodan', + 'six': 'Six', + 'skynet': 'Skynet', + 'supremo': 'Supremo', + 'unicron': 'Unicron', + } + +if __name__ == '__main__': + print("This file is not meant to be called directly.") diff --git a/scripts/wk/ost.py b/scripts/wk/ost.py deleted file mode 100644 index 8894ee7a..00000000 --- a/scripts/wk/ost.py +++ /dev/null @@ -1,113 +0,0 @@ -"""WizardKit: osTicket Functions""" -# vim: sts=2 sw=2 ts=2 - -import atexit -import socket - -import mysql.connector as mariadb - - -# Classes -class osTicket(): # pylint: disable=invalid-name - """Class to track osTicket data and functions.""" - def __init__(self): - self.db_connection = None - self.db_cursor = None - self.disabled = False - self.tunnel_proc = None - - def connect(self, silent=True): - """Establish connection to osTicket via SSH tunnel.""" - # TODO connect - - def convert_report(self, name, report): - """Convert report into an osTicket friendly format, returns list.""" - out_report = [] - #TODO: Convert report - - def disconnect(self): - """Close osTicket connection.""" - # TODO: disconnect - - def get_flag(self, ticket_id, flag_name): - """Get flag for ticket_id from osTicket, returns int.""" - # TODO Get flag value, is it a consistent type? ^^ - - def get_ticket_id(self): - """Get ticket number and name from osTicket DB.""" - # TODO Get ticket number and name, save to self - - def get_ticket_field(self, ticket_id, field): - """Get field for ticket_id from osTicket, returns str.""" - # TODO: Get ticket field - - def set_cpu_failed(self, ticket_id): - """Set CPU as failed in osTicket for ticket_id.""" - # TODO Set CPU failed - - def set_cpu_passed(self, ticket_id): - """Set CPU as passed in osTicket for ticket_id. - - NOTE: This will not overwrite a failed status. - """ - # TODO Set CPU passed - - def set_disk_failed(self, ticket_id): - """Set disk as failed in osTicket for ticket_id.""" - # TODO Set disk failed - - def set_disk_passed(self, ticket_id): - """Set disk as passed in osTicket for ticket_id. - - NOTE: This will not overwrite a failed status. - """ - # TODO Set disk passed - - def set_flag(self, ticket_id, flag_name, flag_value): - """Set flag_name to flag_value for ticket_id in osTicket. - - NOTE: This will overwrite any existing value. - """ - # TODO: Set flag value - - -# Functions -def get_test_station_name(): - """Get test station name from hostname, returns str. - - NOTES: This is quite broad and may include false-positives. - If not a test station then an empty string is returned. - """ - hostname = socket.getfqdn() - - # Check if this is a test station - if TEST_STATIONS['Domain'] in hostname: - hostname = hostname.replace(TEST_STATIONS['Domain'], '') - if hostname.lower() in TEST_STATIONS['Known Names']: - hostname = TEST_STATIONS['Known Names'][hostname.lower()] - else: - hostname = '' - - # Done - return hostname - - -def pad_with_dots(text, pad_right=False): - """Replace space padding with dots, returns str. - - NOTE: This is a dumb hack to better align text in osTicket. - """ - text = str(text) - text = text.replace(' ', '..') - if '.' in text: - if pad_right: - text = f'{text}.' - else: - text = f'.{text}' - - # Done - return text - - -if __name__ == '__main__': - print("This file is not meant to be called directly.") diff --git a/scripts/wk/osticket.py b/scripts/wk/osticket.py new file mode 100644 index 00000000..a969937d --- /dev/null +++ b/scripts/wk/osticket.py @@ -0,0 +1,316 @@ +"""WizardKit: osTicket Functions""" +# vim: sts=2 sw=2 ts=2 + +import atexit +import logging +import socket + +import mysql.connector as mariadb + +from wk.cfg.osticket import SQL, STAFF, TEST_STATIONS +from wk.std import ( + ask, + input_text, + print_colored, + print_error, + print_standard, + print_warning, + sleep, + ) + + +# STATIC_VARIABLES +LOG = logging.getLogger(__name__) +FLAG_CODES = { + 'Pass': 1, + 'Fail': 2, + } +FLAG_CPU = 'zTemps' +FLAG_DISK = 'zHDTune' +FLAG_MAX_TEMP = 'zMaxTemp' +RESPONSE_COLOR_CODES = { + 'Normal': '0', + 'Contact': '1', + 'Diags': '2', + 'Diags FAIL': '3', + } +TABLE_RESPONSE = 'ost_ticket_response' +TABLE_TICKET = 'ost_ticket' + + +# Classes +class osTicket(): # pylint: disable=invalid-name + """Class to track osTicket data and functions.""" + def __init__(self): + self.db_connection = None + self.db_cursor = None + self.disabled = False + self.errors = False + self.ticket_id = None + self.ticket_name = None + + # Ensure connection is closed atexit + atexit.register(self._disconnect) + + def _connect(self, silent=True): + """Establish connection to osTicket.""" + if self.disabled: + return + + # Connect to database + for i in range(3): + i += 1 + try: + self.db_connection = mariadb.connect( + host=SQL['Host'], + port=SQL['Port'], + database=SQL['DB'], + user=SQL['User'], + password=SQL['Pass'], + ) + self.db_cursor = self.db_connection.cursor() + except mariadb.errors.InterfaceError: + # Network issue? try again + sleep(2) + except mariadb.errors.Error: + # Bad creds or other SQL error, bail + break + else: + # Connection established + break + + # Raise exception if necessary + if self.db_cursor is None and not silent: + LOG.error('Failed to connect to osTicket database') + raise RuntimeError('Failed to connect to osTicket database') + + def _convert_report(self, name, report): + """Convert report into an osTicket friendly format, returns list.""" + out_report = [] + #TODO: Convert report + + def _disconnect(self): + """Close osTicket connection.""" + for db_obj in (self.db_cursor, self.db_connection): + if db_obj: + try: + db_obj.close() + except mariadb.errors.Error: + # Ignore errors since vars will be reset below + pass + + # Reset db objects + self.db_cursor = None + self.db_connection = None + + def _get_flag(self, flag_name): + """Get flag for self.ticket_id from osTicket, returns str.""" + flag_value = None + self._verify_ticket_id() + + # Build SQL cmd + sql_cmd = ( + f"SELECT `{flag_name}` FROM `{SQL['DB']}`.`{TABLE_TICKET}` " + f"WHERE `{TABLE_TICKET}`.`ticket_id` = {self.ticket_id};" + ) + + # Run SQL cmd + try: + self.db_cursor.execute(sql_cmd) + for s in self.db_cursor: + flag_value = s[0] + except mariadb.errors.Error as err_msg: + print_error(err_msg) + self.errors = True + + # Done + return str(flag_value) + + def _get_ticket_field(self, ticket_id, field_name): + """Get field for ticket_id from osTicket, returns str.""" + field_data = None + + # Build SQL cmd + sql_cmd = ( + f"SELECT {field_name} FROM `{SQL['DB']}`.`{TABLE_TICKET}` " + f"WHERE `{TABLE_TICKET}`.`ticket_id` = {ticket_id};" + ) + + # Lookup data + # NOTE: If multiple entries are found it will return the last + try: + self.db_cursor.execute(sql_cmd) + for result in self.db_cursor: + field_data = result[0] + except mariadb.errors.Error as err_msg: + # Show error and return None + print_error(err_msg) + + # Done + return field_data + + def _set_flag(self, flag_name, flag_value): + """Set flag_name to flag_value for ticket_id in osTicket. + + NOTE: This will overwrite any existing value. + """ + self._verify_ticket_id() + sql_cmd = ( + f"UPDATE `{SQL['DB']}`.`{TABLE_TICKET}` " + f"SET `{flag_name}` = '{flag_value}' " + f"WHERE `{TABLE_TICKET}`.`ticket_id` = {self.ticket_id};" + ) + + # Run SQL cmd + try: + self.db_cursor.execute(sql_cmd) + except mariadb.errors.Error as err_msg: + print_error(err_msg) + self.errors = True + + def _verify_ticket_id(self): + """Verify that ticket_id has been set.""" + if not self.ticket_id: + LOG.error('Ticket ID not set') + raise RuntimeError('Ticket ID not set') + + def select_ticket(self): + """Set ticket number and name from osTicket DB.""" + print_standard('Connecting to osTicket...') + + # Connect + while True: + try: + self._connect(silent=False) + except (mariadb.errors.Error, RuntimeError): + print_warning('Failed to connect to osTicket') + if not ask('Try again?'): + print_standard('Integration disabled for this session') + self.disabled = True + return + else: + # Connection successful + break + + # Main loop + while self.ticket_id is None: + print_standard(' ') + _id = input_text('Enter ticket number (or leave blank to disable): ') + _id = _id.strip() + + # Nothing entered + if not _id and ask('Disable osTicket integration for this session?'): + self.disabled = True + break + + # Invalid ID entered + if not _id.isnumeric(): + continue + + # Valid ID entered, lookup name + _name = self._get_ticket_field(_id, 'name') + + # Invalid ticket selected + if _name is None: + print_error(f'Ticket #{_id} not found') + continue + + # Valid ticket selected, lookup subject + _subject = self._get_ticket_field(_id, 'subject') + + # Verify selection + print_colored( + ['You have selected ticket', f'#{_id}', _name], + [None, 'BLUE', None], + ) + print_colored(f' {_subject}', 'CYAN') + print_standard(' ') + if ask('Is this correct?'): + self.ticket_id = _id + self.ticket_name = _name + + # Done + self._disconnect() + + def set_cpu_max_temp(self, temp): + """Set CPU max temp in osTicket for ticket_id.""" + LOG.info(f'Setting max CPU temp to {temp}') + self._connect() + self._set_flag(FLAG_MAX_TEMP, temp) + self._disconnect() + + def set_flag_failed(self, flag_name): + """Set flag as failed in osTicket for ticket_id.""" + LOG.warning(f'Setting osTicket {flag_name} checkbox to FAILED') + real_flag_name = FLAG_CPU if flag_name == 'CPU' else FLAG_DISK + self._connect() + + # Set flag + self._set_flag(real_flag_name, FLAG_CODES['Fail']) + + # Done + self._disconnect() + + def set_flag_passed(self, flag_name): + """Set flag as passed in osTicket for ticket_id. + + NOTE: This will not overwrite a failed status. + """ + real_flag_name = FLAG_CPU if flag_name == 'CPU' else FLAG_DISK + self._connect() + + # Bail if flag checkbox set as FAILED + if self._get_flag(real_flag_name) == str(FLAG_CODES['Fail']): + print_warning( + f'Not replacing osTicket {flag_name} checkbox FAILED value', + ) + return + + # Current value != to FAILED, updating checkbox + LOG.info(f'Setting osTicket {flag_name} checkbox to PASSED') + self._set_flag(real_flag_name, FLAG_CODES['Pass']) + + # Done + self._disconnect() + + +# Functions +def get_test_station_name(): + """Get test station name from hostname, returns str. + + NOTES: This is quite broad and may include false-positives. + If not a test station then an empty string is returned. + """ + hostname = socket.getfqdn() + + # Check if this is a test station + if TEST_STATIONS['Domain'] in hostname: + hostname = hostname.replace(TEST_STATIONS['Domain'], '') + if hostname.lower() in TEST_STATIONS['Known Names']: + hostname = TEST_STATIONS['Known Names'][hostname.lower()] + else: + hostname = '' + + # Done + return hostname + + +def pad_with_dots(text, pad_right=False): + """Replace space padding with dots, returns str. + + NOTE: This is a dumb hack to better align text in osTicket. + """ + text = str(text) + text = text.replace(' ', '..') + if '.' in text: + if pad_right: + text = f'{text}.' + else: + text = f'.{text}' + + # Done + return text + + +if __name__ == '__main__': + print("This file is not meant to be called directly.")