WizardKit/scripts/wk.prev/functions/osticket.py
2Shirt ac2e5a4fcf
New project orgnization
* Match upstream layout for the benefits listed there
2020-01-08 00:28:56 -07:00

655 lines
19 KiB
Python

# Wizard Kit: Functions - osTicket
import atexit
import mysql.connector as mariadb
from functions.data import *
from functions.io_graph import *
from settings.osticket import *
# STATIC VARIABLES
KNOWN_DEV_TYPES = ('CPU', 'Disk')
# Regex
REGEX_BLOCK_GRAPH = re.compile(r'(▁|▂|▃|▄|▅|▆|▇|█)')
REGEX_NVME_SMART_ATTRIBUTES = re.compile(r'^\s*(\d+) / (\w+): (.{28})(.*)$')
REGEX_TEMPS = re.compile(r'^\s*(.*?)\s+(idle:)(.*)$')
REGEX_SENSOR = re.compile(r'^(.*?)(\s*)$')
# Error Classes
class osTicketConnectionError(Exception):
pass
# Classes
class osTicket():
"""Class to track osTicket data and functions."""
def __init__(self, tests_cpu, tests_disk):
self.db_connection = None
self.db_cursor = None
self.disabled = False
self.errors = False
self.tests_cpu = tests_cpu
self.tests_disk = tests_disk
self.tunnel_proc = None
def connect(self, silent=True):
"""Establish connection to osTicket via a SSH tunnel."""
cmd = [
'ssh', '-N',
'-p', OSTICKET['SSH']['Port'],
'-L3306:127.0.0.1:{Port}'.format(**OSTICKET['Database']),
'{User}@{Host}'.format(**OSTICKET['SSH']),
]
# Bail if disabled
if self.disabled:
return
# Only open tunnel if one doesn't exist
if self.tunnel_proc is None or self.tunnel_proc.poll() is not None:
if self.tunnel_proc:
# Unregister previous terminate
atexit.unregister(self.tunnel_proc.terminate)
self.tunnel_proc = popen_program(cmd)
atexit.register(self.tunnel_proc.terminate)
# Connect to database
for x in range(5):
sleep(2)
try:
self.db_connection = mariadb.connect(
user=OSTICKET['Database']['User'],
password=OSTICKET['Database']['Pass'],
database=OSTICKET['Database']['Name'],
)
self.db_cursor = self.db_connection.cursor()
except mariadb.errors.InterfaceError:
# SSH issue?, try again
pass
except mariadb.errors.Error:
# Bad creds or other SQL error, bail
break
except Exception:
# Unknown error
break
else:
# Connection established
break
# Raise exception unless silenced
if self.db_cursor is None and not silent:
raise osTicketConnectionError('Failed to connect to osTicket.')
def convert_report(self, name, test):
"""Convert report into an osTicket friendly format, returns list."""
dev = test.dev
out_report = []
source_report = test.report.copy()
status = strip_colors(test.status)
status = status.replace(test.label, '').strip()
# Header
index = 1
if name == 'NVMe / SMART':
out_report.append('{} ({})'.format(name, status))
source_report = dev.generate_attribute_report()
# Notes
if dev.nvme_smart_notes:
source_report.append('{} Notes'.format(dev.attr_type))
source_report.extend(sorted(dev.nvme_smart_notes.keys()))
# Test Report
source_report.extend(test.report.copy())
elif not source_report:
index = 0
out_report.append('{} ({})'.format(name, status))
else:
out_report.append('{} ({})'.format(strip_colors(source_report[0]), status))
# Body
for line in source_report[index:]:
# Remove colors and leading spaces
line = strip_colors(line)
if line[:2] == ' ':
line = line[2:]
# Test-specific modifications
if name == 'Prime95':
r = REGEX_TEMPS.match(line)
if r:
_sensor = '{:<20}'.format(r.group(1))
_temps = r.group(2)
_temps += re.sub(r'\s+(\w+:)', r' ...... \1', r.group(3))
r2 = REGEX_SENSOR.match(_sensor)
_sensor = r2.group(1)
_spacer = pad_with_dots(r2.group(2))
line = '{}{} {}'.format(_sensor, _spacer, _temps)
if line == 'Temps':
out_report.append(' ')
out_report.append('Temps')
continue
elif name == 'NVMe / SMART':
r = REGEX_NVME_SMART_ATTRIBUTES.match(line)
if r:
_dec = '{:>3}'.format(r.group(1))
_hex = r.group(2)
_atr = r.group(3).strip()
_val = '{:<20}'.format(r.group(4))
line = '{}/{}: {} {}'.format(
_hex,
pad_with_dots(_dec),
pad_with_dots(_val, pad_right=True),
_atr)
elif name == 'I/O Benchmark':
line = REGEX_BLOCK_GRAPH.sub('', line)
line = line.strip()
if not line:
continue
# Remove extra spaces
line = line.strip()
line = re.sub(r'(\s+)', ' ', line)
# Indent line
if not re.match(r'^(NVMe|SMART)', line):
line = '... {}'.format(line)
# Add line to report
out_report.append(line)
# Done
return out_report
def disconnect(self, full=False):
"""Close osTicket connection."""
try:
self.db_cursor.close()
self.db_connection.close()
except Exception:
# Ignore errors since vars will be reset below
pass
# Reset errors
if full:
self.errors = False
# Reset vars
self.db_cursor = None
self.db_connection = None
# Close tunnel
if full:
try:
self.tunnel_proc.kill()
self.tunnel_proc.wait(timeout=2)
except (AttributeError, subprocess.TimeoutExpired):
# Ignore and continue
pass
self.tunnel_proc = None
def generate_report(self, dev, ticket_id, ticket_name):
"""Generate device report for osTicket, returns list."""
report = ['']
results = self.get_device_overall_results(dev)
test_station = TEST_STATIONS.get(get_hostname().lower(), '')
# Header
if test_station:
report[0] += '[Test-Station: {}] '.format(test_station)
report[0] += '[Report for ticket #{} {}]'.format(ticket_id, ticket_name)
if results['Full Diag']:
report.append(
'{Dev Type} hardware diagnostic tests: {Status}'.format(**results))
report.append(' ')
# Device
report.append(dev.description)
if hasattr(dev, 'ram_total'):
report.append('{} RAM ({})'.format(
dev.ram_total, ', '.join(dev.ram_dimms)))
report.append(' ')
# Test reports
for name, test in dev.tests.items():
report.extend(self.convert_report(name, test))
if name == 'I/O Benchmark' and not test.disabled:
# Create PNG graph
try:
graph_file = export_io_graph(dev)
except GenericError:
# No data to build graph, ignoring
pass
except (AttributeError, KeyError):
report.append('Failed to export graph')
else:
# Upload to Imgur
try:
url = upload_to_imgur(graph_file)
report.append('Imgur: {}'.format(url))
except Exception:
report.append('Imgur: Failed to upload graph')
# Upload to Nextcloud
try:
url = upload_to_nextcloud(graph_file, ticket_id, dev.name)
report.append('Nextcloud: {}'.format(url))
except Exception:
report.append('Nextcloud: Failed to upload graph')
report.append(' ')
# Volumes
if results['Dev Type'] == 'Disk' and results['Full Diag']:
# Mount all volumes and extend report
report.append('Volumes:')
report.extend(self.generate_volume_report(dev, results))
report.append(' ')
if not dev.is_4k_aligned():
report.append('! NOTE: One or more partitions are not 4K aligned')
# Asterisk
if results['Asterisk']:
report.append('* NOTE: One or more tests were not run on this device')
# Done
return report
def generate_volume_report(self, dev, results):
"""Mount all volumes for dev and generate report, returns list."""
report = []
mount_report = mount_volumes(
all_devices=False,
device_path='{}'.format(dev.path),
core_storage=results['Core'])
# Format report
for v_path, v_data in sorted(mount_report.items()):
label = v_data.get('label', '')
if label:
label = '"{}"'.format(label)
else:
# Ensure string type
label = ''
fstype = v_data.get('fstype', 'UNKNOWN FS')
if not fstype:
# Either empty string or None
fstype = 'UNKNOWN FS'
fstype = str(fstype).upper()
size = v_data.get('size', '')
if size:
size = '{} {}B'.format(size[:-1], size[-1:]).upper()
else:
size = 'UNKNOWN'
size_used = v_data.get('size_used', 'UNKNOWN').upper()
size_avail = v_data.get('size_avail', 'UNKNOWN').upper()
v_data = [v_path, label, size, fstype, size_used, size_avail]
v_data = [str(v) for v in v_data]
v_data = [v.strip().replace(' ', '_') for v in v_data]
for i in range(len(v_data)):
pad = 8
if i < 2:
pad += 4 * (2 - i)
v_data[i] = pad_with_dots(
'{s:<{p}}'.format(s=v_data[i], p=pad),
pad_right=True)
v_data[-1] = re.sub(r'\.*$', '', v_data[-1])
v_data = [v.replace('_', ' ') for v in v_data]
report.append(
'... {}..{}..Total..{}..({}..Used..{}..Free..{})'.format(*v_data))
# Done
return report
def get_device_overall_results(self, dev):
"""Get overall results from tests for device, returns dict."""
results = {
'Core': False,
'Dev Type': self.get_device_type(dev),
'Full Diag': False,
'Asterisk': None,
'Failed': 0,
'N/A': 0,
'OVERRIDE': 0,
'Passed': 0,
'Status': 'Unknown',
}
# Bail on unknown device type
if results['Dev Type'] not in KNOWN_DEV_TYPES:
raise GenericError(
'Unrecognized device type: {}.'.format(results['Dev Type']))
# Get test list for device type
test_list = []
if results['Dev Type'] == 'CPU':
test_list = self.tests_cpu
elif results['Dev Type'] == 'Disk':
test_list = self.tests_disk
# Check if a full diag was run (i.e. all dev tests were enabled)
results['Full Diag'] = len(dev.tests) == len(test_list)
# Tally test results
for test in dev.tests.values():
if test.failed:
results['Failed'] += 1
if test.passed:
results['Passed'] += 1
if 'N/A' in test.status:
results['N/A'] += 1
if 'OVERRIDE' in test.status:
results['OVERRIDE'] += 1
# Set overall status
if results['Failed'] > 0:
dev.checkbox = False
results['Status'] = 'FAILED'
elif results['Passed'] + results['N/A'] == len(test_list):
# Only mark true if all tests are enabled and all are "Passed" / "N/A"
dev.checkbox = True
results['Status'] = 'PASSED'
else:
results['Status'] = 'UNKNOWN'
if results['Full Diag'] and results['N/A'] > 0:
results['Asterisk'] = True
results['Status'] += '*'
# Enable CoreStorage searches
results['Core'] = False
if results['Dev Type'] == 'Disk' and results['Full Diag']:
num_passed = results['Passed'] + results['N/A'] + results['OVERRIDE']
if num_passed == len(test_list):
# We ran all disk tests and all results were acceptable
results['Core'] = True
elif results['Failed'] == 1 and dev.tests['I/O Benchmark'].failed:
# We ran all disk tests and only I/O Benchmark failed
results['Core'] = True
# Done
return results
def get_device_type(self, dev):
"""Terrible hack to determine device type, returns str."""
# TODO: Fix with proper isinstance() call
type_str = str(dev.__class__)
if 'CpuObj' in type_str:
type_str = 'CPU'
elif 'DiskObj' in type_str:
type_str = 'Disk'
return type_str
def get_flag(self, ticket_id, flag_name):
"""Get flag in osTicket."""
flag_value = None
self.connect()
# Bail if disabled
if self.disabled:
return
# Build SQL cmd
sql_cmd = "SELECT `{column}` FROM `{Name}`.`{Ticket}`".format(
column=flag_name,
**OSTICKET['Database'],
**OSTICKET['Tables'])
sql_cmd += "WHERE `{Ticket}`.`ticket_id` = {ticket_id}".format(
ticket_id=ticket_id,
**OSTICKET['Tables'])
sql_cmd += ";"
# Run SQL cmd and get value
try:
self.db_cursor.execute(sql_cmd)
for s in self.db_cursor:
flag_value = s[0]
except mariadb.errors.Error:
# Set self.errors to enable warning line on results screen
self.errors = True
# Done
self.disconnect()
return flag_value
def get_ticket_details(self):
"""Get ticket number and name from osTicket DB, returns tuple."""
ticket_name = None
ticket_number = None
# Connect
print_standard('Connecting to osTicket...')
while True:
try:
self.connect(silent=False)
except osTicketConnectionError:
print_warning('Failed to connect to osTicket')
if not ask('Try again?'):
print_standard('Integration disabled for this session')
self.disabled = True
self.tunnel_proc.kill()
return None
else:
# Connection successful
break
# Main loop
while ticket_number is None:
print_standard(' ')
_ticket_id = input_text('Enter ticket number (or leave blank to disable): ')
_ticket_id = _ticket_id.strip()
# No ticket ID entered
if re.match(r'^\s*$', _ticket_id):
if ask('Disable osTicket integration for this run?'):
self.disabled = True
break
# Invalid ID entered
if not re.match(r'^(\d+)$', _ticket_id):
continue
# Valid ID entered, lookup name
try:
_name = self.get_ticket_field(_ticket_id, 'name')
except Exception:
# Ignore and return None below
break
# Verify ticket exists
if _name is None:
print_error('ERROR: Ticket {} not found.'.format(_ticket_id))
continue
# Lookup subject
try:
_subject = self.get_ticket_field(_ticket_id, 'subject')
except Exception:
# Ignore and set to None
_subject = None
# Verify the selected ticket is correct
print_standard(
'You have selected ticket {BLUE}#{ticket_id}{CLEAR} {name}'.format(
ticket_id=_ticket_id,
name=_name,
**COLORS))
print_standard('{CYAN} {subject}{CLEAR}'.format(
subject=_subject,
**COLORS))
print_standard(' ')
if ask('Is this correct?'):
ticket_name = _name
ticket_number = _ticket_id
# Done
self.disconnect()
return (ticket_number, ticket_name)
def get_ticket_field(self, ticket_id, field):
"""Lookup ticket and return field as str."""
data = None
sql_cmd = "SELECT {field} FROM `{Ticket}`".format(
field=field,
**OSTICKET['Tables'])
sql_cmd += " WHERE `ticket_id` = {}".format(ticket_id)
sql_cmd += ";"
# Lookup data
# NOTE: If multiple entries are found it will return the last
self.db_cursor.execute(sql_cmd)
for s in self.db_cursor:
data = s[0]
# Done
return data
def post_device_results(self, dev, ticket_id, ticket_name, color='Diags'):
"""Generate osTicket friendly report and post as response to ticket."""
if not dev.tests:
# No test results available, aborting post
return
response = self.generate_report(dev, ticket_id, ticket_name)
self.post_response(response, ticket_id, color)
def post_response(self, response, ticket_id, color='Normal'):
"""Post a reply to a ticket in osTicket."""
self.connect()
# Bail if disabled
if self.disabled:
return
# Convert response to string
response = '\n'.join(response)
response = response.replace('`', '')
# Build SQL cmd
sql_cmd = "INSERT INTO `{Name}`.`{Response}`".format(
**OSTICKET['Database'], **OSTICKET['Tables'])
sql_cmd += " (ticket_id, staff_id, staff_name, response, created, code)"
sql_cmd += " VALUES ("
sql_cmd += " '{}',".format(ticket_id)
sql_cmd += " '{ID}', '{Name}',".format(**OSTICKET['Staff'])
sql_cmd += " '{}',".format(response.replace("'", "\\'"))
sql_cmd += " '{}',".format(time.strftime("%Y-%m-%d %H:%M:%S"))
sql_cmd += " '{}'".format(OSTICKET['Color Codes'][color])
sql_cmd += " );"
# Run SQL cmd
try:
self.db_cursor.execute(sql_cmd)
except mariadb.errors.Error:
# Set self.errors to enable warning line on results screen
self.errors = True
# Done
self.disconnect()
def set_cpu_failed(self, ticket_id):
"""Mark cpu as failed in osTicket."""
self.set_flag(
ticket_id,
OSTICKET['CPU Flag']['Name'],
OSTICKET['CPU Flag']['Fail'])
def set_cpu_passed(self, ticket_id):
"""Mark cpu as passed in osTicket."""
current_value = self.get_flag(ticket_id, OSTICKET['CPU Flag']['Name'])
# Bail early?
if current_value == OSTICKET['CPU Flag']['Fail']:
print_warning('Not replacing osTicket cpu checkbox FAILED value')
return
# Current value != FAILED, set to passed
self.set_flag(
ticket_id,
OSTICKET['CPU Flag']['Name'],
OSTICKET['CPU Flag']['Pass'])
def set_cpu_max_temp(self, ticket_id, temp):
"""Set CPU temp string in osTicket."""
self.set_flag(
ticket_id,
OSTICKET['CPU Temp']['Name'],
temp,
)
def set_disk_failed(self, ticket_id):
"""Mark disk as failed in osTicket."""
self.set_flag(
ticket_id,
OSTICKET['Disk Flag']['Name'],
OSTICKET['Disk Flag']['Fail'])
def set_disk_passed(self, ticket_id):
"""Mark disk as passed in osTicket."""
current_value = self.get_flag(ticket_id, OSTICKET['Disk Flag']['Name'])
# Bail early?
if current_value == OSTICKET['Disk Flag']['Fail']:
print_warning('Not replacing osTicket disk checkbox FAILED value')
return
# Current value != FAILED, set to passed
self.set_flag(
ticket_id,
OSTICKET['Disk Flag']['Name'],
OSTICKET['Disk Flag']['Pass'])
def set_flag(self, ticket_id, flag_name, flag_value):
"""Set flag in osTicket."""
self.connect()
# Bail if disabled
if self.disabled:
return
# Build SQL cmd
sql_cmd = "UPDATE `{Name}`.`{Ticket}`".format(
**OSTICKET['Database'], **OSTICKET['Tables'])
sql_cmd += " SET `{}` = '{}'".format(flag_name, flag_value)
sql_cmd += " WHERE `{Ticket}`.`ticket_id` = {ticket_id}".format(
ticket_id=ticket_id, **OSTICKET['Tables'])
sql_cmd += ";"
# Run SQL cmd
try:
self.db_cursor.execute(sql_cmd)
except mariadb.errors.Error:
# Set self.errors to enable warning line on results screen
self.errors = True
# Done
self.disconnect()
# Functions
def get_hostname():
"""Get hostname, returns str."""
cmd = ['hostnamectl', '--static']
result = run_program(cmd, check=False, encoding='utf-8', errors='ignore')
result = result.stdout.strip()
result = result.replace('.1201.com', '')
return result
def pad_with_dots(s, pad_right=False):
"""Replace space padding with dots, returns str."""
s = str(s).replace(' ', '..')
if '.' in s:
if pad_right:
s = s + '.'
else:
s = '.' + s
return s
if __name__ == '__main__':
print("This file is not meant to be called directly.")
# vim: sts=2 sw=2 ts=2