Add test mode to Hardware Diagnostics

Addresses issue #192
This commit is contained in:
2Shirt 2022-05-14 17:34:11 -07:00
parent 47308c1508
commit 0ecc4d4146
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C
4 changed files with 36 additions and 18 deletions

View file

@ -151,6 +151,8 @@ TEMP_COLORS = {
100: 'ORANGE_RED', 100: 'ORANGE_RED',
} }
TESTSTATION_FILE = '/run/archiso/bootmnt/teststation.name' TESTSTATION_FILE = '/run/archiso/bootmnt/teststation.name'
TEST_MODE_BADBLOCKS_LIMIT = '10000' # Last block to read
TEST_MODE_CPU_LIMIT = 0.25 # Number of minutes to test
# THRESHOLDS: Rates used to determine HDD/SSD pass/fail # THRESHOLDS: Rates used to determine HDD/SSD pass/fail
THRESH_HDD_MIN = 50 * 1024**2 THRESH_HDD_MIN = 50 * 1024**2
THRESH_HDD_AVG_HIGH = 75 * 1024**2 THRESH_HDD_AVG_HIGH = 75 * 1024**2

View file

@ -38,7 +38,7 @@ class DeviceTooSmallError(RuntimeError):
# Functions # Functions
def calc_io_dd_values(dev_size) -> dict[str, int]: def calc_io_dd_values(dev_size, test_mode=False) -> dict[str, int]:
"""Calculate I/O benchmark dd values, returns dict. """Calculate I/O benchmark dd values, returns dict.
Calculations: Calculations:
@ -63,7 +63,11 @@ def calc_io_dd_values(dev_size) -> dict[str, int]:
This is needed to ensure an even testing across the dev This is needed to ensure an even testing across the dev
This is calculated by using the fractional amount left off This is calculated by using the fractional amount left off
of the skip_blocks variable of the skip_blocks variable
test_mode limits the benchmark to IO_MINIMUM_TEST_SIZE (if possible)
""" """
if test_mode:
dev_size = min(IO_MINIMUM_TEST_SIZE, dev_size)
read_total = min(IO_MINIMUM_TEST_SIZE, dev_size) read_total = min(IO_MINIMUM_TEST_SIZE, dev_size)
read_total = max(read_total, dev_size*IO_ALT_TEST_SIZE_FACTOR) read_total = max(read_total, dev_size*IO_ALT_TEST_SIZE_FACTOR)
read_chunks = int(read_total // IO_CHUNK_SIZE) read_chunks = int(read_total // IO_CHUNK_SIZE)
@ -135,7 +139,7 @@ def check_io_results(test_obj, rate_list, graph_width) -> None:
test_obj.set_status('Unknown') test_obj.set_status('Unknown')
def run_io_test(test_obj, log_path) -> None: def run_io_test(test_obj, log_path, test_mode=False) -> None:
"""Run I/O benchmark and handle exceptions.""" """Run I/O benchmark and handle exceptions."""
dev_path = test_obj.dev.path dev_path = test_obj.dev.path
if PLATFORM == 'Darwin': if PLATFORM == 'Darwin':
@ -148,7 +152,7 @@ def run_io_test(test_obj, log_path) -> None:
# Get dd values or bail # Get dd values or bail
try: try:
dd_values = calc_io_dd_values(test_obj.dev.size) dd_values = calc_io_dd_values(test_obj.dev.size, test_mode=test_mode)
except DeviceTooSmallError: except DeviceTooSmallError:
test_obj.set_status('N/A') test_obj.set_status('N/A')
test_obj.report.append( test_obj.report.append(

View file

@ -37,6 +37,7 @@ Options:
-c --cli Force CLI mode -c --cli Force CLI mode
-h --help Show this page -h --help Show this page
-q --quick Skip menu and perform a quick check -q --quick Skip menu and perform a quick check
-t --test-mode Run diags in test mode
''' '''
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
TEST_GROUPS = { TEST_GROUPS = {
@ -368,7 +369,7 @@ def build_menu(cli_mode=False, quick_mode=False) -> std.Menu:
return menu return menu
def cpu_stress_tests(state, test_objects) -> None: def cpu_stress_tests(state, test_objects, test_mode=False) -> None:
# pylint: disable=too-many-statements # pylint: disable=too-many-statements
"""CPU & cooling check using Prime95 and Sysbench.""" """CPU & cooling check using Prime95 and Sysbench."""
LOG.info('CPU Test (Prime95)') LOG.info('CPU Test (Prime95)')
@ -376,6 +377,9 @@ def cpu_stress_tests(state, test_objects) -> None:
prime_log = pathlib.Path(f'{state.log_dir}/prime.log') prime_log = pathlib.Path(f'{state.log_dir}/prime.log')
run_sysbench = False run_sysbench = False
sensors_out = pathlib.Path(f'{state.log_dir}/sensors.out') sensors_out = pathlib.Path(f'{state.log_dir}/sensors.out')
test_minutes = cfg.hw.CPU_TEST_MINUTES
if test_mode:
test_minutes = cfg.hw.TEST_MODE_CPU_LIMIT
test_mprime_obj, test_cooling_obj = test_objects test_mprime_obj, test_cooling_obj = test_objects
# Bail early # Bail early
@ -420,7 +424,7 @@ def cpu_stress_tests(state, test_objects) -> None:
# Show countdown # Show countdown
print('') print('')
try: try:
print_countdown(proc=proc_mprime, seconds=cfg.hw.CPU_TEST_MINUTES*60) print_countdown(proc=proc_mprime, seconds=test_minutes*60)
except KeyboardInterrupt: except KeyboardInterrupt:
aborted = True aborted = True
@ -464,7 +468,7 @@ def cpu_stress_tests(state, test_objects) -> None:
pane=state.panes['Prime95'], pane=state.panes['Prime95'],
) )
try: try:
print_countdown(proc=proc_sysbench, seconds=cfg.hw.CPU_TEST_MINUTES*60) print_countdown(proc=proc_sysbench, seconds=test_minutes*60)
except AttributeError: except AttributeError:
# Assuming the sysbench process wasn't found and proc was set to None # Assuming the sysbench process wasn't found and proc was set to None
LOG.error('Failed to find sysbench process', exc_info=True) LOG.error('Failed to find sysbench process', exc_info=True)
@ -496,7 +500,8 @@ def cpu_stress_tests(state, test_objects) -> None:
raise std.GenericAbort('Aborted') raise std.GenericAbort('Aborted')
def disk_attribute_check(state, test_objects) -> None: def disk_attribute_check(state, test_objects, test_mode=False) -> None:
# pylint: disable=unused-argument
"""Disk attribute check.""" """Disk attribute check."""
LOG.info('Disk Attribute Check') LOG.info('Disk Attribute Check')
for test in test_objects: for test in test_objects:
@ -510,7 +515,8 @@ def disk_attribute_check(state, test_objects) -> None:
state.update_progress_pane() state.update_progress_pane()
def disk_io_benchmark(state, test_objects, skip_usb=True) -> None: def disk_io_benchmark(
state, test_objects, skip_usb=True, test_mode=False) -> None:
"""Disk I/O benchmark using dd.""" """Disk I/O benchmark using dd."""
LOG.info('Disk I/O Benchmark (dd)') LOG.info('Disk I/O Benchmark (dd)')
aborted = False aborted = False
@ -546,7 +552,7 @@ def disk_io_benchmark(state, test_objects, skip_usb=True) -> None:
) )
state.update_progress_pane() state.update_progress_pane()
try: try:
hw_benchmark.run_io_test(test, test_log) hw_benchmark.run_io_test(test, test_log, test_mode=test_mode)
except KeyboardInterrupt: except KeyboardInterrupt:
aborted = True aborted = True
except (subprocess.CalledProcessError, TypeError, ValueError) as err: except (subprocess.CalledProcessError, TypeError, ValueError) as err:
@ -573,7 +579,8 @@ def disk_io_benchmark(state, test_objects, skip_usb=True) -> None:
raise std.GenericAbort('Aborted') raise std.GenericAbort('Aborted')
def disk_self_test(state, test_objects) -> None: def disk_self_test(state, test_objects, test_mode=False) -> None:
# pylint: disable=unused-argument
"""Disk self-test if available.""" """Disk self-test if available."""
LOG.info('Disk Self-Test(s)') LOG.info('Disk Self-Test(s)')
aborted = False aborted = False
@ -652,7 +659,7 @@ def disk_smart_status_check(dev, mid_run=True) -> None:
# Set Disk Attributes test result # Set Disk Attributes test result
for test in dev.tests: for test in dev.tests:
if test.name == 'Disk Attributes': if test.name == 'Disk Attributes':
test.failed = test.failed or msg test.failed = bool(test.failed or msg)
test.passed = not test.failed test.passed = not test.failed
if test.failed: if test.failed:
test.set_status('Failed') test.set_status('Failed')
@ -664,7 +671,7 @@ def disk_smart_status_check(dev, mid_run=True) -> None:
dev.disable_disk_tests() dev.disable_disk_tests()
def disk_surface_scan(state, test_objects) -> None: def disk_surface_scan(state, test_objects, test_mode=False) -> None:
"""Read-only disk surface scan using badblocks.""" """Read-only disk surface scan using badblocks."""
LOG.info('Disk Surface Scan (badblocks)') LOG.info('Disk Surface Scan (badblocks)')
aborted = False aborted = False
@ -698,7 +705,7 @@ def disk_surface_scan(state, test_objects) -> None:
# Start thread # Start thread
test_log = f'{state.log_dir}/{test.dev.path.name}_badblocks.log' test_log = f'{state.log_dir}/{test.dev.path.name}_badblocks.log'
threads.append(exe.start_thread( threads.append(exe.start_thread(
hw_surface_scan.run_scan, args=(test, test_log), hw_surface_scan.run_scan, args=(test, test_log, test_mode),
)) ))
# Show progress # Show progress
@ -758,7 +765,7 @@ def main() -> None:
# Quick Mode # Quick Mode
if args['--quick']: if args['--quick']:
run_diags(state, menu, quick_mode=True) run_diags(state, menu, quick_mode=True, test_mode=args['--test-mode'])
return return
# Show menu # Show menu
@ -807,7 +814,7 @@ def main() -> None:
# Start diagnostics # Start diagnostics
if 'Start' in selection: if 'Start' in selection:
run_diags(state, menu, quick_mode=False) run_diags(state, menu, quick_mode=False, test_mode=args['--test-mode'])
# Reset top pane # Reset top pane
state.update_top_pane('Main Menu') state.update_top_pane('Main Menu')
@ -841,7 +848,7 @@ def print_countdown(proc, seconds) -> None:
print('') print('')
def run_diags(state, menu, quick_mode=False) -> None: def run_diags(state, menu, quick_mode=False, test_mode=False) -> None:
"""Run selected diagnostics.""" """Run selected diagnostics."""
aborted = False aborted = False
atexit.register(state.save_debug_reports) atexit.register(state.save_debug_reports)
@ -863,7 +870,7 @@ def run_diags(state, menu, quick_mode=False) -> None:
args.append(menu.toggles['Skip USB Benchmarks']['Selected']) args.append(menu.toggles['Skip USB Benchmarks']['Selected'])
std.clear_screen() std.clear_screen()
try: try:
function(state, *args) function(state, *args, test_mode=test_mode)
except (KeyboardInterrupt, std.GenericAbort): except (KeyboardInterrupt, std.GenericAbort):
aborted = True aborted = True
state.abort_testing() state.abort_testing()

View file

@ -9,6 +9,7 @@ from wk.cfg.hw import (
BADBLOCKS_LARGE_DISK, BADBLOCKS_LARGE_DISK,
BADBLOCKS_REGEX, BADBLOCKS_REGEX,
BADBLOCKS_SKIP_REGEX, BADBLOCKS_SKIP_REGEX,
TEST_MODE_BADBLOCKS_LIMIT,
) )
from wk.exe import run_program from wk.exe import run_program
from wk.std import ( from wk.std import (
@ -48,7 +49,7 @@ def check_surface_scan_results(test_obj, log_path) -> None:
test_obj.set_status('Unknown') test_obj.set_status('Unknown')
def run_scan(test_obj, log_path) -> None: def run_scan(test_obj, log_path, test_mode=False) -> None:
"""Run surface scan and handle exceptions.""" """Run surface scan and handle exceptions."""
block_size = '1024' block_size = '1024'
dev = test_obj.dev dev = test_obj.dev
@ -67,6 +68,10 @@ def run_scan(test_obj, log_path) -> None:
# Start scan # Start scan
cmd = ['sudo', 'badblocks', '-sv', '-b', block_size, '-e', '1', dev_path] cmd = ['sudo', 'badblocks', '-sv', '-b', block_size, '-e', '1', dev_path]
if test_mode:
# Only test a limited scope instead of the whole device
cmd.append(TEST_MODE_BADBLOCKS_LIMIT)
with open(log_path, 'a', encoding='utf-8') as _f: with open(log_path, 'a', encoding='utf-8') as _f:
size_str = bytes_to_string(dev.size, use_binary=False) size_str = bytes_to_string(dev.size, use_binary=False)
_f.write( _f.write(