diff --git a/scripts/wk/hw/__init__.py b/scripts/wk/hw/__init__.py index fd8daedb..55e66e08 100644 --- a/scripts/wk/hw/__init__.py +++ b/scripts/wk/hw/__init__.py @@ -7,6 +7,7 @@ from . import diags from . import disk from . import keyboard from . import network +from . import osticket from . import screensavers from . import sensors from . import smart diff --git a/scripts/wk/hw/diags.py b/scripts/wk/hw/diags.py index 9d9b7d56..10349386 100644 --- a/scripts/wk/hw/diags.py +++ b/scripts/wk/hw/diags.py @@ -6,24 +6,16 @@ import atexit import logging import os import pathlib -import re import subprocess import time from docopt import docopt from wk import cfg, debug, exe, log, osticket, std, tmux -from wk import os as wk_os -from wk.cfg.hw import ( - IO_SMALL_DISK, - REGEX_BLOCK_GRAPH, - REGEX_SMART_ATTRIBUTES, - REGEX_VOLUME, - STATUS_COLORS, - ) from wk.hw import benchmark as hw_benchmark from wk.hw import cpu as hw_cpu from wk.hw import disk as hw_disk +from wk.hw import osticket as hw_osticket from wk.hw import sensors as hw_sensors from wk.hw import smart as hw_smart from wk.hw import surface_scan as hw_surface_scan @@ -57,7 +49,7 @@ Options: LOG = logging.getLogger(__name__) IO_SIZE_SKIP_NAME = ( 'Skip USB Benchmarks ' - f'(< {std.bytes_to_string(IO_SMALL_DISK, use_binary=False)})' + f'(< {std.bytes_to_string(cfg.hw.IO_SMALL_DISK, use_binary=False)})' ) TEST_GROUPS = { # Also used to build the menu options @@ -340,7 +332,7 @@ class State(): for test in group.test_objects: report.append(std.color_string( [test.label, f'{test.status:>{width-len(test.label)}}'], - [None, STATUS_COLORS.get(test.status, None)], + [None, cfg.hw.STATUS_COLORS.get(test.status, None)], sep='', )) @@ -538,7 +530,7 @@ def cpu_stress_tests(state, test_objects, test_mode=False) -> None: std.print_info('Posting results to osTicket...') state.cpu_max_temp = sensors.cpu_max_temp() state.ost.post_response( - ost_build_report(state.system, 'CPU'), + hw_osticket.build_report(state.system, 'CPU'), color='Diags FAIL' if _failed else 'Diags', ) @@ -883,247 +875,6 @@ def main() -> None: state.update_top_pane('Main Menu') -def ost_build_report(dev, dev_type): - # pylint: disable=too-many-branches - """Build report for posting to osTicket, returns str.""" - report = [] - - # Combined result - if dev_type == 'CPU' or len(dev.tests) == NUM_DISK_TESTS: - # Build list of failed tests (if any) - failed_tests = [t.name for t in dev.tests if t.failed] - failed_tests = [name.replace('Disk ', '') for name in failed_tests] - if len(failed_tests) > 2: - failed_tests = f'{", ".join(failed_tests[:-1])}, & {failed_tests[-1]}' - else: - failed_tests = ' & '.join(failed_tests) - - # Get overall result - result = 'UNKNOWN' - if any(t.failed for t in dev.tests): - result = 'FAILED' - elif all(t.passed for t in dev.tests): - result = 'PASSED' - - # Add to report - report.append( - f'{dev_type} hardware diagnostic tests: {result}' - f'{" ("+failed_tests+")" if failed_tests else ""}' - ) - report.append('') - - # Description - if hasattr(dev, 'cpu_description'): - report.append(dev.cpu_description) - else: - report.append(dev.description) - if hasattr(dev, 'ram_total'): - if len(dev.ram_dimms) == 1 and 'justTotalRAM' in dev.ram_dimms[0]: - report.append(f'{dev.ram_total} (Total - no DIMM info available)') - else: - report.append(f'{dev.ram_total} ({", ".join(dev.ram_dimms)})') - if hasattr(dev, 'serial') and dev.serial: - report.append(f'Serial Number: {dev.serial}') - report.append('') - - # Notes - if hasattr(dev, 'notes') and dev.notes: - report.append('Notes') - report.extend([f'... {note}' for note in dev.notes]) - report.append('') - - # Tests - for test in dev.tests: - report.append(f'{test.name} ({test.status})') - - # Report - if test.name == 'Disk Attributes' and dev.attributes: - report.extend( - ost_convert_report( - hw_smart.generate_attribute_report(dev), - start_index=0, - ), - ) - else: - report.extend(ost_convert_report(test.report, start_index=1)) - - # I/O graph upload report - report.extend(getattr(test, 'upload_report', [])) - - # Spacer - report.append('') - - # Volume report - if dev_type == 'Disk' and len(dev.tests) == NUM_DISK_TESTS: - report.append('Volumes:') - report.extend(ost_generate_volume_report(dev)) - - # Remove last line if empty - if not report[-1].strip(): - report.pop(-1) - - # Done - return std.strip_colors('\n'.join(report)) - - -def ost_convert_report(original_report, start_index): - """Convert report to an osTicket compatible type, returns list.""" - report = [] - - # Convert report - for line in original_report[start_index:]: - # Remove colors and leading spaces - line = std.strip_colors(line) - line = re.sub(r'^\s+', '', line) - - # Disk I/O Benchmark - if REGEX_BLOCK_GRAPH.search(line): - line = REGEX_BLOCK_GRAPH.sub('', line) - line = line.strip() - - # SMART attributes - match = REGEX_SMART_ATTRIBUTES.search(line) - if match: - # Switch decimal and hex labels - _dec = f'{match.group("decimal"):>3}' - _dec = osticket.pad_with_dots(_dec) - _hex = match.group('hex') - _data = match.group('data') - line = f'{_hex}/{_dec}: {_data}' - line = line.replace('failed', 'FAILED') - - # Skip empty lines - if not line.strip(): - continue - - # Fix inner spacing - for spacing in re.findall(r'\s\s+', line): - new_padding = osticket.pad_with_dots(spacing) - new_padding += ' ' - line = line.replace(spacing, new_padding) - - # Indent line - line = f'... {line}' - - # Add to (converted) report - report.append(line) - - # Done - return report - - -def ost_generate_volume_report(dev): - """Generate volume report for dev, returns list.""" - report = [] - vol_report = None - - # OS Check - if PLATFORM == 'Darwin': - vol_report = wk_os.mac.mount_disk(device_path=dev.path) - elif PLATFORM == 'Linux': - vol_report = wk_os.linux.mount_volumes( - device_path=dev.path, - read_write=False, - scan_corestorage=not any(t.failed for t in dev.tests), - ) - else: - # Volume report unavailable - return report - - # Convert mount_volume report - for line in vol_report: - line = std.strip_colors(line) - match = REGEX_VOLUME.match(line) - if match: - if match.group('result') == 'Mounted on': - report.append( - f'... {match.group("dev")}' - f'... Mounted on {match.group("path")}' - f'... ({match.group("details")})' - ) - else: - # Assuming either failed to mount or info line about a skipped dev - report.append(f'... {match.group("dev")}... {match.group("result")}') - else: - # Unknown result, just print the whole line - report.append(f'... {line}') - - # Done - return report - - -def ost_post_disk_results(state): - """Post disk test results for all disks.""" - disk_tests = [] - for group in state.test_groups: - if group.name.startswith('Disk'): - disk_tests.extend(group.test_objects) - - # Bail if no disk tests were run - if not disk_tests or state.ost.disabled: - return - - # Post disk results - std.print_info('Posting results to osTicket...') - for disk in state.disks: - state.ost.post_response( - ost_build_report(disk, 'Disk'), - color='Diags FAIL' if any(t.failed for t in disk.tests) else 'Diags', - ) - - -def ost_update_checkboxes(state): - # pylint: disable=too-many-branches - """Update osTicket checkboxes after confirmation.""" - cpu_tests = [] - disk_tests = [] - num_disk_tests_run = len(state.test_groups) - - # Build list of tests - for group in state.test_groups: - if group.name.startswith('CPU'): - cpu_tests.extend(group.test_objects) - num_disk_tests_run -= 1 - elif group.name.startswith('Disk'): - disk_tests.extend(group.test_objects) - - # Bail if osTicket integration disabled - if state.ost.disabled: - return - - # Bail if values not confirmed - if not std.ask('Update osTicket checkboxes using the data above?'): - return - - # CPU max temp and pass/fail - if cpu_tests: - state.ost.set_cpu_max_temp( - state.cpu_max_temp, - ) - if any(t.failed for t in cpu_tests): - state.ost.set_flag_failed('CPU') - elif all(t.passed for t in cpu_tests): - state.ost.set_flag_passed('CPU') - - # Check results for all disks - if state.disks: - all_disks_passed = True - for disk in state.disks: - if any(t.failed for t in disk.tests): - # Mark failed disk in osTicket and stop checking results - all_disks_passed = False - state.ost.set_flag_failed('Disk') - break - if not all(t.passed for t in disk.tests): - all_disks_passed = False - break - - # All disks passed - if all_disks_passed and num_disk_tests_run == NUM_DISK_TESTS: - # Only mark as passed if a full disk diagnostic passed - state.ost.set_flag_passed('Disk') - - def print_countdown(proc, seconds) -> None: """Print countdown to screen while proc is alive.""" seconds = int(seconds) @@ -1214,13 +965,13 @@ def run_diags(state, menu, quick_mode=False, test_mode=False) -> None: test.set_status('Aborted') # Post disk results - ost_post_disk_results(state) + hw_osticket.post_disk_results(state, NUM_DISK_TESTS) # Show results show_results(state) # Update checkboxes - ost_update_checkboxes(state) + hw_osticket.update_checkboxes(state, NUM_DISK_TESTS) # Done state.save_debug_reports() diff --git a/scripts/wk/hw/osticket.py b/scripts/wk/hw/osticket.py new file mode 100644 index 00000000..390fe31c --- /dev/null +++ b/scripts/wk/hw/osticket.py @@ -0,0 +1,264 @@ +"""WizardKit: osTicket hardware diagnostic functions""" +# vim: sts=2 sw=2 ts=2 + +import logging +import re + +from wk import std, osticket +from wk import os as wk_os +from wk.cfg.hw import ( + REGEX_BLOCK_GRAPH, + REGEX_SMART_ATTRIBUTES, + ) +from wk.hw import smart as hw_smart +from wk.std import PLATFORM + + +# STATIC VARIABLES +LOG = logging.getLogger(__name__) + + +# Functions +def build_report(dev, dev_type, num_disk_tests=None): + # pylint: disable=too-many-branches + """Build report for posting to osTicket, returns str.""" + report = [] + + # Combined result + if dev_type == 'CPU' or len(dev.tests) == num_disk_tests: + # Build list of failed tests (if any) + failed_tests = [t.name for t in dev.tests if t.failed] + failed_tests = [name.replace('Disk ', '') for name in failed_tests] + if len(failed_tests) > 2: + failed_tests = f'{", ".join(failed_tests[:-1])}, & {failed_tests[-1]}' + else: + failed_tests = ' & '.join(failed_tests) + + # Get overall result + result = 'UNKNOWN' + if any(t.failed for t in dev.tests): + result = 'FAILED' + elif all(t.passed for t in dev.tests): + result = 'PASSED' + + # Add to report + report.append( + f'{dev_type} hardware diagnostic tests: {result}' + f'{" ("+failed_tests+")" if failed_tests else ""}' + ) + report.append('') + + # Description + if hasattr(dev, 'cpu_description'): + report.append(dev.cpu_description) + else: + report.append(dev.description) + if hasattr(dev, 'ram_total'): + if len(dev.ram_dimms) == 1 and 'justTotalRAM' in dev.ram_dimms[0]: + report.append(f'{dev.ram_total} (Total - no DIMM info available)') + else: + report.append(f'{dev.ram_total} ({", ".join(dev.ram_dimms)})') + if hasattr(dev, 'serial') and dev.serial: + report.append(f'Serial Number: {dev.serial}') + report.append('') + + # Notes + if hasattr(dev, 'notes') and dev.notes: + report.append('Notes') + report.extend([f'... {note}' for note in dev.notes]) + report.append('') + + # Tests + for test in dev.tests: + report.append(f'{test.name} ({test.status})') + + # Report + if test.name == 'Disk Attributes' and dev.attributes: + report.extend( + convert_report( + hw_smart.generate_attribute_report(dev), + start_index=0, + ), + ) + else: + report.extend(convert_report(test.report, start_index=1)) + + # I/O graph upload report + report.extend(getattr(test, 'upload_report', [])) + + # Spacer + report.append('') + + # Volume report + if dev_type == 'Disk' and len(dev.tests) == num_disk_tests: + report.append('Volumes:') + report.extend(generate_volume_report(dev)) + + # Remove last line if empty + if not report[-1].strip(): + report.pop(-1) + + # Done + return std.strip_colors('\n'.join(report)) + + +def convert_report(original_report, start_index): + """Convert report to an osTicket compatible type, returns list.""" + report = [] + + # Convert report + for line in original_report[start_index:]: + # Remove colors and leading spaces + line = std.strip_colors(line) + line = re.sub(r'^\s+', '', line) + + # Disk I/O Benchmark + if REGEX_BLOCK_GRAPH.search(line): + line = REGEX_BLOCK_GRAPH.sub('', line) + line = line.strip() + + # SMART attributes + match = REGEX_SMART_ATTRIBUTES.search(line) + if match: + # Switch decimal and hex labels + _dec = f'{match.group("decimal"):>3}' + _dec = osticket.pad_with_dots(_dec) + _hex = match.group('hex') + _data = match.group('data') + line = f'{_hex}/{_dec}: {_data}' + line = line.replace('failed', 'FAILED') + + # Skip empty lines + if not line.strip(): + continue + + # Fix inner spacing + for spacing in re.findall(r'\s\s+', line): + new_padding = osticket.pad_with_dots(spacing) + new_padding += ' ' + line = line.replace(spacing, new_padding) + + # Indent line + line = f'... {line}' + + # Add to (converted) report + report.append(line) + + # Done + return report + + +def generate_volume_report(dev): + """Generate volume report for dev, returns list.""" + report = [] + vol_report = None + + # OS Check + if PLATFORM == 'Darwin': + vol_report = wk_os.mac.mount_disk(device_path=dev.path) + elif PLATFORM == 'Linux': + vol_report = wk_os.linux.mount_volumes( + device_path=dev.path, + read_write=False, + scan_corestorage=not any(t.failed for t in dev.tests), + ) + else: + # Volume report unavailable + return report + + # Convert mount_volume report + for line in vol_report: + line = std.strip_colors(line) + match = REGEX_VOLUME.match(line) + if match: + if match.group('result') == 'Mounted on': + report.append( + f'... {match.group("dev")}' + f'... Mounted on {match.group("path")}' + f'... ({match.group("details")})' + ) + else: + # Assuming either failed to mount or info line about a skipped dev + report.append(f'... {match.group("dev")}... {match.group("result")}') + else: + # Unknown result, just print the whole line + report.append(f'... {line}') + + # Done + return report + + +def post_disk_results(state, num_disk_tests): + """Post disk test results for all disks.""" + disk_tests = [] + for group in state.test_groups: + if group.name.startswith('Disk'): + disk_tests.extend(group.test_objects) + + # Bail if no disk tests were run + if not disk_tests or state.ost.disabled: + return + + # Post disk results + std.print_info('Posting results to osTicket...') + for disk in state.disks: + state.ost.post_response( + build_report(disk, 'Disk', num_disk_tests), + color='Diags FAIL' if any(t.failed for t in disk.tests) else 'Diags', + ) + + +def update_checkboxes(state, num_disk_tests): + # pylint: disable=too-many-branches + """Update osTicket checkboxes after confirmation.""" + cpu_tests = [] + disk_tests = [] + num_disk_tests_run = len(state.test_groups) + + # Build list of tests + for group in state.test_groups: + if group.name.startswith('CPU'): + cpu_tests.extend(group.test_objects) + num_disk_tests_run -= 1 + elif group.name.startswith('Disk'): + disk_tests.extend(group.test_objects) + + # Bail if osTicket integration disabled + if state.ost.disabled: + return + + # Bail if values not confirmed + if not std.ask('Update osTicket checkboxes using the data above?'): + return + + # CPU max temp and pass/fail + if cpu_tests: + state.ost.set_cpu_max_temp( + state.cpu_max_temp, + ) + if any(t.failed for t in cpu_tests): + state.ost.set_flag_failed('CPU') + elif all(t.passed for t in cpu_tests): + state.ost.set_flag_passed('CPU') + + # Check results for all disks + if state.disks: + all_disks_passed = True + for disk in state.disks: + if any(t.failed for t in disk.tests): + # Mark failed disk in osTicket and stop checking results + all_disks_passed = False + state.ost.set_flag_failed('Disk') + break + if not all(t.passed for t in disk.tests): + all_disks_passed = False + break + + # All disks passed + if all_disks_passed and num_disk_tests_run == num_disk_tests: + # Only mark as passed if a full disk diagnostic passed + state.ost.set_flag_passed('Disk') + + +if __name__ == '__main__': + print("This file is not meant to be called directly.")