"""WizardKit: osTicket Functions""" # vim: sts=2 sw=2 ts=2 import atexit import logging import socket import time import mysql.connector as mariadb from wk import std from wk.cfg.osticket import SQL, STAFF, TEST_STATIONS # STATIC_VARIABLES LOG = logging.getLogger(__name__) FLAG_CODES = { 'Pass': 1, 'Fail': 2, } FLAG_CPU = 'zTemps' FLAG_DISK = 'zHDTune' FLAG_MAX_TEMP = 'zMaxTemp' RESPONSE_COLOR_CODES = { 'Normal': '0', 'Contact': '1', 'Diags': '2', 'Diags FAIL': '3', } TABLE_RESPONSE = 'ost_ticket_response' TABLE_TICKET = 'ost_ticket' # Classes class osTicket(): # pylint: disable=invalid-name """Class to track osTicket data and functions.""" def __init__(self): self.db_connection = None self.db_cursor = None self.disabled = False self.errors = False self.note = None self.ticket_id = None self.ticket_name = None # Ensure connection is closed atexit atexit.register(self._disconnect) def _connect(self, silent=True): """Establish connection to osTicket.""" if self.disabled: return # Connect to database for i in range(3): i += 1 try: self.db_connection = mariadb.connect( host=SQL['Host'], port=SQL['Port'], database=SQL['DB'], user=SQL['User'], password=SQL['Pass'], ) self.db_cursor = self.db_connection.cursor() except mariadb.errors.InterfaceError: # Network issue? try again std.sleep(2) except mariadb.errors.Error: # Bad creds or other SQL error, bail break else: # Connection established break # Raise exception if necessary if self.db_cursor is None: LOG.error('Failed to connect to osTicket database') if silent: # Don't raise exceptions, just disable ost self.disabled = True self.errors = True else: raise RuntimeError('Failed to connect to osTicket database') def _disconnect(self): """Close osTicket connection.""" for db_obj in (self.db_cursor, self.db_connection): if db_obj: try: db_obj.close() except mariadb.errors.Error: # Ignore errors since vars will be reset below pass # Reset db objects self.db_cursor = None self.db_connection = None def _get_flag(self, flag_name): """Get flag for self.ticket_id from osTicket, returns str.""" flag_value = None self._verify_ticket_id() # Build SQL cmd sql_cmd = ( f"SELECT `{flag_name}` FROM `{SQL['DB']}`.`{TABLE_TICKET}` " f"WHERE `{TABLE_TICKET}`.`ticket_id` = {self.ticket_id};" ) # Run SQL cmd try: self.db_cursor.execute(sql_cmd) for s in self.db_cursor: flag_value = s[0] except mariadb.errors.Error as err_msg: std.print_error(err_msg) self.errors = True # Done return str(flag_value) def _get_ticket_field(self, ticket_id, field_name): """Get field for ticket_id from osTicket, returns str.""" field_data = None # Build SQL cmd sql_cmd = ( f"SELECT {field_name} FROM `{SQL['DB']}`.`{TABLE_TICKET}` " f"WHERE `{TABLE_TICKET}`.`ticket_id` = {ticket_id};" ) # Lookup data # NOTE: If multiple entries are found it will return the last try: self.db_cursor.execute(sql_cmd) for result in self.db_cursor: field_data = result[0] except mariadb.errors.Error as err_msg: # Show error and return None std.print_error(err_msg) # Done return field_data def _set_flag(self, flag_name, flag_value): """Set flag_name to flag_value for ticket_id in osTicket. NOTE: This will overwrite any existing value. """ self._verify_ticket_id() sql_cmd = ( f"UPDATE `{SQL['DB']}`.`{TABLE_TICKET}` " f"SET `{flag_name}` = '{flag_value}' " f"WHERE `{TABLE_TICKET}`.`ticket_id` = {self.ticket_id};" ) # Run SQL cmd try: self.db_cursor.execute(sql_cmd) except mariadb.errors.Error as err_msg: std.print_error(err_msg) self.errors = True def _verify_ticket_id(self): """Verify that ticket_id has been set.""" if not self.ticket_id: LOG.error('Ticket ID not set') raise RuntimeError('Ticket ID not set') def add_note(self): """Add note to be included in osTicket replies.""" lines = [] # Instructions std.print_standard('Please enter the additional information for this ticket') std.print_info(' (End note with an empty line)') std.print_standard(' ') # Get note while True: text = std.input_text('> ') if not text: break lines.append(text.strip()) # Save note self.note = lines.pop(0) for line in lines: self.note += f'\n...{line}' def init(self): """Revert to defaults.""" self._disconnect() self.disabled = False self.errors = False self.note = None self.ticket_id = None self.ticket_name = None def post_response(self, response, color='Normal'): """Post a reply to a ticket in osTicket.""" lines = [] test_station = get_test_station_name() self._connect(silent=True) self._verify_ticket_id() # Bail if disabled if self.disabled: return # Format response if test_station: lines.append(f'[Test-Station: {test_station}]') lines.append(f'[Report for ticket #{self.ticket_id} {self.ticket_name}]') if self.note: lines.append(f'[Note] {self.note}\n') lines.append(str(response)) response = '\n'.join(lines) response = std.strip_colors(response) response = response.replace("`", "'").replace("'", "\\'") # Build SQL cmd sql_cmd = ( f"INSERT INTO `{SQL['DB']}`.`{TABLE_RESPONSE}` " f"(ticket_id, staff_id, staff_name, response, created, code) " f"VALUES (" f" '{self.ticket_id}'," f" '{STAFF['ID']}'," f" '{STAFF['Name']}'," f" '{response}'," f" '{time.strftime('%Y-%m-%d %H:%M:%S')}'," f" '{RESPONSE_COLOR_CODES.get(color, 'Normal')}'" f");" ) # Run SQL cmd try: self.db_cursor.execute(sql_cmd) except mariadb.errors.Error: self.errors = True # Done self._disconnect() def select_ticket(self): """Set ticket number and name from osTicket DB.""" std.print_standard('Connecting to osTicket...') # Bail if disabled if self.disabled: return # Connect while True: try: self._connect(silent=False) except (mariadb.errors.Error, RuntimeError): std.print_warning('Failed to connect to osTicket') if not std.ask('Try again?'): std.print_standard('Integration disabled for this session') self.disabled = True return else: # Connection successful break # Main loop while self.ticket_id is None: std.print_standard(' ') _id = std.input_text('Enter ticket number (or leave blank to disable): ') _id = _id.strip() # Nothing entered if not _id: print(' ') if std.ask('Disable osTicket integration for this session?'): self.disabled = True break # Invalid ID entered if not _id.isnumeric(): continue # Valid ID entered, lookup name _name = self._get_ticket_field(_id, 'name') # Invalid ticket selected if _name is None: std.print_error(f'Ticket #{_id} not found') continue # Valid ticket selected, lookup subject _subject = self._get_ticket_field(_id, 'subject') # Verify selection std.print_colored( ['You have selected ticket', f'#{_id}', _name], [None, 'BLUE', None], ) std.print_colored(f' {_subject}', 'CYAN') std.print_standard(' ') if std.ask('Is this correct?'): self.ticket_id = _id self.ticket_name = _name # Done self._disconnect() def set_cpu_max_temp(self, temp): """Set CPU max temp in osTicket for ticket_id. NOTE: This will not replace a higher temp value. """ LOG.info('Setting max CPU temp to %s', temp) self._connect(silent=True) # Bail if disabled if self.disabled: return # Compare to current temp current_temp = self._get_flag(FLAG_MAX_TEMP) if str(current_temp).isnumeric() and int(current_temp) > temp: std.print_warning('Not replacing higher temp in osTicket') self._disconnect() return # Update temp self._set_flag(FLAG_MAX_TEMP, int(temp)) # Done self._disconnect() def set_flag_failed(self, flag_name): """Set flag as failed in osTicket for ticket_id.""" LOG.warning('Setting osTicket %s checkbox to FAILED', flag_name) real_flag_name = FLAG_CPU if flag_name == 'CPU' else FLAG_DISK self._connect(silent=True) # Bail if disabled if self.disabled: return # Set flag self._set_flag(real_flag_name, FLAG_CODES['Fail']) # Done self._disconnect() def set_flag_passed(self, flag_name): """Set flag as passed in osTicket for ticket_id. NOTE: This will not overwrite a failed status. """ real_flag_name = FLAG_CPU if flag_name == 'CPU' else FLAG_DISK self._connect(silent=True) # Bail if disabled if self.disabled: return # Bail if flag checkbox set as FAILED if self._get_flag(real_flag_name) == str(FLAG_CODES['Fail']): std.print_warning( f'Not replacing osTicket {flag_name} checkbox FAILED value', ) self._disconnect() return # Current value != to FAILED, updating checkbox LOG.info('Setting osTicket %s checkbox to PASSED', flag_name) self._set_flag(real_flag_name, FLAG_CODES['Pass']) # Done self._disconnect() # Functions def get_test_station_name(): """Get test station name from hostname, returns str. NOTES: This is quite broad and may include false-positives. If not a test station then an empty string is returned. """ hostname = socket.getfqdn() # Check if this is a test station if TEST_STATIONS['Domain'] in hostname: hostname = hostname.replace(TEST_STATIONS['Domain'], '') if hostname.lower() in TEST_STATIONS['Known Names']: hostname = TEST_STATIONS['Known Names'][hostname.lower()] else: hostname = '' # Done return hostname def pad_with_dots(text, pad_right=False): """Replace space padding with dots, returns str. NOTE: This is a dumb hack to better align text in osTicket. """ text = str(text) text = text.replace(' ', '..') if '.' in text: if pad_right: text = f'{text}.' else: text = f'.{text}' # Done return text if __name__ == '__main__': print("This file is not meant to be called directly.")