Merge remote-tracking branch 'origin/hw-diags-rewrite' into dev

This commit is contained in:
2Shirt 2018-12-26 21:54:07 -07:00
commit c33d1b9706
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C
5 changed files with 730 additions and 59 deletions

View file

@ -5,6 +5,7 @@ import re
import time
from collections import OrderedDict
from functions.osticket import *
from functions.sensors import *
from functions.tmux import *
@ -65,6 +66,11 @@ KEY_NVME = 'nvme_smart_health_information_log'
KEY_SMART = 'ata_smart_attributes'
QUICK_LABEL = '{YELLOW}(Quick){CLEAR}'.format(**COLORS)
SIDE_PANE_WIDTH = 20
STATUSES = {
'RED': ['Denied', 'ERROR', 'NS', 'TimedOut'],
'YELLOW': ['Aborted', 'N/A', 'OVERRIDE', 'Unknown', 'Working'],
'GREEN': ['CS'],
}
TESTS_CPU = ['Prime95']
TESTS_DISK = [
'I/O Benchmark',
@ -78,7 +84,10 @@ TMUX_LAYOUT = OrderedDict({
'Progress': {'x': SIDE_PANE_WIDTH, 'Check': True},
})
# Error Classe
# Regex
REGEX_ERROR_STATUS = re.compile('|'.join(STATUSES['RED']))
# Error Classes
class DeviceTooSmallError(Exception):
pass
@ -90,6 +99,7 @@ class CpuObj():
self.tests = OrderedDict()
self.get_details()
self.name = self.lscpu.get('Model name', 'Unknown CPU')
self.description = self.name
def get_details(self):
"""Get CPU details from lscpu."""
@ -125,6 +135,7 @@ class CpuObj():
class DiskObj():
"""Object for tracking disk specific data."""
def __init__(self, disk_path):
self.checkbox = None
self.disk_ok = True
self.labels = []
self.lsblk = {}
@ -234,7 +245,9 @@ class DiskObj():
print_standard(' (Have you tried swapping the disk cable?)')
else:
# Override?
show_report(self.generate_attribute_report(description=True))
show_report(
self.generate_attribute_report(description=True),
log_report=True)
print_warning(' {} error(s) detected.'.format(attr_type))
if override_disabled:
print_standard('Tests disabled for this device')
@ -482,6 +495,7 @@ class State():
def __init__(self):
self.cpu = None
self.disks = []
self.ost = osTicket(TESTS_CPU, TESTS_DISK)
self.panes = {}
self.quick_mode = False
self.tests = OrderedDict({
@ -506,6 +520,7 @@ class State():
'Objects': [],
},
})
self.ticket_id = None
def init(self):
"""Remove test objects, set log, and add devices."""
@ -566,7 +581,8 @@ class TestObj():
def update_status(self, new_status=None):
"""Update status strings."""
if self.disabled or re.search(r'ERROR|OVERRIDE', self.status):
if self.disabled or REGEX_ERROR_STATUS.search(self.status):
# Don't update error statuses if test is enabled
return
if new_status:
self.status = build_status_string(
@ -603,12 +619,9 @@ def build_outer_panes(state):
def build_status_string(label, status, info_label=False):
"""Build status string with appropriate colors."""
status_color = COLORS['CLEAR']
if status in ['Denied', 'ERROR', 'NS', 'OVERRIDE', 'TimedOut']:
status_color = COLORS['RED']
elif status in ['Aborted', 'N/A', 'Skipped', 'Unknown', 'Working']:
status_color = COLORS['YELLOW']
elif status in ['CS']:
status_color = COLORS['GREEN']
for k, v in STATUSES.items():
if status in v:
status_color = COLORS[k]
return '{l_c}{l}{CLEAR}{s_c}{s:>{s_w}}{CLEAR}'.format(
l_c=COLORS['BLUE'] if info_label else '',
@ -726,17 +739,6 @@ def get_read_rate(s):
real_rate = convert_to_bytes(human_rate)
return real_rate
def get_status_color(s):
"""Get color based on status, returns str."""
color = COLORS['CLEAR']
if s in ['Denied', 'ERROR', 'NS', 'OVERRIDE']:
color = COLORS['RED']
elif s in ['Aborted', 'N/A', 'Unknown', 'Working', 'Skipped']:
color = COLORS['YELLOW']
elif s in ['CS']:
color = COLORS['GREEN']
return color
def menu_diags(state, args):
"""Main menu to select and run HW tests."""
args = [a.lower() for a in args]
@ -746,6 +748,7 @@ def menu_diags(state, args):
{'Base Name': 'Full Diagnostic', 'Enabled': False},
{'Base Name': 'Disk Diagnostic', 'Enabled': False},
{'Base Name': 'Disk Diagnostic (Quick)', 'Enabled': False},
{'Base Name': 'osTicket Integration', 'Enabled': True},
{'Base Name': 'Prime95', 'Enabled': False, 'CRLF': True},
{'Base Name': 'NVMe / SMART', 'Enabled': False},
{'Base Name': 'badblocks', 'Enabled': False},
@ -777,11 +780,11 @@ def menu_diags(state, args):
while True:
# Set quick mode as necessary
if main_options[2]['Enabled'] and main_options[4]['Enabled']:
if main_options[2]['Enabled'] and main_options[5]['Enabled']:
# Check if only Disk Diags (Quick) and NVMe/SMART are enabled
# If so, verify no other tests are enabled and set quick_mode
state.quick_mode = True
for opt in main_options[3:4] + main_options[5:]:
for opt in main_options[4:5] + main_options[6:]:
state.quick_mode &= not opt['Enabled']
else:
state.quick_mode = False
@ -795,13 +798,13 @@ def menu_diags(state, args):
# Verify preset selections
num_tests_selected = 0
for opt in main_options[3:]:
for opt in main_options[4:]:
if opt['Enabled']:
num_tests_selected += 1
if num_tests_selected == 4:
# Full
main_options[0]['Enabled'] = True
elif num_tests_selected == 3 and not main_options[3]['Enabled']:
elif num_tests_selected == 3 and not main_options[4]['Enabled']:
# Disk
main_options[1]['Enabled'] = True
@ -946,6 +949,10 @@ def run_hw_tests(state):
"""Run enabled hardware tests."""
print_standard('Scanning devices...')
state.init()
tests_enabled = False
# Disable osTicket Integration if in quick mode
state.ost.disabled |= state.quick_mode
# Build Panes
update_progress_pane(state)
@ -961,6 +968,8 @@ def run_hw_tests(state):
COLORS['CLEAR'],
QUICK_LABEL if state.quick_mode and 'NVMe' in k else ''))
if v['Enabled']:
tests_enabled = True
# Create TestObj and track under both CpuObj/DiskObj and State
if k in TESTS_CPU:
test_obj = TestObj(
@ -974,6 +983,15 @@ def run_hw_tests(state):
v['Objects'].append(test_obj)
print_standard('')
# Bail if no tests selected
if not tests_enabled:
tmux_kill_pane(*state.panes.values())
return
# Get ticket_number
if not state.ost.disabled:
state.ticket_id = state.ost.get_ticket_number()
# Run disk safety checks (if necessary)
_disk_tests_enabled = False
for k in TESTS_DISK:
@ -991,6 +1009,9 @@ def run_hw_tests(state):
f = v['Function']
for test_obj in v['Objects']:
f(state, test_obj)
if k == TESTS_CPU[-1]:
# Last CPU test run, post CPU results
state.ost.post_device_results(state.cpu, state.ticket_id)
except GenericAbort:
# Cleanup
tmux_kill_pane(*state.panes.values())
@ -1009,14 +1030,48 @@ def run_hw_tests(state):
# Update side pane
update_progress_pane(state)
# Done
# Show results
show_results(state)
# Post disk results
if not state.ost.disabled:
print_standard('Posting results to osTicket...')
for disk in state.disks:
state.ost.post_device_results(disk, state.ticket_id)
# Check if disk checkbox needs updating
all_disks_passed = True
disk_failures = False
for disk in state.disks:
if disk.checkbox is None:
# Aborted/Unknown/etc
all_disks_passed = False
else:
all_disks_passed &= disk.checkbox
disk_failures |= not disk.checkbox
# Update checkbox if necessary
if disk_failures:
state.ost.set_disk_failed(state.ticket_id)
elif all_disks_passed:
state.ost.set_disk_passed(state.ticket_id)
# Spacer
print_standard(' ')
# Check for osTicket errors
if state.ost.errors:
print_warning('Errors encountered posting results to osTicket.')
print_standard(' ')
# Done
if state.quick_mode:
pause('Press Enter to exit...')
else:
pause('Press Enter to return to main menu... ')
# Cleanup
state.ost.disconnect(full=True)
tmux_kill_pane(*state.panes.values())
def run_io_benchmark(state, test):
@ -1060,7 +1115,6 @@ def run_io_benchmark(state, test):
try:
test.merged_rates = []
test.read_rates = []
test.vertical_graph = []
test.dev.calc_io_dd_values()
# Run dd read tests
@ -1087,10 +1141,6 @@ def run_io_benchmark(state, test):
# Add rate to lists
test.read_rates.append(cur_rate)
test.vertical_graph.append(
'{percent:0.1f} {rate}'.format(
percent=(i/test.dev.dd_chunks)*100,
rate=int(cur_rate/(1024**2))))
# Show progress
if i % IO_VARS['Progress Refresh Rate'] == 0:
@ -1175,10 +1225,6 @@ def run_io_benchmark(state, test):
elif not 'N/A' in test.status:
test.update_status('Unknown')
# Save log
with open(test.io_benchmark_out.replace('.', '-raw.'), 'a') as f:
f.write('\n'.join(test.vertical_graph))
# Done
update_progress_pane(state)
@ -1453,8 +1499,6 @@ def run_nvme_smart_tests(state, test):
# Show attributes
clear_screen()
print_info('Device ({})'.format(test.dev.name))
print_standard(' {}'.format(test.dev.description))
show_report(test.dev.generate_attribute_report())
print_standard(' ')
@ -1518,7 +1562,7 @@ def run_nvme_smart_tests(state, test):
test.update_status('TimedOut')
# Disable other drive tests if necessary
if not test.passed:
if test.failed:
for t in ['badblocks', 'I/O Benchmark']:
test.dev.disable_test(t, 'Denied')
@ -1542,11 +1586,12 @@ def secret_screensaver(screensaver=None):
raise Exception('Invalid screensaver')
run_program(cmd, check=False, pipe=False)
def show_report(report):
"""Show report on screen and save to log w/out color."""
def show_report(report, log_report=False):
"""Show report on screen and optionally save to log w/out color."""
for line in report:
print(line)
print_log(strip_colors(line))
if log_report:
print_log(strip_colors(line))
def show_results(state):
"""Show results for all tests."""
@ -1561,7 +1606,7 @@ def show_results(state):
_enabled |= state.tests[k]['Enabled']
if _enabled:
print_success('CPU:'.format(k))
show_report(state.cpu.generate_cpu_report())
show_report(state.cpu.generate_cpu_report(), log_report=True)
print_standard(' ')
# Disk tests
@ -1572,7 +1617,7 @@ def show_results(state):
print_success('Disk{}:'.format(
'' if len(state.disks) == 1 else 's'))
for disk in state.disks:
show_report(disk.generate_disk_report())
show_report(disk.generate_disk_report(), log_report=True)
print_standard(' ')
def update_main_options(state, selection, main_options):
@ -1586,33 +1631,34 @@ def update_main_options(state, selection, main_options):
if main_options[index]['Enabled']:
for opt in main_options[1:3]:
opt['Enabled'] = False
for opt in main_options[3:]:
for opt in main_options[4:]:
opt['Enabled'] = True
else:
for opt in main_options[3:]:
for opt in main_options[4:]:
opt['Enabled'] = False
elif index == 1:
# Disk
if main_options[index]['Enabled']:
main_options[0]['Enabled'] = False
for opt in main_options[2:4]:
opt['Enabled'] = False
for opt in main_options[4:]:
main_options[2]['Enabled'] = False
main_options[4]['Enabled'] = False
for opt in main_options[5:]:
opt['Enabled'] = True
else:
for opt in main_options[4:]:
for opt in main_options[5:]:
opt['Enabled'] = False
elif index == 2:
# Disk (Quick)
if main_options[index]['Enabled']:
for opt in main_options[:2] + main_options[3:]:
for opt in main_options[:2] + main_options[4:]:
opt['Enabled'] = False
main_options[4]['Enabled'] = True
main_options[5]['Enabled'] = True
else:
main_options[4]['Enabled'] = False
main_options[5]['Enabled'] = False
# Update state
for opt in main_options[3:]:
state.ost.disabled = not main_options[3]['Enabled']
for opt in main_options[4:]:
state.tests[opt['Base Name']]['Enabled'] = opt['Enabled']
# Done

View file

@ -0,0 +1,113 @@
# Wizard Kit: Functions - PNG graph for I/O Benchmark
import base64
import Gnuplot
import json
import math
import requests
from functions.common import *
# Functions
def export_io_graph(disk):
"""Exports PNG graph using gnuplot, returns file path as str."""
read_rates = disk.tests['I/O Benchmark'].read_rates
max_rate = max(read_rates) / (1024**2)
max_rate = max(800, max_rate)
out_path = '{}/iobenchmark-{}.png'.format(
global_vars['LogDir'], disk.name)
plot_data = '{}/iobenchmark-{}-plot.data'.format(
global_vars['LogDir'], disk.name)
# Adjust Y-axis range to either 1000 or roughly max rate + 200
## Round up to the nearest 100 and then add 200
y_range = (math.ceil(max_rate/100)*100) + 200
# Save plot data to file for Gnuplot
with open(plot_data, 'w') as f:
for i in range(len(read_rates)):
_percent = ( (i+1) / len(read_rates) ) * 100
_rate = int( read_rates[i] / (1024**2) )
f.write('{:0.1f} {}\n'.format(_percent, _rate))
# Run gnuplot commands
g = Gnuplot.Gnuplot()
g('reset')
g('set output "{}"'.format(out_path))
g('set terminal png large size 660,300 truecolor font "Noto Sans,11"')
g('set title "I/O Benchmark"')
g('set yrange [0:{}]'.format(y_range))
g('set style data lines')
g('plot "{}" title "{}"'.format(
plot_data,
disk.description.replace('_', ' '),
))
# Cleanup
g.close()
del(g)
return out_path
def upload_to_imgur(image_path):
"""Upload image to Imgur and return image url as str."""
image_data = None
image_link = None
# Bail early
if not image_path:
raise GenericError
# Read image file and convert to base64 then convert to str
with open(image_path, 'rb') as f:
image_data = base64.b64encode(f.read()).decode()
# POST image
url = "https://api.imgur.com/3/image"
boundary = '----WebKitFormBoundary7MA4YWxkTrZu0gW'
payload = ('--{boundary}\r\nContent-Disposition: form-data; '
'name="image"\r\n\r\n{data}\r\n--{boundary}--')
payload = payload.format(boundary=boundary, data=image_data)
headers = {
'content-type': 'multipart/form-data; boundary={}'.format(boundary),
'Authorization': 'Client-ID {}'.format(IMGUR_CLIENT_ID),
}
response = requests.request("POST", url, data=payload, headers=headers)
# Return image link
if response.ok:
json_data = json.loads(response.text)
image_link = json_data['data']['link']
return image_link
def upload_to_nextcloud(image_path, ticket_number, dev_name):
"""Upload image to Nextcloud server and return folder url as str."""
image_data = None
# Bail early
if not image_path:
raise GenericError
# Read image file and convert to base64
with open(image_path, 'rb') as f:
image_data = f.read()
# PUT image
url = '{base_url}/{ticket_number}_iobenchmark_{dev_name}_{date}.png'.format(
base_url=BENCHMARK_SERVER['Url'],
ticket_number=ticket_number,
dev_name=dev_name,
date=global_vars.get('Date-Time', 'Unknown Date-Time'))
requests.put(
url,
data=image_data,
headers = {'X-Requested-With': 'XMLHttpRequest'},
auth = (BENCHMARK_SERVER['User'], BENCHMARK_SERVER['Pass']))
# Return folder link
return BENCHMARK_SERVER['Short Url']
if __name__ == '__main__':
print("This file is not meant to be called directly.")
# vim: sts=2 sw=2 ts=2

View file

@ -0,0 +1,486 @@
# Wizard Kit: Functions - osTicket
import mysql.connector as mariadb
from functions.data import *
from functions.io_graph import *
from settings.osticket import *
# STATIC VARIABLES
KNOWN_DEV_TYPES = ('CPU', 'Disk')
# Regex
REGEX_BLOCK_GRAPH = re.compile(r'(▁|▂|▃|▄|▅|▆|▇|█)')
REGEX_NVME_SMART_ATTRIBUTES = re.compile(r'^\s*(\d+) / (\w+): (.{28})(.*)$')
REGEX_TEMPS = re.compile(r'^\s*(.*?)\s+(idle.*)$')
REGEX_SENSOR = re.compile(r'^(.*?)(\s*)$')
# Classes
class osTicket():
"""Class to track osTicket data and functions."""
def __init__(self, tests_cpu, tests_disk):
self.db_connection = None
self.db_cursor = None
self.disabled = False
self.errors = False
self.tests_cpu = tests_cpu
self.tests_disk = tests_disk
self.tunnel_proc = None
def connect(self):
"""Establish connection to osTicket via a SSH tunnel."""
cmd = [
'ssh', '-N',
'-p', OSTICKET['SSH']['Port'],
'-L3306:127.0.0.1:{Port}'.format(**OSTICKET['Database']),
'{User}@{Host}'.format(**OSTICKET['SSH']),
]
# Bail if disabled
if self.disabled:
return
# Only open tunnel if one doesn't exist
if self.tunnel_proc is None or self.tunnel_proc.poll() is not None:
self.tunnel_proc = popen_program(cmd)
# Connect to database
for x in range(5):
sleep(2)
try:
self.db_connection = mariadb.connect(
user=OSTICKET['Database']['User'],
password=OSTICKET['Database']['Pass'],
database=OSTICKET['Database']['Name'],
)
self.db_cursor = self.db_connection.cursor()
except mariadb.errors.InterfaceError:
# SSH issue?, try again
pass
except mariadb.errors.Error:
# Bad creds or other SQL error, bail
break
except Exception:
# Unknown error
break
else:
# Connection established
break
# Disable if necessary
if self.db_cursor is None:
self.disabled = True
self.tunnel_proc.kill()
def convert_report(self, name, test):
"""Convert report into an osTicket friendly format, returns list."""
out_report = []
source_report = test.report
status = strip_colors(test.status)
status = status.replace(test.label, '').strip()
# Header
index = 1
if name == 'NVMe / SMART':
out_report.append('{} ({})'.format(name, status))
if not source_report:
index = 0
source_report = test.dev.generate_attribute_report()
elif not source_report:
index = 0
out_report.append('{} ({})'.format(name, status))
else:
out_report.append('{} ({})'.format(strip_colors(source_report[0]), status))
# Body
for line in source_report[index:]:
# Remove colors and leading spaces
line = strip_colors(line)
if line[:2] == ' ':
line = line[2:]
# Test-specific modifications
if name == 'Prime95':
r = REGEX_TEMPS.match(line)
if r:
_sensor = '{:<20}'.format(r.group(1))
_temps = r.group(2)
r2 = REGEX_SENSOR.match(_sensor)
_sensor = r2.group(1)
_spacer = pad_with_dots(r2.group(2))
line = '{}{} {}'.format(_sensor, _spacer, _temps)
if line == 'Temps':
out_report.append(' ')
elif name == 'NVMe / SMART':
r = REGEX_NVME_SMART_ATTRIBUTES.match(line)
if r:
_dec = '{:>3}'.format(r.group(1))
_hex = r.group(2)
_atr = r.group(3).strip()
_val = '{:<20}'.format(r.group(4))
line = '{}/{}: {} {}'.format(
_hex,
pad_with_dots(_dec),
pad_with_dots(_val, pad_right=True),
_atr)
elif name == 'I/O Benchmark':
line = REGEX_BLOCK_GRAPH.sub('', line)
line = line.strip()
if not line:
continue
# Remove extra spaces
line = line.strip()
line = re.sub(r'(\s+)', ' ', line)
# Add line to report
out_report.append(line)
# Done
return out_report
def disconnect(self, full=False):
"""Close osTicket connection."""
try:
self.db_cursor.close()
self.db_connection.close()
except Exception:
# Ignore errors since vars will be reset below
pass
# Reset errors
if full:
self.errors = False
# Reset vars
self.db_cursor = None
self.db_connection = None
# Close tunnel
if full:
try:
self.tunnel_proc.kill()
self.tunnel_proc.wait(timeout=2)
except (AttributeError, subprocess.TimeoutExpired):
# Ignore and continue
pass
self.tunnel_proc = None
def generate_report(self, dev, ticket_id):
"""Generate device report for osTicket, returns list."""
report = []
results = self.get_device_overall_results(dev)
# Header
if results['Full Diag']:
report.append(
'{Dev Type} hardware diagnostic tests: {Status}'.format(**results))
report.append(' ')
# Device
report.append(dev.description)
report.append(' ')
# Test reports
for name, test in dev.tests.items():
report.extend(self.convert_report(name, test))
if name == 'I/O Benchmark':
# Create PNG graph
try:
graph_file = export_io_graph(dev)
except (AttributeError, KeyError):
report.append('Failed to export graph')
else:
# Upload to Imgur
try:
url = upload_to_imgur(graph_file)
report.append('Imgur: {}'.format(url))
except Exception:
report.append('Imgur: Failed to upload graph')
# Upload to Nextcloud
try:
url = upload_to_nextcloud(graph_file, ticket_id, dev.name)
report.append('Nextcloud: {}'.format(url))
except Exception:
report.append('Nextcloud: Failed to upload graph')
report.append(' ')
# Volumes
if results['Dev Type'] == 'Disk' and results['Full Diag']:
# Mount all volumes and extend report
report.append('Volumes:')
report.extend(self.generate_volume_report(dev, results))
report.append(' ')
# Asterisk
if results['Asterisk']:
report.append('* NOTE: One or more tests were not run on this device')
# Done
return report
def generate_volume_report(self, dev, results):
"""Mount all volumes for dev and generate report, returns list."""
report = []
mount_report = mount_volumes(
all_devices=False,
device_path='{}'.format(dev.path),
core_storage=results['Core'])
# Format report
for v_path, v_data in sorted(mount_report.items()):
label = v_data.get('label', '')
if label:
label = '"{}"'.format(label)
else:
# Ensure string type
label = ''
size = v_data.get('size', '')
if size:
size = '{} {}B'.format(size[:-1], size[-1:]).upper()
else:
size = 'UNKNOWN'
size_used = v_data.get('size_used', 'UNKNOWN').upper()
size_avail = v_data.get('size_avail', 'UNKNOWN').upper()
v_data = [v_path, label, size, size_used, size_avail]
v_data = [v.strip().replace(' ', '_') for v in v_data]
for i in range(len(v_data)):
pad = 8
if i < 2:
pad += 4 * (2 - i)
v_data[i] = pad_with_dots(
'{s:<{p}}'.format(s=v_data[i], p=pad),
pad_right=True)
v_data[-1] = re.sub(r'\.*$', '', v_data[-1])
v_data = [v.replace('_', ' ') for v in v_data]
report.append(
'{}..{}..Total..{}..(Used..{}..Free..{})'.format(*v_data))
# Done
return report
def get_device_overall_results(self, dev):
"""Get overall results from tests for device, returns dict."""
results = {
'Core': False,
'Dev Type': self.get_device_type(dev),
'Full Diag': False,
'Asterisk': None,
'Failed': 0,
'N/A': 0,
'OVERRIDE': 0,
'Passed': 0,
'Status': 'Unknown',
}
# Bail on unknown device type
if results['Dev Type'] not in KNOWN_DEV_TYPES:
raise GenericError(
'Unrecognized device type: {}.'.format(results['Dev Type']))
# Get test list for device type
test_list = []
if results['Dev Type'] == 'CPU':
test_list = self.tests_cpu
elif results['Dev Type'] == 'Disk':
test_list = self.tests_disk
# Check if a full diag was run (i.e. all dev tests were enabled)
results['Full Diag'] = len(dev.tests) == len(test_list)
# Tally test results
for test in dev.tests.values():
if test.failed:
results['Failed'] += 1
if test.passed:
results['Passed'] += 1
if 'N/A' in test.status:
results['N/A'] += 1
if 'OVERRIDE' in test.status:
results['OVERRIDE'] += 1
# Set overall status
if results['Failed'] > 0:
dev.checkbox = False
results['Status'] = 'FAILED'
elif results['Passed'] + results['N/A'] == len(test_list):
# Only mark true if all tests are enabled and all are "Passed" / "N/A"
dev.checkbox = True
results['Status'] = 'PASSED'
else:
results['Status'] = 'UNKNOWN'
if results['Full Diag'] and results['N/A'] > 0:
results['Asterisk'] = True
results['Status'] += '*'
# Enable CoreStorage searches
results['Core'] = (results['Full Diag'] and
results['Passed']+results['N/A']+results['OVERRIDE'] == len(test_list))
# Done
return results
def get_device_type(self, dev):
"""Terrible hack to determine device type, returns str."""
# TODO: Fix with proper isinstance() call
type_str = str(dev.__class__)
if 'CpuObj' in type_str:
type_str = 'CPU'
elif 'DiskObj' in type_str:
type_str = 'Disk'
return type_str
def get_ticket_name(self, ticket_id):
"""Lookup ticket and return name as str."""
name = None
sql_cmd = "SELECT name FROM `{Ticket}`".format(**OSTICKET['Tables'])
sql_cmd += " WHERE `ticket_id` = {}".format(ticket_id)
sql_cmd += ";"
# Lookup name
# NOTE: If multiple entries are found it will return the last
self.db_cursor.execute(sql_cmd)
for s in self.db_cursor:
name = s[0]
# Done
return name
def get_ticket_number(self):
"""Get ticket number and confirm with name from osTicket DB."""
ticket_number = None
self.connect()
# Bail if disabled
if self.disabled:
return None
# Main loop
while ticket_number is None:
print_standard(' ')
_input = input('Enter ticket number (or leave blank to disable): ')
_input = _input.strip()
# No ticket ID entered
if re.match(r'^\s*$', _input):
if ask('Disable osTicket integration for this run?'):
break
# Invalid ID entered
if not re.match(r'^(\d+)$', _input):
continue
# Valid ID entered, lookup name and verify
try:
_name = self.get_ticket_name(_input)
except Exception:
# Ignore and return None below
break
if _name:
print_standard('You have selected ticket #{} {}'.format(
_input, _name))
if ask('Is this correct?'):
ticket_number = _input
# Done
self.disconnect()
return ticket_number
def post_device_results(self, dev, ticket_id):
"""Generate osTicket friendly report and post as response to ticket."""
if not dev.tests:
# No test results available, aborting post
return
response = self.generate_report(dev, ticket_id)
self.post_response(response, ticket_id)
def post_response(self, response, ticket_id):
"""Post a reply to a ticket in osTicket."""
self.connect()
# Bail if disabled
if self.disabled:
return
# Convert response to string
response = '\n'.join(response)
response = response.replace('`', '')
# Build SQL cmd
sql_cmd = "INSERT INTO `{Name}`.`{Response}`".format(
**OSTICKET['Database'], **OSTICKET['Tables'])
sql_cmd += " (ticket_id, staff_id, staff_name, response, created)"
sql_cmd += " VALUES ("
sql_cmd += " '{}',".format(ticket_id)
sql_cmd += " '{ID}', '{Name}',".format(**OSTICKET['Staff'])
sql_cmd += " '{}',".format(response)
sql_cmd += " '{}'".format(time.strftime("%Y-%m-%d %H:%M:%S"))
sql_cmd += " );"
# Run SQL cmd
try:
self.db_cursor.execute(sql_cmd)
except mariadb.errors.Error:
# Set self.errors to enable warning line on results screen
self.errors = True
# Done
self.disconnect()
def set_disk_failed(self, ticket_id):
"""Mark disk as failed in osTicket."""
self.set_flag(
ticket_id,
OSTICKET['Disk Flag']['Name'],
OSTICKET['Disk Flag']['Fail'])
def set_disk_passed(self, ticket_id):
"""Mark disk as passed in osTicket."""
self.set_flag(
ticket_id,
OSTICKET['Disk Flag']['Name'],
OSTICKET['Disk Flag']['Pass'])
def set_flag(self, ticket_id, flag_name, flag_value):
"""Set flag in osTicket."""
self.connect()
# Bail if disabled
if self.disabled:
return
# Build SQL cmd
sql_cmd = "UPDATE `{Name}`.`{Ticket}`".format(
**OSTICKET['Database'], **OSTICKET['Tables'])
sql_cmd += " SET `{}` = '{}'".format(flag_name, flag_value)
sql_cmd += " WHERE `{Ticket}`.`ticket_id` = {ticket_id}".format(
ticket_id=ticket_id, **OSTICKET['Tables'])
sql_cmd += ";"
# Run SQL cmd
try:
self.db_cursor.execute(sql_cmd)
except mariadb.errors.Error:
# Set self.errors to enable warning line on results screen
self.errors = True
# Done
self.disconnect()
# Functions
def pad_with_dots(s, pad_right=False):
"""Replace space padding with dots, returns str."""
s = str(s).replace(' ', '..')
if '.' in s:
if pad_right:
s = s + '.'
else:
s = '.' + s
return s
if __name__ == '__main__':
print("This file is not meant to be called directly.")
# vim: sts=2 sw=2 ts=2

View file

@ -14,13 +14,6 @@ ARCHIVE_PASSWORD='Sorted1201'
KIT_NAME_FULL='1201-WizardKit'
KIT_NAME_SHORT='1201'
SUPPORT_MESSAGE='Please let support know by opening an issue on Gogs'
# osTicket
DB_HOST='osticket.1201.com'
DB_NAME='osticket'
DB_USER='wizardkit'
DB_PASS='U9bJnF9eamVkfsVw'
SSH_PORT='22'
SSH_USER='sql_tunnel'
# imgur
IMGUR_CLIENT_ID='3d1ee1d38707b85'
# Live Linux

View file

@ -0,0 +1,33 @@
# Wizard Kit: Settings - osTicket
OSTICKET = {
'Database': {
'Name': 'osticket',
'User': 'wizardkit',
'Pass': 'U9bJnF9eamVkfsVw',
'Port': '3306',
},
'Disk Flag': {
'Name': 'zHDTune',
'Pass': 1,
'Fail': 2,
},
'SSH': {
'Host': 'osticket.1201.com',
'Port': '22',
'User': 'sql_tunnel',
},
'Staff': {
'ID': '23',
'Name': 'Wizard Kit',
},
'Tables': {
'Response': 'ost_ticket_response',
'Ticket': 'ost_ticket',
},
}
if __name__ == '__main__':
print("This file is not meant to be called directly.")
# vim: sts=2 sw=2 ts=2