601 lines
17 KiB
Python
601 lines
17 KiB
Python
# Wizard Kit: Functions - osTicket
|
|
|
|
import mysql.connector as mariadb
|
|
|
|
from functions.data import *
|
|
from functions.io_graph import *
|
|
from settings.osticket import *
|
|
|
|
|
|
# STATIC VARIABLES
|
|
KNOWN_DEV_TYPES = ('CPU', 'Disk')
|
|
|
|
|
|
# Regex
|
|
REGEX_BLOCK_GRAPH = re.compile(r'(▁|▂|▃|▄|▅|▆|▇|█)')
|
|
REGEX_NVME_SMART_ATTRIBUTES = re.compile(r'^\s*(\d+) / (\w+): (.{28})(.*)$')
|
|
REGEX_TEMPS = re.compile(r'^\s*(.*?)\s+(idle:)(.*)$')
|
|
REGEX_SENSOR = re.compile(r'^(.*?)(\s*)$')
|
|
|
|
|
|
# Error Classes
|
|
class osTicketConnectionError(Exception):
|
|
pass
|
|
|
|
|
|
# Classes
|
|
class osTicket():
|
|
"""Class to track osTicket data and functions."""
|
|
def __init__(self, tests_cpu, tests_disk):
|
|
self.db_connection = None
|
|
self.db_cursor = None
|
|
self.disabled = False
|
|
self.errors = False
|
|
self.tests_cpu = tests_cpu
|
|
self.tests_disk = tests_disk
|
|
self.tunnel_proc = None
|
|
|
|
def connect(self, silent=True):
|
|
"""Establish connection to osTicket via a SSH tunnel."""
|
|
cmd = [
|
|
'ssh', '-N',
|
|
'-p', OSTICKET['SSH']['Port'],
|
|
'-L3306:127.0.0.1:{Port}'.format(**OSTICKET['Database']),
|
|
'{User}@{Host}'.format(**OSTICKET['SSH']),
|
|
]
|
|
|
|
# Bail if disabled
|
|
if self.disabled:
|
|
return
|
|
|
|
# Only open tunnel if one doesn't exist
|
|
if self.tunnel_proc is None or self.tunnel_proc.poll() is not None:
|
|
self.tunnel_proc = popen_program(cmd)
|
|
|
|
# Connect to database
|
|
for x in range(5):
|
|
sleep(2)
|
|
try:
|
|
self.db_connection = mariadb.connect(
|
|
user=OSTICKET['Database']['User'],
|
|
password=OSTICKET['Database']['Pass'],
|
|
database=OSTICKET['Database']['Name'],
|
|
)
|
|
self.db_cursor = self.db_connection.cursor()
|
|
except mariadb.errors.InterfaceError:
|
|
# SSH issue?, try again
|
|
pass
|
|
except mariadb.errors.Error:
|
|
# Bad creds or other SQL error, bail
|
|
break
|
|
except Exception:
|
|
# Unknown error
|
|
break
|
|
else:
|
|
# Connection established
|
|
break
|
|
|
|
# Raise exception unless silenced
|
|
if self.db_cursor is None and not silent:
|
|
raise osTicketConnectionError('Failed to connect to osTicket.')
|
|
|
|
def convert_report(self, name, test):
|
|
"""Convert report into an osTicket friendly format, returns list."""
|
|
dev = test.dev
|
|
out_report = []
|
|
source_report = test.report.copy()
|
|
status = strip_colors(test.status)
|
|
status = status.replace(test.label, '').strip()
|
|
|
|
# Header
|
|
index = 1
|
|
if name == 'NVMe / SMART':
|
|
out_report.append('{} ({})'.format(name, status))
|
|
source_report = dev.generate_attribute_report()
|
|
|
|
# Notes
|
|
if dev.nvme_smart_notes:
|
|
source_report.append('{} Notes'.format(dev.attr_type))
|
|
source_report.extend(sorted(dev.nvme_smart_notes.keys()))
|
|
|
|
# Test Report
|
|
source_report.extend(test.report.copy())
|
|
elif not source_report:
|
|
index = 0
|
|
out_report.append('{} ({})'.format(name, status))
|
|
else:
|
|
out_report.append('{} ({})'.format(strip_colors(source_report[0]), status))
|
|
|
|
# Body
|
|
for line in source_report[index:]:
|
|
# Remove colors and leading spaces
|
|
line = strip_colors(line)
|
|
if line[:2] == ' ':
|
|
line = line[2:]
|
|
|
|
# Test-specific modifications
|
|
if name == 'Prime95':
|
|
r = REGEX_TEMPS.match(line)
|
|
if r:
|
|
_sensor = '{:<20}'.format(r.group(1))
|
|
_temps = r.group(2)
|
|
_temps += re.sub(r'\s+(\w+:)', r' ...... \1', r.group(3))
|
|
r2 = REGEX_SENSOR.match(_sensor)
|
|
_sensor = r2.group(1)
|
|
_spacer = pad_with_dots(r2.group(2))
|
|
line = '{}{} {}'.format(_sensor, _spacer, _temps)
|
|
if line == 'Temps':
|
|
out_report.append(' ')
|
|
out_report.append('Temps')
|
|
continue
|
|
elif name == 'NVMe / SMART':
|
|
r = REGEX_NVME_SMART_ATTRIBUTES.match(line)
|
|
if r:
|
|
_dec = '{:>3}'.format(r.group(1))
|
|
_hex = r.group(2)
|
|
_atr = r.group(3).strip()
|
|
_val = '{:<20}'.format(r.group(4))
|
|
line = '{}/{}: {} {}'.format(
|
|
_hex,
|
|
pad_with_dots(_dec),
|
|
pad_with_dots(_val, pad_right=True),
|
|
_atr)
|
|
elif name == 'I/O Benchmark':
|
|
line = REGEX_BLOCK_GRAPH.sub('', line)
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
# Remove extra spaces
|
|
line = line.strip()
|
|
line = re.sub(r'(\s+)', ' ', line)
|
|
|
|
# Indent line
|
|
if not re.match(r'^(NVMe|SMART)', line):
|
|
line = '... {}'.format(line)
|
|
|
|
# Add line to report
|
|
out_report.append(line)
|
|
|
|
# Done
|
|
return out_report
|
|
|
|
def disconnect(self, full=False):
|
|
"""Close osTicket connection."""
|
|
try:
|
|
self.db_cursor.close()
|
|
self.db_connection.close()
|
|
except Exception:
|
|
# Ignore errors since vars will be reset below
|
|
pass
|
|
|
|
# Reset errors
|
|
if full:
|
|
self.errors = False
|
|
|
|
# Reset vars
|
|
self.db_cursor = None
|
|
self.db_connection = None
|
|
|
|
# Close tunnel
|
|
if full:
|
|
try:
|
|
self.tunnel_proc.kill()
|
|
self.tunnel_proc.wait(timeout=2)
|
|
except (AttributeError, subprocess.TimeoutExpired):
|
|
# Ignore and continue
|
|
pass
|
|
self.tunnel_proc = None
|
|
|
|
def generate_report(self, dev, ticket_id, ticket_name):
|
|
"""Generate device report for osTicket, returns list."""
|
|
report = []
|
|
results = self.get_device_overall_results(dev)
|
|
|
|
# Header
|
|
report.append('[Report for ticket #{} {}]'.format(ticket_id, ticket_name))
|
|
if results['Full Diag']:
|
|
report.append(
|
|
'{Dev Type} hardware diagnostic tests: {Status}'.format(**results))
|
|
report.append(' ')
|
|
|
|
# Device
|
|
report.append(dev.description)
|
|
report.append(' ')
|
|
|
|
# Test reports
|
|
for name, test in dev.tests.items():
|
|
report.extend(self.convert_report(name, test))
|
|
if name == 'I/O Benchmark' and not test.disabled:
|
|
# Create PNG graph
|
|
try:
|
|
graph_file = export_io_graph(dev)
|
|
except GenericError:
|
|
# No data to build graph, ignoring
|
|
pass
|
|
except (AttributeError, KeyError):
|
|
report.append('Failed to export graph')
|
|
else:
|
|
# Upload to Imgur
|
|
try:
|
|
url = upload_to_imgur(graph_file)
|
|
report.append('Imgur: {}'.format(url))
|
|
except Exception:
|
|
report.append('Imgur: Failed to upload graph')
|
|
|
|
# Upload to Nextcloud
|
|
try:
|
|
url = upload_to_nextcloud(graph_file, ticket_id, dev.name)
|
|
report.append('Nextcloud: {}'.format(url))
|
|
except Exception:
|
|
report.append('Nextcloud: Failed to upload graph')
|
|
report.append(' ')
|
|
|
|
# Volumes
|
|
if results['Dev Type'] == 'Disk' and results['Full Diag']:
|
|
# Mount all volumes and extend report
|
|
report.append('Volumes:')
|
|
report.extend(self.generate_volume_report(dev, results))
|
|
report.append(' ')
|
|
|
|
# Asterisk
|
|
if results['Asterisk']:
|
|
report.append('* NOTE: One or more tests were not run on this device')
|
|
|
|
# Done
|
|
return report
|
|
|
|
def generate_volume_report(self, dev, results):
|
|
"""Mount all volumes for dev and generate report, returns list."""
|
|
report = []
|
|
mount_report = mount_volumes(
|
|
all_devices=False,
|
|
device_path='{}'.format(dev.path),
|
|
core_storage=results['Core'])
|
|
|
|
# Format report
|
|
for v_path, v_data in sorted(mount_report.items()):
|
|
label = v_data.get('label', '')
|
|
if label:
|
|
label = '"{}"'.format(label)
|
|
else:
|
|
# Ensure string type
|
|
label = ''
|
|
fstype = v_data.get('fstype', 'UNKNOWN FS')
|
|
if not fstype:
|
|
# Either empty string or None
|
|
fstype = 'UNKNOWN FS'
|
|
fstype = str(fstype).upper()
|
|
size = v_data.get('size', '')
|
|
if size:
|
|
size = '{} {}B'.format(size[:-1], size[-1:]).upper()
|
|
else:
|
|
size = 'UNKNOWN'
|
|
size_used = v_data.get('size_used', 'UNKNOWN').upper()
|
|
size_avail = v_data.get('size_avail', 'UNKNOWN').upper()
|
|
v_data = [v_path, label, size, fstype, size_used, size_avail]
|
|
v_data = [str(v) for v in v_data]
|
|
v_data = [v.strip().replace(' ', '_') for v in v_data]
|
|
for i in range(len(v_data)):
|
|
pad = 8
|
|
if i < 2:
|
|
pad += 4 * (2 - i)
|
|
v_data[i] = pad_with_dots(
|
|
'{s:<{p}}'.format(s=v_data[i], p=pad),
|
|
pad_right=True)
|
|
v_data[-1] = re.sub(r'\.*$', '', v_data[-1])
|
|
v_data = [v.replace('_', ' ') for v in v_data]
|
|
report.append(
|
|
'... {}..{}..Total..{}..({}..Used..{}..Free..{})'.format(*v_data))
|
|
|
|
# Done
|
|
return report
|
|
|
|
def get_device_overall_results(self, dev):
|
|
"""Get overall results from tests for device, returns dict."""
|
|
results = {
|
|
'Core': False,
|
|
'Dev Type': self.get_device_type(dev),
|
|
'Full Diag': False,
|
|
'Asterisk': None,
|
|
'Failed': 0,
|
|
'N/A': 0,
|
|
'OVERRIDE': 0,
|
|
'Passed': 0,
|
|
'Status': 'Unknown',
|
|
}
|
|
|
|
# Bail on unknown device type
|
|
if results['Dev Type'] not in KNOWN_DEV_TYPES:
|
|
raise GenericError(
|
|
'Unrecognized device type: {}.'.format(results['Dev Type']))
|
|
|
|
# Get test list for device type
|
|
test_list = []
|
|
if results['Dev Type'] == 'CPU':
|
|
test_list = self.tests_cpu
|
|
elif results['Dev Type'] == 'Disk':
|
|
test_list = self.tests_disk
|
|
|
|
# Check if a full diag was run (i.e. all dev tests were enabled)
|
|
results['Full Diag'] = len(dev.tests) == len(test_list)
|
|
|
|
# Tally test results
|
|
for test in dev.tests.values():
|
|
if test.failed:
|
|
results['Failed'] += 1
|
|
if test.passed:
|
|
results['Passed'] += 1
|
|
if 'N/A' in test.status:
|
|
results['N/A'] += 1
|
|
if 'OVERRIDE' in test.status:
|
|
results['OVERRIDE'] += 1
|
|
|
|
# Set overall status
|
|
if results['Failed'] > 0:
|
|
dev.checkbox = False
|
|
results['Status'] = 'FAILED'
|
|
elif results['Passed'] + results['N/A'] == len(test_list):
|
|
# Only mark true if all tests are enabled and all are "Passed" / "N/A"
|
|
dev.checkbox = True
|
|
results['Status'] = 'PASSED'
|
|
else:
|
|
results['Status'] = 'UNKNOWN'
|
|
if results['Full Diag'] and results['N/A'] > 0:
|
|
results['Asterisk'] = True
|
|
results['Status'] += '*'
|
|
|
|
# Enable CoreStorage searches
|
|
results['Core'] = False
|
|
if results['Full Diag']:
|
|
num_passed = results['Passed'] + results['N/A'] + results['OVERRIDE']
|
|
if num_passed == len(test_list):
|
|
# We ran all disk tests and all results were acceptable
|
|
results['Core'] = True
|
|
elif results['Failed'] == 1 and dev.tests['I/O Benchmark'].failed:
|
|
# We ran all disk tests and only I/O Benchmark failed
|
|
results['Core'] = True
|
|
|
|
# Done
|
|
return results
|
|
|
|
def get_device_type(self, dev):
|
|
"""Terrible hack to determine device type, returns str."""
|
|
# TODO: Fix with proper isinstance() call
|
|
type_str = str(dev.__class__)
|
|
if 'CpuObj' in type_str:
|
|
type_str = 'CPU'
|
|
elif 'DiskObj' in type_str:
|
|
type_str = 'Disk'
|
|
|
|
return type_str
|
|
|
|
def get_flag(self, ticket_id, flag_name):
|
|
"""Get flag in osTicket."""
|
|
flag_value = None
|
|
self.connect()
|
|
|
|
# Bail if disabled
|
|
if self.disabled:
|
|
return
|
|
|
|
# Build SQL cmd
|
|
sql_cmd = "SELECT `{column}` FROM `{Name}`.`{Ticket}`".format(
|
|
column=flag_name,
|
|
**OSTICKET['Database'],
|
|
**OSTICKET['Tables'])
|
|
sql_cmd += "WHERE `{Ticket}`.`ticket_id` = {ticket_id}".format(
|
|
ticket_id=ticket_id,
|
|
**OSTICKET['Tables'])
|
|
sql_cmd += ";"
|
|
|
|
# Run SQL cmd and get value
|
|
try:
|
|
self.db_cursor.execute(sql_cmd)
|
|
for s in self.db_cursor:
|
|
flag_value = s[0]
|
|
except mariadb.errors.Error:
|
|
# Set self.errors to enable warning line on results screen
|
|
self.errors = True
|
|
|
|
# Done
|
|
self.disconnect()
|
|
return flag_value
|
|
|
|
def get_ticket_details(self):
|
|
"""Get ticket number and name from osTicket DB, returns tuple."""
|
|
ticket_name = None
|
|
ticket_number = None
|
|
|
|
# Connect
|
|
while True:
|
|
try:
|
|
self.connect(silent=False)
|
|
except osTicketConnectionError:
|
|
print_warning('Failed to connect to osTicket')
|
|
if not ask('Try again?'):
|
|
print_standard('Integration disabled for this session')
|
|
self.disabled = True
|
|
self.tunnel_proc.kill()
|
|
return None
|
|
else:
|
|
# Connection successful
|
|
break
|
|
|
|
# Main loop
|
|
while ticket_number is None:
|
|
print_standard(' ')
|
|
_ticket_id = input('Enter ticket number (or leave blank to disable): ')
|
|
_ticket_id = _ticket_id.strip()
|
|
|
|
# No ticket ID entered
|
|
if re.match(r'^\s*$', _ticket_id):
|
|
if ask('Disable osTicket integration for this run?'):
|
|
self.disabled = True
|
|
break
|
|
|
|
# Invalid ID entered
|
|
if not re.match(r'^(\d+)$', _ticket_id):
|
|
continue
|
|
|
|
# Valid ID entered, lookup name
|
|
try:
|
|
_name = self.get_ticket_field(_ticket_id, 'name')
|
|
except Exception:
|
|
# Ignore and return None below
|
|
break
|
|
|
|
# Verify ticket exists
|
|
if _name is None:
|
|
print_error('ERROR: Ticket {} not found.'.format(_ticket_id))
|
|
continue
|
|
|
|
# Lookup subject
|
|
try:
|
|
_subject = self.get_ticket_field(_ticket_id, 'subject')
|
|
except Exception:
|
|
# Ignore and set to None
|
|
_subject = None
|
|
|
|
# Verify the selected ticket is correct
|
|
print_standard(
|
|
'You have selected ticket {BLUE}#{ticket_id}{CLEAR} {name}'.format(
|
|
ticket_id=_ticket_id,
|
|
name=_name,
|
|
**COLORS))
|
|
print_standard('{CYAN} {subject}{CLEAR}'.format(
|
|
subject=_subject,
|
|
**COLORS))
|
|
print_standard(' ')
|
|
if ask('Is this correct?'):
|
|
ticket_name = _name
|
|
ticket_number = _ticket_id
|
|
|
|
# Done
|
|
self.disconnect()
|
|
return (ticket_number, ticket_name)
|
|
|
|
def get_ticket_field(self, ticket_id, field):
|
|
"""Lookup ticket and return field as str."""
|
|
data = None
|
|
sql_cmd = "SELECT {field} FROM `{Ticket}`".format(
|
|
field=field,
|
|
**OSTICKET['Tables'])
|
|
sql_cmd += " WHERE `ticket_id` = {}".format(ticket_id)
|
|
sql_cmd += ";"
|
|
|
|
# Lookup data
|
|
# NOTE: If multiple entries are found it will return the last
|
|
self.db_cursor.execute(sql_cmd)
|
|
for s in self.db_cursor:
|
|
data = s[0]
|
|
|
|
# Done
|
|
return data
|
|
|
|
def post_device_results(self, dev, ticket_id, ticket_name):
|
|
"""Generate osTicket friendly report and post as response to ticket."""
|
|
if not dev.tests:
|
|
# No test results available, aborting post
|
|
return
|
|
response = self.generate_report(dev, ticket_id, ticket_name)
|
|
self.post_response(response, ticket_id)
|
|
|
|
def post_response(self, response, ticket_id):
|
|
"""Post a reply to a ticket in osTicket."""
|
|
self.connect()
|
|
|
|
# Bail if disabled
|
|
if self.disabled:
|
|
return
|
|
|
|
# Convert response to string
|
|
response = '\n'.join(response)
|
|
response = response.replace('`', '')
|
|
|
|
# Build SQL cmd
|
|
sql_cmd = "INSERT INTO `{Name}`.`{Response}`".format(
|
|
**OSTICKET['Database'], **OSTICKET['Tables'])
|
|
sql_cmd += " (ticket_id, staff_id, staff_name, response, created)"
|
|
sql_cmd += " VALUES ("
|
|
sql_cmd += " '{}',".format(ticket_id)
|
|
sql_cmd += " '{ID}', '{Name}',".format(**OSTICKET['Staff'])
|
|
sql_cmd += " '{}',".format(response)
|
|
sql_cmd += " '{}'".format(time.strftime("%Y-%m-%d %H:%M:%S"))
|
|
sql_cmd += " );"
|
|
|
|
# Run SQL cmd
|
|
try:
|
|
self.db_cursor.execute(sql_cmd)
|
|
except mariadb.errors.Error:
|
|
# Set self.errors to enable warning line on results screen
|
|
self.errors = True
|
|
|
|
# Done
|
|
self.disconnect()
|
|
|
|
def set_disk_failed(self, ticket_id):
|
|
"""Mark disk as failed in osTicket."""
|
|
self.set_flag(
|
|
ticket_id,
|
|
OSTICKET['Disk Flag']['Name'],
|
|
OSTICKET['Disk Flag']['Fail'])
|
|
|
|
def set_disk_passed(self, ticket_id):
|
|
"""Mark disk as passed in osTicket."""
|
|
current_value = self.get_flag(ticket_id, OSTICKET['Disk Flag']['Name'])
|
|
|
|
# Bail early?
|
|
if current_value == OSTICKET['Disk Flag']['Fail']:
|
|
print_warning('Not replacing osTicket disk checkbox FAILED value')
|
|
return
|
|
|
|
# Current value != FAILED, set to passed
|
|
self.set_flag(
|
|
ticket_id,
|
|
OSTICKET['Disk Flag']['Name'],
|
|
OSTICKET['Disk Flag']['Pass'])
|
|
|
|
def set_flag(self, ticket_id, flag_name, flag_value):
|
|
"""Set flag in osTicket."""
|
|
self.connect()
|
|
|
|
# Bail if disabled
|
|
if self.disabled:
|
|
return
|
|
|
|
# Build SQL cmd
|
|
sql_cmd = "UPDATE `{Name}`.`{Ticket}`".format(
|
|
**OSTICKET['Database'], **OSTICKET['Tables'])
|
|
sql_cmd += " SET `{}` = '{}'".format(flag_name, flag_value)
|
|
sql_cmd += " WHERE `{Ticket}`.`ticket_id` = {ticket_id}".format(
|
|
ticket_id=ticket_id, **OSTICKET['Tables'])
|
|
sql_cmd += ";"
|
|
|
|
# Run SQL cmd
|
|
try:
|
|
self.db_cursor.execute(sql_cmd)
|
|
except mariadb.errors.Error:
|
|
# Set self.errors to enable warning line on results screen
|
|
self.errors = True
|
|
|
|
# Done
|
|
self.disconnect()
|
|
|
|
|
|
# Functions
|
|
def pad_with_dots(s, pad_right=False):
|
|
"""Replace space padding with dots, returns str."""
|
|
s = str(s).replace(' ', '..')
|
|
if '.' in s:
|
|
if pad_right:
|
|
s = s + '.'
|
|
else:
|
|
s = '.' + s
|
|
return s
|
|
|
|
|
|
if __name__ == '__main__':
|
|
print("This file is not meant to be called directly.")
|
|
|
|
# vim: sts=2 sw=2 ts=2
|