Posting CPU results is working

This commit is contained in:
2Shirt 2018-12-24 21:11:13 -07:00
parent ad6980f82b
commit 8d6b29be53
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C
2 changed files with 95 additions and 66 deletions

View file

@ -138,6 +138,7 @@ class CpuObj():
class DiskObj():
"""Object for tracking disk specific data."""
def __init__(self, disk_path):
self.checkbox = None
self.disk_ok = True
self.labels = []
self.lsblk = {}

View file

@ -5,18 +5,24 @@ import mysql.connector as mariadb
from functions.common import *
from settings.osticket import *
# STATIC VARIABLES
KNOWN_DEV_TYPES = ('CPU', 'Disk')
# Regex
REGEX_BLOCK_GRAPH = re.compile(r'(▁|▂|▃|▄|▅|▆|▇|█)')
REGEX_NVME_SMART_ATTRIBUTES = re.compile(r'^\s*(\d+) / (\w+): (.{28})(.*)$')
REGEX_TEMPS = re.compile(r'^\s*(.*?)\s+(idle.*)$')
REGEX_SENSOR = re.compile(r'^(.*?)(\s*)$')
# Classes
class osTicket():
"""Class to track osTicket data and functions."""
def __init__(self):
def __init__(self, tests_cpu, tests_disk):
self.db_connection = None
self.db_cursor = None
self.errors = False
self.tests_cpu = tests_cpu
self.tests_disk = tests_disk
self.tunnel_proc = None
def connect(self):
@ -49,10 +55,10 @@ class osTicket():
# Connection established
break
def convert_report(name, test):
def convert_report(self, name, test):
"""Convert report into an osTicket friendly format, returns list."""
out_report = []
source_report = test.source_report
source_report = test.report
status = strip_colors(test.status)
status = status.replace(test.label, '').strip()
@ -62,7 +68,7 @@ class osTicket():
out_report.append('{} ({})'.format(name, status))
if not source_report:
index = 0
source_report = test.dev.generate_attribute_source_report()
source_report = test.dev.generate_attribute_report()
else:
out_report.append('{} ({})'.format(strip_colors(source_report[0]), status))
@ -79,9 +85,10 @@ class osTicket():
if r:
_sensor = '{:<20}'.format(r.group(1))
_temps = r.group(2)
line = '{} {}'.format(
pad_with_dots(_sensor, pad_right=True),
_temps)
r2 = REGEX_SENSOR.match(_sensor)
_sensor = r2.group(1)
_spacer = pad_with_dots(r2.group(2))
line = '{}{} {}'.format(_sensor, _spacer, _temps)
elif name == 'NVMe / SMART':
r = REGEX_NVME_SMART_ATTRIBUTES.match(line)
if r:
@ -110,10 +117,10 @@ class osTicket():
# Done
return out_report
def generate_report(dev):
def generate_report(self, dev):
"""Generate device report for osTicket, returns list."""
report = []
results = get_device_overall_results(dev)
results = self.get_device_overall_results(dev)
# Header
if results['Full Diag']:
@ -127,7 +134,7 @@ class osTicket():
# Test reports
for name, test in dev.tests.items():
report.extend(convert_report(name, test))
report.extend(self.convert_report(name, test))
if name == 'I/O Benchmark':
# TODO: Create PNG graph and upload to imgur/Nextcloud
report.append('Imgur: TODO')
@ -165,11 +172,74 @@ class osTicket():
self.db_cursor = None
self.db_connection = None
def get_device_overall_results(self, dev):
"""Get overall results from tests for device, returns dict."""
results = {
'Dev Type': self.get_device_type(dev),
'Full Diag': False,
'Asterisk': None,
'Failed': 0,
'N/A': 0,
'Passed': 0,
'Status': 'Unknown',
}
# Bail on unknown device type
if results['Dev Type'] not in KNOWN_DEV_TYPES:
raise GenericError(
'Unrecognized device type: {}.'.format(results['Dev Type']))
# Get test list for device type
test_list = []
if results['Dev Type'] == 'CPU':
test_list = self.tests_cpu
elif results['Dev Type'] == 'Disk':
test_list = self.tests_disk
# Check if a full diag was run (i.e. all dev tests were enabled)
results['Full Diag'] = len(dev.tests) == len(test_list)
# Tally test results
for test in dev.tests.values():
if test.failed:
results['Failed'] += 1
if test.passed:
results['Passed'] += 1
if 'N/A' in test.status:
results['N/A'] += 1
# Set overall status
if results['Failed'] > 0:
dev.checkbox = False
results['Status'] = 'FAILED'
elif results['Passed'] + results['N/A'] == len(dev.tests):
dev.checkbox = True
results['Status'] = 'PASSED'
else:
results['Status'] = 'UNKNOWN'
if results['Full Diag'] and results['N/A'] > 0:
results['Asterisk'] = True
results['Status'] += '*'
# Done
return results
def get_device_type(self, dev):
"""Terrible hack to determine device type, returns str."""
# TODO: Fix with proper isinstance() call
type_str = str(dev.__class__)
if 'CpuObj' in type_str:
type_str = 'CPU'
elif 'DiskObj' in type_str:
type_str = 'Disk'
return type_str
def get_ticket_name(self, ticket_id):
"""Lookup ticket and return name as str."""
name = None
sql_cmd = "SELECT name FROM `{Ticket}`".format(**OSTICKET['Tables'])
sql_cmd += " WHERE `ticket_id` = `{}`".format(ticket_id)
sql_cmd += " WHERE `ticket_id` = {}".format(ticket_id)
sql_cmd += ";"
# TODO: Is the ';' needed above? It wasn't in the prev version??
@ -183,9 +253,13 @@ class osTicket():
# TODO: Fix exception handling
self.errors = True
# Done
return name
def get_ticket_number(self):
"""Get ticket number and confirm with name from osTicket DB."""
ticket_number = None
self.connect()
# Main loop
while ticket_number is None:
@ -199,7 +273,7 @@ class osTicket():
break
# Invalid ID entered
if not re.match(r'^([0-9]+)$', _input):
if not re.match(r'^(\d+)$', _input):
continue
# Valid ID entered, lookup name and verify
@ -211,17 +285,22 @@ class osTicket():
ticket_number = _input
# Done
self.disconnect()
return ticket_number
def post_device_results(self, dev, ticket_id):
"""Generate osTicket friendly report and post as response to ticket."""
response = self.generate_report(dev)
post_response(response, ticket_id)
self.post_response(response, ticket_id)
def post_response(self, response, ticket_id):
"""Post a reply to a ticket in osTicket."""
self.connect()
# Convert response to string
response = '\n'.join(response)
response = response.replace('`', '')
# Build SQL cmd
sql_cmd = "INSERT INTO `{Name}`.`{Response}`".format(
**OSTICKET['Database'], **OSTICKET['Tables'])
@ -235,7 +314,7 @@ class osTicket():
# Run SQL cmd
try:
self.cursor.execute(sql_cmd)
self.db_cursor.execute(sql_cmd)
except Exception:
# TODO: Fix exception handling
self.errors = True
@ -271,7 +350,7 @@ class osTicket():
# Run SQL cmd
try:
self.cursor.execute(sql_cmd)
self.db_cursor.execute(sql_cmd)
except Exception:
# TODO: Fix exception handling
self.errors = True
@ -280,57 +359,6 @@ class osTicket():
self.disconnect()
# Functions
def get_device_overall_results(dev):
"""Get overall results from tests for device, returns dict."""
results = {
'Dev Type': 'Unknown',
'Full Diag': False,
'Asterisk': None,
'Failed': 0,
'N/A': 0,
'Passed': 0,
'Status': 'Unknown',
}
# Get test list for device type
test_list = []
if isinstance(dev, CpuObj):
results['Dev Type'] = 'CPU'
test_list = TESTS_CPU
elif isinstance(dev, DiskObj):
results['Dev Type'] = 'Disk'
test_list = TESTS_DISK
else:
raise GenericError('Unrecognized device type.')
# Check if a full diag was run (i.e. all dev tests were enabled)
results['Full Diag'] = len(dev.tests) == len(test_list)
# Tally test results
for test in dev.tests.value():
if test.failed:
results['Failed'] += 1
if test.passed:
results['Passed'] += 1
if 'N/A' in test.status:
results['N/A'] += 1
# Set overall status
if results['Failed'] > 0:
dev.checkbox = False
results['Status'] = 'FAILED'
elif results['Passed'] + results['N/A'] == len(dev.tests):
dev.checkbox = True
results['Status'] = 'PASSED'
else:
results['Status'] = 'UNKNOWN'
if results['Full Diag'] and results['N/A'] > 0:
results['Asterisk'] = True
results['Status'] += '*'
# Done
return results
def pad_with_dots(s, pad_right=False):
"""Replace space padding with dots, returns str."""
s = str(s).replace(' ', '..')