diff --git a/.bin/Scripts/functions/hw_diags.py b/.bin/Scripts/functions/hw_diags.py index 9f10f997..7e06ecf9 100644 --- a/.bin/Scripts/functions/hw_diags.py +++ b/.bin/Scripts/functions/hw_diags.py @@ -5,6 +5,7 @@ import re import time from collections import OrderedDict +from functions.osticket import * from functions.sensors import * from functions.tmux import * @@ -65,6 +66,11 @@ KEY_NVME = 'nvme_smart_health_information_log' KEY_SMART = 'ata_smart_attributes' QUICK_LABEL = '{YELLOW}(Quick){CLEAR}'.format(**COLORS) SIDE_PANE_WIDTH = 20 +STATUSES = { + 'RED': ['Denied', 'ERROR', 'NS', 'TimedOut'], + 'YELLOW': ['Aborted', 'N/A', 'OVERRIDE', 'Unknown', 'Working'], + 'GREEN': ['CS'], +} TESTS_CPU = ['Prime95'] TESTS_DISK = [ 'I/O Benchmark', @@ -78,7 +84,10 @@ TMUX_LAYOUT = OrderedDict({ 'Progress': {'x': SIDE_PANE_WIDTH, 'Check': True}, }) -# Error Classe +# Regex +REGEX_ERROR_STATUS = re.compile('|'.join(STATUSES['RED'])) + +# Error Classes class DeviceTooSmallError(Exception): pass @@ -90,6 +99,7 @@ class CpuObj(): self.tests = OrderedDict() self.get_details() self.name = self.lscpu.get('Model name', 'Unknown CPU') + self.description = self.name def get_details(self): """Get CPU details from lscpu.""" @@ -125,6 +135,7 @@ class CpuObj(): class DiskObj(): """Object for tracking disk specific data.""" def __init__(self, disk_path): + self.checkbox = None self.disk_ok = True self.labels = [] self.lsblk = {} @@ -234,7 +245,9 @@ class DiskObj(): print_standard(' (Have you tried swapping the disk cable?)') else: # Override? - show_report(self.generate_attribute_report(description=True)) + show_report( + self.generate_attribute_report(description=True), + log_report=True) print_warning(' {} error(s) detected.'.format(attr_type)) if override_disabled: print_standard('Tests disabled for this device') @@ -482,6 +495,7 @@ class State(): def __init__(self): self.cpu = None self.disks = [] + self.ost = osTicket(TESTS_CPU, TESTS_DISK) self.panes = {} self.quick_mode = False self.tests = OrderedDict({ @@ -506,6 +520,7 @@ class State(): 'Objects': [], }, }) + self.ticket_id = None def init(self): """Remove test objects, set log, and add devices.""" @@ -566,7 +581,8 @@ class TestObj(): def update_status(self, new_status=None): """Update status strings.""" - if self.disabled or re.search(r'ERROR|OVERRIDE', self.status): + if self.disabled or REGEX_ERROR_STATUS.search(self.status): + # Don't update error statuses if test is enabled return if new_status: self.status = build_status_string( @@ -603,12 +619,9 @@ def build_outer_panes(state): def build_status_string(label, status, info_label=False): """Build status string with appropriate colors.""" status_color = COLORS['CLEAR'] - if status in ['Denied', 'ERROR', 'NS', 'OVERRIDE', 'TimedOut']: - status_color = COLORS['RED'] - elif status in ['Aborted', 'N/A', 'Skipped', 'Unknown', 'Working']: - status_color = COLORS['YELLOW'] - elif status in ['CS']: - status_color = COLORS['GREEN'] + for k, v in STATUSES.items(): + if status in v: + status_color = COLORS[k] return '{l_c}{l}{CLEAR}{s_c}{s:>{s_w}}{CLEAR}'.format( l_c=COLORS['BLUE'] if info_label else '', @@ -726,17 +739,6 @@ def get_read_rate(s): real_rate = convert_to_bytes(human_rate) return real_rate -def get_status_color(s): - """Get color based on status, returns str.""" - color = COLORS['CLEAR'] - if s in ['Denied', 'ERROR', 'NS', 'OVERRIDE']: - color = COLORS['RED'] - elif s in ['Aborted', 'N/A', 'Unknown', 'Working', 'Skipped']: - color = COLORS['YELLOW'] - elif s in ['CS']: - color = COLORS['GREEN'] - return color - def menu_diags(state, args): """Main menu to select and run HW tests.""" args = [a.lower() for a in args] @@ -746,6 +748,7 @@ def menu_diags(state, args): {'Base Name': 'Full Diagnostic', 'Enabled': False}, {'Base Name': 'Disk Diagnostic', 'Enabled': False}, {'Base Name': 'Disk Diagnostic (Quick)', 'Enabled': False}, + {'Base Name': 'osTicket Integration', 'Enabled': True}, {'Base Name': 'Prime95', 'Enabled': False, 'CRLF': True}, {'Base Name': 'NVMe / SMART', 'Enabled': False}, {'Base Name': 'badblocks', 'Enabled': False}, @@ -777,11 +780,11 @@ def menu_diags(state, args): while True: # Set quick mode as necessary - if main_options[2]['Enabled'] and main_options[4]['Enabled']: + if main_options[2]['Enabled'] and main_options[5]['Enabled']: # Check if only Disk Diags (Quick) and NVMe/SMART are enabled # If so, verify no other tests are enabled and set quick_mode state.quick_mode = True - for opt in main_options[3:4] + main_options[5:]: + for opt in main_options[4:5] + main_options[6:]: state.quick_mode &= not opt['Enabled'] else: state.quick_mode = False @@ -795,13 +798,13 @@ def menu_diags(state, args): # Verify preset selections num_tests_selected = 0 - for opt in main_options[3:]: + for opt in main_options[4:]: if opt['Enabled']: num_tests_selected += 1 if num_tests_selected == 4: # Full main_options[0]['Enabled'] = True - elif num_tests_selected == 3 and not main_options[3]['Enabled']: + elif num_tests_selected == 3 and not main_options[4]['Enabled']: # Disk main_options[1]['Enabled'] = True @@ -946,6 +949,10 @@ def run_hw_tests(state): """Run enabled hardware tests.""" print_standard('Scanning devices...') state.init() + tests_enabled = False + + # Disable osTicket Integration if in quick mode + state.ost.disabled |= state.quick_mode # Build Panes update_progress_pane(state) @@ -961,6 +968,8 @@ def run_hw_tests(state): COLORS['CLEAR'], QUICK_LABEL if state.quick_mode and 'NVMe' in k else '')) if v['Enabled']: + tests_enabled = True + # Create TestObj and track under both CpuObj/DiskObj and State if k in TESTS_CPU: test_obj = TestObj( @@ -974,6 +983,15 @@ def run_hw_tests(state): v['Objects'].append(test_obj) print_standard('') + # Bail if no tests selected + if not tests_enabled: + tmux_kill_pane(*state.panes.values()) + return + + # Get ticket_number + if not state.ost.disabled: + state.ticket_id = state.ost.get_ticket_number() + # Run disk safety checks (if necessary) _disk_tests_enabled = False for k in TESTS_DISK: @@ -991,6 +1009,9 @@ def run_hw_tests(state): f = v['Function'] for test_obj in v['Objects']: f(state, test_obj) + if k == TESTS_CPU[-1]: + # Last CPU test run, post CPU results + state.ost.post_device_results(state.cpu, state.ticket_id) except GenericAbort: # Cleanup tmux_kill_pane(*state.panes.values()) @@ -1009,14 +1030,48 @@ def run_hw_tests(state): # Update side pane update_progress_pane(state) - # Done + # Show results show_results(state) + + # Post disk results + if not state.ost.disabled: + print_standard('Posting results to osTicket...') + for disk in state.disks: + state.ost.post_device_results(disk, state.ticket_id) + + # Check if disk checkbox needs updating + all_disks_passed = True + disk_failures = False + for disk in state.disks: + if disk.checkbox is None: + # Aborted/Unknown/etc + all_disks_passed = False + else: + all_disks_passed &= disk.checkbox + disk_failures |= not disk.checkbox + + # Update checkbox if necessary + if disk_failures: + state.ost.set_disk_failed(state.ticket_id) + elif all_disks_passed: + state.ost.set_disk_passed(state.ticket_id) + + # Spacer + print_standard(' ') + + # Check for osTicket errors + if state.ost.errors: + print_warning('Errors encountered posting results to osTicket.') + print_standard(' ') + + # Done if state.quick_mode: pause('Press Enter to exit...') else: pause('Press Enter to return to main menu... ') # Cleanup + state.ost.disconnect(full=True) tmux_kill_pane(*state.panes.values()) def run_io_benchmark(state, test): @@ -1060,7 +1115,6 @@ def run_io_benchmark(state, test): try: test.merged_rates = [] test.read_rates = [] - test.vertical_graph = [] test.dev.calc_io_dd_values() # Run dd read tests @@ -1087,10 +1141,6 @@ def run_io_benchmark(state, test): # Add rate to lists test.read_rates.append(cur_rate) - test.vertical_graph.append( - '{percent:0.1f} {rate}'.format( - percent=(i/test.dev.dd_chunks)*100, - rate=int(cur_rate/(1024**2)))) # Show progress if i % IO_VARS['Progress Refresh Rate'] == 0: @@ -1175,10 +1225,6 @@ def run_io_benchmark(state, test): elif not 'N/A' in test.status: test.update_status('Unknown') - # Save log - with open(test.io_benchmark_out.replace('.', '-raw.'), 'a') as f: - f.write('\n'.join(test.vertical_graph)) - # Done update_progress_pane(state) @@ -1453,8 +1499,6 @@ def run_nvme_smart_tests(state, test): # Show attributes clear_screen() - print_info('Device ({})'.format(test.dev.name)) - print_standard(' {}'.format(test.dev.description)) show_report(test.dev.generate_attribute_report()) print_standard(' ') @@ -1518,7 +1562,7 @@ def run_nvme_smart_tests(state, test): test.update_status('TimedOut') # Disable other drive tests if necessary - if not test.passed: + if test.failed: for t in ['badblocks', 'I/O Benchmark']: test.dev.disable_test(t, 'Denied') @@ -1542,11 +1586,12 @@ def secret_screensaver(screensaver=None): raise Exception('Invalid screensaver') run_program(cmd, check=False, pipe=False) -def show_report(report): - """Show report on screen and save to log w/out color.""" +def show_report(report, log_report=False): + """Show report on screen and optionally save to log w/out color.""" for line in report: print(line) - print_log(strip_colors(line)) + if log_report: + print_log(strip_colors(line)) def show_results(state): """Show results for all tests.""" @@ -1561,7 +1606,7 @@ def show_results(state): _enabled |= state.tests[k]['Enabled'] if _enabled: print_success('CPU:'.format(k)) - show_report(state.cpu.generate_cpu_report()) + show_report(state.cpu.generate_cpu_report(), log_report=True) print_standard(' ') # Disk tests @@ -1572,7 +1617,7 @@ def show_results(state): print_success('Disk{}:'.format( '' if len(state.disks) == 1 else 's')) for disk in state.disks: - show_report(disk.generate_disk_report()) + show_report(disk.generate_disk_report(), log_report=True) print_standard(' ') def update_main_options(state, selection, main_options): @@ -1586,33 +1631,34 @@ def update_main_options(state, selection, main_options): if main_options[index]['Enabled']: for opt in main_options[1:3]: opt['Enabled'] = False - for opt in main_options[3:]: + for opt in main_options[4:]: opt['Enabled'] = True else: - for opt in main_options[3:]: + for opt in main_options[4:]: opt['Enabled'] = False elif index == 1: # Disk if main_options[index]['Enabled']: main_options[0]['Enabled'] = False - for opt in main_options[2:4]: - opt['Enabled'] = False - for opt in main_options[4:]: + main_options[2]['Enabled'] = False + main_options[4]['Enabled'] = False + for opt in main_options[5:]: opt['Enabled'] = True else: - for opt in main_options[4:]: + for opt in main_options[5:]: opt['Enabled'] = False elif index == 2: # Disk (Quick) if main_options[index]['Enabled']: - for opt in main_options[:2] + main_options[3:]: + for opt in main_options[:2] + main_options[4:]: opt['Enabled'] = False - main_options[4]['Enabled'] = True + main_options[5]['Enabled'] = True else: - main_options[4]['Enabled'] = False + main_options[5]['Enabled'] = False # Update state - for opt in main_options[3:]: + state.ost.disabled = not main_options[3]['Enabled'] + for opt in main_options[4:]: state.tests[opt['Base Name']]['Enabled'] = opt['Enabled'] # Done diff --git a/.bin/Scripts/functions/io_graph.py b/.bin/Scripts/functions/io_graph.py new file mode 100644 index 00000000..fb909c9f --- /dev/null +++ b/.bin/Scripts/functions/io_graph.py @@ -0,0 +1,113 @@ +# Wizard Kit: Functions - PNG graph for I/O Benchmark + +import base64 +import Gnuplot +import json +import math +import requests + +from functions.common import * + +# Functions +def export_io_graph(disk): + """Exports PNG graph using gnuplot, returns file path as str.""" + read_rates = disk.tests['I/O Benchmark'].read_rates + max_rate = max(read_rates) / (1024**2) + max_rate = max(800, max_rate) + out_path = '{}/iobenchmark-{}.png'.format( + global_vars['LogDir'], disk.name) + plot_data = '{}/iobenchmark-{}-plot.data'.format( + global_vars['LogDir'], disk.name) + + # Adjust Y-axis range to either 1000 or roughly max rate + 200 + ## Round up to the nearest 100 and then add 200 + y_range = (math.ceil(max_rate/100)*100) + 200 + + # Save plot data to file for Gnuplot + with open(plot_data, 'w') as f: + for i in range(len(read_rates)): + _percent = ( (i+1) / len(read_rates) ) * 100 + _rate = int( read_rates[i] / (1024**2) ) + f.write('{:0.1f} {}\n'.format(_percent, _rate)) + + # Run gnuplot commands + g = Gnuplot.Gnuplot() + g('reset') + g('set output "{}"'.format(out_path)) + g('set terminal png large size 660,300 truecolor font "Noto Sans,11"') + g('set title "I/O Benchmark"') + g('set yrange [0:{}]'.format(y_range)) + g('set style data lines') + g('plot "{}" title "{}"'.format( + plot_data, + disk.description.replace('_', ' '), + )) + + # Cleanup + g.close() + del(g) + + return out_path + +def upload_to_imgur(image_path): + """Upload image to Imgur and return image url as str.""" + image_data = None + image_link = None + + # Bail early + if not image_path: + raise GenericError + + # Read image file and convert to base64 then convert to str + with open(image_path, 'rb') as f: + image_data = base64.b64encode(f.read()).decode() + + # POST image + url = "https://api.imgur.com/3/image" + boundary = '----WebKitFormBoundary7MA4YWxkTrZu0gW' + payload = ('--{boundary}\r\nContent-Disposition: form-data; ' + 'name="image"\r\n\r\n{data}\r\n--{boundary}--') + payload = payload.format(boundary=boundary, data=image_data) + headers = { + 'content-type': 'multipart/form-data; boundary={}'.format(boundary), + 'Authorization': 'Client-ID {}'.format(IMGUR_CLIENT_ID), + } + response = requests.request("POST", url, data=payload, headers=headers) + + # Return image link + if response.ok: + json_data = json.loads(response.text) + image_link = json_data['data']['link'] + return image_link + +def upload_to_nextcloud(image_path, ticket_number, dev_name): + """Upload image to Nextcloud server and return folder url as str.""" + image_data = None + + # Bail early + if not image_path: + raise GenericError + + # Read image file and convert to base64 + with open(image_path, 'rb') as f: + image_data = f.read() + + # PUT image + url = '{base_url}/{ticket_number}_iobenchmark_{dev_name}_{date}.png'.format( + base_url=BENCHMARK_SERVER['Url'], + ticket_number=ticket_number, + dev_name=dev_name, + date=global_vars.get('Date-Time', 'Unknown Date-Time')) + requests.put( + url, + data=image_data, + headers = {'X-Requested-With': 'XMLHttpRequest'}, + auth = (BENCHMARK_SERVER['User'], BENCHMARK_SERVER['Pass'])) + + # Return folder link + return BENCHMARK_SERVER['Short Url'] + +if __name__ == '__main__': + print("This file is not meant to be called directly.") + +# vim: sts=2 sw=2 ts=2 diff --git a/.bin/Scripts/functions/osticket.py b/.bin/Scripts/functions/osticket.py new file mode 100644 index 00000000..13490e92 --- /dev/null +++ b/.bin/Scripts/functions/osticket.py @@ -0,0 +1,486 @@ +# Wizard Kit: Functions - osTicket + +import mysql.connector as mariadb + +from functions.data import * +from functions.io_graph import * +from settings.osticket import * + +# STATIC VARIABLES +KNOWN_DEV_TYPES = ('CPU', 'Disk') + +# Regex +REGEX_BLOCK_GRAPH = re.compile(r'(▁|▂|▃|▄|▅|▆|▇|█)') +REGEX_NVME_SMART_ATTRIBUTES = re.compile(r'^\s*(\d+) / (\w+): (.{28})(.*)$') +REGEX_TEMPS = re.compile(r'^\s*(.*?)\s+(idle.*)$') +REGEX_SENSOR = re.compile(r'^(.*?)(\s*)$') + +# Classes +class osTicket(): + """Class to track osTicket data and functions.""" + def __init__(self, tests_cpu, tests_disk): + self.db_connection = None + self.db_cursor = None + self.disabled = False + self.errors = False + self.tests_cpu = tests_cpu + self.tests_disk = tests_disk + self.tunnel_proc = None + + def connect(self): + """Establish connection to osTicket via a SSH tunnel.""" + cmd = [ + 'ssh', '-N', + '-p', OSTICKET['SSH']['Port'], + '-L3306:127.0.0.1:{Port}'.format(**OSTICKET['Database']), + '{User}@{Host}'.format(**OSTICKET['SSH']), + ] + + # Bail if disabled + if self.disabled: + return + + # Only open tunnel if one doesn't exist + if self.tunnel_proc is None or self.tunnel_proc.poll() is not None: + self.tunnel_proc = popen_program(cmd) + + # Connect to database + for x in range(5): + sleep(2) + try: + self.db_connection = mariadb.connect( + user=OSTICKET['Database']['User'], + password=OSTICKET['Database']['Pass'], + database=OSTICKET['Database']['Name'], + ) + self.db_cursor = self.db_connection.cursor() + except mariadb.errors.InterfaceError: + # SSH issue?, try again + pass + except mariadb.errors.Error: + # Bad creds or other SQL error, bail + break + except Exception: + # Unknown error + break + else: + # Connection established + break + + # Disable if necessary + if self.db_cursor is None: + self.disabled = True + self.tunnel_proc.kill() + + def convert_report(self, name, test): + """Convert report into an osTicket friendly format, returns list.""" + out_report = [] + source_report = test.report + status = strip_colors(test.status) + status = status.replace(test.label, '').strip() + + # Header + index = 1 + if name == 'NVMe / SMART': + out_report.append('{} ({})'.format(name, status)) + if not source_report: + index = 0 + source_report = test.dev.generate_attribute_report() + elif not source_report: + index = 0 + out_report.append('{} ({})'.format(name, status)) + else: + out_report.append('{} ({})'.format(strip_colors(source_report[0]), status)) + + # Body + for line in source_report[index:]: + # Remove colors and leading spaces + line = strip_colors(line) + if line[:2] == ' ': + line = line[2:] + + # Test-specific modifications + if name == 'Prime95': + r = REGEX_TEMPS.match(line) + if r: + _sensor = '{:<20}'.format(r.group(1)) + _temps = r.group(2) + r2 = REGEX_SENSOR.match(_sensor) + _sensor = r2.group(1) + _spacer = pad_with_dots(r2.group(2)) + line = '{}{} {}'.format(_sensor, _spacer, _temps) + if line == 'Temps': + out_report.append(' ') + elif name == 'NVMe / SMART': + r = REGEX_NVME_SMART_ATTRIBUTES.match(line) + if r: + _dec = '{:>3}'.format(r.group(1)) + _hex = r.group(2) + _atr = r.group(3).strip() + _val = '{:<20}'.format(r.group(4)) + line = '{}/{}: {} {}'.format( + _hex, + pad_with_dots(_dec), + pad_with_dots(_val, pad_right=True), + _atr) + elif name == 'I/O Benchmark': + line = REGEX_BLOCK_GRAPH.sub('', line) + line = line.strip() + if not line: + continue + + # Remove extra spaces + line = line.strip() + line = re.sub(r'(\s+)', ' ', line) + + # Add line to report + out_report.append(line) + + # Done + return out_report + + def disconnect(self, full=False): + """Close osTicket connection.""" + try: + self.db_cursor.close() + self.db_connection.close() + except Exception: + # Ignore errors since vars will be reset below + pass + + # Reset errors + if full: + self.errors = False + + # Reset vars + self.db_cursor = None + self.db_connection = None + + # Close tunnel + if full: + try: + self.tunnel_proc.kill() + self.tunnel_proc.wait(timeout=2) + except (AttributeError, subprocess.TimeoutExpired): + # Ignore and continue + pass + self.tunnel_proc = None + + def generate_report(self, dev, ticket_id): + """Generate device report for osTicket, returns list.""" + report = [] + results = self.get_device_overall_results(dev) + + # Header + if results['Full Diag']: + report.append( + '{Dev Type} hardware diagnostic tests: {Status}'.format(**results)) + report.append(' ') + + # Device + report.append(dev.description) + report.append(' ') + + # Test reports + for name, test in dev.tests.items(): + report.extend(self.convert_report(name, test)) + if name == 'I/O Benchmark': + # Create PNG graph + try: + graph_file = export_io_graph(dev) + except (AttributeError, KeyError): + report.append('Failed to export graph') + else: + # Upload to Imgur + try: + url = upload_to_imgur(graph_file) + report.append('Imgur: {}'.format(url)) + except Exception: + report.append('Imgur: Failed to upload graph') + + # Upload to Nextcloud + try: + url = upload_to_nextcloud(graph_file, ticket_id, dev.name) + report.append('Nextcloud: {}'.format(url)) + except Exception: + report.append('Nextcloud: Failed to upload graph') + report.append(' ') + + # Volumes + if results['Dev Type'] == 'Disk' and results['Full Diag']: + # Mount all volumes and extend report + report.append('Volumes:') + report.extend(self.generate_volume_report(dev, results)) + report.append(' ') + + # Asterisk + if results['Asterisk']: + report.append('* NOTE: One or more tests were not run on this device') + + # Done + return report + + def generate_volume_report(self, dev, results): + """Mount all volumes for dev and generate report, returns list.""" + report = [] + mount_report = mount_volumes( + all_devices=False, + device_path='{}'.format(dev.path), + core_storage=results['Core']) + + # Format report + for v_path, v_data in sorted(mount_report.items()): + label = v_data.get('label', '') + if label: + label = '"{}"'.format(label) + else: + # Ensure string type + label = '' + size = v_data.get('size', '') + if size: + size = '{} {}B'.format(size[:-1], size[-1:]).upper() + else: + size = 'UNKNOWN' + size_used = v_data.get('size_used', 'UNKNOWN').upper() + size_avail = v_data.get('size_avail', 'UNKNOWN').upper() + v_data = [v_path, label, size, size_used, size_avail] + v_data = [v.strip().replace(' ', '_') for v in v_data] + for i in range(len(v_data)): + pad = 8 + if i < 2: + pad += 4 * (2 - i) + v_data[i] = pad_with_dots( + '{s:<{p}}'.format(s=v_data[i], p=pad), + pad_right=True) + v_data[-1] = re.sub(r'\.*$', '', v_data[-1]) + v_data = [v.replace('_', ' ') for v in v_data] + report.append( + '{}..{}..Total..{}..(Used..{}..Free..{})'.format(*v_data)) + + # Done + return report + + def get_device_overall_results(self, dev): + """Get overall results from tests for device, returns dict.""" + results = { + 'Core': False, + 'Dev Type': self.get_device_type(dev), + 'Full Diag': False, + 'Asterisk': None, + 'Failed': 0, + 'N/A': 0, + 'OVERRIDE': 0, + 'Passed': 0, + 'Status': 'Unknown', + } + + # Bail on unknown device type + if results['Dev Type'] not in KNOWN_DEV_TYPES: + raise GenericError( + 'Unrecognized device type: {}.'.format(results['Dev Type'])) + + # Get test list for device type + test_list = [] + if results['Dev Type'] == 'CPU': + test_list = self.tests_cpu + elif results['Dev Type'] == 'Disk': + test_list = self.tests_disk + + # Check if a full diag was run (i.e. all dev tests were enabled) + results['Full Diag'] = len(dev.tests) == len(test_list) + + # Tally test results + for test in dev.tests.values(): + if test.failed: + results['Failed'] += 1 + if test.passed: + results['Passed'] += 1 + if 'N/A' in test.status: + results['N/A'] += 1 + if 'OVERRIDE' in test.status: + results['OVERRIDE'] += 1 + + # Set overall status + if results['Failed'] > 0: + dev.checkbox = False + results['Status'] = 'FAILED' + elif results['Passed'] + results['N/A'] == len(test_list): + # Only mark true if all tests are enabled and all are "Passed" / "N/A" + dev.checkbox = True + results['Status'] = 'PASSED' + else: + results['Status'] = 'UNKNOWN' + if results['Full Diag'] and results['N/A'] > 0: + results['Asterisk'] = True + results['Status'] += '*' + + # Enable CoreStorage searches + results['Core'] = (results['Full Diag'] and + results['Passed']+results['N/A']+results['OVERRIDE'] == len(test_list)) + + # Done + return results + + def get_device_type(self, dev): + """Terrible hack to determine device type, returns str.""" + # TODO: Fix with proper isinstance() call + type_str = str(dev.__class__) + if 'CpuObj' in type_str: + type_str = 'CPU' + elif 'DiskObj' in type_str: + type_str = 'Disk' + + return type_str + + def get_ticket_name(self, ticket_id): + """Lookup ticket and return name as str.""" + name = None + sql_cmd = "SELECT name FROM `{Ticket}`".format(**OSTICKET['Tables']) + sql_cmd += " WHERE `ticket_id` = {}".format(ticket_id) + sql_cmd += ";" + + # Lookup name + # NOTE: If multiple entries are found it will return the last + self.db_cursor.execute(sql_cmd) + for s in self.db_cursor: + name = s[0] + + # Done + return name + + def get_ticket_number(self): + """Get ticket number and confirm with name from osTicket DB.""" + ticket_number = None + self.connect() + + # Bail if disabled + if self.disabled: + return None + + # Main loop + while ticket_number is None: + print_standard(' ') + _input = input('Enter ticket number (or leave blank to disable): ') + _input = _input.strip() + + # No ticket ID entered + if re.match(r'^\s*$', _input): + if ask('Disable osTicket integration for this run?'): + break + + # Invalid ID entered + if not re.match(r'^(\d+)$', _input): + continue + + # Valid ID entered, lookup name and verify + try: + _name = self.get_ticket_name(_input) + except Exception: + # Ignore and return None below + break + if _name: + print_standard('You have selected ticket #{} {}'.format( + _input, _name)) + if ask('Is this correct?'): + ticket_number = _input + + # Done + self.disconnect() + return ticket_number + + def post_device_results(self, dev, ticket_id): + """Generate osTicket friendly report and post as response to ticket.""" + if not dev.tests: + # No test results available, aborting post + return + response = self.generate_report(dev, ticket_id) + self.post_response(response, ticket_id) + + def post_response(self, response, ticket_id): + """Post a reply to a ticket in osTicket.""" + self.connect() + + # Bail if disabled + if self.disabled: + return + + # Convert response to string + response = '\n'.join(response) + response = response.replace('`', '') + + # Build SQL cmd + sql_cmd = "INSERT INTO `{Name}`.`{Response}`".format( + **OSTICKET['Database'], **OSTICKET['Tables']) + sql_cmd += " (ticket_id, staff_id, staff_name, response, created)" + sql_cmd += " VALUES (" + sql_cmd += " '{}',".format(ticket_id) + sql_cmd += " '{ID}', '{Name}',".format(**OSTICKET['Staff']) + sql_cmd += " '{}',".format(response) + sql_cmd += " '{}'".format(time.strftime("%Y-%m-%d %H:%M:%S")) + sql_cmd += " );" + + # Run SQL cmd + try: + self.db_cursor.execute(sql_cmd) + except mariadb.errors.Error: + # Set self.errors to enable warning line on results screen + self.errors = True + + # Done + self.disconnect() + + def set_disk_failed(self, ticket_id): + """Mark disk as failed in osTicket.""" + self.set_flag( + ticket_id, + OSTICKET['Disk Flag']['Name'], + OSTICKET['Disk Flag']['Fail']) + + def set_disk_passed(self, ticket_id): + """Mark disk as passed in osTicket.""" + self.set_flag( + ticket_id, + OSTICKET['Disk Flag']['Name'], + OSTICKET['Disk Flag']['Pass']) + + def set_flag(self, ticket_id, flag_name, flag_value): + """Set flag in osTicket.""" + self.connect() + + # Bail if disabled + if self.disabled: + return + + # Build SQL cmd + sql_cmd = "UPDATE `{Name}`.`{Ticket}`".format( + **OSTICKET['Database'], **OSTICKET['Tables']) + sql_cmd += " SET `{}` = '{}'".format(flag_name, flag_value) + sql_cmd += " WHERE `{Ticket}`.`ticket_id` = {ticket_id}".format( + ticket_id=ticket_id, **OSTICKET['Tables']) + sql_cmd += ";" + + # Run SQL cmd + try: + self.db_cursor.execute(sql_cmd) + except mariadb.errors.Error: + # Set self.errors to enable warning line on results screen + self.errors = True + + # Done + self.disconnect() + +# Functions +def pad_with_dots(s, pad_right=False): + """Replace space padding with dots, returns str.""" + s = str(s).replace(' ', '..') + if '.' in s: + if pad_right: + s = s + '.' + else: + s = '.' + s + return s + +if __name__ == '__main__': + print("This file is not meant to be called directly.") + +# vim: sts=2 sw=2 ts=2 diff --git a/.bin/Scripts/settings/main.py b/.bin/Scripts/settings/main.py index e6aff08f..5e9002f5 100644 --- a/.bin/Scripts/settings/main.py +++ b/.bin/Scripts/settings/main.py @@ -14,13 +14,6 @@ ARCHIVE_PASSWORD='Sorted1201' KIT_NAME_FULL='1201-WizardKit' KIT_NAME_SHORT='1201' SUPPORT_MESSAGE='Please let support know by opening an issue on Gogs' -# osTicket -DB_HOST='osticket.1201.com' -DB_NAME='osticket' -DB_USER='wizardkit' -DB_PASS='U9bJnF9eamVkfsVw' -SSH_PORT='22' -SSH_USER='sql_tunnel' # imgur IMGUR_CLIENT_ID='3d1ee1d38707b85' # Live Linux diff --git a/.bin/Scripts/settings/osticket.py b/.bin/Scripts/settings/osticket.py new file mode 100644 index 00000000..fe998eb2 --- /dev/null +++ b/.bin/Scripts/settings/osticket.py @@ -0,0 +1,33 @@ +# Wizard Kit: Settings - osTicket + +OSTICKET = { + 'Database': { + 'Name': 'osticket', + 'User': 'wizardkit', + 'Pass': 'U9bJnF9eamVkfsVw', + 'Port': '3306', + }, + 'Disk Flag': { + 'Name': 'zHDTune', + 'Pass': 1, + 'Fail': 2, + }, + 'SSH': { + 'Host': 'osticket.1201.com', + 'Port': '22', + 'User': 'sql_tunnel', + }, + 'Staff': { + 'ID': '23', + 'Name': 'Wizard Kit', + }, + 'Tables': { + 'Response': 'ost_ticket_response', + 'Ticket': 'ost_ticket', + }, + } + +if __name__ == '__main__': + print("This file is not meant to be called directly.") + +# vim: sts=2 sw=2 ts=2