WizardKit/scripts/wk/hw/diags.py

1053 lines
30 KiB
Python

"""WizardKit: Hardware diagnostics"""
# pylint: disable=too-many-lines
# vim: sts=2 sw=2 ts=2
import atexit
import logging
import os
import pathlib
import subprocess
import time
from docopt import docopt
from wk import cfg, debug, exe, log, osticket, std, tmux
from wk.hw import benchmark as hw_benchmark
from wk.hw import cpu as hw_cpu
from wk.hw import disk as hw_disk
from wk.hw import osticket as hw_osticket
from wk.hw import sensors as hw_sensors
from wk.hw import smart as hw_smart
from wk.hw import surface_scan as hw_surface_scan
from wk.hw import system as hw_system
from wk.hw import volumes as hw_volumes
from wk.hw.audio import audio_test
from wk.hw.keyboard import keyboard_test
from wk.hw.network import network_test
from wk.hw.screensavers import screensaver
from wk.hw.test import Test, TestGroup
# STATIC VARIABLES
DOCSTRING = f'''{cfg.main.KIT_NAME_FULL}: Hardware Diagnostics
Usage:
hw-diags [options]
hw-diags (-h | --help)
Options:
-c --cli Force CLI mode
-h --help Show this page
-q --quick Skip menu and perform a quick check
-t --test-mode Run diags in test mode
--ignore-smart-errors NOT RECOMMENDED!
Only use if you have RTFM,
know what you're doing,
understand the risks,
and accept responsibililty.
'''
LOG = logging.getLogger(__name__)
IO_SIZE_SKIP_NAME = (
'Skip USB Benchmarks '
f'(< {std.bytes_to_string(cfg.hw.IO_SMALL_DISK, use_binary=False)})'
)
TEST_GROUPS = {
# Also used to build the menu options
## NOTE: This needs to be above MENU_SETS
'CPU & Cooling': 'cpu_stress_tests',
'Disk Attributes': 'disk_attribute_check',
'Disk Self-Test': 'disk_self_test',
'Disk Surface Scan': 'disk_surface_scan',
'Disk I/O Benchmark': 'disk_io_benchmark',
'Disk Utilization': 'disk_volume_utilization',
}
MENU_ACTIONS = (
'Audio Test',
'Keyboard Test',
'Network Test',
'Clock Sync',
'Start',
'Quit')
MENU_ACTIONS_SECRET = (
'Matrix',
'Tubes',
)
MENU_OPTIONS_QUICK = ('Disk Attributes',)
MENU_SETS = {
'Full Diagnostic': (*TEST_GROUPS,),
'Disk Diagnostic': (
'Disk Attributes',
'Disk Self-Test',
'Disk Surface Scan',
'Disk I/O Benchmark',
'Disk Utilization',
),
'Disk Diagnostic (Quick)': ('Disk Attributes',),
}
MENU_TOGGLES = (
'osTicket Integration',
'osTicket Tech Note',
IO_SIZE_SKIP_NAME,
)
NUM_DISK_TESTS = len([s for s in TEST_GROUPS if s.startswith('Disk')])
PLATFORM = std.PLATFORM
# Classes
class State():
# pylint: disable=too-many-instance-attributes
"""Object for tracking hardware diagnostic data."""
def __init__(self):
self.cpu_max_temp = -1
self.disks = []
self.layout = cfg.hw.TMUX_LAYOUT.copy()
self.log_dir = None
self.ost = osticket.osTicket()
self.panes = {}
self.system = None
self.test_groups = []
self.top_text = std.color_string('Hardware Diagnostics', 'GREEN')
# Init tmux and start a background process to maintain layout
self.init_tmux()
exe.start_thread(self.fix_tmux_layout_loop)
def abort_testing(self) -> None:
"""Set unfinished tests as aborted and cleanup tmux panes."""
for group in self.test_groups:
for test in group.test_objects:
if test.status in ('Pending', 'Working'):
test.set_status('Aborted')
# Cleanup tmux
self.panes.pop('Current', None)
for key, pane_ids in self.panes.copy().items():
if key in ('Top', 'Started', 'Progress'):
continue
if isinstance(pane_ids, str):
tmux.kill_pane(self.panes.pop(key))
else:
for _id in pane_ids:
tmux.kill_pane(_id)
self.panes.pop(key)
def disk_safety_checks(self) -> None:
"""Check for mid-run SMART failures and failed test(s)."""
for dev in self.disks:
disk_smart_status_check(dev, mid_run=True)
for test in dev.tests:
if test.failed and 'Attributes' not in test.name:
dev.disable_disk_tests()
break
def fix_tmux_layout(self, forced=True) -> None:
"""Fix tmux layout based on cfg.hw.TMUX_LAYOUT."""
try:
tmux.fix_layout(self.panes, self.layout, forced=forced)
except RuntimeError:
# Assuming self.panes changed while running
pass
def fix_tmux_layout_loop(self) -> None:
"""Fix tmux layout on a loop.
NOTE: This should be called as a thread.
"""
while True:
self.fix_tmux_layout(forced=False)
std.sleep(1)
def init_diags(self, menu) -> None:
"""Initialize diagnostic pass."""
# Reset objects
self.disks.clear()
self.layout.clear()
self.layout.update(cfg.hw.TMUX_LAYOUT)
self.test_groups.clear()
# osTicket
self.top_text = std.color_string('Hardware Diagnostics', 'GREEN')
self.ost.init()
self.ost.disabled = not menu.toggles['osTicket Integration']['Selected']
# Set log
self.log_dir = log.format_log_path()
self.log_dir = pathlib.Path(
f'{self.log_dir.parent}/'
f'Hardware-Diagnostics_{time.strftime("%Y-%m-%d_%H%M%S%z")}/'
)
log.update_log_path(
dest_dir=self.log_dir,
dest_name='main',
keep_history=False,
timestamp=False,
)
std.clear_screen()
std.print_info('Initializing...')
# Progress Pane
self.update_progress_pane()
tmux.respawn_pane(
pane_id=self.panes['Progress'],
watch_file=f'{self.log_dir}/progress.out',
)
# Add HW Objects
self.system = hw_system.System()
self.disks = hw_disk.get_disks(skip_kits=True)
# Add test objects
for name, details in menu.options.items():
if not details['Selected']:
# Only add selected options
continue
if 'CPU' in name:
# Create two Test objects which will both be used by cpu_stress_tests
# NOTE: Prime95 should be added first
self.system.tests.append(
Test(dev=self.system, label='Prime95', name=name),
)
self.system.tests.append(
Test(dev=self.system, label='Cooling', name=name),
)
self.test_groups.append(
TestGroup(
name=name,
function=globals()[TEST_GROUPS[name]],
test_objects=self.system.tests,
),
)
if 'Disk' in name:
test_group = TestGroup(
name=name, function=globals()[TEST_GROUPS[name]],
)
for disk in self.disks:
test_obj = Test(dev=disk, label=disk.path.name, name=name)
disk.tests.append(test_obj)
test_group.test_objects.append(test_obj)
self.test_groups.append(test_group)
def init_tmux(self) -> None:
"""Initialize tmux layout."""
tmux.kill_all_panes()
# Top
self.panes['Top'] = tmux.split_window(
behind=True,
lines=2,
vertical=True,
text=f'{self.top_text}\nMain Menu',
)
# Started
self.panes['Started'] = tmux.split_window(
lines=cfg.hw.TMUX_SIDE_WIDTH,
target_id=self.panes['Top'],
text=std.color_string(
['Started', time.strftime("%Y-%m-%d %H:%M %Z")],
['BLUE', None],
sep='\n',
),
)
# Progress
self.panes['Progress'] = tmux.split_window(
lines=cfg.hw.TMUX_SIDE_WIDTH,
text=' ',
)
def save_debug_reports(self) -> None:
"""Save debug reports to disk."""
LOG.info('Saving debug reports')
debug_dir = pathlib.Path(f'{self.log_dir}/debug')
if not debug_dir.exists():
debug_dir.mkdir()
# State (self)
std.save_pickles({'state': self}, debug_dir)
with open(f'{debug_dir}/state.report', 'a', encoding='utf-8') as _f:
_f.write('\n'.join(debug.generate_object_report(self)))
# Disks
for disk in self.disks:
with open(
f'{debug_dir}/disk_{disk.path.name}.report', 'a',
encoding='utf-8') as _f:
_f.write('\n'.join(debug.generate_object_report(disk)))
_f.write('\n\n[Tests]')
for test in disk.tests:
_f.write(f'\n{test.name}:\n')
_f.write('\n'.join(debug.generate_object_report(test, indent=1)))
cmd = [(
f'sudo gpt -r show "{disk.path}"'
f' >> {debug_dir}/gpt_{disk.path.name}.info'
)]
exe.run_program(cmd, check=False, shell=True)
# osTicket
with open(f'{debug_dir}/osTicket.report', 'a', encoding='utf-8') as _f:
_f.write('\n'.join(debug.generate_object_report(self.ost)))
# SMC
if os.path.exists('/.wk-live-macos'):
data = []
try:
proc = exe.run_program(['smc', '-f'])
data.extend(proc.stdout.splitlines())
data.append('----')
proc = exe.run_program(['smc', '-l'])
data.extend(proc.stdout.splitlines())
except Exception: # pylint: disable=broad-except
LOG.ERROR('Error(s) encountered while exporting SMC data')
data = [line.strip() for line in data]
with open(f'{debug_dir}/smc.data', 'a', encoding='utf-8') as _f:
_f.write('\n'.join(data))
# System
with open(f'{debug_dir}/system.report', 'a', encoding='utf-8') as _f:
_f.write('\n'.join(debug.generate_object_report(self.system)))
_f.write('\n\n[Tests]')
for test in self.system.tests:
_f.write(f'\n{test.name}:\n')
_f.write('\n'.join(debug.generate_object_report(test, indent=1)))
def update_clock(self) -> None:
"""Update 'Started' pane following clock sync."""
tmux.respawn_pane(
pane_id=self.panes['Started'],
text=std.color_string(
['Started', time.strftime("%Y-%m-%d %H:%M %Z")],
['BLUE', None],
sep='\n',
),
)
def update_progress_pane(self) -> None:
"""Update progress pane."""
report = []
width = cfg.hw.TMUX_SIDE_WIDTH
for group in self.test_groups:
report.append(std.color_string(group.name, 'BLUE'))
for test in group.test_objects:
report.append(std.color_string(
[test.label, f'{test.status:>{width-len(test.label)}}'],
[None, cfg.hw.STATUS_COLORS.get(test.status, None)],
sep='',
))
# Add spacer
report.append(' ')
# Write to progress file
out_path = pathlib.Path(f'{self.log_dir}/progress.out')
with open(out_path, 'w', encoding='utf-8') as _f:
_f.write('\n'.join(report))
def update_top_pane(self, text) -> None:
"""Update top pane with text."""
tmux.respawn_pane(self.panes['Top'], text=f'{self.top_text}\n{text}')
# Functions
def build_menu(cli_mode=False, quick_mode=False) -> std.Menu:
# pylint: disable=too-many-branches
"""Build main menu, returns wk.std.Menu."""
menu = std.Menu(title=None)
# Add actions, options, etc
for action in MENU_ACTIONS:
menu.add_action(action)
for action in MENU_ACTIONS_SECRET:
menu.add_action(action, {'Hidden': True})
for option in TEST_GROUPS:
menu.add_option(option, {'Selected': True})
for toggle in MENU_TOGGLES:
menu.add_toggle(toggle, {'Selected': True})
for name, targets in MENU_SETS.items():
menu.add_set(name, {'Targets': targets})
menu.actions['Start']['Separator'] = True
# osTicket
menu.toggles['osTicket Tech Note']['Selected'] = False
# Update default selections for quick mode if necessary
if quick_mode:
for name in menu.options:
# Only select quick option(s)
menu.options[name]['Selected'] = name in MENU_OPTIONS_QUICK
# Skip CPU tests for TestStations
if os.path.exists(cfg.hw.TESTSTATION_FILE):
menu.options['CPU & Cooling']['Selected'] = False
# Add CLI actions if necessary
if cli_mode or 'DISPLAY' not in os.environ:
menu.add_action('Reboot')
menu.add_action('Power Off')
# Compatibility checks
if PLATFORM != 'Linux':
for name in ('Audio Test', 'Keyboard Test'):
menu.actions[name]['Disabled'] = True
if PLATFORM not in ('Darwin', 'Linux'):
for name in ('Matrix', 'Network Test', 'Tubes'):
menu.actions[name]['Disabled'] = True
# Live macOS actions
if os.path.exists('/.wk-live-macos'):
menu.actions['Clock Sync']['Separator'] = True
else:
menu.actions['Clock Sync']['Disabled'] = True
menu.actions['Clock Sync']['Hidden'] = True
# Done
return menu
def cpu_stress_tests(state, test_objects, test_mode=False) -> None:
# pylint: disable=too-many-statements
"""CPU & cooling check using Prime95 and Sysbench."""
LOG.info('CPU Test (Prime95)')
aborted = False
prime_log = pathlib.Path(f'{state.log_dir}/prime.log')
run_sysbench = False
sensors_out = pathlib.Path(f'{state.log_dir}/sensors.out')
test_minutes = cfg.hw.CPU_TEST_MINUTES
if test_mode:
test_minutes = cfg.hw.TEST_MODE_CPU_LIMIT
test_mprime_obj, test_cooling_obj = test_objects
# Bail early
if test_cooling_obj.disabled or test_mprime_obj.disabled:
return
# Prep
state.update_top_pane(test_mprime_obj.dev.cpu_description)
test_cooling_obj.set_status('Working')
test_mprime_obj.set_status('Working')
# Start sensors monitor
sensors = hw_sensors.Sensors()
sensors.start_background_monitor(
sensors_out,
thermal_action=('killall', 'mprime', '-INT'),
)
# Create monitor and worker panes
state.update_progress_pane()
state.panes['Prime95'] = tmux.split_window(
lines=10, vertical=True, watch_file=prime_log, watch_cmd='tail')
if PLATFORM == 'Darwin':
state.panes['Temps'] = tmux.split_window(
behind=True, percent=80, vertical=True, cmd='./hw-sensors')
elif PLATFORM == 'Linux':
state.panes['Temps'] = tmux.split_window(
behind=True, percent=80, vertical=True, watch_file=sensors_out)
tmux.resize_pane(height=3)
state.panes['Current'] = ''
state.layout['Current'] = {'height': 3, 'Check': True}
# Get idle temps
std.print_standard('Saving idle temps...')
sensors.save_average_temps(temp_label='Idle', seconds=5)
# Stress CPU
std.print_info('Running stress test')
hw_cpu.set_apple_fan_speed('max')
proc_mprime = hw_cpu.start_mprime(state.log_dir, prime_log)
# Show countdown
print('')
try:
print_countdown(proc=proc_mprime, seconds=test_minutes*60)
except KeyboardInterrupt:
aborted = True
# Stop Prime95
hw_cpu.stop_mprime(proc_mprime)
# Update progress if necessary
if sensors.cpu_reached_critical_temp() or aborted:
test_cooling_obj.set_status('Aborted')
test_mprime_obj.set_status('Aborted')
state.update_progress_pane()
# Get cooldown temp
std.clear_screen()
std.print_standard('Letting CPU cooldown...')
std.sleep(5)
std.print_standard('Saving cooldown temps...')
sensors.save_average_temps(temp_label='Cooldown', seconds=5)
# Check Prime95 results
test_mprime_obj.report.append(std.color_string('Prime95', 'BLUE'))
hw_cpu.check_mprime_results(
test_obj=test_mprime_obj, working_dir=state.log_dir,
)
# Run Sysbench test if necessary
run_sysbench = (
not aborted and sensors.cpu_max_temp() >= cfg.hw.CPU_FAILURE_TEMP
)
if run_sysbench:
LOG.info('CPU Test (Sysbench)')
std.print_standard('Letting CPU cooldown more...')
std.sleep(30)
std.clear_screen()
std.print_info('Running alternate stress test')
print('')
proc_sysbench, filehandle_sysbench = hw_cpu.start_sysbench(
sensors,
sensors_out,
log_path=prime_log.with_name('sysbench.log'),
pane=state.panes['Prime95'],
)
try:
print_countdown(proc=proc_sysbench, seconds=test_minutes*60)
except AttributeError:
# Assuming the sysbench process wasn't found and proc was set to None
LOG.error('Failed to find sysbench process', exc_info=True)
except KeyboardInterrupt:
aborted = True
hw_cpu.stop_sysbench(proc_sysbench, filehandle_sysbench)
# Update progress
# NOTE: CPU critical temp check isn't really necessary
# Hard to imagine it wasn't hit during Prime95 but was in sysbench
if sensors.cpu_reached_critical_temp() or aborted:
test_cooling_obj.set_status('Aborted')
test_mprime_obj.set_status('Aborted')
state.update_progress_pane()
# Check Cooling results
test_cooling_obj.report.append(std.color_string('Temps', 'BLUE'))
hw_cpu.check_cooling_results(test_cooling_obj, sensors, run_sysbench)
# Post results to osTicket
if not state.ost.disabled:
_failed = test_cooling_obj.failed or test_mprime_obj.failed
std.print_info('Posting results to osTicket...')
state.cpu_max_temp = sensors.cpu_max_temp()
state.ost.post_response(
hw_osticket.build_report(state.system, 'CPU'),
color='Diags FAIL' if _failed else 'Diags',
)
# Cleanup
state.update_progress_pane()
sensors.stop_background_monitor()
state.panes.pop('Current', None)
tmux.kill_pane(state.panes.pop('Prime95', None))
tmux.kill_pane(state.panes.pop('Temps', None))
# Done
if aborted:
raise std.GenericAbort('Aborted')
def disk_attribute_check(state, test_objects, test_mode=False) -> None:
# pylint: disable=unused-argument
"""Disk attribute check."""
LOG.info('Disk Attribute Check')
for test in test_objects:
disk_smart_status_check(test.dev, mid_run=False)
if not test.dev.attributes:
# No NVMe/SMART data
test.set_status('N/A')
continue
# Done
state.update_progress_pane()
def disk_io_benchmark(
state, test_objects, skip_usb=True, test_mode=False) -> None:
"""Disk I/O benchmark using dd."""
LOG.info('Disk I/O Benchmark (dd)')
aborted = False
# Run benchmarks
state.update_top_pane(
f'Disk I/O Benchmark{"s" if len(test_objects) > 1 else ""}',
)
state.panes['I/O Benchmark'] = tmux.split_window(
percent=50,
vertical=True,
text=' ',
)
# Skip USB devices if requested
for test in test_objects:
if (
skip_usb
and test.dev.bus == 'USB'
and test.dev.size < cfg.hw.IO_SMALL_DISK
):
test.set_status('Skipped')
test.disabled = True
continue
# Start benchmark
for test in test_objects:
if test.disabled:
continue
# Start benchmark
std.clear_screen()
std.print_report(test.dev.generate_report())
test.set_status('Working')
test_log = f'{state.log_dir}/{test.dev.path.name}_benchmark.out'
tmux.respawn_pane(
state.panes['I/O Benchmark'],
watch_cmd='tail',
watch_file=test_log,
)
state.update_progress_pane()
try:
hw_benchmark.run_io_test(test, test_log, test_mode=test_mode)
except KeyboardInterrupt:
aborted = True
except (subprocess.CalledProcessError, TypeError, ValueError) as err:
# Something went wrong
LOG.error('%s', err)
test.set_status('ERROR')
test.report.append(std.color_string(' Unknown Error', 'RED'))
# Mark test(s) aborted if necessary
if aborted:
test.set_status('Aborted')
test.report.append(std.color_string(' Aborted', 'YELLOW'))
break
# Update progress after each test
state.update_progress_pane()
# Cleanup
state.update_progress_pane()
tmux.kill_pane(state.panes.pop('I/O Benchmark', None))
# Done
if aborted:
raise std.GenericAbort('Aborted')
def disk_self_test(state, test_objects, test_mode=False) -> None:
# pylint: disable=unused-argument
"""Disk self-test if available."""
LOG.info('Disk Self-Test(s)')
aborted = False
threads = []
state.panes['SMART'] = []
# Run self-tests
state.update_top_pane(
f'Disk self-test{"s" if len(test_objects) > 1 else ""}',
)
std.print_info(f'Starting self-test{"s" if len(test_objects) > 1 else ""}')
for test in reversed(test_objects):
if test.disabled:
# Skip
continue
# Start thread
test.set_status('Working')
test_log = f'{state.log_dir}/{test.dev.path.name}_selftest.log'
threads.append(exe.start_thread(hw_smart.run_self_test, args=(test, test_log)))
# Show progress
if threads[-1].is_alive():
state.panes['SMART'].append(
tmux.split_window(lines=4, vertical=True, watch_file=test_log),
)
# Wait for all tests to complete
state.update_progress_pane()
try:
while True:
if any(t.is_alive() for t in threads):
std.sleep(1)
else:
break
except KeyboardInterrupt:
aborted = True
for test in test_objects:
hw_smart.abort_self_test(test.dev)
std.sleep(0.5)
hw_smart.build_self_test_report(test, aborted=True)
# Cleanup
state.update_progress_pane()
for pane in state.panes['SMART']:
tmux.kill_pane(pane)
state.panes.pop('SMART', None)
# Done
if aborted:
raise std.GenericAbort('Aborted')
def disk_smart_status_check(dev, mid_run=True) -> None:
"""Check SMART status."""
msg = None
color = None
disable_tests = False
# Bail if dev is missing
if not dev.present:
dev.disable_disk_tests()
return
# Check SMART status and attributes
if not hw_smart.smart_status_ok(dev):
msg = 'Critical SMART error detected'
color = 'RED'
disable_tests = True
elif not hw_smart.check_attributes(dev, only_blocking=False):
# Non-blocking errors
msg = 'SMART attribute failure(s) detected'
color = 'YELLOW'
# Log errors if detected
if msg and not dev.contains_note(msg):
msg = f'{msg}{" during diagnostics" if mid_run else ""}'
LOG.warning(msg)
dev.add_note(msg, color)
# Set Disk Attributes test result
for test in dev.tests:
if test.name == 'Disk Attributes':
test.failed = bool(test.failed or msg)
test.passed = not test.failed
if test.failed:
test.set_status('Failed')
elif 'N/A' not in test.status:
test.set_status('Passed')
# Disable further testing if needed
if disable_tests:
dev.disable_disk_tests()
def disk_surface_scan(state, test_objects, test_mode=False) -> None:
"""Read-only disk surface scan using badblocks."""
LOG.info('Disk Surface Scan (badblocks)')
aborted = False
threads = []
state.panes['badblocks'] = []
# Update panes
state.update_top_pane(
f'Disk Surface Scan{"s" if len(test_objects) > 1 else ""}',
)
std.print_info(
f'Starting disk surface scan{"s" if len(test_objects) > 1 else ""}',
)
for disk in state.disks:
failed_attributes = [
line for line in hw_smart.generate_attribute_report(disk) if 'failed' in line
]
if failed_attributes:
size_str = std.bytes_to_string(disk.size, use_binary=False)
std.print_colored(
['[', disk.path.name, ' ', size_str, ']'],
[None, 'BLUE', None, 'CYAN', None],
sep='',
)
std.print_report(failed_attributes)
std.print_standard('')
# Run surface scans
for test in reversed([test for test in test_objects if not test.disabled]):
# Start thread
test_log = f'{state.log_dir}/{test.dev.path.name}_badblocks.log'
threads.append(exe.start_thread(
hw_surface_scan.run_scan, args=(test, test_log, test_mode),
))
# Show progress
if threads[-1].is_alive():
state.panes['badblocks'].append(
tmux.split_window(
lines=5,
vertical=True,
watch_cmd='tail',
watch_file=test_log,
),
)
# Wait for all tests to complete
try:
while True:
if any(t.is_alive() for t in threads):
state.update_progress_pane()
std.sleep(5)
else:
break
except KeyboardInterrupt:
aborted = True
std.sleep(0.5)
# Handle aborts
for test in test_objects:
if not (test.disabled or test.passed or test.failed):
test.set_status('Aborted')
test.report.append(std.color_string(' Aborted', 'YELLOW'))
# Cleanup
state.update_progress_pane()
for pane in state.panes['badblocks']:
tmux.kill_pane(pane)
state.panes.pop('badblocks', None)
# Done
if aborted:
raise std.GenericAbort('Aborted')
def disk_volume_utilization(state, test_objects, test_mode=False) -> None:
# pylint: disable=unused-argument
"""Check disk for full volumes."""
LOG.info('Disk Utilization')
for test in test_objects:
hw_volumes.check_volume_utilization(test)
# Done
state.update_progress_pane()
def main() -> None:
# pylint: disable=too-many-branches
"""Main function for hardware diagnostics."""
args = docopt(DOCSTRING)
log.update_log_path(dest_name='Hardware-Diagnostics', timestamp=True)
# Safety check
if 'TMUX' not in os.environ:
LOG.error('tmux session not found')
raise RuntimeError('tmux session not found')
# Init
atexit.register(tmux.kill_all_panes)
menu = build_menu(cli_mode=args['--cli'], quick_mode=args['--quick'])
state = State()
state.override_all_smart_errors = args['--ignore-smart-errors'] # pylint: disable=attribute-defined-outside-init
# Quick Mode
if args['--quick']:
menu.toggles['osTicket Integration']['Selected'] = False
run_diags(state, menu, quick_mode=True, test_mode=args['--test-mode'])
return
# Show menu
while True:
action = None
selection = menu.advanced_select()
# Set action
if 'Audio Test' in selection:
action = audio_test
elif 'Keyboard Test' in selection:
action = keyboard_test
elif 'Network Test' in selection:
action = network_test
elif 'Clock Sync' in selection:
action = sync_clock
# Run simple test
if action:
state.update_top_pane(selection[0])
try:
action()
except KeyboardInterrupt:
std.print_warning('Aborted.')
std.print_standard('')
std.pause('Press Enter to return to main menu...')
if 'Clock Sync' in selection:
state.update_clock()
# Secrets
if 'Matrix' in selection:
screensaver('matrix')
elif 'Tubes' in selection:
# Tubes ≈≈ Pipes?
screensaver('pipes')
# Quit
if 'Reboot' in selection:
cmd = ['/usr/local/bin/wk-power-command', 'reboot']
exe.run_program(cmd, check=False)
elif 'Power Off' in selection:
cmd = ['/usr/local/bin/wk-power-command', 'poweroff']
exe.run_program(cmd, check=False)
elif 'Quit' in selection:
break
# Start diagnostics
if 'Start' in selection:
run_diags(state, menu, quick_mode=False, test_mode=args['--test-mode'])
# Reset top pane
state.update_top_pane('Main Menu')
def print_countdown(proc, seconds) -> None:
"""Print countdown to screen while proc is alive."""
seconds = int(seconds)
for i in range(seconds):
sec_left = (seconds - i) % 60
min_left = int((seconds - i) / 60)
out_str = '\r '
if min_left:
out_str += f'{min_left} minute{"s" if min_left != 1 else ""}, '
out_str += f'{sec_left} second{"s" if sec_left != 1 else ""}'
out_str += ' remaining'
print(f'{out_str:<42}', end='', flush=True)
try:
proc.wait(1)
except subprocess.TimeoutExpired:
# proc still going, continue
pass
if ((hasattr(proc, 'poll') and proc.poll() is not None)
or (hasattr(proc, 'is_running') and not proc.is_running())):
# proc exited, stop countdown
break
# Done
print('')
def run_diags(state, menu, quick_mode=False, test_mode=False) -> None:
"""Run selected diagnostics."""
aborted = False
atexit.register(state.save_debug_reports)
state.init_diags(menu)
def _init_osticket():
"""Dumb private function to avoid pylint error."""
if not state.ost.disabled:
# Select Ticket
state.ost.select_ticket()
# Update top_text
if state.ost.ticket_id:
state.top_text += std.color_string(
[f' #{state.ost.ticket_id}', state.ost.ticket_name],
[None, 'CYAN'],
)
# Add note
if (state.ost.ticket_id
and menu.toggles['osTicket Tech Note']['Selected']):
state.ost.add_note()
# Just return if no tests were selected
if not state.test_groups:
std.print_warning('No tests selected?')
std.pause()
return
# osTicket
_init_osticket()
# Run tests
for group in state.test_groups:
# Run test(s)
function = group.function
args = [group.test_objects]
if group.name == 'Disk I/O Benchmark':
args.append(menu.toggles[IO_SIZE_SKIP_NAME]['Selected'])
std.clear_screen()
try:
function(state, *args, test_mode=test_mode)
except (KeyboardInterrupt, std.GenericAbort):
aborted = True
state.abort_testing()
state.update_progress_pane()
break
else:
# Run safety checks after disk tests
if group.name.startswith('Disk'):
state.disk_safety_checks()
# Handle aborts
if aborted:
for group in state.test_groups:
for test in group.test_objects:
if test.status == 'Pending':
test.set_status('Aborted')
# Post disk results
hw_osticket.post_disk_results(state, NUM_DISK_TESTS)
# Show results
show_results(state)
# Update checkboxes
hw_osticket.update_checkboxes(state, NUM_DISK_TESTS)
# Done
state.save_debug_reports()
atexit.unregister(state.save_debug_reports)
if quick_mode:
std.pause('Press Enter to exit...')
else:
std.pause('Press Enter to return to main menu...')
# osTicket
state.top_text = std.color_string('Hardware Diagnostics', 'GREEN')
def show_results(state) -> None:
"""Show test results by device."""
std.sleep(0.5)
std.clear_screen()
state.update_top_pane('Results')
# CPU Tests
cpu_tests_enabled = [
group.name for group in state.test_groups if 'CPU' in group.name
]
if cpu_tests_enabled:
std.print_success('CPU:')
std.print_report(state.system.generate_report())
std.print_standard(' ')
# Drop Disk Utilization reports (only needed for OST)
for test_group in state.test_groups:
if test_group.name == 'Disk Utilization':
for test in test_group.test_objects:
test.report = []
# Disk Tests
disk_tests_enabled = [
group.name for group in state.test_groups if 'Disk' in group.name
]
if disk_tests_enabled:
std.print_success(f'Disk{"s" if len(state.disks) > 1 else ""}:')
for disk in state.disks:
std.print_report(disk.generate_report())
std.print_standard(' ')
if not state.disks:
std.print_warning('No devices')
std.print_standard(' ')
def sync_clock() -> None:
"""Sync clock under macOS using sntp."""
cmd = ['sudo', 'sntp', '-Ss', 'us.pool.ntp.org']
proc = exe.run_program(cmd, check=False)
if proc.returncode:
# Assuming we're running under an older version of macOS
cmd[2] = '-s'
exe.run_program(cmd, check=False)
if __name__ == '__main__':
print("This file is not meant to be called directly.")