WizardKit/scripts/wk/hw/diags.py

1081 lines
31 KiB
Python

"""WizardKit: Hardware diagnostics"""
# vim: sts=2 sw=2 ts=2
import atexit
import logging
import os
import pathlib
import platform
import subprocess
import time
from docopt import docopt
from wk import cfg, debug, exe, log, osticket, std
from wk.cfg.hw import STATUS_COLORS
from wk.hw import benchmark as hw_benchmark
from wk.hw import cpu as hw_cpu
from wk.hw import disk as hw_disk
from wk.hw import osticket as hw_osticket
from wk.hw import sensors as hw_sensors
from wk.hw import smart as hw_smart
from wk.hw import surface_scan as hw_surface_scan
from wk.hw import system as hw_system
from wk.hw import volumes as hw_volumes
from wk.hw.audio import audio_test
from wk.hw.keyboard import keyboard_test
from wk.hw.network import network_test
from wk.hw.screensavers import screensaver
from wk.hw.test import Test, TestGroup
from wk.ui import ansi, cli, tui
# STATIC VARIABLES
DOCSTRING = f'''{cfg.main.KIT_NAME_FULL}: Hardware Diagnostics
Usage:
hw-diags [options]
hw-diags (-h | --help)
Options:
-c --cli Force CLI mode
-h --help Show this page
-q --quick Skip menu and perform a quick check
-t --test-mode Run diags in test mode
--ignore-smart-errors NOT RECOMMENDED!
Only use if you have RTFM,
know what you're doing,
understand the risks,
and accept responsibililty.
'''
LOG = logging.getLogger(__name__)
IO_SIZE_SKIP_NAME = (
'Skip USB Benchmarks '
f'(< {std.bytes_to_string(cfg.hw.IO_SMALL_DISK, use_binary=False)})'
)
TEST_GROUPS = {
# Also used to build the menu options
## NOTE: This needs to be above MENU_SETS
'System Info': 'post_system_info',
'CPU (Sysbench)': 'cpu_test_sysbench',
'CPU (Prime95)': 'cpu_test_mprime',
'CPU (Cooling)': 'cpu_test_cooling',
'Disk Attributes': 'disk_attribute_check',
'Disk Self-Test': 'disk_self_test',
'Disk Surface Scan': 'disk_surface_scan',
'Disk I/O Benchmark': 'disk_io_benchmark',
'Disk Utilization': 'disk_volume_utilization',
}
MENU_ACTIONS = (
'Audio Test',
'Keyboard Test',
'Network Test',
'Clock Sync',
'Start',
'Quit')
MENU_ACTIONS_SECRET = (
'Matrix',
'Tubes',
)
MENU_OPTIONS_QUICK = ('Disk Attributes',)
MENU_SETS = {
'Full Diagnostic': (*TEST_GROUPS,),
'CPU Diagnostic': (*[group for group in TEST_GROUPS if group.startswith('CPU')],),
'Disk Diagnostic': (
'Disk Attributes',
'Disk Self-Test',
'Disk Surface Scan',
'Disk I/O Benchmark',
'Disk Utilization',
),
'Disk Diagnostic (Quick)': ('Disk Attributes', 'Disk Utilization'),
}
MENU_TOGGLES = (
'osTicket Integration',
'osTicket Tech Note',
IO_SIZE_SKIP_NAME,
)
NUM_DISK_TESTS = len([s for s in TEST_GROUPS if s.startswith('Disk')])
PLATFORM = std.PLATFORM
# Classes
class State():
"""Object for tracking hardware diagnostic data."""
def __init__(self, test_mode=False):
self.disks: list[hw_disk.Disk] = []
self.log_dir: pathlib.Path | None = None
self.ost = osticket.osTicket()
self.progress_file: pathlib.Path | None = None
self.sensors: hw_sensors.Sensors = hw_sensors.Sensors()
self.system: hw_system.System | None = None
self.test_groups: list[TestGroup] = []
self.title_text: str = ansi.color_string('Hardware Diagnostics', 'GREEN')
if test_mode:
self.title_text += ansi.color_string(' (Test Mode)', 'YELLOW')
self.ui: tui.TUI = tui.TUI(f'{self.title_text}\nMain Menu')
def abort_testing(self) -> None:
"""Set unfinished tests as aborted and cleanup panes."""
for group in self.test_groups:
for test in group.test_objects:
if test.status in ('Pending', 'Working'):
test.set_status('Aborted')
# Cleanup panes
self.reset_layout()
def disk_safety_checks(self) -> None:
"""Check for mid-run SMART failures and failed test(s)."""
for dev in self.disks:
disk_smart_status_check(dev, mid_run=True)
for test in dev.tests:
if test.failed:
# Skip acceptable failure states
if 'Attributes' in test.name:
continue
if 'Self-Test' in test.name and 'TimedOut' in test.status:
continue
# Disable remaining tests
dev.disable_disk_tests()
break
def init_diags(self, menu) -> None:
"""Initialize diagnostic pass."""
# Reset objects
self.disks.clear()
self.sensors = hw_sensors.Sensors()
self.test_groups.clear()
self.ui.remove_all_subtitle_panes()
# osTicket
self.ost.init()
self.ost.disabled = not menu.toggles['osTicket Integration']['Selected']
# Set log
self.log_dir = log.format_log_path()
self.log_dir = pathlib.Path(
f'{self.log_dir.parent}/'
f'Hardware-Diagnostics_{time.strftime("%Y-%m-%d_%H%M%S%z")}/'
)
log.update_log_path(
dest_dir=self.log_dir,
dest_name='main',
keep_history=False,
timestamp=False,
)
cli.clear_screen()
cli.print_info('Initializing...')
# Progress Pane
self.progress_file = pathlib.Path(f'{self.log_dir}/progress.out')
self.update_progress_file()
self.ui.set_progress_file(self.progress_file)
# Add HW Objects
self.system = hw_system.System()
self.disks = hw_disk.get_disks(skip_kits=True)
for disk in self.disks:
hw_smart.enable_smart(disk)
hw_smart.update_smart_details(disk)
# Add test objects
for name, details in menu.options.items():
if not details['Selected']:
# Only add selected options
continue
if 'System' in name:
test = Test(dev=self.system, label=name, name=name)
self.test_groups.append(
TestGroup(
name=name,
function=globals()[TEST_GROUPS[name]],
test_objects=[test],
)
)
if 'CPU' in name:
self.system.tests.append(
Test(dev=self.system, label=name[5:-1], name=name),
)
if 'Disk' in name:
test_group = TestGroup(
name=name, function=globals()[TEST_GROUPS[name]],
)
for disk in self.disks:
test_obj = Test(dev=disk, label=disk.path.name, name=name)
disk.tests.append(test_obj)
test_group.test_objects.append(test_obj)
self.test_groups.append(test_group)
# Group CPU tests
if self.system.tests:
self.test_groups.insert(
0,
TestGroup(
name='CPU & Cooling',
function=run_cpu_tests,
test_objects=self.system.tests,
),
)
def reset_layout(self) -> None:
"""Reset layout to avoid flickering."""
self.ui.clear_current_pane_height()
self.ui.remove_all_info_panes()
self.ui.remove_all_worker_panes()
def save_debug_reports(self) -> None:
"""Save debug reports to disk."""
LOG.info('Saving debug reports')
debug_dir = pathlib.Path(f'{self.log_dir}/debug')
if not debug_dir.exists():
debug_dir.mkdir()
# State (self)
debug.save_pickles({'state': self}, debug_dir)
with open(f'{debug_dir}/state.report', 'a', encoding='utf-8') as _f:
_f.write('\n'.join(debug.generate_object_report(self)))
# Disks
for disk in self.disks:
with open(
f'{debug_dir}/disk_{disk.path.name}.report', 'a',
encoding='utf-8') as _f:
_f.write('\n'.join(debug.generate_object_report(disk)))
_f.write('\n\n[Tests]')
for test in disk.tests:
_f.write(f'\n{test.name}:\n')
_f.write('\n'.join(debug.generate_object_report(test, indent=1)))
if platform.system() == 'Darwin':
cmd = [(
f'sudo gpt -r show "{disk.path}"'
f' >> {debug_dir}/gpt_{disk.path.name}.info'
)]
exe.run_program(cmd, check=False, shell=True)
# osTicket
with open(f'{debug_dir}/osTicket.report', 'a', encoding='utf-8') as _f:
_f.write('\n'.join(debug.generate_object_report(self.ost)))
# SMC
if os.path.exists('/.wk-live-macos'):
data = []
try:
proc = exe.run_program(['smc', '-f'])
data.extend(proc.stdout.splitlines())
data.append('----')
proc = exe.run_program(['smc', '-l'])
data.extend(proc.stdout.splitlines())
except Exception:
LOG.ERROR('Error(s) encountered while exporting SMC data')
data = [line.strip() for line in data]
with open(f'{debug_dir}/smc.data', 'a', encoding='utf-8') as _f:
_f.write('\n'.join(data))
# System
with open(f'{debug_dir}/system.report', 'a', encoding='utf-8') as _f:
_f.write('\n'.join(debug.generate_object_report(self.system)))
_f.write('\n\n[Tests]')
for test in self.system.tests:
_f.write(f'\n{test.name}:\n')
_f.write('\n'.join(debug.generate_object_report(test, indent=1)))
def update_progress_file(self) -> None:
"""Update progress file."""
report = []
for group in self.test_groups:
report.append(ansi.color_string(group.name, 'BLUE'))
for test in group.test_objects:
report.append(ansi.color_string(
[test.label, f'{test.status:>{self.ui.side_width-len(test.label)}}'],
[None, STATUS_COLORS.get(test.status, None)],
sep='',
))
# Add spacer
report.append(' ')
# Write to progress file
self.progress_file.write_text('\n'.join(report), encoding='utf-8')
def update_title_text(self, text) -> None:
"""Update top pane with text."""
self.ui.set_title(self.title_text, text)
# Functions
def build_menu(cli_mode=False, quick_mode=False) -> cli.Menu:
"""Build main menu, returns wk.ui.cli.Menu."""
menu = cli.Menu(title='')
# Add actions, options, etc
for action in MENU_ACTIONS:
menu.add_action(action)
for action in MENU_ACTIONS_SECRET:
menu.add_action(action, {'Hidden': True})
for option in TEST_GROUPS:
menu.add_option(option, {'Selected': True})
for toggle in MENU_TOGGLES:
menu.add_toggle(toggle, {'Selected': True})
for name, targets in MENU_SETS.items():
menu.add_set(name, {'Targets': targets})
menu.actions['Start']['Separator'] = True
# osTicket
menu.toggles['osTicket Tech Note']['Selected'] = False
# Update default selections for quick mode if necessary
if quick_mode:
for name, details in menu.options.items():
# Only select quick option(s)
details['Selected'] = name in MENU_OPTIONS_QUICK
# Skip CPU tests for TestStations
if os.path.exists(cfg.hw.TESTSTATION_FILE):
menu.options['CPU & Cooling']['Selected'] = False
menu.options['System Info']['Selected'] = False
# Add CLI actions if necessary
if cli_mode or 'DISPLAY' not in os.environ:
menu.add_action('Reboot')
menu.add_action('Power Off')
# Compatibility checks
if PLATFORM != 'Linux':
for name in ('Audio Test', 'Keyboard Test'):
menu.actions[name]['Disabled'] = True
if PLATFORM not in ('Darwin', 'Linux'):
for name in ('Matrix', 'Network Test', 'Tubes'):
menu.actions[name]['Disabled'] = True
# Live macOS actions
if os.path.exists('/.wk-live-macos'):
menu.actions['Clock Sync']['Separator'] = True
else:
menu.actions['Clock Sync']['Disabled'] = True
menu.actions['Clock Sync']['Hidden'] = True
# Done
return menu
def cpu_tests_init(state) -> None:
"""Initialize CPU tests."""
sensors_out = pathlib.Path(f'{state.log_dir}/sensors.out')
state.update_title_text(state.system.cpu_description)
# Start monitor
if PLATFORM == 'Darwin':
state.ui.add_info_pane(
percent=80, cmd='./hw-sensors', update_layout=False,
)
elif PLATFORM == 'Linux':
state.ui.add_info_pane(
percent=80,
watch_file=pathlib.Path(f'{state.log_dir}/sensors.out'),
update_layout=False,
)
state.sensors.start_background_monitor(sensors_out)
state.ui.set_current_pane_height(3)
# Save idle temps
cli.print_standard('Saving idle temps...')
state.sensors.save_average_temps(temp_label='Idle', seconds=5)
def cpu_tests_end(state) -> None:
"""End CPU tests."""
# Cleanup
state.sensors.stop_background_monitor()
state.ui.clear_current_pane_height()
state.ui.remove_all_info_panes()
state.ui.remove_all_worker_panes()
def cpu_test_cooling(state, test_object, test_mode=False) -> None:
"""CPU cooling test via sensor data assessment."""
LOG.info('CPU Test (Cooling)')
# Bail early
if test_object.disabled:
return
hw_cpu.check_cooling_results(state.sensors, test_object)
state.update_progress_file()
def cpu_test_mprime(state, test_object, test_mode=False) -> None:
"""CPU stress test using mprime."""
LOG.info('CPU Test (Prime95)')
aborted = False
log_path = pathlib.Path(f'{state.log_dir}/prime.log')
sensors_out = pathlib.Path(f'{state.log_dir}/sensors.out')
test_minutes = cfg.hw.CPU_TEST_MINUTES
if test_mode:
test_minutes = cfg.hw.TEST_MODE_CPU_LIMIT
# Bail early
if test_object.disabled:
return
if state.sensors.cpu_reached_critical_temp():
test_object.set_status('Denied')
test_object.disabled = True
return
# Prep
test_object.set_status('Working')
state.update_progress_file()
state.ui.clear_current_pane()
cli.print_info('Running stress test')
print('')
# Start sensors monitor
state.sensors.stop_background_monitor()
state.sensors.start_background_monitor(
sensors_out,
alt_max='Prime95',
thermal_action=('killall', '-INT', 'mprime'),
)
# Run Prime95
hw_cpu.set_apple_fan_speed('max')
proc = hw_cpu.start_mprime(state.log_dir, log_path)
state.ui.add_worker_pane(lines=10, watch_cmd='tail', watch_file=log_path)
try:
print_countdown(proc=proc, seconds=test_minutes*60)
except KeyboardInterrupt:
aborted = True
# Stop Prime95
hw_cpu.stop_mprime(proc)
# Get cooldown temp
if 'Cooldown' in state.sensors.temp_labels:
# Give Prime95 time to save the results
std.sleep(1)
else:
# Save cooldown temp
state.ui.clear_current_pane()
cli.print_standard('Letting CPU cooldown...')
std.sleep(5)
cli.print_standard('Saving cooldown temps...')
state.sensors.save_average_temps(temp_label='Cooldown', seconds=5)
# Check Prime95 results
test_object.report.append(ansi.color_string('Prime95', 'BLUE'))
hw_cpu.check_mprime_results(test_obj=test_object, working_dir=state.log_dir)
# Update progress
if state.sensors.cpu_reached_critical_temp() or aborted:
test_object.set_status('Aborted')
state.update_progress_file()
# Done
state.ui.remove_all_worker_panes()
if aborted:
cpu_tests_end(state)
raise std.GenericAbort('Aborted')
def cpu_test_sysbench(state, test_object, test_mode=False) -> None:
"""CPU stress test using Sysbench."""
LOG.info('CPU Test (Sysbench)')
aborted = False
log_path = pathlib.Path(f'{state.log_dir}/sysbench.log')
sensors_out = pathlib.Path(f'{state.log_dir}/sensors.out')
test_minutes = cfg.hw.CPU_TEST_MINUTES
if test_mode:
test_minutes = cfg.hw.TEST_MODE_CPU_LIMIT
# Bail early
if test_object.disabled:
return
# Prep
test_object.set_status('Working')
state.update_progress_file()
state.ui.clear_current_pane()
cli.print_info('Running stress test')
print('')
# Start sensors monitor
state.sensors.stop_background_monitor()
state.sensors.start_background_monitor(
sensors_out,
alt_max='Sysbench',
thermal_action=('killall', '-INT', 'sysbench'),
)
# Run sysbench
state.ui.add_worker_pane(lines=10, watch_cmd='tail', watch_file=log_path)
proc, filehandle = hw_cpu.start_sysbench(log_path=log_path)
try:
print_countdown(proc=proc, seconds=test_minutes*60)
except AttributeError:
# Assuming the sysbench process wasn't found and proc was set to None
LOG.error('Failed to find sysbench process', exc_info=True)
except KeyboardInterrupt:
aborted = True
hw_cpu.stop_sysbench(proc, filehandle)
# Get cooldown temp
if 'Cooldown' not in state.sensors.temp_labels:
state.ui.clear_current_pane()
cli.print_standard('Letting CPU cooldown...')
std.sleep(5)
cli.print_standard('Saving cooldown temps...')
state.sensors.save_average_temps(temp_label='Cooldown', seconds=5)
# Update progress
test_object.report.append(ansi.color_string('Sysbench', 'BLUE'))
if aborted:
test_object.set_status('Aborted')
test_object.report.append(ansi.color_string(' Aborted.', 'YELLOW'))
state.update_progress_file()
elif state.sensors.cpu_reached_critical_temp():
test_object.set_status('Aborted')
test_object.report.append(
ansi.color_string(' Aborted due to temps.', 'YELLOW'),
)
elif proc.returncode not in (-15, -2, 0):
# NOTE: Return codes:
# 0 == Completed w/out issue
# -2 == Stopped with INT signal
# -15 == Stopped with TERM signal
test_object.set_status('Failed')
test_object.report.append(f' Failed with return code: {proc.returncode}')
else:
test_object.set_status('Passed')
test_object.report.append(' Completed without issue.')
state.update_progress_file()
# Done
state.ui.remove_all_worker_panes()
if aborted:
cpu_tests_end(state)
raise std.GenericAbort('Aborted')
def disk_attribute_check(state, test_objects, test_mode=False) -> None:
"""Disk attribute check."""
LOG.info('Disk Attribute Check')
for test in test_objects:
disk_smart_status_check(test.dev, mid_run=False)
if not test.dev.attributes:
# No NVMe/SMART data
test.set_status('N/A')
continue
# Done
state.update_progress_file()
def disk_io_benchmark(
state, test_objects, skip_usb=True, test_mode=False) -> None:
"""Disk I/O benchmark using dd."""
LOG.info('Disk I/O Benchmark (dd)')
aborted = False
# Run benchmarks
state.update_title_text(
f'Disk I/O Benchmark{"s" if len(test_objects) > 1 else ""}',
)
state.ui.set_current_pane_height(10)
for test in test_objects:
if (
skip_usb
and test.dev.bus == 'USB'
and test.dev.size < cfg.hw.IO_SMALL_DISK
):
test.set_status('Skipped')
test.disabled = True
continue
# Start benchmark
for test in test_objects:
if test.disabled:
continue
# Start benchmark
state.ui.clear_current_pane()
cli.print_report(test.dev.generate_report())
test.set_status('Working')
test_log = f'{state.log_dir}/{test.dev.path.name}_benchmark.out'
state.ui.remove_all_worker_panes()
state.ui.add_worker_pane(
percent=50,
update_layout=False,
watch_cmd='tail',
watch_file=test_log,
)
state.update_progress_file()
try:
hw_benchmark.run_io_test(state, test, test_log, test_mode=test_mode)
except KeyboardInterrupt:
aborted = True
except (subprocess.CalledProcessError, TypeError, ValueError) as err:
# Something went wrong
LOG.error('%s', err)
test.set_status('ERROR')
test.report.append(ansi.color_string(' Unknown Error', 'RED'))
# Mark test(s) aborted if necessary
if aborted:
test.set_status('Aborted')
test.report.append(ansi.color_string(' Aborted', 'YELLOW'))
break
# Update progress after each test
state.update_progress_file()
# Cleanup
state.update_progress_file()
state.ui.clear_current_pane_height()
state.ui.remove_all_worker_panes()
# Done
if aborted:
raise std.GenericAbort('Aborted')
def disk_self_test(state, test_objects, test_mode=False) -> None:
"""Disk self-test if available."""
LOG.info('Disk Self-Test(s)')
aborted = False
threads = []
# Run self-tests
state.update_title_text(
f'Disk self-test{"s" if len(test_objects) > 1 else ""}',
)
cli.print_info(f'Starting self-test{"s" if len(test_objects) > 1 else ""}')
show_failed_attributes(state)
for test in reversed(test_objects):
if test.disabled:
# Skip
continue
# Start thread
test.set_status('Working')
test_log = f'{state.log_dir}/{test.dev.path.name}_selftest.log'
threads.append(exe.start_thread(hw_smart.run_self_test, args=(test, test_log)))
# Show progress
if threads[-1].is_alive():
state.ui.add_worker_pane(lines=4, watch_file=test_log)
# Wait for all tests to complete
state.update_progress_file()
try:
while True:
if any(t.is_alive() for t in threads):
std.sleep(1)
else:
break
except KeyboardInterrupt:
aborted = True
for test in test_objects:
hw_smart.abort_self_test(test.dev)
std.sleep(0.5)
hw_smart.build_self_test_report(test, aborted=True)
# Cleanup
state.update_progress_file()
state.ui.remove_all_worker_panes()
# Done
if aborted:
raise std.GenericAbort('Aborted')
def disk_smart_status_check(dev, mid_run=True) -> None:
"""Check SMART status."""
msg = None
color = None
disable_tests = False
# Bail if dev is missing
if not dev.present:
dev.disable_disk_tests()
return
# Check SMART status and attributes
if not hw_smart.smart_status_ok(dev):
msg = 'Critical SMART error detected'
color = 'RED'
disable_tests = True
elif not hw_smart.check_attributes(dev, only_blocking=False):
# Non-blocking errors
msg = 'SMART attribute failure(s) detected'
color = 'YELLOW'
# Log errors if detected
if msg and not dev.contains_note(msg):
msg = f'{msg}{" during diagnostics" if mid_run else ""}'
LOG.warning(msg)
dev.add_note(msg, color)
# Set Disk Attributes test result
for test in dev.tests:
if test.name == 'Disk Attributes':
test.failed = bool(test.failed or msg)
test.passed = not test.failed
if test.failed:
test.set_status('Failed')
elif 'N/A' not in test.status:
test.set_status('Passed')
# Disable further testing if needed
if disable_tests:
dev.disable_disk_tests()
def disk_surface_scan(state, test_objects, test_mode=False) -> None:
"""Read-only disk surface scan using badblocks."""
LOG.info('Disk Surface Scan (badblocks)')
aborted = False
threads = []
# Update panes
state.update_title_text(
f'Disk Surface Scan{"s" if len(test_objects) > 1 else ""}',
)
cli.print_info(
f'Starting disk surface scan{"s" if len(test_objects) > 1 else ""}',
)
show_failed_attributes(state)
# Run surface scans
for test in reversed([test for test in test_objects if not test.disabled]):
# Start thread
test_log = f'{state.log_dir}/{test.dev.path.name}_badblocks.log'
threads.append(exe.start_thread(
hw_surface_scan.run_scan, args=(test, test_log, test_mode),
))
# Show progress
if threads[-1].is_alive():
state.ui.add_worker_pane(lines=5, watch_cmd='tail', watch_file=test_log)
# Wait for all tests to complete
try:
while True:
if any(t.is_alive() for t in threads):
state.update_progress_file()
std.sleep(5)
else:
break
except KeyboardInterrupt:
aborted = True
std.sleep(0.5)
# Handle aborts
for test in test_objects:
if not (test.disabled or test.passed or test.failed):
test.set_status('Aborted')
test.report.append(ansi.color_string(' Aborted', 'YELLOW'))
# Cleanup
state.update_progress_file()
state.ui.remove_all_worker_panes()
# Done
if aborted:
raise std.GenericAbort('Aborted')
def disk_volume_utilization(state, test_objects, test_mode=False) -> None:
"""Check disk for full volumes."""
LOG.info('Disk Utilization')
for test in test_objects:
hw_volumes.check_volume_utilization(test)
# Done
state.update_progress_file()
def main() -> None:
"""Main function for hardware diagnostics."""
args = docopt(DOCSTRING)
log.update_log_path(dest_name='Hardware-Diagnostics', timestamp=True)
# Safety check
if 'TMUX' not in os.environ:
LOG.error('tmux session not found')
raise RuntimeError('tmux session not found')
# Init
menu = build_menu(cli_mode=args['--cli'], quick_mode=args['--quick'])
state = State(test_mode=args['--test-mode'])
state.override_all_smart_errors = args['--ignore-smart-errors']
# Quick Mode
if args['--quick']:
menu.toggles['osTicket Integration']['Selected'] = False
run_diags(state, menu, quick_mode=True, test_mode=args['--test-mode'])
return
# Show menu
while True:
action = None
selection = menu.advanced_select()
# Set action
if 'Audio Test' in selection:
action = audio_test
elif 'Keyboard Test' in selection:
action = keyboard_test
elif 'Network Test' in selection:
action = network_test
elif 'Clock Sync' in selection:
action = sync_clock
# Run simple test
if action:
state.update_title_text(selection[0])
try:
action()
except KeyboardInterrupt:
cli.print_warning('Aborted.')
cli.print_standard('')
cli.pause('Press Enter to return to main menu...')
if 'Clock Sync' in selection:
state.ui.update_clock()
# Secrets
if 'Matrix' in selection:
screensaver('matrix')
elif 'Tubes' in selection:
# Tubes ≈≈ Pipes?
screensaver('pipes')
# Quit
if 'Reboot' in selection:
cmd = ['/usr/local/bin/wk-power-command', 'reboot']
exe.run_program(cmd, check=False)
elif 'Power Off' in selection:
cmd = ['/usr/local/bin/wk-power-command', 'poweroff']
exe.run_program(cmd, check=False)
elif 'Quit' in selection:
break
# Start diagnostics
if 'Start' in selection:
run_diags(state, menu, quick_mode=False, test_mode=args['--test-mode'])
# Reset top pane
state.update_title_text('Main Menu')
def post_system_info(state, quick_mode=False, test_mode=False) -> None:
"""Post system info to osTicket."""
# Bail early
if state.ost.disabled:
return
# Build report
report = state.system.generate_full_report()
if state.disks:
report.append('\n[Disks]')
for disk in state.disks:
report.append(f'... {disk.description}')
# Post to osTicket
state.ost.post_response('\n'.join(report))
def print_countdown(proc, seconds) -> None:
"""Print countdown to screen while proc is alive."""
seconds = int(seconds)
for i in range(seconds):
sec_left = (seconds - i) % 60
min_left = int((seconds - i) / 60)
out_str = '\r '
if min_left:
out_str += f'{min_left} minute{"s" if min_left != 1 else ""}, '
out_str += f'{sec_left} second{"s" if sec_left != 1 else ""}'
out_str += ' remaining'
print(f'{out_str:<42}', end='', flush=True)
try:
proc.wait(1)
except subprocess.TimeoutExpired:
# proc still going, continue
pass
if ((hasattr(proc, 'poll') and proc.poll() is not None)
or (hasattr(proc, 'is_running') and not proc.is_running())):
# proc exited, stop countdown
break
# Done
print('')
def run_cpu_tests(state, test_objects, test_mode=False) -> None:
"""Run selected CPU test(s)."""
state.update_progress_file()
cpu_tests_init(state)
for obj in test_objects:
func = globals()[TEST_GROUPS[obj.name]]
func(state, obj, test_mode=test_mode)
cpu_tests_end(state)
state.update_progress_file()
# Post results to osTicket
if not state.ost.disabled:
failed = any(test.failed for test in state.system.tests)
cli.print_info('Posting results to osTicket...')
state.ost.post_response(
hw_osticket.build_report(state.system, 'CPU'),
color='Diags FAIL' if failed else 'Diags',
)
def run_diags(
state: State,
menu: cli.Menu,
quick_mode: bool = False,
test_mode: bool = False,
) -> None:
"""Run selected diagnostics."""
aborted = False
atexit.register(state.save_debug_reports)
state.init_diags(menu)
# Just return if no tests were selected
if not state.test_groups:
cli.print_warning('No tests selected?')
cli.pause()
return
# osTicket
if not state.ost.disabled:
# Select Ticket
state.ost.select_ticket()
# Update top_text
if state.ost.ticket_id:
state.ui.add_subtitle_pane(
cli.color_string(
[f'#{state.ost.ticket_id}', str(state.ost.ticket_name)],
[None, 'CYAN'],
),
str(state.ost.ticket_subject),
)
# Add note
if (state.ost.ticket_id
and menu.toggles['osTicket Tech Note']['Selected']):
note_lines = state.ost.add_note()
if note_lines:
state.ui.add_subtitle_pane(
cli.color_string('Tech Note', 'YELLOW'),
' | '.join(note_lines),
)
# Run tests
for group in state.test_groups:
# Run test(s)
function = group.function
args = [group.test_objects]
if group.name == 'Disk I/O Benchmark':
args.append(menu.toggles[IO_SIZE_SKIP_NAME]['Selected'])
state.ui.clear_current_pane()
try:
function(state, *args, test_mode=test_mode)
except (KeyboardInterrupt, std.GenericAbort):
aborted = True
state.abort_testing()
state.update_progress_file()
state.reset_layout()
break
else:
# Run safety checks after disk tests
if group.name.startswith('Disk'):
state.disk_safety_checks()
# Handle aborts
if aborted:
for group in state.test_groups:
for test in group.test_objects:
if test.status == 'Pending':
test.set_status('Aborted')
# Post disk results
hw_osticket.post_disk_results(state, NUM_DISK_TESTS)
# Show results
show_results(state)
# Update checkboxes
hw_osticket.update_checkboxes(state, NUM_DISK_TESTS)
# Done
state.ui.remove_all_subtitle_panes()
state.save_debug_reports()
atexit.unregister(state.save_debug_reports)
if quick_mode:
cli.pause('Press Enter to exit...')
else:
cli.pause('Press Enter to return to main menu...')
def show_failed_attributes(state) -> None:
"""Show failed attributes for all disks."""
for dev in state.disks:
cli.print_colored([dev.name, dev.description], ['CYAN', None])
cli.print_report(
hw_smart.generate_attribute_report(dev, only_failed=True),
)
cli.print_standard('')
def show_results(state) -> None:
"""Show test results by device."""
std.sleep(0.5)
state.ui.clear_current_pane()
state.update_title_text('Results')
# CPU Tests
cpu_tests_enabled = [
group.name for group in state.test_groups if 'CPU' in group.name
]
if cpu_tests_enabled:
cli.print_success('CPU:')
cli.print_report(state.system.generate_cpu_ram_report())
cli.print_standard(' ')
# Disk Tests
disk_tests_enabled = [
group.name for group in state.test_groups if 'Disk' in group.name
]
if disk_tests_enabled:
cli.print_success(f'Disk{"s" if len(state.disks) > 1 else ""}:')
for disk in state.disks:
cli.print_report(disk.generate_report())
cli.print_standard(' ')
if not state.disks:
cli.print_warning('No devices')
cli.print_standard(' ')
def sync_clock() -> None:
"""Sync clock under macOS using sntp."""
cmd = ['sudo', 'sntp', '-Ss', 'us.pool.ntp.org']
proc = exe.run_program(cmd, check=False)
if proc.returncode:
# Assuming we're running under an older version of macOS
cmd[2] = '-s'
exe.run_program(cmd, check=False)
if __name__ == '__main__':
print("This file is not meant to be called directly.")