diff --git a/scripts/wk/cfg/hw.py b/scripts/wk/cfg/hw.py index e0ae4cd8..c9bc28f0 100644 --- a/scripts/wk/cfg/hw.py +++ b/scripts/wk/cfg/hw.py @@ -151,6 +151,8 @@ TEMP_COLORS = { 100: 'ORANGE_RED', } TESTSTATION_FILE = '/run/archiso/bootmnt/teststation.name' +TEST_MODE_BADBLOCKS_LIMIT = '10000' # Last block to read +TEST_MODE_CPU_LIMIT = 0.25 # Number of minutes to test # THRESHOLDS: Rates used to determine HDD/SSD pass/fail THRESH_HDD_MIN = 50 * 1024**2 THRESH_HDD_AVG_HIGH = 75 * 1024**2 diff --git a/scripts/wk/hw/benchmark.py b/scripts/wk/hw/benchmark.py index 2ee08577..fd8e86db 100644 --- a/scripts/wk/hw/benchmark.py +++ b/scripts/wk/hw/benchmark.py @@ -38,7 +38,7 @@ class DeviceTooSmallError(RuntimeError): # Functions -def calc_io_dd_values(dev_size) -> dict[str, int]: +def calc_io_dd_values(dev_size, test_mode=False) -> dict[str, int]: """Calculate I/O benchmark dd values, returns dict. Calculations: @@ -63,7 +63,11 @@ def calc_io_dd_values(dev_size) -> dict[str, int]: This is needed to ensure an even testing across the dev This is calculated by using the fractional amount left off of the skip_blocks variable + + test_mode limits the benchmark to IO_MINIMUM_TEST_SIZE (if possible) """ + if test_mode: + dev_size = min(IO_MINIMUM_TEST_SIZE, dev_size) read_total = min(IO_MINIMUM_TEST_SIZE, dev_size) read_total = max(read_total, dev_size*IO_ALT_TEST_SIZE_FACTOR) read_chunks = int(read_total // IO_CHUNK_SIZE) @@ -135,7 +139,7 @@ def check_io_results(test_obj, rate_list, graph_width) -> None: test_obj.set_status('Unknown') -def run_io_test(test_obj, log_path) -> None: +def run_io_test(test_obj, log_path, test_mode=False) -> None: """Run I/O benchmark and handle exceptions.""" dev_path = test_obj.dev.path if PLATFORM == 'Darwin': @@ -148,7 +152,7 @@ def run_io_test(test_obj, log_path) -> None: # Get dd values or bail try: - dd_values = calc_io_dd_values(test_obj.dev.size) + dd_values = calc_io_dd_values(test_obj.dev.size, test_mode=test_mode) except DeviceTooSmallError: test_obj.set_status('N/A') test_obj.report.append( diff --git a/scripts/wk/hw/diags.py b/scripts/wk/hw/diags.py index f0b047af..05fef3e3 100644 --- a/scripts/wk/hw/diags.py +++ b/scripts/wk/hw/diags.py @@ -37,6 +37,7 @@ Options: -c --cli Force CLI mode -h --help Show this page -q --quick Skip menu and perform a quick check + -t --test-mode Run diags in test mode ''' LOG = logging.getLogger(__name__) TEST_GROUPS = { @@ -368,7 +369,7 @@ def build_menu(cli_mode=False, quick_mode=False) -> std.Menu: return menu -def cpu_stress_tests(state, test_objects) -> None: +def cpu_stress_tests(state, test_objects, test_mode=False) -> None: # pylint: disable=too-many-statements """CPU & cooling check using Prime95 and Sysbench.""" LOG.info('CPU Test (Prime95)') @@ -376,6 +377,9 @@ def cpu_stress_tests(state, test_objects) -> None: prime_log = pathlib.Path(f'{state.log_dir}/prime.log') run_sysbench = False sensors_out = pathlib.Path(f'{state.log_dir}/sensors.out') + test_minutes = cfg.hw.CPU_TEST_MINUTES + if test_mode: + test_minutes = cfg.hw.TEST_MODE_CPU_LIMIT test_mprime_obj, test_cooling_obj = test_objects # Bail early @@ -420,7 +424,7 @@ def cpu_stress_tests(state, test_objects) -> None: # Show countdown print('') try: - print_countdown(proc=proc_mprime, seconds=cfg.hw.CPU_TEST_MINUTES*60) + print_countdown(proc=proc_mprime, seconds=test_minutes*60) except KeyboardInterrupt: aborted = True @@ -464,7 +468,7 @@ def cpu_stress_tests(state, test_objects) -> None: pane=state.panes['Prime95'], ) try: - print_countdown(proc=proc_sysbench, seconds=cfg.hw.CPU_TEST_MINUTES*60) + print_countdown(proc=proc_sysbench, seconds=test_minutes*60) except AttributeError: # Assuming the sysbench process wasn't found and proc was set to None LOG.error('Failed to find sysbench process', exc_info=True) @@ -496,7 +500,8 @@ def cpu_stress_tests(state, test_objects) -> None: raise std.GenericAbort('Aborted') -def disk_attribute_check(state, test_objects) -> None: +def disk_attribute_check(state, test_objects, test_mode=False) -> None: + # pylint: disable=unused-argument """Disk attribute check.""" LOG.info('Disk Attribute Check') for test in test_objects: @@ -510,7 +515,8 @@ def disk_attribute_check(state, test_objects) -> None: state.update_progress_pane() -def disk_io_benchmark(state, test_objects, skip_usb=True) -> None: +def disk_io_benchmark( + state, test_objects, skip_usb=True, test_mode=False) -> None: """Disk I/O benchmark using dd.""" LOG.info('Disk I/O Benchmark (dd)') aborted = False @@ -546,7 +552,7 @@ def disk_io_benchmark(state, test_objects, skip_usb=True) -> None: ) state.update_progress_pane() try: - hw_benchmark.run_io_test(test, test_log) + hw_benchmark.run_io_test(test, test_log, test_mode=test_mode) except KeyboardInterrupt: aborted = True except (subprocess.CalledProcessError, TypeError, ValueError) as err: @@ -573,7 +579,8 @@ def disk_io_benchmark(state, test_objects, skip_usb=True) -> None: raise std.GenericAbort('Aborted') -def disk_self_test(state, test_objects) -> None: +def disk_self_test(state, test_objects, test_mode=False) -> None: + # pylint: disable=unused-argument """Disk self-test if available.""" LOG.info('Disk Self-Test(s)') aborted = False @@ -652,7 +659,7 @@ def disk_smart_status_check(dev, mid_run=True) -> None: # Set Disk Attributes test result for test in dev.tests: if test.name == 'Disk Attributes': - test.failed = test.failed or msg + test.failed = bool(test.failed or msg) test.passed = not test.failed if test.failed: test.set_status('Failed') @@ -664,7 +671,7 @@ def disk_smart_status_check(dev, mid_run=True) -> None: dev.disable_disk_tests() -def disk_surface_scan(state, test_objects) -> None: +def disk_surface_scan(state, test_objects, test_mode=False) -> None: """Read-only disk surface scan using badblocks.""" LOG.info('Disk Surface Scan (badblocks)') aborted = False @@ -698,7 +705,7 @@ def disk_surface_scan(state, test_objects) -> None: # Start thread test_log = f'{state.log_dir}/{test.dev.path.name}_badblocks.log' threads.append(exe.start_thread( - hw_surface_scan.run_scan, args=(test, test_log), + hw_surface_scan.run_scan, args=(test, test_log, test_mode), )) # Show progress @@ -758,7 +765,7 @@ def main() -> None: # Quick Mode if args['--quick']: - run_diags(state, menu, quick_mode=True) + run_diags(state, menu, quick_mode=True, test_mode=args['--test-mode']) return # Show menu @@ -807,7 +814,7 @@ def main() -> None: # Start diagnostics if 'Start' in selection: - run_diags(state, menu, quick_mode=False) + run_diags(state, menu, quick_mode=False, test_mode=args['--test-mode']) # Reset top pane state.update_top_pane('Main Menu') @@ -841,7 +848,7 @@ def print_countdown(proc, seconds) -> None: print('') -def run_diags(state, menu, quick_mode=False) -> None: +def run_diags(state, menu, quick_mode=False, test_mode=False) -> None: """Run selected diagnostics.""" aborted = False atexit.register(state.save_debug_reports) @@ -863,7 +870,7 @@ def run_diags(state, menu, quick_mode=False) -> None: args.append(menu.toggles['Skip USB Benchmarks']['Selected']) std.clear_screen() try: - function(state, *args) + function(state, *args, test_mode=test_mode) except (KeyboardInterrupt, std.GenericAbort): aborted = True state.abort_testing() diff --git a/scripts/wk/hw/surface_scan.py b/scripts/wk/hw/surface_scan.py index f3a7212d..f6452bc5 100644 --- a/scripts/wk/hw/surface_scan.py +++ b/scripts/wk/hw/surface_scan.py @@ -9,6 +9,7 @@ from wk.cfg.hw import ( BADBLOCKS_LARGE_DISK, BADBLOCKS_REGEX, BADBLOCKS_SKIP_REGEX, + TEST_MODE_BADBLOCKS_LIMIT, ) from wk.exe import run_program from wk.std import ( @@ -48,7 +49,7 @@ def check_surface_scan_results(test_obj, log_path) -> None: test_obj.set_status('Unknown') -def run_scan(test_obj, log_path) -> None: +def run_scan(test_obj, log_path, test_mode=False) -> None: """Run surface scan and handle exceptions.""" block_size = '1024' dev = test_obj.dev @@ -67,6 +68,10 @@ def run_scan(test_obj, log_path) -> None: # Start scan cmd = ['sudo', 'badblocks', '-sv', '-b', block_size, '-e', '1', dev_path] + if test_mode: + # Only test a limited scope instead of the whole device + cmd.append(TEST_MODE_BADBLOCKS_LIMIT) + with open(log_path, 'a', encoding='utf-8') as _f: size_str = bytes_to_string(dev.size, use_binary=False) _f.write(