From 99dd7661d4a660811fdb7ad950cca3a65f96f7f0 Mon Sep 17 00:00:00 2001 From: 2Shirt <2xShirt@gmail.com> Date: Tue, 5 Apr 2022 18:11:06 -0600 Subject: [PATCH] Split hardware diagnostics into multiple files --- scripts/wk/hw/__init__.py | 7 + scripts/wk/hw/audio.py | 37 +++ scripts/wk/hw/benchmark.py | 212 ++++++++++++++ scripts/wk/hw/cpu.py | 205 +++++++++++++ scripts/wk/hw/diags.py | 538 ++-------------------------------- scripts/wk/hw/keyboard.py | 31 ++ scripts/wk/hw/network.py | 58 ++++ scripts/wk/hw/screensavers.py | 40 +++ scripts/wk/hw/surface_scan.py | 85 ++++++ 9 files changed, 697 insertions(+), 516 deletions(-) create mode 100644 scripts/wk/hw/audio.py create mode 100644 scripts/wk/hw/benchmark.py create mode 100644 scripts/wk/hw/cpu.py create mode 100644 scripts/wk/hw/keyboard.py create mode 100644 scripts/wk/hw/network.py create mode 100644 scripts/wk/hw/screensavers.py create mode 100644 scripts/wk/hw/surface_scan.py diff --git a/scripts/wk/hw/__init__.py b/scripts/wk/hw/__init__.py index 092a7428..821c9761 100644 --- a/scripts/wk/hw/__init__.py +++ b/scripts/wk/hw/__init__.py @@ -1,8 +1,15 @@ """WizardKit: hw module init""" +from . import audio +from . import benchmark +from . import cpu from . import ddrescue from . import diags from . import disk +from . import keyboard +from . import network +from . import screensavers from . import sensors +from . import surface_scan from . import system from . import test diff --git a/scripts/wk/hw/audio.py b/scripts/wk/hw/audio.py new file mode 100644 index 00000000..f6a984b5 --- /dev/null +++ b/scripts/wk/hw/audio.py @@ -0,0 +1,37 @@ +"""WizardKit: Audio test functions""" +# vim: sts=2 sw=2 ts=2 + +import logging + +from wk.exe import run_program +from wk.std import PLATFORM + + +# STATIC VARIABLES +LOG = logging.getLogger(__name__) + + +# Functions +def audio_test(): + """Run an OS-specific audio test.""" + if PLATFORM == 'Linux': + audio_test_linux() + + +def audio_test_linux(): + """Run an audio test using amixer and speaker-test.""" + LOG.info('Audio Test') + + # Set volume + for source in ('Master', 'PCM'): + cmd = f'amixer -q set "{source}" 80% unmute'.split() + run_program(cmd, check=False) + + # Run audio tests + for mode in ('pink', 'wav'): + cmd = f'speaker-test -c 2 -l 1 -t {mode}'.split() + run_program(cmd, check=False, pipe=False) + + +if __name__ == '__main__': + print("This file is not meant to be called directly.") diff --git a/scripts/wk/hw/benchmark.py b/scripts/wk/hw/benchmark.py new file mode 100644 index 00000000..78664192 --- /dev/null +++ b/scripts/wk/hw/benchmark.py @@ -0,0 +1,212 @@ +"""WizardKit: Benchmark test functions""" +# vim: sts=2 sw=2 ts=2 + +import logging + +from subprocess import PIPE, STDOUT + +from wk import graph +from wk.cfg.hw import ( + IO_ALT_TEST_SIZE_FACTOR, + IO_BLOCK_SIZE, + IO_CHUNK_SIZE, + IO_GRAPH_WIDTH, + IO_MINIMUM_TEST_SIZE, + IO_RATE_REGEX, + THRESH_HDD_AVG_HIGH, + THRESH_HDD_AVG_LOW, + THRESH_HDD_MIN, + THRESH_SSD_AVG_HIGH, + THRESH_SSD_AVG_LOW, + THRESH_SSD_MIN, + ) +from wk.exe import run_program +from wk.std import ( + PLATFORM, + strip_colors, + color_string, + ) + + +# STATIC VARIABLES +LOG = logging.getLogger(__name__) + + +# Error Classes +class DeviceTooSmallError(RuntimeError): + """Raised when a device is too small to test.""" + + +# Functions +def calc_io_dd_values(dev_size): + """Calculate I/O benchmark dd values, returns dict. + + Calculations: + The minimum dev size is IO_GRAPH_WIDTH * IO_CHUNK_SIZE + (e.g. 1.25 GB for a width of 40 and a chunk size of 32MB) + + read_total is the area to be read in bytes + If the dev is < IO_MINIMUM_TEST_SIZE then it's the whole dev + Else it's the larger of IO_MINIMUM_TEST_SIZE or the alt test size + (determined by dev * IO_ALT_TEST_SIZE_FACTOR) + + read_chunks is the number of groups of IO_CHUNK_SIZE in test_obj.dev + This number is reduced to a multiple of IO_GRAPH_WIDTH in order + to allow for the data to be condensed cleanly + + read_blocks is the chunk size in number of blocks + (e.g. 64 if block size is 512KB and chunk size is 32MB + + skip_total is the number of IO_BLOCK_SIZE groups not tested + skip_blocks is the number of blocks to skip per IO_CHUNK_SIZE + skip_extra_rate is how often to add an additional skip block + This is needed to ensure an even testing across the dev + This is calculated by using the fractional amount left off + of the skip_blocks variable + """ + read_total = min(IO_MINIMUM_TEST_SIZE, dev_size) + read_total = max(read_total, dev_size*IO_ALT_TEST_SIZE_FACTOR) + read_chunks = int(read_total // IO_CHUNK_SIZE) + read_chunks -= read_chunks % IO_GRAPH_WIDTH + if read_chunks < IO_GRAPH_WIDTH: + raise DeviceTooSmallError + read_blocks = int(IO_CHUNK_SIZE / IO_BLOCK_SIZE) + read_total = read_chunks * IO_CHUNK_SIZE + skip_total = int((dev_size - read_total) // IO_BLOCK_SIZE) + skip_blocks = int((skip_total / read_chunks) // 1) + skip_extra_rate = 0 + try: + skip_extra_rate = 1 + int(1 / ((skip_total / read_chunks) % 1)) + except ZeroDivisionError: + # skip_extra_rate == 0 is fine + pass + + # Done + return { + 'Read Chunks': read_chunks, + 'Read Blocks': read_blocks, + 'Skip Blocks': skip_blocks, + 'Skip Extra': skip_extra_rate, + } + + +def check_io_results(test_obj, rate_list, graph_width): + """Generate colored report using rate_list, returns list of str.""" + avg_read = sum(rate_list) / len(rate_list) + min_read = min(rate_list) + max_read = max(rate_list) + if test_obj.dev.ssd: + thresh_min = THRESH_SSD_MIN + thresh_avg_high = THRESH_SSD_AVG_HIGH + thresh_avg_low = THRESH_SSD_AVG_LOW + else: + thresh_min = THRESH_HDD_MIN + thresh_avg_high = THRESH_HDD_AVG_HIGH + thresh_avg_low = THRESH_HDD_AVG_LOW + + # Add horizontal graph to report + for line in graph.generate_horizontal_graph(rate_list, graph_width): + if not strip_colors(line).strip(): + # Skip empty lines + continue + test_obj.report.append(line) + + # Add read rates to report + test_obj.report.append( + f'Read speeds avg: {avg_read/(1000**2):3.1f}' + f' min: {min_read/(1000**2):3.1f}' + f' max: {max_read/(1000**2):3.1f}' + ) + + # Compare against thresholds + if min_read <= thresh_min and avg_read <= thresh_avg_high: + test_obj.failed = True + elif avg_read <= thresh_avg_low: + test_obj.failed = True + else: + test_obj.passed = True + + # Set status + if test_obj.failed: + test_obj.set_status('Failed') + elif test_obj.passed: + test_obj.set_status('Passed') + else: + test_obj.set_status('Unknown') + + +def run_io_test(test_obj, log_path): + """Run I/O benchmark and handle exceptions.""" + dev_path = test_obj.dev.path + if PLATFORM == 'Darwin': + # Use "RAW" disks under macOS + dev_path = dev_path.with_name(f'r{dev_path.name}') + LOG.info('Using %s for better performance', dev_path) + offset = 0 + read_rates = [] + test_obj.report.append(color_string('I/O Benchmark', 'BLUE')) + + # Get dd values or bail + try: + dd_values = calc_io_dd_values(test_obj.dev.size) + except DeviceTooSmallError: + test_obj.set_status('N/A') + test_obj.report.append( + color_string('Disk too small to test', 'YELLOW'), + ) + return + + # Run dd read tests + for _i in range(dd_values['Read Chunks']): + _i += 1 + + # Build cmd + skip = dd_values['Skip Blocks'] + if dd_values['Skip Extra'] and _i % dd_values['Skip Extra'] == 0: + skip += 1 + cmd = [ + 'sudo', 'dd', + f'bs={IO_BLOCK_SIZE}', + f'skip={offset+skip}', + f'count={dd_values["Read Blocks"]}', + f'if={dev_path}', + 'of=/dev/null', + ] + if PLATFORM == 'Linux': + cmd.append('iflag=direct') + + # Run and get read rate + try: + proc = run_program( + cmd, + pipe=False, + stdout=PIPE, + stderr=STDOUT, + ) + except PermissionError as err: + # Since we're using sudo we can't kill dd + # Assuming this happened during a CTRL+c + raise KeyboardInterrupt from err + match = IO_RATE_REGEX.search(proc.stdout) + if match: + read_rates.append( + int(match.group('bytes')) / float(match.group('seconds')), + ) + match.group(1) + + # Show progress + with open(log_path, 'a', encoding='utf-8') as _f: + if _i % 5 == 0: + percent = (_i / dd_values['Read Chunks']) * 100 + _f.write(f' {graph.vertical_graph_line(percent, read_rates[-1])}\n') + + # Update offset + offset += dd_values['Read Blocks'] + skip + + # Check results + check_io_results(test_obj, read_rates, IO_GRAPH_WIDTH) + + + +if __name__ == '__main__': + print("This file is not meant to be called directly.") diff --git a/scripts/wk/hw/cpu.py b/scripts/wk/hw/cpu.py new file mode 100644 index 00000000..44329e7a --- /dev/null +++ b/scripts/wk/hw/cpu.py @@ -0,0 +1,205 @@ +"""WizardKit: CPU test functions""" +# vim: sts=2 sw=2 ts=2 + +import logging +import re +import subprocess + +from wk import exe +from wk.cfg.hw import CPU_FAILURE_TEMP +from wk.os.mac import set_fans as macos_set_fans +from wk.std import ( + PLATFORM, + color_string, + print_error, + print_warning, + ) +from wk.tmux import respawn_pane as tmux_respawn_pane + + +# STATIC VARIABLES +LOG = logging.getLogger(__name__) + + +# Functions +def check_cooling_results(test_obj, sensors, run_sysbench=False): + """Check cooling results and update test_obj.""" + max_temp = sensors.cpu_max_temp() + temp_labels = ['Idle', 'Max', 'Cooldown'] + if run_sysbench: + temp_labels.append('Sysbench') + + # Check temps + if not max_temp: + test_obj.set_status('Unknown') + elif max_temp >= CPU_FAILURE_TEMP: + test_obj.failed = True + test_obj.set_status('Failed') + elif 'Aborted' not in test_obj.status: + test_obj.passed = True + test_obj.set_status('Passed') + + # Add temps to report + for line in sensors.generate_report(*temp_labels, only_cpu=True): + test_obj.report.append(f' {line}') + + +def check_mprime_results(test_obj, working_dir): + """Check mprime log files and update test_obj.""" + passing_lines = {} + warning_lines = {} + + def _read_file(log_name): + """Read file and split into lines, returns list.""" + lines = [] + try: + with open(f'{working_dir}/{log_name}', 'r', encoding='utf-8') as _f: + lines = _f.readlines() + except FileNotFoundError: + # File may be missing on older systems + lines = [] + + return lines + + # results.txt (check if failed) + for line in _read_file('results.txt'): + line = line.strip() + if re.search(r'(error|fail)', line, re.IGNORECASE): + warning_lines[line] = None + + # print.log (check if passed) + for line in _read_file('prime.log'): + line = line.strip() + match = re.search( + r'(completed.*(\d+) errors, (\d+) warnings)', line, re.IGNORECASE) + if match: + if int(match.group(2)) + int(match.group(3)) > 0: + # Errors and/or warnings encountered + warning_lines[match.group(1).capitalize()] = None + else: + # No errors/warnings + passing_lines[match.group(1).capitalize()] = None + + # Update status + if warning_lines: + test_obj.failed = True + test_obj.set_status('Failed') + elif passing_lines and 'Aborted' not in test_obj.status: + test_obj.passed = True + test_obj.set_status('Passed') + else: + test_obj.set_status('Unknown') + + # Update report + for line in passing_lines: + test_obj.report.append(f' {line}') + for line in warning_lines: + test_obj.report.append(color_string(f' {line}', 'YELLOW')) + if not (passing_lines or warning_lines): + test_obj.report.append(color_string(' Unknown result', 'YELLOW')) + + +def start_mprime(working_dir, log_path): + """Start mprime and save filtered output to log, returns Popen object.""" + set_apple_fan_speed('max') + proc_mprime = subprocess.Popen( # pylint: disable=consider-using-with + ['mprime', '-t'], + cwd=working_dir, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + proc_grep = subprocess.Popen( # pylint: disable=consider-using-with + 'grep --ignore-case --invert-match --line-buffered stress.txt'.split(), + stdin=proc_mprime.stdout, + stdout=subprocess.PIPE, + ) + proc_mprime.stdout.close() + save_nsbr = exe.NonBlockingStreamReader(proc_grep.stdout) + exe.start_thread( + save_nsbr.save_to_file, + args=(proc_grep, log_path), + ) + + # Return objects + return proc_mprime + + +def start_sysbench(sensors, sensors_out, log_path, pane): + """Start sysbench, returns tuple with Popen object and file handle.""" + set_apple_fan_speed('max') + sysbench_cmd = [ + 'sysbench', + f'--threads={exe.psutil.cpu_count()}', + '--cpu-max-prime=1000000000', + 'cpu', + 'run', + ] + + # Restart background monitor for Sysbench + sensors.stop_background_monitor() + sensors.start_background_monitor( + sensors_out, + alt_max='Sysbench', + thermal_action=('killall', 'sysbench', '-INT'), + ) + + # Update bottom pane + tmux_respawn_pane(pane, watch_file=log_path, watch_cmd='tail') + + # Start sysbench + filehandle_sysbench = open( # pylint: disable=consider-using-with + log_path, 'a', encoding='utf-8', + ) + proc_sysbench = exe.popen_program(sysbench_cmd, stdout=filehandle_sysbench) + + # Done + return (proc_sysbench, filehandle_sysbench) + + +def set_apple_fan_speed(speed): + """Set Apple fan speed.""" + cmd = None + + # Check + if speed not in ('auto', 'max'): + raise RuntimeError(f'Invalid speed {speed}') + + # Set cmd + if PLATFORM == 'Darwin': + try: + macos_set_fans(speed) + except (RuntimeError, ValueError, subprocess.CalledProcessError) as err: + LOG.error('Failed to set fans to %s', speed) + LOG.error('Error: %s', err) + print_error(f'Failed to set fans to {speed}') + for line in str(err).splitlines(): + print_warning(f' {line.strip()}') + elif PLATFORM == 'Linux': + cmd = ['apple-fans', speed] + exe.run_program(cmd, check=False) + + +def stop_mprime(proc_mprime): + """Stop mprime gracefully, then forcefully as needed.""" + proc_mprime.terminate() + try: + proc_mprime.wait(timeout=5) + except subprocess.TimeoutExpired: + proc_mprime.kill() + set_apple_fan_speed('auto') + + +def stop_sysbench(proc_sysbench, filehandle_sysbench): + """Stop sysbench.""" + proc_sysbench.terminate() + try: + proc_sysbench.wait(timeout=5) + except subprocess.TimeoutExpired: + proc_sysbench.kill() + filehandle_sysbench.flush() + filehandle_sysbench.close() + set_apple_fan_speed('auto') + + +if __name__ == '__main__': + print("This file is not meant to be called directly.") diff --git a/scripts/wk/hw/diags.py b/scripts/wk/hw/diags.py index fdfaba55..64fcabf9 100644 --- a/scripts/wk/hw/diags.py +++ b/scripts/wk/hw/diags.py @@ -1,32 +1,27 @@ """WizardKit: Hardware diagnostics""" -# pylint: disable=too-many-lines # vim: sts=2 sw=2 ts=2 import atexit import logging import os import pathlib -import re import subprocess import time from docopt import docopt -from wk import cfg, debug, exe, graph, log, net, std, tmux -from wk import os as wk_os -from wk.cfg.hw import ( - BADBLOCKS_REGEX, - IO_GRAPH_WIDTH, - IO_ALT_TEST_SIZE_FACTOR, - IO_BLOCK_SIZE, - IO_CHUNK_SIZE, - IO_MINIMUM_TEST_SIZE, - IO_RATE_REGEX, - STATUS_COLORS, - ) +from wk import cfg, debug, exe, log, std, tmux +from wk.cfg.hw import STATUS_COLORS +from wk.hw import benchmark as hw_benchmark +from wk.hw import cpu as hw_cpu from wk.hw import disk as hw_disk from wk.hw import sensors as hw_sensors +from wk.hw import surface_scan as hw_surface_scan from wk.hw import system as hw_system +from wk.hw.audio import audio_test +from wk.hw.keyboard import keyboard_test +from wk.hw.network import network_test +from wk.hw.screensavers import screensaver from wk.hw.test import Test, TestGroup @@ -79,11 +74,6 @@ MENU_TOGGLES = ( ) PLATFORM = std.PLATFORM -# Error Classes -class DeviceTooSmallError(RuntimeError): - """Raised when a device is too small to test.""" - - # Classes class State(): """Object for tracking hardware diagnostic data.""" @@ -407,27 +397,6 @@ class State(): # Functions -def audio_test(): - """Run an OS-specific audio test.""" - if PLATFORM == 'Linux': - audio_test_linux() - - -def audio_test_linux(): - """Run an audio test using amixer and speaker-test.""" - LOG.info('Audio Test') - - # Set volume - for source in ('Master', 'PCM'): - cmd = f'amixer -q set "{source}" 80% unmute'.split() - exe.run_program(cmd, check=False) - - # Run audio tests - for mode in ('pink', 'wav'): - cmd = f'speaker-test -c 2 -l 1 -t {mode}'.split() - exe.run_program(cmd, check=False, pipe=False) - - def build_menu(cli_mode=False, quick_mode=False): # pylint: disable=too-many-branches """Build main menu, returns wk.std.Menu.""" @@ -480,180 +449,6 @@ def build_menu(cli_mode=False, quick_mode=False): return menu -def calc_io_dd_values(dev_size): - """Calculate I/O benchmark dd values, returns dict. - - Calculations: - The minimum dev size is IO_GRAPH_WIDTH * IO_CHUNK_SIZE - (e.g. 1.25 GB for a width of 40 and a chunk size of 32MB) - - read_total is the area to be read in bytes - If the dev is < IO_MINIMUM_TEST_SIZE then it's the whole dev - Else it's the larger of IO_MINIMUM_TEST_SIZE or the alt test size - (determined by dev * IO_ALT_TEST_SIZE_FACTOR) - - read_chunks is the number of groups of IO_CHUNK_SIZE in test_obj.dev - This number is reduced to a multiple of IO_GRAPH_WIDTH in order - to allow for the data to be condensed cleanly - - read_blocks is the chunk size in number of blocks - (e.g. 64 if block size is 512KB and chunk size is 32MB - - skip_total is the number of IO_BLOCK_SIZE groups not tested - skip_blocks is the number of blocks to skip per IO_CHUNK_SIZE - skip_extra_rate is how often to add an additional skip block - This is needed to ensure an even testing across the dev - This is calculated by using the fractional amount left off - of the skip_blocks variable - """ - read_total = min(IO_MINIMUM_TEST_SIZE, dev_size) - read_total = max(read_total, dev_size*IO_ALT_TEST_SIZE_FACTOR) - read_chunks = int(read_total // IO_CHUNK_SIZE) - read_chunks -= read_chunks % IO_GRAPH_WIDTH - if read_chunks < IO_GRAPH_WIDTH: - raise DeviceTooSmallError - read_blocks = int(IO_CHUNK_SIZE / IO_BLOCK_SIZE) - read_total = read_chunks * IO_CHUNK_SIZE - skip_total = int((dev_size - read_total) // IO_BLOCK_SIZE) - skip_blocks = int((skip_total / read_chunks) // 1) - skip_extra_rate = 0 - try: - skip_extra_rate = 1 + int(1 / ((skip_total / read_chunks) % 1)) - except ZeroDivisionError: - # skip_extra_rate == 0 is fine - pass - - # Done - return { - 'Read Chunks': read_chunks, - 'Read Blocks': read_blocks, - 'Skip Blocks': skip_blocks, - 'Skip Extra': skip_extra_rate, - } - - -def check_cooling_results(test_obj, sensors, run_sysbench=False): - """Check cooling results and update test_obj.""" - max_temp = sensors.cpu_max_temp() - temp_labels = ['Idle', 'Max', 'Cooldown'] - if run_sysbench: - temp_labels.append('Sysbench') - - # Check temps - if not max_temp: - test_obj.set_status('Unknown') - elif max_temp >= cfg.hw.CPU_FAILURE_TEMP: - test_obj.failed = True - test_obj.set_status('Failed') - elif 'Aborted' not in test_obj.status: - test_obj.passed = True - test_obj.set_status('Passed') - - # Add temps to report - for line in sensors.generate_report(*temp_labels, only_cpu=True): - test_obj.report.append(f' {line}') - - -def check_io_benchmark_results(test_obj, rate_list, graph_width): - """Generate colored report using rate_list, returns list of str.""" - avg_read = sum(rate_list) / len(rate_list) - min_read = min(rate_list) - max_read = max(rate_list) - if test_obj.dev.ssd: - thresh_min = cfg.hw.THRESH_SSD_MIN - thresh_avg_high = cfg.hw.THRESH_SSD_AVG_HIGH - thresh_avg_low = cfg.hw.THRESH_SSD_AVG_LOW - else: - thresh_min = cfg.hw.THRESH_HDD_MIN - thresh_avg_high = cfg.hw.THRESH_HDD_AVG_HIGH - thresh_avg_low = cfg.hw.THRESH_HDD_AVG_LOW - - # Add horizontal graph to report - for line in graph.generate_horizontal_graph(rate_list, graph_width): - if not std.strip_colors(line).strip(): - # Skip empty lines - continue - test_obj.report.append(line) - - # Add read rates to report - test_obj.report.append( - f'Read speeds avg: {avg_read/(1000**2):3.1f}' - f' min: {min_read/(1000**2):3.1f}' - f' max: {max_read/(1000**2):3.1f}' - ) - - # Compare against thresholds - if min_read <= thresh_min and avg_read <= thresh_avg_high: - test_obj.failed = True - elif avg_read <= thresh_avg_low: - test_obj.failed = True - else: - test_obj.passed = True - - # Set status - if test_obj.failed: - test_obj.set_status('Failed') - elif test_obj.passed: - test_obj.set_status('Passed') - else: - test_obj.set_status('Unknown') - - -def check_mprime_results(test_obj, working_dir): - """Check mprime log files and update test_obj.""" - passing_lines = {} - warning_lines = {} - - def _read_file(log_name): - """Read file and split into lines, returns list.""" - lines = [] - try: - with open(f'{working_dir}/{log_name}', 'r', encoding='utf-8') as _f: - lines = _f.readlines() - except FileNotFoundError: - # File may be missing on older systems - lines = [] - - return lines - - # results.txt (check if failed) - for line in _read_file('results.txt'): - line = line.strip() - if re.search(r'(error|fail)', line, re.IGNORECASE): - warning_lines[line] = None - - # print.log (check if passed) - for line in _read_file('prime.log'): - line = line.strip() - match = re.search( - r'(completed.*(\d+) errors, (\d+) warnings)', line, re.IGNORECASE) - if match: - if int(match.group(2)) + int(match.group(3)) > 0: - # Errors and/or warnings encountered - warning_lines[match.group(1).capitalize()] = None - else: - # No errors/warnings - passing_lines[match.group(1).capitalize()] = None - - # Update status - if warning_lines: - test_obj.failed = True - test_obj.set_status('Failed') - elif passing_lines and 'Aborted' not in test_obj.status: - test_obj.passed = True - test_obj.set_status('Passed') - else: - test_obj.set_status('Unknown') - - # Update report - for line in passing_lines: - test_obj.report.append(f' {line}') - for line in warning_lines: - test_obj.report.append(std.color_string(f' {line}', 'YELLOW')) - if not (passing_lines or warning_lines): - test_obj.report.append(std.color_string(' Unknown result', 'YELLOW')) - - def check_self_test_results(test_obj, aborted=False): """Check SMART self-test results.""" test_obj.report.append(std.color_string('Self-Test', 'BLUE')) @@ -726,8 +521,8 @@ def cpu_stress_tests(state, test_objects): # Stress CPU std.print_info('Running stress test') - set_apple_fan_speed('max') - proc_mprime = start_mprime(state.log_dir, prime_log) + hw_cpu.set_apple_fan_speed('max') + proc_mprime = hw_cpu.start_mprime(state.log_dir, prime_log) # Show countdown print('') @@ -737,7 +532,7 @@ def cpu_stress_tests(state, test_objects): aborted = True # Stop Prime95 - stop_mprime(proc_mprime) + hw_cpu.stop_mprime(proc_mprime) # Update progress if necessary if sensors.cpu_reached_critical_temp() or aborted: @@ -754,7 +549,9 @@ def cpu_stress_tests(state, test_objects): # Check Prime95 results test_mprime_obj.report.append(std.color_string('Prime95', 'BLUE')) - check_mprime_results(test_obj=test_mprime_obj, working_dir=state.log_dir) + hw_cpu.check_mprime_results( + test_obj=test_mprime_obj, working_dir=state.log_dir, + ) # Run Sysbench test if necessary run_sysbench = ( @@ -767,7 +564,7 @@ def cpu_stress_tests(state, test_objects): std.clear_screen() std.print_info('Running alternate stress test') print('') - proc_sysbench, filehandle_sysbench = start_sysbench( + proc_sysbench, filehandle_sysbench = hw_cpu.start_sysbench( sensors, sensors_out, log_path=prime_log.with_name('sysbench.log'), @@ -780,7 +577,7 @@ def cpu_stress_tests(state, test_objects): LOG.error('Failed to find sysbench process', exc_info=True) except KeyboardInterrupt: aborted = True - stop_sysbench(proc_sysbench, filehandle_sysbench) + hw_cpu.stop_sysbench(proc_sysbench, filehandle_sysbench) # Update progress # NOTE: CPU critical temp check isn't really necessary @@ -792,7 +589,7 @@ def cpu_stress_tests(state, test_objects): # Check Cooling results test_cooling_obj.report.append(std.color_string('Temps', 'BLUE')) - check_cooling_results(test_cooling_obj, sensors, run_sysbench) + hw_cpu.check_cooling_results(test_cooling_obj, sensors, run_sysbench) # Cleanup state.update_progress_pane() @@ -832,77 +629,6 @@ def disk_io_benchmark(state, test_objects, skip_usb=True): LOG.info('Disk I/O Benchmark (dd)') aborted = False - def _run_io_benchmark(test_obj, log_path): - """Run I/O benchmark and handle exceptions.""" - dev_path = test_obj.dev.path - if PLATFORM == 'Darwin': - # Use "RAW" disks under macOS - dev_path = dev_path.with_name(f'r{dev_path.name}') - LOG.info('Using %s for better performance', dev_path) - offset = 0 - read_rates = [] - test_obj.report.append(std.color_string('I/O Benchmark', 'BLUE')) - - # Get dd values or bail - try: - dd_values = calc_io_dd_values(test_obj.dev.size) - except DeviceTooSmallError: - test_obj.set_status('N/A') - test_obj.report.append( - std.color_string('Disk too small to test', 'YELLOW'), - ) - return - - # Run dd read tests - for _i in range(dd_values['Read Chunks']): - _i += 1 - - # Build cmd - skip = dd_values['Skip Blocks'] - if dd_values['Skip Extra'] and _i % dd_values['Skip Extra'] == 0: - skip += 1 - cmd = [ - 'sudo', 'dd', - f'bs={IO_BLOCK_SIZE}', - f'skip={offset+skip}', - f'count={dd_values["Read Blocks"]}', - f'if={dev_path}', - 'of=/dev/null', - ] - if PLATFORM == 'Linux': - cmd.append('iflag=direct') - - # Run and get read rate - try: - proc = exe.run_program( - cmd, - pipe=False, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - ) - except PermissionError as err: - # Since we're using sudo we can't kill dd - # Assuming this happened during a CTRL+c - raise KeyboardInterrupt from err - match = IO_RATE_REGEX.search(proc.stdout) - if match: - read_rates.append( - int(match.group('bytes')) / float(match.group('seconds')), - ) - match.group(1) - - # Show progress - with open(log_path, 'a', encoding='utf-8') as _f: - if _i % 5 == 0: - percent = (_i / dd_values['Read Chunks']) * 100 - _f.write(f' {graph.vertical_graph_line(percent, read_rates[-1])}\n') - - # Update offset - offset += dd_values['Read Blocks'] + skip - - # Check results - check_io_benchmark_results(test_obj, read_rates, IO_GRAPH_WIDTH) - # Run benchmarks state.update_top_pane( f'Disk I/O Benchmark{"s" if len(test_objects) > 1 else ""}', @@ -935,7 +661,7 @@ def disk_io_benchmark(state, test_objects, skip_usb=True): ) state.update_progress_pane() try: - _run_io_benchmark(test, test_log) + hw_benchmark.run_io_test(test, test_log) except KeyboardInterrupt: aborted = True except (subprocess.CalledProcessError, TypeError, ValueError) as err: @@ -1052,65 +778,6 @@ def disk_surface_scan(state, test_objects): threads = [] state.panes['badblocks'] = [] - def _run_surface_scan(test_obj, log_path): - """Run surface scan and handle exceptions.""" - block_size = '1024' - dev = test_obj.dev - dev_path = test_obj.dev.path - if PLATFORM == 'Darwin': - # Use "RAW" disks under macOS - dev_path = dev_path.with_name(f'r{dev_path.name}') - LOG.info('Using %s for better performance', dev_path) - test_obj.report.append(std.color_string('badblocks', 'BLUE')) - test_obj.set_status('Working') - - # Increase block size if necessary - if (dev.phy_sec == 4096 - or dev.size >= cfg.hw.BADBLOCKS_LARGE_DISK): - block_size = '4096' - - # Start scan - cmd = ['sudo', 'badblocks', '-sv', '-b', block_size, '-e', '1', dev_path] - with open(log_path, 'a', encoding='utf-8') as _f: - size_str = std.bytes_to_string(dev.size, use_binary=False) - _f.write( - std.color_string( - ['[', dev.path.name, ' ', size_str, ']\n'], - [None, 'BLUE', None, 'CYAN', None], - sep='', - ), - ) - _f.flush() - exe.run_program( - cmd, - check=False, - pipe=False, - stderr=subprocess.STDOUT, - stdout=_f, - ) - - # Check results - with open(log_path, 'r', encoding='utf-8') as _f: - for line in _f.readlines(): - line = std.strip_colors(line.strip()) - if not line or line.startswith('Checking') or line.startswith('['): - # Skip - continue - match = BADBLOCKS_REGEX.search(line) - if match: - if all(s == '0' for s in match.groups()): - test_obj.passed = True - test_obj.report.append(f' {line}') - test_obj.set_status('Passed') - else: - test_obj.failed = True - test_obj.report.append(f' {std.color_string(line, "YELLOW")}') - test_obj.set_status('Failed') - else: - test_obj.report.append(f' {std.color_string(line, "YELLOW")}') - if not (test_obj.passed or test_obj.failed): - test_obj.set_status('Unknown') - # Update panes state.update_top_pane( f'Disk Surface Scan{"s" if len(test_objects) > 1 else ""}', @@ -1141,7 +808,9 @@ def disk_surface_scan(state, test_objects): # Start thread test_log = f'{state.log_dir}/{test.dev.path.name}_badblocks.log' - threads.append(exe.start_thread(_run_surface_scan, args=(test, test_log))) + threads.append(exe.start_thread( + hw_surface_scan.run_scan, args=(test, test_log), + )) # Show progress if threads[-1].is_alive(): @@ -1182,13 +851,6 @@ def disk_surface_scan(state, test_objects): raise std.GenericAbort('Aborted') -def keyboard_test(): - """Test keyboard using xev.""" - LOG.info('Keyboard Test (xev)') - cmd = ['xev', '-event', 'keyboard'] - exe.run_program(cmd, check=False, pipe=False) - - def main(): # pylint: disable=too-many-branches """Main function for hardware diagnostics.""" @@ -1262,39 +924,6 @@ def main(): state.update_top_pane('Main Menu') -def network_test(): - """Run network tests.""" - LOG.info('Network Test') - try_and_print = std.TryAndPrint() - result = try_and_print.run( - message='Network connection...', - function=net.connected_to_private_network, - msg_good='OK', - raise_on_error=True, - ) - - # Bail if not connected - if result['Failed']: - std.print_warning('Please connect to a network and try again') - std.pause('Press Enter to return to main menu...') - return - - # Show IP address(es) - net.show_valid_addresses() - - # Ping tests - try_and_print.run( - 'Internet connection...', net.ping, msg_good='OK', addr='8.8.8.8') - try_and_print.run( - 'DNS resolution...', net.ping, msg_good='OK', addr='google.com') - - # Speedtest - try_and_print.run('Speedtest...', net.speedtest) - - # Done - std.pause('Press Enter to return to main menu...') - - def print_countdown(proc, seconds): """Print countdown to screen while proc is alive.""" for i in range(seconds): @@ -1376,51 +1005,6 @@ def run_diags(state, menu, quick_mode=False): std.pause('Press Enter to return to main menu...') -def screensaver(name): - """Show screensaver""" - LOG.info('Screensaver (%s)', name) - if name == 'matrix': - cmd = ['cmatrix', '-abs'] - elif name == 'pipes': - cmd = [ - 'pipes.sh', - '-t', '0', - '-t', '1', - '-t', '2', - '-t', '3', - '-t', '5', - '-R', '-r', '4000', - ] - - # Switch pane to fullscreen and start screensaver - tmux.zoom_pane() - exe.run_program(cmd, check=False, pipe=False, stderr=subprocess.PIPE) - tmux.zoom_pane() - - -def set_apple_fan_speed(speed): - """Set Apple fan speed.""" - cmd = None - - # Check - if speed not in ('auto', 'max'): - raise RuntimeError(f'Invalid speed {speed}') - - # Set cmd - if PLATFORM == 'Darwin': - try: - wk_os.mac.set_fans(speed) - except (RuntimeError, ValueError, subprocess.CalledProcessError) as err: - LOG.error('Failed to set fans to %s', speed) - LOG.error('Error: %s', err) - std.print_error(f'Failed to set fans to {speed}') - for line in str(err).splitlines(): - std.print_warning(f' {line.strip()}') - elif PLATFORM == 'Linux': - cmd = ['apple-fans', speed] - exe.run_program(cmd, check=False) - - def show_results(state): """Show test results by device.""" std.sleep(0.5) @@ -1450,84 +1034,6 @@ def show_results(state): std.print_standard(' ') -def start_mprime(working_dir, log_path): - """Start mprime and save filtered output to log, returns Popen object.""" - set_apple_fan_speed('max') - proc_mprime = subprocess.Popen( # pylint: disable=consider-using-with - ['mprime', '-t'], - cwd=working_dir, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - ) - proc_grep = subprocess.Popen( # pylint: disable=consider-using-with - 'grep --ignore-case --invert-match --line-buffered stress.txt'.split(), - stdin=proc_mprime.stdout, - stdout=subprocess.PIPE, - ) - proc_mprime.stdout.close() - save_nsbr = exe.NonBlockingStreamReader(proc_grep.stdout) - exe.start_thread( - save_nsbr.save_to_file, - args=(proc_grep, log_path), - ) - - # Return objects - return proc_mprime - - -def start_sysbench(sensors, sensors_out, log_path, pane): - """Start sysbench, returns tuple with Popen object and file handle.""" - set_apple_fan_speed('max') - sysbench_cmd = [ - 'sysbench', - f'--threads={exe.psutil.cpu_count()}', - '--cpu-max-prime=1000000000', - 'cpu', - 'run', - ] - - # Restart background monitor for Sysbench - sensors.stop_background_monitor() - sensors.start_background_monitor( - sensors_out, - alt_max='Sysbench', - thermal_action=('killall', 'sysbench', '-INT'), - ) - - # Update bottom pane - tmux.respawn_pane(pane, watch_file=log_path, watch_cmd='tail') - - # Start sysbench - filehandle_sysbench = open( # pylint: disable=consider-using-with - log_path, 'a', encoding='utf-8', - ) - proc_sysbench = exe.popen_program(sysbench_cmd, stdout=filehandle_sysbench) - - # Done - return (proc_sysbench, filehandle_sysbench) - -def stop_mprime(proc_mprime): - """Stop mprime gracefully, then forcefully as needed.""" - proc_mprime.terminate() - try: - proc_mprime.wait(timeout=5) - except subprocess.TimeoutExpired: - proc_mprime.kill() - set_apple_fan_speed('auto') - - -def stop_sysbench(proc_sysbench, filehandle_sysbench): - """Stop sysbench.""" - proc_sysbench.terminate() - try: - proc_sysbench.wait(timeout=5) - except subprocess.TimeoutExpired: - proc_sysbench.kill() - filehandle_sysbench.flush() - filehandle_sysbench.close() - set_apple_fan_speed('auto') - - def sync_clock(): """Sync clock under macOS using sntp.""" cmd = ['sudo', 'sntp', '-Ss', 'us.pool.ntp.org'] diff --git a/scripts/wk/hw/keyboard.py b/scripts/wk/hw/keyboard.py new file mode 100644 index 00000000..20a3db0a --- /dev/null +++ b/scripts/wk/hw/keyboard.py @@ -0,0 +1,31 @@ +"""WizardKit: Keyboard test functions""" +# vim: sts=2 sw=2 ts=2 + +import logging + +from wk.exe import run_program +from wk.std import PLATFORM, print_warning + + +# STATIC VARIABLES +LOG = logging.getLogger(__name__) + + +# Functions +def keyboard_test(): + """Test keyboard using OS specific functions.""" + if PLATFORM == 'Linux': + run_xev() + else: + print_warning(f'Not supported under this OS: {PLATFORM}') + + +def run_xev(): + """Test keyboard using xev.""" + LOG.info('Keyboard Test (xev)') + cmd = ['xev', '-event', 'keyboard'] + run_program(cmd, check=False, pipe=False) + + +if __name__ == '__main__': + print("This file is not meant to be called directly.") diff --git a/scripts/wk/hw/network.py b/scripts/wk/hw/network.py new file mode 100644 index 00000000..68ab30d9 --- /dev/null +++ b/scripts/wk/hw/network.py @@ -0,0 +1,58 @@ +"""WizardKit: Network test functions""" +# vim: sts=2 sw=2 ts=2 + +import logging + +from wk.net import ( + connected_to_private_network, + ping, + show_valid_addresses, + speedtest, + ) +from wk.std import ( + TryAndPrint, + pause, + print_warning, + ) + + +# STATIC VARIABLES +LOG = logging.getLogger(__name__) + + +# Functions +def network_test(): + """Run network tests.""" + LOG.info('Network Test') + try_and_print = TryAndPrint() + result = try_and_print.run( + message='Network connection...', + function=connected_to_private_network, + msg_good='OK', + raise_on_error=True, + ) + + # Bail if not connected + if result['Failed']: + print_warning('Please connect to a network and try again') + pause('Press Enter to return to main menu...') + return + + # Show IP address(es) + show_valid_addresses() + + # Ping tests + try_and_print.run( + 'Internet connection...', ping, msg_good='OK', addr='8.8.8.8') + try_and_print.run( + 'DNS resolution...', ping, msg_good='OK', addr='google.com') + + # Speedtest + try_and_print.run('Speedtest...', speedtest) + + # Done + pause('Press Enter to return to main menu...') + + +if __name__ == '__main__': + print("This file is not meant to be called directly.") diff --git a/scripts/wk/hw/screensavers.py b/scripts/wk/hw/screensavers.py new file mode 100644 index 00000000..9bdfb719 --- /dev/null +++ b/scripts/wk/hw/screensavers.py @@ -0,0 +1,40 @@ +"""WizardKit: Screensaver functions""" +# vim: sts=2 sw=2 ts=2 + +import logging + +from subprocess import PIPE + +from wk.exe import run_program +from wk.tmux import zoom_pane as tmux_zoom_pane + + +# STATIC VARIABLES +LOG = logging.getLogger(__name__) + + +# Functions +def screensaver(name): + """Show screensaver""" + LOG.info('Screensaver (%s)', name) + if name == 'matrix': + cmd = ['cmatrix', '-abs'] + elif name == 'pipes': + cmd = [ + 'pipes.sh', + '-t', '0', + '-t', '1', + '-t', '2', + '-t', '3', + '-t', '5', + '-R', '-r', '4000', + ] + + # Switch pane to fullscreen and start screensaver + tmux_zoom_pane() + run_program(cmd, check=False, pipe=False, stderr=PIPE) + tmux_zoom_pane() + + +if __name__ == '__main__': + print("This file is not meant to be called directly.") diff --git a/scripts/wk/hw/surface_scan.py b/scripts/wk/hw/surface_scan.py new file mode 100644 index 00000000..085bc52c --- /dev/null +++ b/scripts/wk/hw/surface_scan.py @@ -0,0 +1,85 @@ +"""WizardKit: Surface scan test functions""" +# vim: sts=2 sw=2 ts=2 + +import logging + +from subprocess import STDOUT + +from wk.cfg.hw import BADBLOCKS_LARGE_DISK, BADBLOCKS_REGEX +from wk.exe import run_program +from wk.std import ( + PLATFORM, + bytes_to_string, + color_string, + strip_colors, + ) + + +# STATIC VARIABLES +LOG = logging.getLogger(__name__) + + +# Functions +def run_scan(test_obj, log_path): + """Run surface scan and handle exceptions.""" + block_size = '1024' + dev = test_obj.dev + dev_path = test_obj.dev.path + if PLATFORM == 'Darwin': + # Use "RAW" disks under macOS + dev_path = dev_path.with_name(f'r{dev_path.name}') + LOG.info('Using %s for better performance', dev_path) + test_obj.report.append(color_string('badblocks', 'BLUE')) + test_obj.set_status('Working') + + # Increase block size if necessary + if (dev.phy_sec == 4096 + or dev.size >= BADBLOCKS_LARGE_DISK): + block_size = '4096' + + # Start scan + cmd = ['sudo', 'badblocks', '-sv', '-b', block_size, '-e', '1', dev_path] + with open(log_path, 'a', encoding='utf-8') as _f: + size_str = bytes_to_string(dev.size, use_binary=False) + _f.write( + color_string( + ['[', dev.path.name, ' ', size_str, ']\n'], + [None, 'BLUE', None, 'CYAN', None], + sep='', + ), + ) + _f.flush() + run_program( + cmd, + check=False, + pipe=False, + stderr=STDOUT, + stdout=_f, + ) + + # Check results + with open(log_path, 'r', encoding='utf-8') as _f: + for line in _f.readlines(): + line = strip_colors(line.strip()) + if not line or line.startswith('Checking') or line.startswith('['): + # Skip + continue + match = BADBLOCKS_REGEX.search(line) + if match: + if all(s == '0' for s in match.groups()): + test_obj.passed = True + test_obj.report.append(f' {line}') + test_obj.set_status('Passed') + else: + test_obj.failed = True + test_obj.report.append(f' {color_string(line, "YELLOW")}') + test_obj.set_status('Failed') + else: + test_obj.report.append(f' {color_string(line, "YELLOW")}') + if not (test_obj.passed or test_obj.failed): + test_obj.set_status('Unknown') + + + +if __name__ == '__main__': + print("This file is not meant to be called directly.")