WizardKit/.bin/Scripts/functions/osticket.py

347 lines
9.5 KiB
Python

# Wizard Kit: Functions - osTicket
import mysql.connector as mariadb
from functions.common import *
from settings.osticket import *
# Regex
REGEX_BLOCK_GRAPH = re.compile(r'(▁|▂|▃|▄|▅|▆|▇|█)')
REGEX_NVME_SMART_ATTRIBUTES = re.compile(r'^\s*(\d+) / (\w+): (.{28})(.*)$')
REGEX_TEMPS = re.compile(r'^\s*(.*?)\s+(idle.*)$')
# Classes
class osTicket():
"""Class to track osTicket data and functions."""
def __init__(self):
self.db_connection = None
self.db_cursor = None
self.errors = False
self.tunnel_proc = None
def connect(self):
"""Establish connection to osTicket via a SSH tunnel."""
cmd = [
'ssh', '-N',
'-p', OSTICKET['SSH']['Port'],
'-L3306:127.0.0.1:{Port}'.format(**OSTICKET['Database']),
'{User}@{Host}'.format(**OSTICKET['SSH']),
]
# Only open tunnel if one doesn't exist
if self.tunnel_proc is None or self.tunnel_proc.poll() is not None:
self.tunnel_proc = popen_program(cmd)
# Connect to database
for x in range(5):
sleep(2)
try:
self.db_connection = mariadb.connect(
user=OSTICKET['Database']['User'],
password=OSTICKET['Database']['Pass'],
database=OSTICKET['Database']['Name'],
)
self.db_cursor = self.db_connection.cursor()
except Exception:
# TODO: Refine exception handling
pass
else:
# Connection established
break
def convert_report(name, test):
"""Convert report into an osTicket friendly format, returns list."""
out_report = []
source_report = test.source_report
status = strip_colors(test.status)
status = status.replace(test.label, '').strip()
# Header
index = 1
if name == 'NVMe / SMART':
out_report.append('{} ({})'.format(name, status))
if not source_report:
index = 0
source_report = test.dev.generate_attribute_source_report()
else:
out_report.append('{} ({})'.format(strip_colors(source_report[0]), status))
# Body
for line in source_report[index:]:
# Remove colors and leading spaces
line = strip_colors(line)
if line[:2] == ' ':
line = line[2:]
# Test-specific modifications
if name == 'Prime95':
r = REGEX_TEMPS.match(line)
if r:
_sensor = '{:<20}'.format(r.group(1))
_temps = r.group(2)
line = '{} {}'.format(
pad_with_dots(_sensor, pad_right=True),
_temps)
elif name == 'NVMe / SMART':
r = REGEX_NVME_SMART_ATTRIBUTES.match(line)
if r:
_dec = '{:>3}'.format(r.group(1))
_hex = r.group(2)
_atr = r.group(3).strip()
_val = '{:<20}'.format(r.group(4))
line = '{}/{}: {} {}'.format(
_hex,
pad_with_dots(_dec),
pad_with_dots(_val, pad_right=True),
_atr)
elif name == 'I/O Benchmark':
line = REGEX_BLOCK_GRAPH.sub('', line)
line = line.strip()
if not line:
continue
# Remove extra spaces
line = line.strip()
line = re.sub(r'(\s+)', ' ', line)
# Add line to report
out_report.append(line)
# Done
return out_report
def generate_report(dev):
"""Generate device report for osTicket, returns list."""
report = []
results = get_device_overall_results(dev)
# Header
if results['Full Diag']:
report.append(
'{Dev Type} hardware diagnostic tests: {Status}'.format(**results))
report.append(' ')
# Device
report.append(dev.description)
report.append(' ')
# Test reports
for name, test in dev.tests.items():
report.extend(convert_report(name, test))
if name == 'I/O Benchmark':
# TODO: Create PNG graph and upload to imgur/Nextcloud
report.append('Imgur: TODO')
report.append('Nextcloud: TODO')
report.append(' ')
# Volumes
if results['Dev Type'] == 'Disk':
# TODO: Mount all volumes and extend report
report.append('Volumes:')
report.append('TODO')
report.append(' ')
# Asterisk
if results['Asterisk']:
report.append('* NOTE: One or more tests were not run on this device')
# Done
return report
def disconnect(self, reset_errors=False):
"""Close osTicket connection."""
try:
self.db_cursor.close()
self.db_connection.close()
except Exception:
# TODO: Fix exception handling
pass
# Reset errors
if reset_errors:
self.errors = False
# Reset vars
self.db_cursor = None
self.db_connection = None
def get_ticket_name(self, ticket_id):
"""Lookup ticket and return name as str."""
name = None
sql_cmd = "SELECT name FROM `{Ticket}`".format(**OSTICKET['Tables'])
sql_cmd += " WHERE `ticket_id` = `{}`".format(ticket_id)
sql_cmd += ";"
# TODO: Is the ';' needed above? It wasn't in the prev version??
# Lookup name
# NOTE: If multiple entries are found it will return the last
try:
self.db_cursor.execute(sql_cmd)
for s in self.db_cursor:
name = s[0]
except Exception:
# TODO: Fix exception handling
self.errors = True
def get_ticket_number(self):
"""Get ticket number and confirm with name from osTicket DB."""
ticket_number = None
# Main loop
while ticket_number is None:
print_standard(' ')
_input = input('Enter ticket number (or leave blank to disable): ')
_input = _input.strip()
# No ticket ID entered
if re.match(r'^\s*$', _input):
if ask('Disable osTicket integration for this run?'):
break
# Invalid ID entered
if not re.match(r'^([0-9]+)$', _input):
continue
# Valid ID entered, lookup name and verify
_name = self.get_ticket_name(_input)
if _name:
print_standard('You have selected ticket #{} {}'.format(
_input, _name))
if ask('Is this correct?'):
ticket_number = _input
# Done
return ticket_number
def post_device_results(self, dev, ticket_id):
"""Generate osTicket friendly report and post as response to ticket."""
response = self.generate_report(dev)
post_response(response, ticket_id)
def post_response(self, response, ticket_id):
"""Post a reply to a ticket in osTicket."""
self.connect()
# Build SQL cmd
sql_cmd = "INSERT INTO `{Name}`.`{Response}`".format(
**OSTICKET['Database'], **OSTICKET['Tables'])
sql_cmd += " (ticket_id, staff_id, staff_name, response, created)"
sql_cmd += " VALUES ("
sql_cmd += " '{}',".format(ticket_id)
sql_cmd += " '{ID}', '{Name}',".format(**OSTICKET['Staff'])
sql_cmd += " '{}',".format(response)
sql_cmd += " '{}'".format(time.strftime("%Y-%m-%d %H:%M:%S"))
sql_cmd += " );"
# Run SQL cmd
try:
self.cursor.execute(sql_cmd)
except Exception:
# TODO: Fix exception handling
self.errors = True
# Done
self.disconnect()
def set_disk_failed(self, ticket_id):
"""Mark disk as failed in osTicket."""
self.set_flag(
ticket_id,
OSTICKET['Disk Flag']['Name'],
OSTICKET['Disk Flag']['Fail'])
def set_disk_passed(self, ticket_id):
"""Mark disk as passed in osTicket."""
self.set_flag(
ticket_id,
OSTICKET['Disk Flag']['Name'],
OSTICKET['Disk Flag']['Pass'])
def set_flag(self, ticket_id, flag_name, flag_value):
"""Set flag in osTicket."""
self.connect()
# Build SQL cmd
sql_cmd = "UPDATE `{Name}`.`{Ticket}`".format(
**OSTICKET['Database'], **OSTICKET['Tables'])
sql_cmd += " SET `{}` = `{}`".format(flag_name, flag_value)
sql_cmd += " WHERE `{Ticket}`.`ticket_id` = `{ticket_id}`".format(
ticket_id=ticket_id, **OSTICKET['Tables'])
sql_cmd += ";"
# Run SQL cmd
try:
self.cursor.execute(sql_cmd)
except Exception:
# TODO: Fix exception handling
self.errors = True
# Done
self.disconnect()
# Functions
def get_device_overall_results(dev):
"""Get overall results from tests for device, returns dict."""
results = {
'Dev Type': 'Unknown',
'Full Diag': False,
'Asterisk': None,
'Failed': 0,
'N/A': 0,
'Passed': 0,
'Status': 'Unknown',
}
# Get test list for device type
test_list = []
if isinstance(dev, CpuObj):
results['Dev Type'] = 'CPU'
test_list = TESTS_CPU
elif isinstance(dev, DiskObj):
results['Dev Type'] = 'Disk'
test_list = TESTS_DISK
else:
raise GenericError('Unrecognized device type.')
# Check if a full diag was run (i.e. all dev tests were enabled)
results['Full Diag'] = len(dev.tests) == len(test_list)
# Tally test results
for test in dev.tests.value():
if test.failed:
results['Failed'] += 1
if test.passed:
results['Passed'] += 1
if 'N/A' in test.status:
results['N/A'] += 1
# Set overall status
if results['Failed'] > 0:
dev.checkbox = False
results['Status'] = 'FAILED'
elif results['Passed'] + results['N/A'] == len(dev.tests):
dev.checkbox = True
results['Status'] = 'PASSED'
else:
results['Status'] = 'UNKNOWN'
if results['Full Diag'] and results['N/A'] > 0:
results['Asterisk'] = True
results['Status'] += '*'
# Done
return results
def pad_with_dots(s, pad_right=False):
"""Replace space padding with dots, returns str."""
s = str(s).replace(' ', '..')
if '.' in s:
if pad_right:
s = s + '.'
else:
s = '.' + s
return s
if __name__ == '__main__':
print("This file is not meant to be called directly.")
# vim: sts=2 sw=2 ts=2