866 lines
25 KiB
Python
866 lines
25 KiB
Python
"""WizardKit: Hardware diagnostics"""
|
|
# vim: sts=2 sw=2 ts=2
|
|
|
|
import atexit
|
|
import logging
|
|
import os
|
|
import pathlib
|
|
import subprocess
|
|
import time
|
|
|
|
from docopt import docopt
|
|
|
|
from wk import cfg, debug, exe, log, std
|
|
from wk.cfg.hw import STATUS_COLORS
|
|
from wk.hw import benchmark as hw_benchmark
|
|
from wk.hw import cpu as hw_cpu
|
|
from wk.hw import disk as hw_disk
|
|
from wk.hw import sensors as hw_sensors
|
|
from wk.hw import smart as hw_smart
|
|
from wk.hw import surface_scan as hw_surface_scan
|
|
from wk.hw import system as hw_system
|
|
from wk.hw.audio import audio_test
|
|
from wk.hw.keyboard import keyboard_test
|
|
from wk.hw.network import network_test
|
|
from wk.hw.screensavers import screensaver
|
|
from wk.hw.test import Test, TestGroup
|
|
|
|
from wk.ui import ansi, cli, tui
|
|
|
|
|
|
# STATIC VARIABLES
|
|
DOCSTRING = f'''{cfg.main.KIT_NAME_FULL}: Hardware Diagnostics
|
|
|
|
Usage:
|
|
hw-diags [options]
|
|
hw-diags (-h | --help)
|
|
|
|
Options:
|
|
-c --cli Force CLI mode
|
|
-h --help Show this page
|
|
-q --quick Skip menu and perform a quick check
|
|
-t --test-mode Run diags in test mode
|
|
'''
|
|
LOG = logging.getLogger(__name__)
|
|
TEST_GROUPS = {
|
|
# Also used to build the menu options
|
|
## NOTE: This needs to be above MENU_SETS
|
|
'CPU & Cooling': 'cpu_stress_tests',
|
|
'Disk Attributes': 'disk_attribute_check',
|
|
'Disk Self-Test': 'disk_self_test',
|
|
'Disk Surface Scan': 'disk_surface_scan',
|
|
'Disk I/O Benchmark': 'disk_io_benchmark',
|
|
}
|
|
MENU_ACTIONS = (
|
|
'Audio Test',
|
|
'Keyboard Test',
|
|
'Network Test',
|
|
'Clock Sync',
|
|
'Start',
|
|
'Quit')
|
|
MENU_ACTIONS_SECRET = (
|
|
'Matrix',
|
|
'Tubes',
|
|
)
|
|
MENU_OPTIONS_QUICK = ('Disk Attributes',)
|
|
MENU_SETS = {
|
|
'Full Diagnostic': (*TEST_GROUPS,),
|
|
'Disk Diagnostic': (
|
|
'Disk Attributes',
|
|
'Disk Self-Test',
|
|
'Disk Surface Scan',
|
|
'Disk I/O Benchmark',
|
|
),
|
|
'Disk Diagnostic (Quick)': ('Disk Attributes',),
|
|
}
|
|
MENU_TOGGLES = (
|
|
'Skip USB Benchmarks',
|
|
)
|
|
PLATFORM = std.PLATFORM
|
|
|
|
# Classes
|
|
class State():
|
|
"""Object for tracking hardware diagnostic data."""
|
|
def __init__(self, test_mode=False):
|
|
self.disks: list[hw_disk.Disk] = []
|
|
self.log_dir: pathlib.Path | None = None
|
|
self.progress_file: pathlib.Path | None = None
|
|
self.system: hw_system.System | None = None
|
|
self.test_groups: list[TestGroup] = []
|
|
self.title_text: str = ansi.color_string('Hardware Diagnostics', 'GREEN')
|
|
if test_mode:
|
|
self.title_text += ansi.color_string(' (Test Mode)', 'YELLOW')
|
|
self.ui: tui.TUI = tui.TUI(f'{self.title_text}\nMain Menu')
|
|
|
|
def abort_testing(self) -> None:
|
|
"""Set unfinished tests as aborted and cleanup panes."""
|
|
for group in self.test_groups:
|
|
for test in group.test_objects:
|
|
if test.status in ('Pending', 'Working'):
|
|
test.set_status('Aborted')
|
|
|
|
# Cleanup panes
|
|
self.ui.remove_all_info_panes()
|
|
self.ui.remove_all_worker_panes()
|
|
|
|
def disk_safety_checks(self) -> None:
|
|
"""Check for mid-run SMART failures and failed test(s)."""
|
|
for dev in self.disks:
|
|
disk_smart_status_check(dev, mid_run=True)
|
|
for test in dev.tests:
|
|
if test.failed:
|
|
# Skip acceptable failure states
|
|
if 'Attributes' in test.name:
|
|
continue
|
|
if 'Self-Test' in test.name and 'TimedOut' in test.status:
|
|
continue
|
|
# Disable remaining tests
|
|
dev.disable_disk_tests()
|
|
break
|
|
|
|
def init_diags(self, menu) -> None:
|
|
"""Initialize diagnostic pass."""
|
|
|
|
# Reset objects
|
|
self.disks.clear()
|
|
self.test_groups.clear()
|
|
|
|
# Set log
|
|
self.log_dir = log.format_log_path()
|
|
self.log_dir = pathlib.Path(
|
|
f'{self.log_dir.parent}/'
|
|
f'Hardware-Diagnostics_{time.strftime("%Y-%m-%d_%H%M%S%z")}/'
|
|
)
|
|
log.update_log_path(
|
|
dest_dir=self.log_dir,
|
|
dest_name='main',
|
|
keep_history=False,
|
|
timestamp=False,
|
|
)
|
|
cli.clear_screen()
|
|
cli.print_info('Initializing...')
|
|
|
|
# Progress Pane
|
|
self.progress_file = pathlib.Path(f'{self.log_dir}/progress.out')
|
|
self.update_progress_file()
|
|
self.ui.set_progress_file(self.progress_file)
|
|
|
|
# Add HW Objects
|
|
self.system = hw_system.System()
|
|
self.disks = hw_disk.get_disks(skip_kits=True)
|
|
for disk in self.disks:
|
|
hw_smart.enable_smart(disk)
|
|
hw_smart.update_smart_details(disk)
|
|
|
|
# Add test objects
|
|
for name, details in menu.options.items():
|
|
if not details['Selected']:
|
|
# Only add selected options
|
|
continue
|
|
|
|
if 'CPU' in name:
|
|
# Create two Test objects which will both be used by cpu_stress_tests
|
|
# NOTE: Prime95 should be added first
|
|
self.system.tests.append(
|
|
Test(dev=self.system, label='Prime95', name=name),
|
|
)
|
|
self.system.tests.append(
|
|
Test(dev=self.system, label='Cooling', name=name),
|
|
)
|
|
self.test_groups.append(
|
|
TestGroup(
|
|
name=name,
|
|
function=globals()[TEST_GROUPS[name]],
|
|
test_objects=self.system.tests,
|
|
),
|
|
)
|
|
|
|
if 'Disk' in name:
|
|
test_group = TestGroup(
|
|
name=name, function=globals()[TEST_GROUPS[name]],
|
|
)
|
|
for disk in self.disks:
|
|
test_obj = Test(dev=disk, label=disk.path.name, name=name)
|
|
disk.tests.append(test_obj)
|
|
test_group.test_objects.append(test_obj)
|
|
self.test_groups.append(test_group)
|
|
|
|
def save_debug_reports(self) -> None:
|
|
"""Save debug reports to disk."""
|
|
LOG.info('Saving debug reports')
|
|
debug_dir = pathlib.Path(f'{self.log_dir}/debug')
|
|
if not debug_dir.exists():
|
|
debug_dir.mkdir()
|
|
|
|
# State (self)
|
|
debug.save_pickles({'state': self}, debug_dir)
|
|
with open(f'{debug_dir}/state.report', 'a', encoding='utf-8') as _f:
|
|
_f.write('\n'.join(debug.generate_object_report(self)))
|
|
|
|
# Disks
|
|
for disk in self.disks:
|
|
with open(
|
|
f'{debug_dir}/disk_{disk.path.name}.report', 'a',
|
|
encoding='utf-8') as _f:
|
|
_f.write('\n'.join(debug.generate_object_report(disk)))
|
|
_f.write('\n\n[Tests]')
|
|
for test in disk.tests:
|
|
_f.write(f'\n{test.name}:\n')
|
|
_f.write('\n'.join(debug.generate_object_report(test, indent=1)))
|
|
|
|
# SMC
|
|
if os.path.exists('/.wk-live-macos'):
|
|
data = []
|
|
try:
|
|
proc = exe.run_program(['smc', '-f'])
|
|
data.extend(proc.stdout.splitlines())
|
|
data.append('----')
|
|
proc = exe.run_program(['smc', '-l'])
|
|
data.extend(proc.stdout.splitlines())
|
|
except Exception:
|
|
LOG.ERROR('Error(s) encountered while exporting SMC data')
|
|
data = [line.strip() for line in data]
|
|
with open(f'{debug_dir}/smc.data', 'a', encoding='utf-8') as _f:
|
|
_f.write('\n'.join(data))
|
|
|
|
# System
|
|
with open(f'{debug_dir}/system.report', 'a', encoding='utf-8') as _f:
|
|
_f.write('\n'.join(debug.generate_object_report(self.system)))
|
|
_f.write('\n\n[Tests]')
|
|
for test in self.system.tests:
|
|
_f.write(f'\n{test.name}:\n')
|
|
_f.write('\n'.join(debug.generate_object_report(test, indent=1)))
|
|
|
|
def update_progress_file(self) -> None:
|
|
"""Update progress file."""
|
|
report = []
|
|
|
|
for group in self.test_groups:
|
|
report.append(ansi.color_string(group.name, 'BLUE'))
|
|
for test in group.test_objects:
|
|
report.append(ansi.color_string(
|
|
[test.label, f'{test.status:>{self.ui.side_width-len(test.label)}}'],
|
|
[None, STATUS_COLORS.get(test.status, None)],
|
|
sep='',
|
|
))
|
|
|
|
# Add spacer
|
|
report.append(' ')
|
|
|
|
# Write to progress file
|
|
self.progress_file.write_text('\n'.join(report), encoding='utf-8')
|
|
|
|
def update_title_text(self, text) -> None:
|
|
"""Update top pane with text."""
|
|
self.ui.set_title(self.title_text, text)
|
|
|
|
|
|
# Functions
|
|
def build_menu(cli_mode=False, quick_mode=False) -> cli.Menu:
|
|
"""Build main menu, returns wk.ui.cli.Menu."""
|
|
menu = cli.Menu(title=None)
|
|
|
|
# Add actions, options, etc
|
|
for action in MENU_ACTIONS:
|
|
menu.add_action(action)
|
|
for action in MENU_ACTIONS_SECRET:
|
|
menu.add_action(action, {'Hidden': True})
|
|
for option in TEST_GROUPS:
|
|
menu.add_option(option, {'Selected': True})
|
|
for toggle in MENU_TOGGLES:
|
|
menu.add_toggle(toggle, {'Selected': True})
|
|
for name, targets in MENU_SETS.items():
|
|
menu.add_set(name, {'Targets': targets})
|
|
menu.actions['Start']['Separator'] = True
|
|
|
|
# Update default selections for quick mode if necessary
|
|
if quick_mode:
|
|
for name, details in menu.options.items():
|
|
# Only select quick option(s)
|
|
details['Selected'] = name in MENU_OPTIONS_QUICK
|
|
|
|
# Skip CPU tests for TestStations
|
|
if os.path.exists(cfg.hw.TESTSTATION_FILE):
|
|
menu.options['CPU & Cooling']['Selected'] = False
|
|
|
|
# Add CLI actions if necessary
|
|
if cli_mode or 'DISPLAY' not in os.environ:
|
|
menu.add_action('Reboot')
|
|
menu.add_action('Power Off')
|
|
|
|
# Compatibility checks
|
|
if PLATFORM != 'Linux':
|
|
for name in ('Audio Test', 'Keyboard Test'):
|
|
menu.actions[name]['Disabled'] = True
|
|
if PLATFORM not in ('Darwin', 'Linux'):
|
|
for name in ('Matrix', 'Network Test', 'Tubes'):
|
|
menu.actions[name]['Disabled'] = True
|
|
|
|
# Live macOS actions
|
|
if os.path.exists('/.wk-live-macos'):
|
|
menu.actions['Clock Sync']['Separator'] = True
|
|
else:
|
|
menu.actions['Clock Sync']['Disabled'] = True
|
|
menu.actions['Clock Sync']['Hidden'] = True
|
|
|
|
# Done
|
|
return menu
|
|
|
|
|
|
def cpu_stress_tests(state, test_objects, test_mode=False) -> None:
|
|
"""CPU & cooling check using Prime95 and Sysbench."""
|
|
LOG.info('CPU Test (Prime95)')
|
|
aborted = False
|
|
prime_log = pathlib.Path(f'{state.log_dir}/prime.log')
|
|
run_sysbench = False
|
|
sensors_out = pathlib.Path(f'{state.log_dir}/sensors.out')
|
|
test_minutes = cfg.hw.CPU_TEST_MINUTES
|
|
if test_mode:
|
|
test_minutes = cfg.hw.TEST_MODE_CPU_LIMIT
|
|
test_mprime_obj, test_cooling_obj = test_objects
|
|
|
|
# Bail early
|
|
if test_cooling_obj.disabled or test_mprime_obj.disabled:
|
|
return
|
|
|
|
# Prep
|
|
state.update_title_text(test_mprime_obj.dev.cpu_description)
|
|
test_cooling_obj.set_status('Working')
|
|
test_mprime_obj.set_status('Working')
|
|
|
|
# Start sensors monitor
|
|
sensors = hw_sensors.Sensors()
|
|
sensors.start_background_monitor(
|
|
sensors_out,
|
|
thermal_action=('killall', 'mprime', '-INT'),
|
|
)
|
|
|
|
# Create monitor and worker panes
|
|
state.update_progress_file()
|
|
state.ui.add_worker_pane(lines=10, watch_cmd='tail', watch_file=prime_log)
|
|
if PLATFORM == 'Darwin':
|
|
state.ui.add_info_pane(
|
|
percent=80, cmd='./hw-sensors', update_layout=False,
|
|
)
|
|
elif PLATFORM == 'Linux':
|
|
state.ui.add_info_pane(
|
|
percent=80, watch_file=sensors_out, update_layout=False,
|
|
)
|
|
state.ui.set_current_pane_height(3)
|
|
|
|
# Get idle temps
|
|
cli.print_standard('Saving idle temps...')
|
|
sensors.save_average_temps(temp_label='Idle', seconds=5)
|
|
|
|
# Stress CPU
|
|
cli.print_info('Running stress test')
|
|
hw_cpu.set_apple_fan_speed('max')
|
|
proc_mprime = hw_cpu.start_mprime(state.log_dir, prime_log)
|
|
|
|
# Show countdown
|
|
print('')
|
|
try:
|
|
print_countdown(proc=proc_mprime, seconds=test_minutes*60)
|
|
except KeyboardInterrupt:
|
|
aborted = True
|
|
|
|
# Stop Prime95
|
|
hw_cpu.stop_mprime(proc_mprime)
|
|
|
|
# Update progress if necessary
|
|
if sensors.cpu_reached_critical_temp() or aborted:
|
|
test_cooling_obj.set_status('Aborted')
|
|
test_mprime_obj.set_status('Aborted')
|
|
state.update_progress_file()
|
|
|
|
# Get cooldown temp
|
|
state.ui.clear_current_pane()
|
|
cli.print_standard('Letting CPU cooldown...')
|
|
std.sleep(5)
|
|
cli.print_standard('Saving cooldown temps...')
|
|
sensors.save_average_temps(temp_label='Cooldown', seconds=5)
|
|
|
|
# Check Prime95 results
|
|
test_mprime_obj.report.append(ansi.color_string('Prime95', 'BLUE'))
|
|
hw_cpu.check_mprime_results(
|
|
test_obj=test_mprime_obj, working_dir=state.log_dir,
|
|
)
|
|
|
|
# Run Sysbench test if necessary
|
|
run_sysbench = (
|
|
not aborted and sensors.cpu_max_temp() >= cfg.hw.CPU_FAILURE_TEMP
|
|
)
|
|
if run_sysbench:
|
|
LOG.info('CPU Test (Sysbench)')
|
|
cli.print_standard('Letting CPU cooldown more...')
|
|
std.sleep(10)
|
|
state.ui.clear_current_pane()
|
|
cli.print_info('Running alternate stress test')
|
|
print('')
|
|
sysbench_log = prime_log.with_name('sysbench.log')
|
|
sysbench_log.touch()
|
|
state.ui.remove_all_worker_panes()
|
|
state.ui.add_worker_pane(lines=10, watch_cmd='tail', watch_file=sysbench_log)
|
|
proc_sysbench, filehandle_sysbench = hw_cpu.start_sysbench(
|
|
sensors,
|
|
sensors_out,
|
|
log_path=sysbench_log,
|
|
)
|
|
try:
|
|
print_countdown(proc=proc_sysbench, seconds=test_minutes*60)
|
|
except AttributeError:
|
|
# Assuming the sysbench process wasn't found and proc was set to None
|
|
LOG.error('Failed to find sysbench process', exc_info=True)
|
|
except KeyboardInterrupt:
|
|
aborted = True
|
|
hw_cpu.stop_sysbench(proc_sysbench, filehandle_sysbench)
|
|
|
|
# Update progress
|
|
# NOTE: CPU critical temp check isn't really necessary
|
|
# Hard to imagine it wasn't hit during Prime95 but was in sysbench
|
|
if sensors.cpu_reached_critical_temp() or aborted:
|
|
test_cooling_obj.set_status('Aborted')
|
|
test_mprime_obj.set_status('Aborted')
|
|
state.update_progress_file()
|
|
|
|
# Check Cooling results
|
|
test_cooling_obj.report.append(ansi.color_string('Temps', 'BLUE'))
|
|
hw_cpu.check_cooling_results(test_cooling_obj, sensors, run_sysbench)
|
|
|
|
# Cleanup
|
|
state.update_progress_file()
|
|
sensors.stop_background_monitor()
|
|
state.ui.clear_current_pane_height()
|
|
state.ui.remove_all_info_panes()
|
|
state.ui.remove_all_worker_panes()
|
|
|
|
# Done
|
|
if aborted:
|
|
raise std.GenericAbort('Aborted')
|
|
|
|
|
|
def disk_attribute_check(state, test_objects, test_mode=False) -> None:
|
|
"""Disk attribute check."""
|
|
LOG.info('Disk Attribute Check')
|
|
for test in test_objects:
|
|
disk_smart_status_check(test.dev, mid_run=False)
|
|
if not test.dev.attributes:
|
|
# No NVMe/SMART data
|
|
test.set_status('N/A')
|
|
continue
|
|
|
|
# Done
|
|
state.update_progress_file()
|
|
|
|
|
|
def disk_io_benchmark(
|
|
state, test_objects, skip_usb=True, test_mode=False) -> None:
|
|
"""Disk I/O benchmark using dd."""
|
|
LOG.info('Disk I/O Benchmark (dd)')
|
|
aborted = False
|
|
|
|
# Run benchmarks
|
|
state.update_title_text(
|
|
f'Disk I/O Benchmark{"s" if len(test_objects) > 1 else ""}',
|
|
)
|
|
state.ui.set_current_pane_height(10)
|
|
for test in test_objects:
|
|
if test.disabled:
|
|
# Skip
|
|
continue
|
|
|
|
# Skip USB devices if requested
|
|
if skip_usb and test.dev.bus == 'USB':
|
|
test.set_status('Skipped')
|
|
continue
|
|
|
|
# Start benchmark
|
|
state.ui.clear_current_pane()
|
|
cli.print_report(test.dev.generate_report())
|
|
test.set_status('Working')
|
|
test_log = f'{state.log_dir}/{test.dev.path.name}_benchmark.out'
|
|
state.ui.remove_all_worker_panes()
|
|
state.ui.add_worker_pane(
|
|
percent=50,
|
|
update_layout=False,
|
|
watch_cmd='tail',
|
|
watch_file=test_log,
|
|
)
|
|
state.update_progress_file()
|
|
try:
|
|
hw_benchmark.run_io_test(test, test_log, test_mode=test_mode)
|
|
except KeyboardInterrupt:
|
|
aborted = True
|
|
except (subprocess.CalledProcessError, TypeError, ValueError) as err:
|
|
# Something went wrong
|
|
LOG.error('%s', err)
|
|
test.set_status('ERROR')
|
|
test.report.append(ansi.color_string(' Unknown Error', 'RED'))
|
|
|
|
# Mark test(s) aborted if necessary
|
|
if aborted:
|
|
test.set_status('Aborted')
|
|
test.report.append(ansi.color_string(' Aborted', 'YELLOW'))
|
|
break
|
|
|
|
# Update progress after each test
|
|
state.update_progress_file()
|
|
|
|
# Cleanup
|
|
state.update_progress_file()
|
|
state.ui.clear_current_pane_height()
|
|
state.ui.remove_all_worker_panes()
|
|
|
|
# Done
|
|
if aborted:
|
|
raise std.GenericAbort('Aborted')
|
|
|
|
|
|
def disk_self_test(state, test_objects, test_mode=False) -> None:
|
|
"""Disk self-test if available."""
|
|
LOG.info('Disk Self-Test(s)')
|
|
aborted = False
|
|
threads = []
|
|
|
|
# Run self-tests
|
|
state.update_title_text(
|
|
f'Disk self-test{"s" if len(test_objects) > 1 else ""}',
|
|
)
|
|
cli.print_info(f'Starting self-test{"s" if len(test_objects) > 1 else ""}')
|
|
show_failed_attributes(state)
|
|
for test in reversed(test_objects):
|
|
if test.disabled:
|
|
# Skip
|
|
continue
|
|
|
|
# Start thread
|
|
test.set_status('Working')
|
|
test_log = f'{state.log_dir}/{test.dev.path.name}_selftest.log'
|
|
threads.append(exe.start_thread(hw_smart.run_self_test, args=(test, test_log)))
|
|
|
|
# Show progress
|
|
if threads[-1].is_alive():
|
|
state.ui.add_worker_pane(lines=4, watch_file=test_log)
|
|
|
|
# Wait for all tests to complete
|
|
state.update_progress_file()
|
|
try:
|
|
while True:
|
|
if any(t.is_alive() for t in threads):
|
|
std.sleep(1)
|
|
else:
|
|
break
|
|
except KeyboardInterrupt:
|
|
aborted = True
|
|
for test in test_objects:
|
|
hw_smart.abort_self_test(test.dev)
|
|
std.sleep(0.5)
|
|
hw_smart.build_self_test_report(test, aborted=True)
|
|
|
|
# Cleanup
|
|
state.update_progress_file()
|
|
state.ui.remove_all_worker_panes()
|
|
|
|
# Done
|
|
if aborted:
|
|
raise std.GenericAbort('Aborted')
|
|
|
|
|
|
def disk_smart_status_check(dev, mid_run=True) -> None:
|
|
"""Check SMART status."""
|
|
msg = None
|
|
color = None
|
|
disable_tests = False
|
|
|
|
# Bail if dev is missing
|
|
if not dev.present:
|
|
dev.disable_disk_tests()
|
|
return
|
|
|
|
# Check SMART status and attributes
|
|
if not hw_smart.smart_status_ok(dev):
|
|
msg = 'Critical SMART error detected'
|
|
color = 'RED'
|
|
disable_tests = True
|
|
elif not hw_smart.check_attributes(dev, only_blocking=False):
|
|
# Non-blocking errors
|
|
msg = 'SMART attribute failure(s) detected'
|
|
color = 'YELLOW'
|
|
|
|
# Log errors if detected
|
|
if msg and not dev.contains_note(msg):
|
|
msg = f'{msg}{" during diagnostics" if mid_run else ""}'
|
|
LOG.warning(msg)
|
|
dev.add_note(msg, color)
|
|
|
|
# Set Disk Attributes test result
|
|
for test in dev.tests:
|
|
if test.name == 'Disk Attributes':
|
|
test.failed = bool(test.failed or msg)
|
|
test.passed = not test.failed
|
|
if test.failed:
|
|
test.set_status('Failed')
|
|
elif 'N/A' not in test.status:
|
|
test.set_status('Passed')
|
|
|
|
# Disable further testing if needed
|
|
if disable_tests:
|
|
dev.disable_disk_tests()
|
|
|
|
|
|
def disk_surface_scan(state, test_objects, test_mode=False) -> None:
|
|
"""Read-only disk surface scan using badblocks."""
|
|
LOG.info('Disk Surface Scan (badblocks)')
|
|
aborted = False
|
|
threads = []
|
|
|
|
# Update panes
|
|
state.update_title_text(
|
|
f'Disk Surface Scan{"s" if len(test_objects) > 1 else ""}',
|
|
)
|
|
cli.print_info(
|
|
f'Starting disk surface scan{"s" if len(test_objects) > 1 else ""}',
|
|
)
|
|
show_failed_attributes(state)
|
|
|
|
# Run surface scans
|
|
for test in reversed([test for test in test_objects if not test.disabled]):
|
|
|
|
# Start thread
|
|
test_log = f'{state.log_dir}/{test.dev.path.name}_badblocks.log'
|
|
threads.append(exe.start_thread(
|
|
hw_surface_scan.run_scan, args=(test, test_log, test_mode),
|
|
))
|
|
|
|
# Show progress
|
|
if threads[-1].is_alive():
|
|
state.ui.add_worker_pane(lines=5, watch_cmd='tail', watch_file=test_log)
|
|
|
|
# Wait for all tests to complete
|
|
try:
|
|
while True:
|
|
if any(t.is_alive() for t in threads):
|
|
state.update_progress_file()
|
|
std.sleep(5)
|
|
else:
|
|
break
|
|
except KeyboardInterrupt:
|
|
aborted = True
|
|
std.sleep(0.5)
|
|
# Handle aborts
|
|
for test in test_objects:
|
|
if not (test.disabled or test.passed or test.failed):
|
|
test.set_status('Aborted')
|
|
test.report.append(ansi.color_string(' Aborted', 'YELLOW'))
|
|
|
|
# Cleanup
|
|
state.update_progress_file()
|
|
state.ui.remove_all_worker_panes()
|
|
|
|
# Done
|
|
if aborted:
|
|
raise std.GenericAbort('Aborted')
|
|
|
|
|
|
def main() -> None:
|
|
"""Main function for hardware diagnostics."""
|
|
args = docopt(DOCSTRING)
|
|
log.update_log_path(dest_name='Hardware-Diagnostics', timestamp=True)
|
|
|
|
# Safety check
|
|
if 'TMUX' not in os.environ:
|
|
LOG.error('tmux session not found')
|
|
raise RuntimeError('tmux session not found')
|
|
|
|
# Init
|
|
menu = build_menu(cli_mode=args['--cli'], quick_mode=args['--quick'])
|
|
state = State(test_mode=args['--test-mode'])
|
|
|
|
# Quick Mode
|
|
if args['--quick']:
|
|
run_diags(state, menu, quick_mode=True, test_mode=args['--test-mode'])
|
|
return
|
|
|
|
# Show menu
|
|
while True:
|
|
action = None
|
|
selection = menu.advanced_select()
|
|
|
|
# Set action
|
|
if 'Audio Test' in selection:
|
|
action = audio_test
|
|
elif 'Keyboard Test' in selection:
|
|
action = keyboard_test
|
|
elif 'Network Test' in selection:
|
|
action = network_test
|
|
elif 'Clock Sync' in selection:
|
|
action = sync_clock
|
|
|
|
# Run simple test
|
|
if action:
|
|
state.update_title_text(selection[0])
|
|
try:
|
|
action()
|
|
except KeyboardInterrupt:
|
|
cli.print_warning('Aborted.')
|
|
cli.print_standard('')
|
|
cli.pause('Press Enter to return to main menu...')
|
|
if 'Clock Sync' in selection:
|
|
state.ui.update_clock()
|
|
|
|
# Secrets
|
|
if 'Matrix' in selection:
|
|
screensaver('matrix')
|
|
elif 'Tubes' in selection:
|
|
# Tubes ≈≈ Pipes?
|
|
screensaver('pipes')
|
|
|
|
# Quit
|
|
if 'Reboot' in selection:
|
|
cmd = ['/usr/local/bin/wk-power-command', 'reboot']
|
|
exe.run_program(cmd, check=False)
|
|
elif 'Power Off' in selection:
|
|
cmd = ['/usr/local/bin/wk-power-command', 'poweroff']
|
|
exe.run_program(cmd, check=False)
|
|
elif 'Quit' in selection:
|
|
break
|
|
|
|
# Start diagnostics
|
|
if 'Start' in selection:
|
|
run_diags(state, menu, quick_mode=False, test_mode=args['--test-mode'])
|
|
|
|
# Reset top pane
|
|
state.update_title_text('Main Menu')
|
|
|
|
|
|
def print_countdown(proc, seconds) -> None:
|
|
"""Print countdown to screen while proc is alive."""
|
|
seconds = int(seconds)
|
|
for i in range(seconds):
|
|
sec_left = (seconds - i) % 60
|
|
min_left = int((seconds - i) / 60)
|
|
|
|
out_str = '\r '
|
|
if min_left:
|
|
out_str += f'{min_left} minute{"s" if min_left != 1 else ""}, '
|
|
out_str += f'{sec_left} second{"s" if sec_left != 1 else ""}'
|
|
out_str += ' remaining'
|
|
|
|
print(f'{out_str:<42}', end='', flush=True)
|
|
try:
|
|
proc.wait(1)
|
|
except subprocess.TimeoutExpired:
|
|
# proc still going, continue
|
|
pass
|
|
if ((hasattr(proc, 'poll') and proc.poll() is not None)
|
|
or (hasattr(proc, 'is_running') and not proc.is_running())):
|
|
# proc exited, stop countdown
|
|
break
|
|
|
|
# Done
|
|
print('')
|
|
|
|
|
|
def run_diags(state, menu, quick_mode=False, test_mode=False) -> None:
|
|
"""Run selected diagnostics."""
|
|
aborted = False
|
|
atexit.register(state.save_debug_reports)
|
|
state.init_diags(menu)
|
|
|
|
# Just return if no tests were selected
|
|
if not state.test_groups:
|
|
cli.print_warning('No tests selected?')
|
|
cli.pause()
|
|
return
|
|
|
|
# Run tests
|
|
for group in state.test_groups:
|
|
|
|
# Run test(s)
|
|
function = group.function
|
|
args = [group.test_objects]
|
|
if group.name == 'Disk I/O Benchmark':
|
|
args.append(menu.toggles['Skip USB Benchmarks']['Selected'])
|
|
state.ui.clear_current_pane()
|
|
try:
|
|
function(state, *args, test_mode=test_mode)
|
|
except (KeyboardInterrupt, std.GenericAbort):
|
|
aborted = True
|
|
state.abort_testing()
|
|
state.update_progress_file()
|
|
break
|
|
else:
|
|
# Run safety checks after disk tests
|
|
if group.name.startswith('Disk'):
|
|
state.disk_safety_checks()
|
|
|
|
# Handle aborts
|
|
if aborted:
|
|
for group in state.test_groups:
|
|
for test in group.test_objects:
|
|
if test.status == 'Pending':
|
|
test.set_status('Aborted')
|
|
|
|
# Show results
|
|
show_results(state)
|
|
|
|
# Done
|
|
state.save_debug_reports()
|
|
atexit.unregister(state.save_debug_reports)
|
|
if quick_mode:
|
|
cli.pause('Press Enter to exit...')
|
|
else:
|
|
cli.pause('Press Enter to return to main menu...')
|
|
|
|
|
|
def show_failed_attributes(state) -> None:
|
|
"""Show failed attributes for all disks."""
|
|
for dev in state.disks:
|
|
cli.print_colored([dev.name, dev.description], ['CYAN', None])
|
|
cli.print_report(
|
|
hw_smart.generate_attribute_report(dev, only_failed=True),
|
|
)
|
|
cli.print_standard('')
|
|
|
|
|
|
def show_results(state) -> None:
|
|
"""Show test results by device."""
|
|
std.sleep(0.5)
|
|
state.ui.clear_current_pane()
|
|
state.update_title_text('Results')
|
|
|
|
# CPU Tests
|
|
cpu_tests_enabled = [
|
|
group.name for group in state.test_groups if 'CPU' in group.name
|
|
]
|
|
if cpu_tests_enabled:
|
|
cli.print_success('CPU:')
|
|
cli.print_report(state.system.generate_report())
|
|
cli.print_standard(' ')
|
|
|
|
# Disk Tests
|
|
disk_tests_enabled = [
|
|
group.name for group in state.test_groups if 'Disk' in group.name
|
|
]
|
|
if disk_tests_enabled:
|
|
cli.print_success(f'Disk{"s" if len(state.disks) > 1 else ""}:')
|
|
for disk in state.disks:
|
|
cli.print_report(disk.generate_report())
|
|
cli.print_standard(' ')
|
|
if not state.disks:
|
|
cli.print_warning('No devices')
|
|
cli.print_standard(' ')
|
|
|
|
|
|
def sync_clock() -> None:
|
|
"""Sync clock under macOS using sntp."""
|
|
cmd = ['sudo', 'sntp', '-Ss', 'us.pool.ntp.org']
|
|
proc = exe.run_program(cmd, check=False)
|
|
if proc.returncode:
|
|
# Assuming we're running under an older version of macOS
|
|
cmd[2] = '-s'
|
|
exe.run_program(cmd, check=False)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
print("This file is not meant to be called directly.")
|