Move HW diagnostic osTicket functions to new file

This commit is contained in:
2Shirt 2022-05-21 18:16:40 -07:00
parent d039fb962d
commit 7ffbcc83fa
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C
3 changed files with 271 additions and 255 deletions

View file

@ -7,6 +7,7 @@ from . import diags
from . import disk
from . import keyboard
from . import network
from . import osticket
from . import screensavers
from . import sensors
from . import smart

View file

@ -6,24 +6,16 @@ import atexit
import logging
import os
import pathlib
import re
import subprocess
import time
from docopt import docopt
from wk import cfg, debug, exe, log, osticket, std, tmux
from wk import os as wk_os
from wk.cfg.hw import (
IO_SMALL_DISK,
REGEX_BLOCK_GRAPH,
REGEX_SMART_ATTRIBUTES,
REGEX_VOLUME,
STATUS_COLORS,
)
from wk.hw import benchmark as hw_benchmark
from wk.hw import cpu as hw_cpu
from wk.hw import disk as hw_disk
from wk.hw import osticket as hw_osticket
from wk.hw import sensors as hw_sensors
from wk.hw import smart as hw_smart
from wk.hw import surface_scan as hw_surface_scan
@ -57,7 +49,7 @@ Options:
LOG = logging.getLogger(__name__)
IO_SIZE_SKIP_NAME = (
'Skip USB Benchmarks '
f'(< {std.bytes_to_string(IO_SMALL_DISK, use_binary=False)})'
f'(< {std.bytes_to_string(cfg.hw.IO_SMALL_DISK, use_binary=False)})'
)
TEST_GROUPS = {
# Also used to build the menu options
@ -340,7 +332,7 @@ class State():
for test in group.test_objects:
report.append(std.color_string(
[test.label, f'{test.status:>{width-len(test.label)}}'],
[None, STATUS_COLORS.get(test.status, None)],
[None, cfg.hw.STATUS_COLORS.get(test.status, None)],
sep='',
))
@ -538,7 +530,7 @@ def cpu_stress_tests(state, test_objects, test_mode=False) -> None:
std.print_info('Posting results to osTicket...')
state.cpu_max_temp = sensors.cpu_max_temp()
state.ost.post_response(
ost_build_report(state.system, 'CPU'),
hw_osticket.build_report(state.system, 'CPU'),
color='Diags FAIL' if _failed else 'Diags',
)
@ -883,247 +875,6 @@ def main() -> None:
state.update_top_pane('Main Menu')
def ost_build_report(dev, dev_type):
# pylint: disable=too-many-branches
"""Build report for posting to osTicket, returns str."""
report = []
# Combined result
if dev_type == 'CPU' or len(dev.tests) == NUM_DISK_TESTS:
# Build list of failed tests (if any)
failed_tests = [t.name for t in dev.tests if t.failed]
failed_tests = [name.replace('Disk ', '') for name in failed_tests]
if len(failed_tests) > 2:
failed_tests = f'{", ".join(failed_tests[:-1])}, & {failed_tests[-1]}'
else:
failed_tests = ' & '.join(failed_tests)
# Get overall result
result = 'UNKNOWN'
if any(t.failed for t in dev.tests):
result = 'FAILED'
elif all(t.passed for t in dev.tests):
result = 'PASSED'
# Add to report
report.append(
f'{dev_type} hardware diagnostic tests: {result}'
f'{" ("+failed_tests+")" if failed_tests else ""}'
)
report.append('')
# Description
if hasattr(dev, 'cpu_description'):
report.append(dev.cpu_description)
else:
report.append(dev.description)
if hasattr(dev, 'ram_total'):
if len(dev.ram_dimms) == 1 and 'justTotalRAM' in dev.ram_dimms[0]:
report.append(f'{dev.ram_total} (Total - no DIMM info available)')
else:
report.append(f'{dev.ram_total} ({", ".join(dev.ram_dimms)})')
if hasattr(dev, 'serial') and dev.serial:
report.append(f'Serial Number: {dev.serial}')
report.append('')
# Notes
if hasattr(dev, 'notes') and dev.notes:
report.append('Notes')
report.extend([f'... {note}' for note in dev.notes])
report.append('')
# Tests
for test in dev.tests:
report.append(f'{test.name} ({test.status})')
# Report
if test.name == 'Disk Attributes' and dev.attributes:
report.extend(
ost_convert_report(
hw_smart.generate_attribute_report(dev),
start_index=0,
),
)
else:
report.extend(ost_convert_report(test.report, start_index=1))
# I/O graph upload report
report.extend(getattr(test, 'upload_report', []))
# Spacer
report.append('')
# Volume report
if dev_type == 'Disk' and len(dev.tests) == NUM_DISK_TESTS:
report.append('Volumes:')
report.extend(ost_generate_volume_report(dev))
# Remove last line if empty
if not report[-1].strip():
report.pop(-1)
# Done
return std.strip_colors('\n'.join(report))
def ost_convert_report(original_report, start_index):
"""Convert report to an osTicket compatible type, returns list."""
report = []
# Convert report
for line in original_report[start_index:]:
# Remove colors and leading spaces
line = std.strip_colors(line)
line = re.sub(r'^\s+', '', line)
# Disk I/O Benchmark
if REGEX_BLOCK_GRAPH.search(line):
line = REGEX_BLOCK_GRAPH.sub('', line)
line = line.strip()
# SMART attributes
match = REGEX_SMART_ATTRIBUTES.search(line)
if match:
# Switch decimal and hex labels
_dec = f'{match.group("decimal"):>3}'
_dec = osticket.pad_with_dots(_dec)
_hex = match.group('hex')
_data = match.group('data')
line = f'{_hex}/{_dec}: {_data}'
line = line.replace('failed', 'FAILED')
# Skip empty lines
if not line.strip():
continue
# Fix inner spacing
for spacing in re.findall(r'\s\s+', line):
new_padding = osticket.pad_with_dots(spacing)
new_padding += ' '
line = line.replace(spacing, new_padding)
# Indent line
line = f'... {line}'
# Add to (converted) report
report.append(line)
# Done
return report
def ost_generate_volume_report(dev):
"""Generate volume report for dev, returns list."""
report = []
vol_report = None
# OS Check
if PLATFORM == 'Darwin':
vol_report = wk_os.mac.mount_disk(device_path=dev.path)
elif PLATFORM == 'Linux':
vol_report = wk_os.linux.mount_volumes(
device_path=dev.path,
read_write=False,
scan_corestorage=not any(t.failed for t in dev.tests),
)
else:
# Volume report unavailable
return report
# Convert mount_volume report
for line in vol_report:
line = std.strip_colors(line)
match = REGEX_VOLUME.match(line)
if match:
if match.group('result') == 'Mounted on':
report.append(
f'... {match.group("dev")}'
f'... Mounted on {match.group("path")}'
f'... ({match.group("details")})'
)
else:
# Assuming either failed to mount or info line about a skipped dev
report.append(f'... {match.group("dev")}... {match.group("result")}')
else:
# Unknown result, just print the whole line
report.append(f'... {line}')
# Done
return report
def ost_post_disk_results(state):
"""Post disk test results for all disks."""
disk_tests = []
for group in state.test_groups:
if group.name.startswith('Disk'):
disk_tests.extend(group.test_objects)
# Bail if no disk tests were run
if not disk_tests or state.ost.disabled:
return
# Post disk results
std.print_info('Posting results to osTicket...')
for disk in state.disks:
state.ost.post_response(
ost_build_report(disk, 'Disk'),
color='Diags FAIL' if any(t.failed for t in disk.tests) else 'Diags',
)
def ost_update_checkboxes(state):
# pylint: disable=too-many-branches
"""Update osTicket checkboxes after confirmation."""
cpu_tests = []
disk_tests = []
num_disk_tests_run = len(state.test_groups)
# Build list of tests
for group in state.test_groups:
if group.name.startswith('CPU'):
cpu_tests.extend(group.test_objects)
num_disk_tests_run -= 1
elif group.name.startswith('Disk'):
disk_tests.extend(group.test_objects)
# Bail if osTicket integration disabled
if state.ost.disabled:
return
# Bail if values not confirmed
if not std.ask('Update osTicket checkboxes using the data above?'):
return
# CPU max temp and pass/fail
if cpu_tests:
state.ost.set_cpu_max_temp(
state.cpu_max_temp,
)
if any(t.failed for t in cpu_tests):
state.ost.set_flag_failed('CPU')
elif all(t.passed for t in cpu_tests):
state.ost.set_flag_passed('CPU')
# Check results for all disks
if state.disks:
all_disks_passed = True
for disk in state.disks:
if any(t.failed for t in disk.tests):
# Mark failed disk in osTicket and stop checking results
all_disks_passed = False
state.ost.set_flag_failed('Disk')
break
if not all(t.passed for t in disk.tests):
all_disks_passed = False
break
# All disks passed
if all_disks_passed and num_disk_tests_run == NUM_DISK_TESTS:
# Only mark as passed if a full disk diagnostic passed
state.ost.set_flag_passed('Disk')
def print_countdown(proc, seconds) -> None:
"""Print countdown to screen while proc is alive."""
seconds = int(seconds)
@ -1214,13 +965,13 @@ def run_diags(state, menu, quick_mode=False, test_mode=False) -> None:
test.set_status('Aborted')
# Post disk results
ost_post_disk_results(state)
hw_osticket.post_disk_results(state, NUM_DISK_TESTS)
# Show results
show_results(state)
# Update checkboxes
ost_update_checkboxes(state)
hw_osticket.update_checkboxes(state, NUM_DISK_TESTS)
# Done
state.save_debug_reports()

264
scripts/wk/hw/osticket.py Normal file
View file

@ -0,0 +1,264 @@
"""WizardKit: osTicket hardware diagnostic functions"""
# vim: sts=2 sw=2 ts=2
import logging
import re
from wk import std, osticket
from wk import os as wk_os
from wk.cfg.hw import (
REGEX_BLOCK_GRAPH,
REGEX_SMART_ATTRIBUTES,
)
from wk.hw import smart as hw_smart
from wk.std import PLATFORM
# STATIC VARIABLES
LOG = logging.getLogger(__name__)
# Functions
def build_report(dev, dev_type, num_disk_tests=None):
# pylint: disable=too-many-branches
"""Build report for posting to osTicket, returns str."""
report = []
# Combined result
if dev_type == 'CPU' or len(dev.tests) == num_disk_tests:
# Build list of failed tests (if any)
failed_tests = [t.name for t in dev.tests if t.failed]
failed_tests = [name.replace('Disk ', '') for name in failed_tests]
if len(failed_tests) > 2:
failed_tests = f'{", ".join(failed_tests[:-1])}, & {failed_tests[-1]}'
else:
failed_tests = ' & '.join(failed_tests)
# Get overall result
result = 'UNKNOWN'
if any(t.failed for t in dev.tests):
result = 'FAILED'
elif all(t.passed for t in dev.tests):
result = 'PASSED'
# Add to report
report.append(
f'{dev_type} hardware diagnostic tests: {result}'
f'{" ("+failed_tests+")" if failed_tests else ""}'
)
report.append('')
# Description
if hasattr(dev, 'cpu_description'):
report.append(dev.cpu_description)
else:
report.append(dev.description)
if hasattr(dev, 'ram_total'):
if len(dev.ram_dimms) == 1 and 'justTotalRAM' in dev.ram_dimms[0]:
report.append(f'{dev.ram_total} (Total - no DIMM info available)')
else:
report.append(f'{dev.ram_total} ({", ".join(dev.ram_dimms)})')
if hasattr(dev, 'serial') and dev.serial:
report.append(f'Serial Number: {dev.serial}')
report.append('')
# Notes
if hasattr(dev, 'notes') and dev.notes:
report.append('Notes')
report.extend([f'... {note}' for note in dev.notes])
report.append('')
# Tests
for test in dev.tests:
report.append(f'{test.name} ({test.status})')
# Report
if test.name == 'Disk Attributes' and dev.attributes:
report.extend(
convert_report(
hw_smart.generate_attribute_report(dev),
start_index=0,
),
)
else:
report.extend(convert_report(test.report, start_index=1))
# I/O graph upload report
report.extend(getattr(test, 'upload_report', []))
# Spacer
report.append('')
# Volume report
if dev_type == 'Disk' and len(dev.tests) == num_disk_tests:
report.append('Volumes:')
report.extend(generate_volume_report(dev))
# Remove last line if empty
if not report[-1].strip():
report.pop(-1)
# Done
return std.strip_colors('\n'.join(report))
def convert_report(original_report, start_index):
"""Convert report to an osTicket compatible type, returns list."""
report = []
# Convert report
for line in original_report[start_index:]:
# Remove colors and leading spaces
line = std.strip_colors(line)
line = re.sub(r'^\s+', '', line)
# Disk I/O Benchmark
if REGEX_BLOCK_GRAPH.search(line):
line = REGEX_BLOCK_GRAPH.sub('', line)
line = line.strip()
# SMART attributes
match = REGEX_SMART_ATTRIBUTES.search(line)
if match:
# Switch decimal and hex labels
_dec = f'{match.group("decimal"):>3}'
_dec = osticket.pad_with_dots(_dec)
_hex = match.group('hex')
_data = match.group('data')
line = f'{_hex}/{_dec}: {_data}'
line = line.replace('failed', 'FAILED')
# Skip empty lines
if not line.strip():
continue
# Fix inner spacing
for spacing in re.findall(r'\s\s+', line):
new_padding = osticket.pad_with_dots(spacing)
new_padding += ' '
line = line.replace(spacing, new_padding)
# Indent line
line = f'... {line}'
# Add to (converted) report
report.append(line)
# Done
return report
def generate_volume_report(dev):
"""Generate volume report for dev, returns list."""
report = []
vol_report = None
# OS Check
if PLATFORM == 'Darwin':
vol_report = wk_os.mac.mount_disk(device_path=dev.path)
elif PLATFORM == 'Linux':
vol_report = wk_os.linux.mount_volumes(
device_path=dev.path,
read_write=False,
scan_corestorage=not any(t.failed for t in dev.tests),
)
else:
# Volume report unavailable
return report
# Convert mount_volume report
for line in vol_report:
line = std.strip_colors(line)
match = REGEX_VOLUME.match(line)
if match:
if match.group('result') == 'Mounted on':
report.append(
f'... {match.group("dev")}'
f'... Mounted on {match.group("path")}'
f'... ({match.group("details")})'
)
else:
# Assuming either failed to mount or info line about a skipped dev
report.append(f'... {match.group("dev")}... {match.group("result")}')
else:
# Unknown result, just print the whole line
report.append(f'... {line}')
# Done
return report
def post_disk_results(state, num_disk_tests):
"""Post disk test results for all disks."""
disk_tests = []
for group in state.test_groups:
if group.name.startswith('Disk'):
disk_tests.extend(group.test_objects)
# Bail if no disk tests were run
if not disk_tests or state.ost.disabled:
return
# Post disk results
std.print_info('Posting results to osTicket...')
for disk in state.disks:
state.ost.post_response(
build_report(disk, 'Disk', num_disk_tests),
color='Diags FAIL' if any(t.failed for t in disk.tests) else 'Diags',
)
def update_checkboxes(state, num_disk_tests):
# pylint: disable=too-many-branches
"""Update osTicket checkboxes after confirmation."""
cpu_tests = []
disk_tests = []
num_disk_tests_run = len(state.test_groups)
# Build list of tests
for group in state.test_groups:
if group.name.startswith('CPU'):
cpu_tests.extend(group.test_objects)
num_disk_tests_run -= 1
elif group.name.startswith('Disk'):
disk_tests.extend(group.test_objects)
# Bail if osTicket integration disabled
if state.ost.disabled:
return
# Bail if values not confirmed
if not std.ask('Update osTicket checkboxes using the data above?'):
return
# CPU max temp and pass/fail
if cpu_tests:
state.ost.set_cpu_max_temp(
state.cpu_max_temp,
)
if any(t.failed for t in cpu_tests):
state.ost.set_flag_failed('CPU')
elif all(t.passed for t in cpu_tests):
state.ost.set_flag_passed('CPU')
# Check results for all disks
if state.disks:
all_disks_passed = True
for disk in state.disks:
if any(t.failed for t in disk.tests):
# Mark failed disk in osTicket and stop checking results
all_disks_passed = False
state.ost.set_flag_failed('Disk')
break
if not all(t.passed for t in disk.tests):
all_disks_passed = False
break
# All disks passed
if all_disks_passed and num_disk_tests_run == num_disk_tests:
# Only mark as passed if a full disk diagnostic passed
state.ost.set_flag_passed('Disk')
if __name__ == '__main__':
print("This file is not meant to be called directly.")