Finished osTicket checkbox functions

This commit is contained in:
2Shirt 2020-01-10 16:29:20 -07:00
parent df2a7c03c1
commit e9e19053f4
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C
5 changed files with 354 additions and 176 deletions

View file

@ -1,63 +0,0 @@
# Wizard Kit: Settings - osTicket
OSTICKET = {
'Color Codes': {
'Normal': '0',
'Contact': '1',
'Diags': '2',
'Diags FAIL': '3',
},
'CPU Flag': {
'Name': 'zTemps',
'Pass': 1,
'Fail': 2,
},
'CPU Temp': {
'Name': 'zMaxTemp',
},
'Database': {
'Name': 'osticket',
'User': 'wizardkit',
'Pass': 'U9bJnF9eamVkfsVw',
'Port': '3306',
},
'Disk Flag': {
'Name': 'zHDTune',
'Pass': 1,
'Fail': 2,
},
'SSH': {
'Host': 'osticket.1201.com',
'Port': '22',
'User': 'sql_tunnel',
},
'Staff': {
'ID': '23',
'Name': 'Wizard Kit',
},
'Tables': {
'Response': 'ost_ticket_response',
'Ticket': 'ost_ticket',
},
}
TEST_STATIONS = {
'bender': 'Bender',
'combine': 'Combine',
'control': 'Control',
'cortana': 'Cortana',
'data': 'Data',
'glados': 'GLaDOS',
'locutus': 'Locutus',
'lore': 'Lore',
'sex-robot': 'Sex-Robot',
'shodan': 'Shodan',
'six': 'Six',
'skynet': 'Skynet',
'supremo': 'Supremo',
'unicron': 'Unicron',
}
if __name__ == '__main__':
print("This file is not meant to be called directly.")
# vim: sts=2 sw=2 ts=2

View file

@ -13,6 +13,7 @@ from wk import kit
from wk import log
from wk import net
from wk import os
from wk import osticket
from wk import std
from wk import sw
from wk import tmux

View file

@ -0,0 +1,37 @@
"""Wizard Kit: Config - osTicket"""
# vim: sts=2 sw=2 ts=2
SQL = {
'DB': 'osticket',
'Host': 'osticket.1201.com',
'Port': '3306',
'User': 'wizardkit',
'Pass': 'U9bJnF9eamVkfsVw',
}
STAFF = {
'ID': '23',
'Name': 'Wizard Kit',
}
TEST_STATIONS = {
# Domain will be stripped from the FQDN so the initial '.' is included
'Domain': '.1201.com',
# The rest are used to get the test-station name with proper capitalization
'bender': 'Bender',
'combine': 'Combine',
'control': 'Control',
'cortana': 'Cortana',
'data': 'Data',
'deepmind': 'DeepMind',
'glados': 'GLaDOS',
'locutus': 'Locutus',
'lore': 'Lore',
'sex-robot': 'Sex-Robot',
'shodan': 'Shodan',
'six': 'Six',
'skynet': 'Skynet',
'supremo': 'Supremo',
'unicron': 'Unicron',
}
if __name__ == '__main__':
print("This file is not meant to be called directly.")

View file

@ -1,113 +0,0 @@
"""WizardKit: osTicket Functions"""
# vim: sts=2 sw=2 ts=2
import atexit
import socket
import mysql.connector as mariadb
# Classes
class osTicket(): # pylint: disable=invalid-name
"""Class to track osTicket data and functions."""
def __init__(self):
self.db_connection = None
self.db_cursor = None
self.disabled = False
self.tunnel_proc = None
def connect(self, silent=True):
"""Establish connection to osTicket via SSH tunnel."""
# TODO connect
def convert_report(self, name, report):
"""Convert report into an osTicket friendly format, returns list."""
out_report = []
#TODO: Convert report
def disconnect(self):
"""Close osTicket connection."""
# TODO: disconnect
def get_flag(self, ticket_id, flag_name):
"""Get flag for ticket_id from osTicket, returns int."""
# TODO Get flag value, is it a consistent type? ^^
def get_ticket_id(self):
"""Get ticket number and name from osTicket DB."""
# TODO Get ticket number and name, save to self
def get_ticket_field(self, ticket_id, field):
"""Get field for ticket_id from osTicket, returns str."""
# TODO: Get ticket field
def set_cpu_failed(self, ticket_id):
"""Set CPU as failed in osTicket for ticket_id."""
# TODO Set CPU failed
def set_cpu_passed(self, ticket_id):
"""Set CPU as passed in osTicket for ticket_id.
NOTE: This will not overwrite a failed status.
"""
# TODO Set CPU passed
def set_disk_failed(self, ticket_id):
"""Set disk as failed in osTicket for ticket_id."""
# TODO Set disk failed
def set_disk_passed(self, ticket_id):
"""Set disk as passed in osTicket for ticket_id.
NOTE: This will not overwrite a failed status.
"""
# TODO Set disk passed
def set_flag(self, ticket_id, flag_name, flag_value):
"""Set flag_name to flag_value for ticket_id in osTicket.
NOTE: This will overwrite any existing value.
"""
# TODO: Set flag value
# Functions
def get_test_station_name():
"""Get test station name from hostname, returns str.
NOTES: This is quite broad and may include false-positives.
If not a test station then an empty string is returned.
"""
hostname = socket.getfqdn()
# Check if this is a test station
if TEST_STATIONS['Domain'] in hostname:
hostname = hostname.replace(TEST_STATIONS['Domain'], '')
if hostname.lower() in TEST_STATIONS['Known Names']:
hostname = TEST_STATIONS['Known Names'][hostname.lower()]
else:
hostname = ''
# Done
return hostname
def pad_with_dots(text, pad_right=False):
"""Replace space padding with dots, returns str.
NOTE: This is a dumb hack to better align text in osTicket.
"""
text = str(text)
text = text.replace(' ', '..')
if '.' in text:
if pad_right:
text = f'{text}.'
else:
text = f'.{text}'
# Done
return text
if __name__ == '__main__':
print("This file is not meant to be called directly.")

