WizardKit/.bin/Scripts/functions/osticket.py

468 lines
13 KiB
Python

# Wizard Kit: Functions - osTicket
import mysql.connector as mariadb
from functions.data import *
from functions.io_graph import *
from settings.osticket import *
# STATIC VARIABLES
KNOWN_DEV_TYPES = ('CPU', 'Disk')
# Regex
REGEX_BLOCK_GRAPH = re.compile(r'(▁|▂|▃|▄|▅|▆|▇|█)')
REGEX_NVME_SMART_ATTRIBUTES = re.compile(r'^\s*(\d+) / (\w+): (.{28})(.*)$')
REGEX_TEMPS = re.compile(r'^\s*(.*?)\s+(idle.*)$')
REGEX_SENSOR = re.compile(r'^(.*?)(\s*)$')
# Classes
class osTicket():
"""Class to track osTicket data and functions."""
def __init__(self, tests_cpu, tests_disk):
self.db_connection = None
self.db_cursor = None
self.disabled = False
self.errors = False
self.tests_cpu = tests_cpu
self.tests_disk = tests_disk
self.tunnel_proc = None
def connect(self):
"""Establish connection to osTicket via a SSH tunnel."""
cmd = [
'ssh', '-N',
'-p', OSTICKET['SSH']['Port'],
'-L3306:127.0.0.1:{Port}'.format(**OSTICKET['Database']),
'{User}@{Host}'.format(**OSTICKET['SSH']),
]
# Bail if disabled
if self.disabled:
return
# Only open tunnel if one doesn't exist
if self.tunnel_proc is None or self.tunnel_proc.poll() is not None:
self.tunnel_proc = popen_program(cmd)
# Connect to database
for x in range(5):
sleep(2)
try:
self.db_connection = mariadb.connect(
user=OSTICKET['Database']['User'],
password=OSTICKET['Database']['Pass'],
database=OSTICKET['Database']['Name'],
)
self.db_cursor = self.db_connection.cursor()
except mariadb.errors.InterfaceError:
# SSH issue?, try again
pass
except mariadb.errors.Error:
# Bad creds or other SQL error, bail
break
except Exception:
# Unknown error
break
else:
# Connection established
break
# Disable if necessary
if self.db_cursor is None:
self.disabled = True
self.tunnel_proc.kill()
def convert_report(self, name, test):
"""Convert report into an osTicket friendly format, returns list."""
out_report = []
source_report = test.report
status = strip_colors(test.status)
status = status.replace(test.label, '').strip()
# Header
index = 1
if name == 'NVMe / SMART':
out_report.append('{} ({})'.format(name, status))
if not source_report:
index = 0
source_report = test.dev.generate_attribute_report()
elif not source_report:
index = 0
out_report.append('{} ({})'.format(name, status))
else:
out_report.append('{} ({})'.format(strip_colors(source_report[0]), status))
# Body
for line in source_report[index:]:
# Remove colors and leading spaces
line = strip_colors(line)
if line[:2] == ' ':
line = line[2:]
# Test-specific modifications
if name == 'Prime95':
r = REGEX_TEMPS.match(line)
if r:
_sensor = '{:<20}'.format(r.group(1))
_temps = r.group(2)
r2 = REGEX_SENSOR.match(_sensor)
_sensor = r2.group(1)
_spacer = pad_with_dots(r2.group(2))
line = '{}{} {}'.format(_sensor, _spacer, _temps)
if line == 'Temps':
out_report.append(' ')
elif name == 'NVMe / SMART':
r = REGEX_NVME_SMART_ATTRIBUTES.match(line)
if r:
_dec = '{:>3}'.format(r.group(1))
_hex = r.group(2)
_atr = r.group(3).strip()
_val = '{:<20}'.format(r.group(4))
line = '{}/{}: {} {}'.format(
_hex,
pad_with_dots(_dec),
pad_with_dots(_val, pad_right=True),
_atr)
elif name == 'I/O Benchmark':
line = REGEX_BLOCK_GRAPH.sub('', line)
line = line.strip()
if not line:
continue
# Remove extra spaces
line = line.strip()
line = re.sub(r'(\s+)', ' ', line)
# Add line to report
out_report.append(line)
# Done
return out_report
def disconnect(self, reset_errors=False):
"""Close osTicket connection."""
try:
self.db_cursor.close()
self.db_connection.close()
except Exception:
# Ignore errors since vars will be reset below
pass
# Reset vars
self.db_cursor = None
self.db_connection = None
def generate_report(self, dev, ticket_id):
"""Generate device report for osTicket, returns list."""
report = []
results = self.get_device_overall_results(dev)
# Header
if results['Full Diag']:
report.append(
'{Dev Type} hardware diagnostic tests: {Status}'.format(**results))
report.append(' ')
# Device
report.append(dev.description)
report.append(' ')
# Test reports
for name, test in dev.tests.items():
report.extend(self.convert_report(name, test))
if name == 'I/O Benchmark':
# Create PNG graph
try:
graph_file = export_io_graph(dev)
except (AttributeError, KeyError):
report.append('Failed to export graph')
else:
# Upload to Imgur
try:
url = upload_to_imgur(graph_file)
report.append('Imgur: {}'.format(url))
except Exception:
report.append('Imgur: Failed to upload graph')
# Upload to Nextcloud
try:
url = upload_to_nextcloud(graph_file, ticket_id, dev.name)
report.append('Nextcloud: {}'.format(url))
except Exception:
report.append('Nextcloud: Failed to upload graph')
report.append(' ')
# Volumes
if results['Dev Type'] == 'Disk' and results['Full Diag']:
# Mount all volumes and extend report
report.append('Volumes:')
report.extend(self.generate_volume_report(dev, results))
report.append(' ')
# Asterisk
if results['Asterisk']:
report.append('* NOTE: One or more tests were not run on this device')
# Done
return report
def generate_volume_report(self, dev, results):
"""Mount all volumes for dev and generate report, returns list."""
report = []
mount_report = mount_volumes(
all_devices=False,
device_path='{}'.format(dev.path),
core_storage=results['Core'])
# Format report
for v_path, v_data in sorted(mount_report.items()):
label = v_data.get('label', '')
if label:
label = '"{}"'.format(label)
else:
# Ensure string type
label = ''
size = v_data.get('size', '')
if size:
size = '{} {}B'.format(size[:-1], size[-1:]).upper()
else:
size = 'UNKNOWN'
size_used = v_data.get('size_used', 'UNKNOWN').upper()
size_avail = v_data.get('size_avail', 'UNKNOWN').upper()
v_data = [v_path, label, size, size_used, size_avail]
v_data = [v.strip().replace(' ', '_') for v in v_data]
for i in range(len(v_data)):
pad = 8
if i < 2:
pad += 4 * (2 - i)
v_data[i] = pad_with_dots(
'{s:<{p}}'.format(s=v_data[i], p=pad),
pad_right=True)
v_data[-1] = re.sub(r'\.*$', '', v_data[-1])
v_data = [v.replace('_', ' ') for v in v_data]
report.append(
'{}..{}..Total..{}..(Used..{}..Free..{})'.format(*v_data))
# Done
return report
def get_device_overall_results(self, dev):
"""Get overall results from tests for device, returns dict."""
results = {
'Core': False,
'Dev Type': self.get_device_type(dev),
'Full Diag': False,
'Asterisk': None,
'Failed': 0,
'N/A': 0,
'OVERRIDE': 0,
'Passed': 0,
'Status': 'Unknown',
}
# Bail on unknown device type
if results['Dev Type'] not in KNOWN_DEV_TYPES:
raise GenericError(
'Unrecognized device type: {}.'.format(results['Dev Type']))
# Get test list for device type
test_list = []
if results['Dev Type'] == 'CPU':
test_list = self.tests_cpu
elif results['Dev Type'] == 'Disk':
test_list = self.tests_disk
# Check if a full diag was run (i.e. all dev tests were enabled)
results['Full Diag'] = len(dev.tests) == len(test_list)
# Tally test results
for test in dev.tests.values():
if test.failed:
results['Failed'] += 1
if test.passed:
results['Passed'] += 1
if 'N/A' in test.status:
results['N/A'] += 1
if 'OVERRIDE' in test.status:
results['OVERRIDE'] += 1
# Set overall status
if results['Failed'] > 0:
dev.checkbox = False
results['Status'] = 'FAILED'
elif results['Passed'] + results['N/A'] == len(dev.tests):
dev.checkbox = True
results['Status'] = 'PASSED'
else:
results['Status'] = 'UNKNOWN'
if results['Full Diag'] and results['N/A'] > 0:
results['Asterisk'] = True
results['Status'] += '*'
# Enable CoreStorage searches
results['Core'] = (results['Full Diag'] and
results['Passed']+results['N/A']+results['OVERRIDE'] == len(test_list))
# Done
return results
def get_device_type(self, dev):
"""Terrible hack to determine device type, returns str."""
# TODO: Fix with proper isinstance() call
type_str = str(dev.__class__)
if 'CpuObj' in type_str:
type_str = 'CPU'
elif 'DiskObj' in type_str:
type_str = 'Disk'
return type_str
def get_ticket_name(self, ticket_id):
"""Lookup ticket and return name as str."""
name = None
sql_cmd = "SELECT name FROM `{Ticket}`".format(**OSTICKET['Tables'])
sql_cmd += " WHERE `ticket_id` = {}".format(ticket_id)
sql_cmd += ";"
# Lookup name
# NOTE: If multiple entries are found it will return the last
self.db_cursor.execute(sql_cmd)
for s in self.db_cursor:
name = s[0]
# Done
return name
def get_ticket_number(self):
"""Get ticket number and confirm with name from osTicket DB."""
ticket_number = None
self.connect()
# Bail if disabled
if self.disabled:
return None
# Main loop
while ticket_number is None:
print_standard(' ')
_input = input('Enter ticket number (or leave blank to disable): ')
_input = _input.strip()
# No ticket ID entered
if re.match(r'^\s*$', _input):
if ask('Disable osTicket integration for this run?'):
break
# Invalid ID entered
if not re.match(r'^(\d+)$', _input):
continue
# Valid ID entered, lookup name and verify
try:
_name = self.get_ticket_name(_input)
except Exception:
# Ignore and return None below
break
if _name:
print_standard('You have selected ticket #{} {}'.format(
_input, _name))
if ask('Is this correct?'):
ticket_number = _input
# Done
self.disconnect()
return ticket_number
def post_device_results(self, dev, ticket_id):
"""Generate osTicket friendly report and post as response to ticket."""
response = self.generate_report(dev, ticket_id)
self.post_response(response, ticket_id)
def post_response(self, response, ticket_id):
"""Post a reply to a ticket in osTicket."""
self.connect()
# Bail if disabled
if self.disabled:
return
# Convert response to string
response = '\n'.join(response)
response = response.replace('`', '')
# Build SQL cmd
sql_cmd = "INSERT INTO `{Name}`.`{Response}`".format(
**OSTICKET['Database'], **OSTICKET['Tables'])
sql_cmd += " (ticket_id, staff_id, staff_name, response, created)"
sql_cmd += " VALUES ("
sql_cmd += " '{}',".format(ticket_id)
sql_cmd += " '{ID}', '{Name}',".format(**OSTICKET['Staff'])
sql_cmd += " '{}',".format(response)
sql_cmd += " '{}'".format(time.strftime("%Y-%m-%d %H:%M:%S"))
sql_cmd += " );"
# Run SQL cmd
try:
self.db_cursor.execute(sql_cmd)
except mariadb.errors.Error:
# Set self.errors to enable warning line on results screen
self.errors = True
# Done
self.disconnect()
def set_disk_failed(self, ticket_id):
"""Mark disk as failed in osTicket."""
self.set_flag(
ticket_id,
OSTICKET['Disk Flag']['Name'],
OSTICKET['Disk Flag']['Fail'])
def set_disk_passed(self, ticket_id):
"""Mark disk as passed in osTicket."""
self.set_flag(
ticket_id,
OSTICKET['Disk Flag']['Name'],
OSTICKET['Disk Flag']['Pass'])
def set_flag(self, ticket_id, flag_name, flag_value):
"""Set flag in osTicket."""
self.connect()
# Bail if disabled
if self.disabled:
return
# Build SQL cmd
sql_cmd = "UPDATE `{Name}`.`{Ticket}`".format(
**OSTICKET['Database'], **OSTICKET['Tables'])
sql_cmd += " SET `{}` = '{}'".format(flag_name, flag_value)
sql_cmd += " WHERE `{Ticket}`.`ticket_id` = {ticket_id}".format(
ticket_id=ticket_id, **OSTICKET['Tables'])
sql_cmd += ";"
# Run SQL cmd
try:
self.db_cursor.execute(sql_cmd)
except mariadb.errors.Error:
# Set self.errors to enable warning line on results screen
self.errors = True
# Done
self.disconnect()
# Functions
def pad_with_dots(s, pad_right=False):
"""Replace space padding with dots, returns str."""
s = str(s).replace(' ', '..')
if '.' in s:
if pad_right:
s = s + '.'
else:
s = '.' + s
return s
if __name__ == '__main__':
print("This file is not meant to be called directly.")
# vim: sts=2 sw=2 ts=2