From 8d6b29be53c50cbd7aabafe00c60e7831a8d0145 Mon Sep 17 00:00:00 2001 From: 2Shirt <2xShirt@gmail.com> Date: Mon, 24 Dec 2018 21:11:13 -0700 Subject: [PATCH] Posting CPU results is working --- .bin/Scripts/functions/hw_diags.py | 1 + .bin/Scripts/functions/osticket.py | 160 +++++++++++++++++------------ 2 files changed, 95 insertions(+), 66 deletions(-) diff --git a/.bin/Scripts/functions/hw_diags.py b/.bin/Scripts/functions/hw_diags.py index 48962299..b50744aa 100644 --- a/.bin/Scripts/functions/hw_diags.py +++ b/.bin/Scripts/functions/hw_diags.py @@ -138,6 +138,7 @@ class CpuObj(): class DiskObj(): """Object for tracking disk specific data.""" def __init__(self, disk_path): + self.checkbox = None self.disk_ok = True self.labels = [] self.lsblk = {} diff --git a/.bin/Scripts/functions/osticket.py b/.bin/Scripts/functions/osticket.py index ccfc59bc..fb5649ba 100644 --- a/.bin/Scripts/functions/osticket.py +++ b/.bin/Scripts/functions/osticket.py @@ -5,18 +5,24 @@ import mysql.connector as mariadb from functions.common import * from settings.osticket import * +# STATIC VARIABLES +KNOWN_DEV_TYPES = ('CPU', 'Disk') + # Regex REGEX_BLOCK_GRAPH = re.compile(r'(▁|▂|▃|▄|▅|▆|▇|█)') REGEX_NVME_SMART_ATTRIBUTES = re.compile(r'^\s*(\d+) / (\w+): (.{28})(.*)$') REGEX_TEMPS = re.compile(r'^\s*(.*?)\s+(idle.*)$') +REGEX_SENSOR = re.compile(r'^(.*?)(\s*)$') # Classes class osTicket(): """Class to track osTicket data and functions.""" - def __init__(self): + def __init__(self, tests_cpu, tests_disk): self.db_connection = None self.db_cursor = None self.errors = False + self.tests_cpu = tests_cpu + self.tests_disk = tests_disk self.tunnel_proc = None def connect(self): @@ -49,10 +55,10 @@ class osTicket(): # Connection established break - def convert_report(name, test): + def convert_report(self, name, test): """Convert report into an osTicket friendly format, returns list.""" out_report = [] - source_report = test.source_report + source_report = test.report status = strip_colors(test.status) status = status.replace(test.label, '').strip() @@ -62,7 +68,7 @@ class osTicket(): out_report.append('{} ({})'.format(name, status)) if not source_report: index = 0 - source_report = test.dev.generate_attribute_source_report() + source_report = test.dev.generate_attribute_report() else: out_report.append('{} ({})'.format(strip_colors(source_report[0]), status)) @@ -79,9 +85,10 @@ class osTicket(): if r: _sensor = '{:<20}'.format(r.group(1)) _temps = r.group(2) - line = '{} {}'.format( - pad_with_dots(_sensor, pad_right=True), - _temps) + r2 = REGEX_SENSOR.match(_sensor) + _sensor = r2.group(1) + _spacer = pad_with_dots(r2.group(2)) + line = '{}{} {}'.format(_sensor, _spacer, _temps) elif name == 'NVMe / SMART': r = REGEX_NVME_SMART_ATTRIBUTES.match(line) if r: @@ -110,10 +117,10 @@ class osTicket(): # Done return out_report - def generate_report(dev): + def generate_report(self, dev): """Generate device report for osTicket, returns list.""" report = [] - results = get_device_overall_results(dev) + results = self.get_device_overall_results(dev) # Header if results['Full Diag']: @@ -127,7 +134,7 @@ class osTicket(): # Test reports for name, test in dev.tests.items(): - report.extend(convert_report(name, test)) + report.extend(self.convert_report(name, test)) if name == 'I/O Benchmark': # TODO: Create PNG graph and upload to imgur/Nextcloud report.append('Imgur: TODO') @@ -165,11 +172,74 @@ class osTicket(): self.db_cursor = None self.db_connection = None + def get_device_overall_results(self, dev): + """Get overall results from tests for device, returns dict.""" + results = { + 'Dev Type': self.get_device_type(dev), + 'Full Diag': False, + 'Asterisk': None, + 'Failed': 0, + 'N/A': 0, + 'Passed': 0, + 'Status': 'Unknown', + } + + # Bail on unknown device type + if results['Dev Type'] not in KNOWN_DEV_TYPES: + raise GenericError( + 'Unrecognized device type: {}.'.format(results['Dev Type'])) + + # Get test list for device type + test_list = [] + if results['Dev Type'] == 'CPU': + test_list = self.tests_cpu + elif results['Dev Type'] == 'Disk': + test_list = self.tests_disk + + # Check if a full diag was run (i.e. all dev tests were enabled) + results['Full Diag'] = len(dev.tests) == len(test_list) + + # Tally test results + for test in dev.tests.values(): + if test.failed: + results['Failed'] += 1 + if test.passed: + results['Passed'] += 1 + if 'N/A' in test.status: + results['N/A'] += 1 + + # Set overall status + if results['Failed'] > 0: + dev.checkbox = False + results['Status'] = 'FAILED' + elif results['Passed'] + results['N/A'] == len(dev.tests): + dev.checkbox = True + results['Status'] = 'PASSED' + else: + results['Status'] = 'UNKNOWN' + if results['Full Diag'] and results['N/A'] > 0: + results['Asterisk'] = True + results['Status'] += '*' + + # Done + return results + + def get_device_type(self, dev): + """Terrible hack to determine device type, returns str.""" + # TODO: Fix with proper isinstance() call + type_str = str(dev.__class__) + if 'CpuObj' in type_str: + type_str = 'CPU' + elif 'DiskObj' in type_str: + type_str = 'Disk' + + return type_str + def get_ticket_name(self, ticket_id): """Lookup ticket and return name as str.""" name = None sql_cmd = "SELECT name FROM `{Ticket}`".format(**OSTICKET['Tables']) - sql_cmd += " WHERE `ticket_id` = `{}`".format(ticket_id) + sql_cmd += " WHERE `ticket_id` = {}".format(ticket_id) sql_cmd += ";" # TODO: Is the ';' needed above? It wasn't in the prev version?? @@ -183,9 +253,13 @@ class osTicket(): # TODO: Fix exception handling self.errors = True + # Done + return name + def get_ticket_number(self): """Get ticket number and confirm with name from osTicket DB.""" ticket_number = None + self.connect() # Main loop while ticket_number is None: @@ -199,7 +273,7 @@ class osTicket(): break # Invalid ID entered - if not re.match(r'^([0-9]+)$', _input): + if not re.match(r'^(\d+)$', _input): continue # Valid ID entered, lookup name and verify @@ -211,17 +285,22 @@ class osTicket(): ticket_number = _input # Done + self.disconnect() return ticket_number def post_device_results(self, dev, ticket_id): """Generate osTicket friendly report and post as response to ticket.""" response = self.generate_report(dev) - post_response(response, ticket_id) + self.post_response(response, ticket_id) def post_response(self, response, ticket_id): """Post a reply to a ticket in osTicket.""" self.connect() + # Convert response to string + response = '\n'.join(response) + response = response.replace('`', '') + # Build SQL cmd sql_cmd = "INSERT INTO `{Name}`.`{Response}`".format( **OSTICKET['Database'], **OSTICKET['Tables']) @@ -235,7 +314,7 @@ class osTicket(): # Run SQL cmd try: - self.cursor.execute(sql_cmd) + self.db_cursor.execute(sql_cmd) except Exception: # TODO: Fix exception handling self.errors = True @@ -271,7 +350,7 @@ class osTicket(): # Run SQL cmd try: - self.cursor.execute(sql_cmd) + self.db_cursor.execute(sql_cmd) except Exception: # TODO: Fix exception handling self.errors = True @@ -280,57 +359,6 @@ class osTicket(): self.disconnect() # Functions -def get_device_overall_results(dev): - """Get overall results from tests for device, returns dict.""" - results = { - 'Dev Type': 'Unknown', - 'Full Diag': False, - 'Asterisk': None, - 'Failed': 0, - 'N/A': 0, - 'Passed': 0, - 'Status': 'Unknown', - } - - # Get test list for device type - test_list = [] - if isinstance(dev, CpuObj): - results['Dev Type'] = 'CPU' - test_list = TESTS_CPU - elif isinstance(dev, DiskObj): - results['Dev Type'] = 'Disk' - test_list = TESTS_DISK - else: - raise GenericError('Unrecognized device type.') - - # Check if a full diag was run (i.e. all dev tests were enabled) - results['Full Diag'] = len(dev.tests) == len(test_list) - - # Tally test results - for test in dev.tests.value(): - if test.failed: - results['Failed'] += 1 - if test.passed: - results['Passed'] += 1 - if 'N/A' in test.status: - results['N/A'] += 1 - - # Set overall status - if results['Failed'] > 0: - dev.checkbox = False - results['Status'] = 'FAILED' - elif results['Passed'] + results['N/A'] == len(dev.tests): - dev.checkbox = True - results['Status'] = 'PASSED' - else: - results['Status'] = 'UNKNOWN' - if results['Full Diag'] and results['N/A'] > 0: - results['Asterisk'] = True - results['Status'] += '*' - - # Done - return results - def pad_with_dots(s, pad_right=False): """Replace space padding with dots, returns str.""" s = str(s).replace(' ', '..')