316
scripts/wk/osticket.py Normal file
View file

@ -0,0 +1,316 @@
"""WizardKit: osTicket Functions"""
# vim: sts=2 sw=2 ts=2
import atexit
import logging
import socket
import mysql.connector as mariadb
from wk.cfg.osticket import SQL, STAFF, TEST_STATIONS
from wk.std import (
ask,
input_text,
print_colored,
print_error,
print_standard,
print_warning,
sleep,
)
# STATIC_VARIABLES
LOG = logging.getLogger(__name__)
FLAG_CODES = {
'Pass': 1,
'Fail': 2,
}
FLAG_CPU = 'zTemps'
FLAG_DISK = 'zHDTune'
FLAG_MAX_TEMP = 'zMaxTemp'
RESPONSE_COLOR_CODES = {
'Normal': '0',
'Contact': '1',
'Diags': '2',
'Diags FAIL': '3',
}
TABLE_RESPONSE = 'ost_ticket_response'
TABLE_TICKET = 'ost_ticket'
# Classes
class osTicket(): # pylint: disable=invalid-name
"""Class to track osTicket data and functions."""
def __init__(self):
self.db_connection = None
self.db_cursor = None
self.disabled = False
self.errors = False
self.ticket_id = None
self.ticket_name = None
# Ensure connection is closed atexit
atexit.register(self._disconnect)
def _connect(self, silent=True):
"""Establish connection to osTicket."""
if self.disabled:
return
# Connect to database
for i in range(3):
i += 1
try:
self.db_connection = mariadb.connect(
host=SQL['Host'],
port=SQL['Port'],
database=SQL['DB'],
user=SQL['User'],
password=SQL['Pass'],
)
self.db_cursor = self.db_connection.cursor()
except mariadb.errors.InterfaceError:
# Network issue? try again
sleep(2)
except mariadb.errors.Error:
# Bad creds or other SQL error, bail
break
else:
# Connection established
break
# Raise exception if necessary
if self.db_cursor is None and not silent:
LOG.error('Failed to connect to osTicket database')
raise RuntimeError('Failed to connect to osTicket database')
def _convert_report(self, name, report):
"""Convert report into an osTicket friendly format, returns list."""
out_report = []
#TODO: Convert report
def _disconnect(self):
"""Close osTicket connection."""
for db_obj in (self.db_cursor, self.db_connection):
if db_obj:
try:
db_obj.close()
except mariadb.errors.Error:
# Ignore errors since vars will be reset below
pass
# Reset db objects
self.db_cursor = None
self.db_connection = None
def _get_flag(self, flag_name):
"""Get flag for self.ticket_id from osTicket, returns str."""
flag_value = None
self._verify_ticket_id()
# Build SQL cmd
sql_cmd = (
f"SELECT `{flag_name}` FROM `{SQL['DB']}`.`{TABLE_TICKET}` "
f"WHERE `{TABLE_TICKET}`.`ticket_id` = {self.ticket_id};"
)
# Run SQL cmd
try:
self.db_cursor.execute(sql_cmd)
for s in self.db_cursor:
flag_value = s[0]
except mariadb.errors.Error as err_msg:
print_error(err_msg)
self.errors = True
# Done
return str(flag_value)
def _get_ticket_field(self, ticket_id, field_name):
"""Get field for ticket_id from osTicket, returns str."""
field_data = None
# Build SQL cmd
sql_cmd = (
f"SELECT {field_name} FROM `{SQL['DB']}`.`{TABLE_TICKET}` "
f"WHERE `{TABLE_TICKET}`.`ticket_id` = {ticket_id};"
)
# Lookup data
# NOTE: If multiple entries are found it will return the last
try:
self.db_cursor.execute(sql_cmd)
for result in self.db_cursor:
field_data = result[0]
except mariadb.errors.Error as err_msg:
# Show error and return None
print_error(err_msg)
# Done
return field_data
def _set_flag(self, flag_name, flag_value):
"""Set flag_name to flag_value for ticket_id in osTicket.
NOTE: This will overwrite any existing value.
"""
self._verify_ticket_id()
sql_cmd = (
f"UPDATE `{SQL['DB']}`.`{TABLE_TICKET}` "
f"SET `{flag_name}` = '{flag_value}' "
f"WHERE `{TABLE_TICKET}`.`ticket_id` = {self.ticket_id};"
)
# Run SQL cmd
try:
self.db_cursor.execute(sql_cmd)
except mariadb.errors.Error as err_msg:
print_error(err_msg)
self.errors = True
def _verify_ticket_id(self):
"""Verify that ticket_id has been set."""
if not self.ticket_id:
LOG.error('Ticket ID not set')
raise RuntimeError('Ticket ID not set')
def select_ticket(self):
"""Set ticket number and name from osTicket DB."""
print_standard('Connecting to osTicket...')
# Connect
while True:
try:
self._connect(silent=False)
except (mariadb.errors.Error, RuntimeError):
print_warning('Failed to connect to osTicket')
if not ask('Try again?'):
print_standard('Integration disabled for this session')
self.disabled = True
return
else:
# Connection successful
break
# Main loop
while self.ticket_id is None:
print_standard(' ')
_id = input_text('Enter ticket number (or leave blank to disable): ')
_id = _id.strip()
# Nothing entered
if not _id and ask('Disable osTicket integration for this session?'):
self.disabled = True
break
# Invalid ID entered
if not _id.isnumeric():
continue
# Valid ID entered, lookup name
_name = self._get_ticket_field(_id, 'name')
# Invalid ticket selected
if _name is None:
print_error(f'Ticket #{_id} not found')
continue
# Valid ticket selected, lookup subject
_subject = self._get_ticket_field(_id, 'subject')
# Verify selection
print_colored(
['You have selected ticket', f'#{_id}', _name],
[None, 'BLUE', None],
)
print_colored(f' {_subject}', 'CYAN')
print_standard(' ')
if ask('Is this correct?'):
self.ticket_id = _id
self.ticket_name = _name
# Done
self._disconnect()
def set_cpu_max_temp(self, temp):
"""Set CPU max temp in osTicket for ticket_id."""
LOG.info(f'Setting max CPU temp to {temp}')
self._connect()
self._set_flag(FLAG_MAX_TEMP, temp)
self._disconnect()
def set_flag_failed(self, flag_name):
"""Set flag as failed in osTicket for ticket_id."""
LOG.warning(f'Setting osTicket {flag_name} checkbox to FAILED')
real_flag_name = FLAG_CPU if flag_name == 'CPU' else FLAG_DISK
self._connect()
# Set flag
self._set_flag(real_flag_name, FLAG_CODES['Fail'])
# Done
self._disconnect()
def set_flag_passed(self, flag_name):
"""Set flag as passed in osTicket for ticket_id.
NOTE: This will not overwrite a failed status.
"""
real_flag_name = FLAG_CPU if flag_name == 'CPU' else FLAG_DISK
self._connect()
# Bail if flag checkbox set as FAILED
if self._get_flag(real_flag_name) == str(FLAG_CODES['Fail']):
print_warning(
f'Not replacing osTicket {flag_name} checkbox FAILED value',
)
return
# Current value != to FAILED, updating checkbox
LOG.info(f'Setting osTicket {flag_name} checkbox to PASSED')
self._set_flag(real_flag_name, FLAG_CODES['Pass'])
# Done
self._disconnect()
# Functions
def get_test_station_name():
"""Get test station name from hostname, returns str.
NOTES: This is quite broad and may include false-positives.
If not a test station then an empty string is returned.
"""
hostname = socket.getfqdn()
# Check if this is a test station
if TEST_STATIONS['Domain'] in hostname:
hostname = hostname.replace(TEST_STATIONS['Domain'], '')
if hostname.lower() in TEST_STATIONS['Known Names']:
hostname = TEST_STATIONS['Known Names'][hostname.lower()]
else:
hostname = ''
# Done
return hostname
def pad_with_dots(text, pad_right=False):
"""Replace space padding with dots, returns str.
NOTE: This is a dumb hack to better align text in osTicket.
"""
text = str(text)
text = text.replace(' ', '..')
if '.' in text:
if pad_right:
text = f'{text}.'
else:
text = f'.{text}'
# Done
return text
if __name__ == '__main__':
print("This file is not meant to be called directly.")