Split hardware diagnostics into multiple files

This commit is contained in:
2Shirt 2022-04-05 18:11:06 -06:00
parent fc2bb07d11
commit 99dd7661d4
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C
9 changed files with 697 additions and 516 deletions

View file

@ -1,8 +1,15 @@
"""WizardKit: hw module init"""
from . import audio
from . import benchmark
from . import cpu
from . import ddrescue
from . import diags
from . import disk
from . import keyboard
from . import network
from . import screensavers
from . import sensors
from . import surface_scan
from . import system
from . import test

37
scripts/wk/hw/audio.py Normal file
View file

@ -0,0 +1,37 @@
"""WizardKit: Audio test functions"""
# vim: sts=2 sw=2 ts=2
import logging
from wk.exe import run_program
from wk.std import PLATFORM
# STATIC VARIABLES
LOG = logging.getLogger(__name__)
# Functions
def audio_test():
"""Run an OS-specific audio test."""
if PLATFORM == 'Linux':
audio_test_linux()
def audio_test_linux():
"""Run an audio test using amixer and speaker-test."""
LOG.info('Audio Test')
# Set volume
for source in ('Master', 'PCM'):
cmd = f'amixer -q set "{source}" 80% unmute'.split()
run_program(cmd, check=False)
# Run audio tests
for mode in ('pink', 'wav'):
cmd = f'speaker-test -c 2 -l 1 -t {mode}'.split()
run_program(cmd, check=False, pipe=False)
if __name__ == '__main__':
print("This file is not meant to be called directly.")

212
scripts/wk/hw/benchmark.py Normal file
View file

@ -0,0 +1,212 @@
"""WizardKit: Benchmark test functions"""
# vim: sts=2 sw=2 ts=2
import logging
from subprocess import PIPE, STDOUT
from wk import graph
from wk.cfg.hw import (
IO_ALT_TEST_SIZE_FACTOR,
IO_BLOCK_SIZE,
IO_CHUNK_SIZE,
IO_GRAPH_WIDTH,
IO_MINIMUM_TEST_SIZE,
IO_RATE_REGEX,
THRESH_HDD_AVG_HIGH,
THRESH_HDD_AVG_LOW,
THRESH_HDD_MIN,
THRESH_SSD_AVG_HIGH,
THRESH_SSD_AVG_LOW,
THRESH_SSD_MIN,
)
from wk.exe import run_program
from wk.std import (
PLATFORM,
strip_colors,
color_string,
)
# STATIC VARIABLES
LOG = logging.getLogger(__name__)
# Error Classes
class DeviceTooSmallError(RuntimeError):
"""Raised when a device is too small to test."""
# Functions
def calc_io_dd_values(dev_size):
"""Calculate I/O benchmark dd values, returns dict.
Calculations:
The minimum dev size is IO_GRAPH_WIDTH * IO_CHUNK_SIZE
(e.g. 1.25 GB for a width of 40 and a chunk size of 32MB)
read_total is the area to be read in bytes
If the dev is < IO_MINIMUM_TEST_SIZE then it's the whole dev
Else it's the larger of IO_MINIMUM_TEST_SIZE or the alt test size
(determined by dev * IO_ALT_TEST_SIZE_FACTOR)
read_chunks is the number of groups of IO_CHUNK_SIZE in test_obj.dev
This number is reduced to a multiple of IO_GRAPH_WIDTH in order
to allow for the data to be condensed cleanly
read_blocks is the chunk size in number of blocks
(e.g. 64 if block size is 512KB and chunk size is 32MB
skip_total is the number of IO_BLOCK_SIZE groups not tested
skip_blocks is the number of blocks to skip per IO_CHUNK_SIZE
skip_extra_rate is how often to add an additional skip block
This is needed to ensure an even testing across the dev
This is calculated by using the fractional amount left off
of the skip_blocks variable
"""
read_total = min(IO_MINIMUM_TEST_SIZE, dev_size)
read_total = max(read_total, dev_size*IO_ALT_TEST_SIZE_FACTOR)
read_chunks = int(read_total // IO_CHUNK_SIZE)
read_chunks -= read_chunks % IO_GRAPH_WIDTH
if read_chunks < IO_GRAPH_WIDTH:
raise DeviceTooSmallError
read_blocks = int(IO_CHUNK_SIZE / IO_BLOCK_SIZE)
read_total = read_chunks * IO_CHUNK_SIZE
skip_total = int((dev_size - read_total) // IO_BLOCK_SIZE)
skip_blocks = int((skip_total / read_chunks) // 1)
skip_extra_rate = 0
try:
skip_extra_rate = 1 + int(1 / ((skip_total / read_chunks) % 1))
except ZeroDivisionError:
# skip_extra_rate == 0 is fine
pass
# Done
return {
'Read Chunks': read_chunks,
'Read Blocks': read_blocks,
'Skip Blocks': skip_blocks,
'Skip Extra': skip_extra_rate,
}
def check_io_results(test_obj, rate_list, graph_width):
"""Generate colored report using rate_list, returns list of str."""
avg_read = sum(rate_list) / len(rate_list)
min_read = min(rate_list)
max_read = max(rate_list)
if test_obj.dev.ssd:
thresh_min = THRESH_SSD_MIN
thresh_avg_high = THRESH_SSD_AVG_HIGH
thresh_avg_low = THRESH_SSD_AVG_LOW
else:
thresh_min = THRESH_HDD_MIN
thresh_avg_high = THRESH_HDD_AVG_HIGH
thresh_avg_low = THRESH_HDD_AVG_LOW
# Add horizontal graph to report
for line in graph.generate_horizontal_graph(rate_list, graph_width):
if not strip_colors(line).strip():
# Skip empty lines
continue
test_obj.report.append(line)
# Add read rates to report
test_obj.report.append(
f'Read speeds avg: {avg_read/(1000**2):3.1f}'
f' min: {min_read/(1000**2):3.1f}'
f' max: {max_read/(1000**2):3.1f}'
)
# Compare against thresholds
if min_read <= thresh_min and avg_read <= thresh_avg_high:
test_obj.failed = True
elif avg_read <= thresh_avg_low:
test_obj.failed = True
else:
test_obj.passed = True
# Set status
if test_obj.failed:
test_obj.set_status('Failed')
elif test_obj.passed:
test_obj.set_status('Passed')
else:
test_obj.set_status('Unknown')
def run_io_test(test_obj, log_path):
"""Run I/O benchmark and handle exceptions."""
dev_path = test_obj.dev.path
if PLATFORM == 'Darwin':
# Use "RAW" disks under macOS
dev_path = dev_path.with_name(f'r{dev_path.name}')
LOG.info('Using %s for better performance', dev_path)
offset = 0
read_rates = []
test_obj.report.append(color_string('I/O Benchmark', 'BLUE'))
# Get dd values or bail
try:
dd_values = calc_io_dd_values(test_obj.dev.size)
except DeviceTooSmallError:
test_obj.set_status('N/A')
test_obj.report.append(
color_string('Disk too small to test', 'YELLOW'),
)
return
# Run dd read tests
for _i in range(dd_values['Read Chunks']):
_i += 1
# Build cmd
skip = dd_values['Skip Blocks']
if dd_values['Skip Extra'] and _i % dd_values['Skip Extra'] == 0:
skip += 1
cmd = [
'sudo', 'dd',
f'bs={IO_BLOCK_SIZE}',
f'skip={offset+skip}',
f'count={dd_values["Read Blocks"]}',
f'if={dev_path}',
'of=/dev/null',
]
if PLATFORM == 'Linux':
cmd.append('iflag=direct')
# Run and get read rate
try:
proc = run_program(
cmd,
pipe=False,
stdout=PIPE,
stderr=STDOUT,
)
except PermissionError as err:
# Since we're using sudo we can't kill dd
# Assuming this happened during a CTRL+c
raise KeyboardInterrupt from err
match = IO_RATE_REGEX.search(proc.stdout)
if match:
read_rates.append(
int(match.group('bytes')) / float(match.group('seconds')),
)
match.group(1)
# Show progress
with open(log_path, 'a', encoding='utf-8') as _f:
if _i % 5 == 0:
percent = (_i / dd_values['Read Chunks']) * 100
_f.write(f' {graph.vertical_graph_line(percent, read_rates[-1])}\n')
# Update offset
offset += dd_values['Read Blocks'] + skip
# Check results
check_io_results(test_obj, read_rates, IO_GRAPH_WIDTH)
if __name__ == '__main__':
print("This file is not meant to be called directly.")

205
scripts/wk/hw/cpu.py Normal file
View file

@ -0,0 +1,205 @@
"""WizardKit: CPU test functions"""
# vim: sts=2 sw=2 ts=2
import logging
import re
import subprocess
from wk import exe
from wk.cfg.hw import CPU_FAILURE_TEMP
from wk.os.mac import set_fans as macos_set_fans
from wk.std import (
PLATFORM,
color_string,
print_error,
print_warning,
)
from wk.tmux import respawn_pane as tmux_respawn_pane
# STATIC VARIABLES
LOG = logging.getLogger(__name__)
# Functions
def check_cooling_results(test_obj, sensors, run_sysbench=False):
"""Check cooling results and update test_obj."""
max_temp = sensors.cpu_max_temp()
temp_labels = ['Idle', 'Max', 'Cooldown']
if run_sysbench:
temp_labels.append('Sysbench')
# Check temps
if not max_temp:
test_obj.set_status('Unknown')
elif max_temp >= CPU_FAILURE_TEMP:
test_obj.failed = True
test_obj.set_status('Failed')
elif 'Aborted' not in test_obj.status:
test_obj.passed = True
test_obj.set_status('Passed')
# Add temps to report
for line in sensors.generate_report(*temp_labels, only_cpu=True):
test_obj.report.append(f' {line}')
def check_mprime_results(test_obj, working_dir):
"""Check mprime log files and update test_obj."""
passing_lines = {}
warning_lines = {}
def _read_file(log_name):
"""Read file and split into lines, returns list."""
lines = []
try:
with open(f'{working_dir}/{log_name}', 'r', encoding='utf-8') as _f:
lines = _f.readlines()
except FileNotFoundError:
# File may be missing on older systems
lines = []
return lines
# results.txt (check if failed)
for line in _read_file('results.txt'):
line = line.strip()
if re.search(r'(error|fail)', line, re.IGNORECASE):
warning_lines[line] = None
# print.log (check if passed)
for line in _read_file('prime.log'):
line = line.strip()
match = re.search(
r'(completed.*(\d+) errors, (\d+) warnings)', line, re.IGNORECASE)
if match:
if int(match.group(2)) + int(match.group(3)) > 0:
# Errors and/or warnings encountered
warning_lines[match.group(1).capitalize()] = None
else:
# No errors/warnings
passing_lines[match.group(1).capitalize()] = None
# Update status
if warning_lines:
test_obj.failed = True
test_obj.set_status('Failed')
elif passing_lines and 'Aborted' not in test_obj.status:
test_obj.passed = True
test_obj.set_status('Passed')
else:
test_obj.set_status('Unknown')
# Update report
for line in passing_lines:
test_obj.report.append(f' {line}')
for line in warning_lines:
test_obj.report.append(color_string(f' {line}', 'YELLOW'))
if not (passing_lines or warning_lines):
test_obj.report.append(color_string(' Unknown result', 'YELLOW'))
def start_mprime(working_dir, log_path):
"""Start mprime and save filtered output to log, returns Popen object."""
set_apple_fan_speed('max')
proc_mprime = subprocess.Popen( # pylint: disable=consider-using-with
['mprime', '-t'],
cwd=working_dir,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
proc_grep = subprocess.Popen( # pylint: disable=consider-using-with
'grep --ignore-case --invert-match --line-buffered stress.txt'.split(),
stdin=proc_mprime.stdout,
stdout=subprocess.PIPE,
)
proc_mprime.stdout.close()
save_nsbr = exe.NonBlockingStreamReader(proc_grep.stdout)
exe.start_thread(
save_nsbr.save_to_file,
args=(proc_grep, log_path),
)
# Return objects
return proc_mprime
def start_sysbench(sensors, sensors_out, log_path, pane):
"""Start sysbench, returns tuple with Popen object and file handle."""
set_apple_fan_speed('max')
sysbench_cmd = [
'sysbench',
f'--threads={exe.psutil.cpu_count()}',
'--cpu-max-prime=1000000000',
'cpu',
'run',
]
# Restart background monitor for Sysbench
sensors.stop_background_monitor()
sensors.start_background_monitor(
sensors_out,
alt_max='Sysbench',
thermal_action=('killall', 'sysbench', '-INT'),
)
# Update bottom pane
tmux_respawn_pane(pane, watch_file=log_path, watch_cmd='tail')
# Start sysbench
filehandle_sysbench = open( # pylint: disable=consider-using-with
log_path, 'a', encoding='utf-8',
)
proc_sysbench = exe.popen_program(sysbench_cmd, stdout=filehandle_sysbench)
# Done
return (proc_sysbench, filehandle_sysbench)
def set_apple_fan_speed(speed):
"""Set Apple fan speed."""
cmd = None
# Check
if speed not in ('auto', 'max'):
raise RuntimeError(f'Invalid speed {speed}')
# Set cmd
if PLATFORM == 'Darwin':
try:
macos_set_fans(speed)
except (RuntimeError, ValueError, subprocess.CalledProcessError) as err:
LOG.error('Failed to set fans to %s', speed)
LOG.error('Error: %s', err)
print_error(f'Failed to set fans to {speed}')
for line in str(err).splitlines():
print_warning(f' {line.strip()}')
elif PLATFORM == 'Linux':
cmd = ['apple-fans', speed]
exe.run_program(cmd, check=False)
def stop_mprime(proc_mprime):
"""Stop mprime gracefully, then forcefully as needed."""
proc_mprime.terminate()
try:
proc_mprime.wait(timeout=5)
except subprocess.TimeoutExpired:
proc_mprime.kill()
set_apple_fan_speed('auto')
def stop_sysbench(proc_sysbench, filehandle_sysbench):
"""Stop sysbench."""
proc_sysbench.terminate()
try:
proc_sysbench.wait(timeout=5)
except subprocess.TimeoutExpired:
proc_sysbench.kill()
filehandle_sysbench.flush()
filehandle_sysbench.close()
set_apple_fan_speed('auto')
if __name__ == '__main__':
print("This file is not meant to be called directly.")

View file

@ -1,32 +1,27 @@
"""WizardKit: Hardware diagnostics"""
# pylint: disable=too-many-lines
# vim: sts=2 sw=2 ts=2
import atexit
import logging
import os
import pathlib
import re
import subprocess
import time
from docopt import docopt
from wk import cfg, debug, exe, graph, log, net, std, tmux
from wk import os as wk_os
from wk.cfg.hw import (
BADBLOCKS_REGEX,
IO_GRAPH_WIDTH,
IO_ALT_TEST_SIZE_FACTOR,
IO_BLOCK_SIZE,
IO_CHUNK_SIZE,
IO_MINIMUM_TEST_SIZE,
IO_RATE_REGEX,
STATUS_COLORS,
)
from wk import cfg, debug, exe, log, std, tmux
from wk.cfg.hw import STATUS_COLORS
from wk.hw import benchmark as hw_benchmark
from wk.hw import cpu as hw_cpu
from wk.hw import disk as hw_disk
from wk.hw import sensors as hw_sensors
from wk.hw import surface_scan as hw_surface_scan
from wk.hw import system as hw_system
from wk.hw.audio import audio_test
from wk.hw.keyboard import keyboard_test
from wk.hw.network import network_test
from wk.hw.screensavers import screensaver
from wk.hw.test import Test, TestGroup
@ -79,11 +74,6 @@ MENU_TOGGLES = (
)
PLATFORM = std.PLATFORM
# Error Classes
class DeviceTooSmallError(RuntimeError):
"""Raised when a device is too small to test."""
# Classes
class State():
"""Object for tracking hardware diagnostic data."""
@ -407,27 +397,6 @@ class State():
# Functions
def audio_test():
"""Run an OS-specific audio test."""
if PLATFORM == 'Linux':
audio_test_linux()
def audio_test_linux():
"""Run an audio test using amixer and speaker-test."""
LOG.info('Audio Test')
# Set volume
for source in ('Master', 'PCM'):
cmd = f'amixer -q set "{source}" 80% unmute'.split()
exe.run_program(cmd, check=False)
# Run audio tests
for mode in ('pink', 'wav'):
cmd = f'speaker-test -c 2 -l 1 -t {mode}'.split()
exe.run_program(cmd, check=False, pipe=False)
def build_menu(cli_mode=False, quick_mode=False):
# pylint: disable=too-many-branches
"""Build main menu, returns wk.std.Menu."""
@ -480,180 +449,6 @@ def build_menu(cli_mode=False, quick_mode=False):
return menu
def calc_io_dd_values(dev_size):
"""Calculate I/O benchmark dd values, returns dict.
Calculations:
The minimum dev size is IO_GRAPH_WIDTH * IO_CHUNK_SIZE
(e.g. 1.25 GB for a width of 40 and a chunk size of 32MB)
read_total is the area to be read in bytes
If the dev is < IO_MINIMUM_TEST_SIZE then it's the whole dev
Else it's the larger of IO_MINIMUM_TEST_SIZE or the alt test size
(determined by dev * IO_ALT_TEST_SIZE_FACTOR)
read_chunks is the number of groups of IO_CHUNK_SIZE in test_obj.dev
This number is reduced to a multiple of IO_GRAPH_WIDTH in order
to allow for the data to be condensed cleanly
read_blocks is the chunk size in number of blocks
(e.g. 64 if block size is 512KB and chunk size is 32MB
skip_total is the number of IO_BLOCK_SIZE groups not tested
skip_blocks is the number of blocks to skip per IO_CHUNK_SIZE
skip_extra_rate is how often to add an additional skip block
This is needed to ensure an even testing across the dev
This is calculated by using the fractional amount left off
of the skip_blocks variable
"""
read_total = min(IO_MINIMUM_TEST_SIZE, dev_size)
read_total = max(read_total, dev_size*IO_ALT_TEST_SIZE_FACTOR)
read_chunks = int(read_total // IO_CHUNK_SIZE)
read_chunks -= read_chunks % IO_GRAPH_WIDTH
if read_chunks < IO_GRAPH_WIDTH:
raise DeviceTooSmallError
read_blocks = int(IO_CHUNK_SIZE / IO_BLOCK_SIZE)
read_total = read_chunks * IO_CHUNK_SIZE
skip_total = int((dev_size - read_total) // IO_BLOCK_SIZE)
skip_blocks = int((skip_total / read_chunks) // 1)
skip_extra_rate = 0
try:
skip_extra_rate = 1 + int(1 / ((skip_total / read_chunks) % 1))
except ZeroDivisionError:
# skip_extra_rate == 0 is fine
pass
# Done
return {
'Read Chunks': read_chunks,
'Read Blocks': read_blocks,
'Skip Blocks': skip_blocks,
'Skip Extra': skip_extra_rate,
}
def check_cooling_results(test_obj, sensors, run_sysbench=False):
"""Check cooling results and update test_obj."""
max_temp = sensors.cpu_max_temp()
temp_labels = ['Idle', 'Max', 'Cooldown']
if run_sysbench:
temp_labels.append('Sysbench')
# Check temps
if not max_temp:
test_obj.set_status('Unknown')
elif max_temp >= cfg.hw.CPU_FAILURE_TEMP:
test_obj.failed = True
test_obj.set_status('Failed')
elif 'Aborted' not in test_obj.status:
test_obj.passed = True
test_obj.set_status('Passed')
# Add temps to report
for line in sensors.generate_report(*temp_labels, only_cpu=True):
test_obj.report.append(f' {line}')
def check_io_benchmark_results(test_obj, rate_list, graph_width):
"""Generate colored report using rate_list, returns list of str."""
avg_read = sum(rate_list) / len(rate_list)
min_read = min(rate_list)
max_read = max(rate_list)
if test_obj.dev.ssd:
thresh_min = cfg.hw.THRESH_SSD_MIN
thresh_avg_high = cfg.hw.THRESH_SSD_AVG_HIGH
thresh_avg_low = cfg.hw.THRESH_SSD_AVG_LOW
else:
thresh_min = cfg.hw.THRESH_HDD_MIN
thresh_avg_high = cfg.hw.THRESH_HDD_AVG_HIGH
thresh_avg_low = cfg.hw.THRESH_HDD_AVG_LOW
# Add horizontal graph to report
for line in graph.generate_horizontal_graph(rate_list, graph_width):
if not std.strip_colors(line).strip():
# Skip empty lines
continue
test_obj.report.append(line)
# Add read rates to report
test_obj.report.append(
f'Read speeds avg: {avg_read/(1000**2):3.1f}'
f' min: {min_read/(1000**2):3.1f}'
f' max: {max_read/(1000**2):3.1f}'
)
# Compare against thresholds
if min_read <= thresh_min and avg_read <= thresh_avg_high:
test_obj.failed = True
elif avg_read <= thresh_avg_low:
test_obj.failed = True
else:
test_obj.passed = True
# Set status
if test_obj.failed:
test_obj.set_status('Failed')
elif test_obj.passed:
test_obj.set_status('Passed')
else:
test_obj.set_status('Unknown')
def check_mprime_results(test_obj, working_dir):
"""Check mprime log files and update test_obj."""
passing_lines = {}
warning_lines = {}
def _read_file(log_name):
"""Read file and split into lines, returns list."""
lines = []
try:
with open(f'{working_dir}/{log_name}', 'r', encoding='utf-8') as _f:
lines = _f.readlines()
except FileNotFoundError:
# File may be missing on older systems
lines = []
return lines
# results.txt (check if failed)
for line in _read_file('results.txt'):
line = line.strip()
if re.search(r'(error|fail)', line, re.IGNORECASE):
warning_lines[line] = None
# print.log (check if passed)
for line in _read_file('prime.log'):
line = line.strip()
match = re.search(
r'(completed.*(\d+) errors, (\d+) warnings)', line, re.IGNORECASE)
if match:
if int(match.group(2)) + int(match.group(3)) > 0:
# Errors and/or warnings encountered
warning_lines[match.group(1).capitalize()] = None
else:
# No errors/warnings
passing_lines[match.group(1).capitalize()] = None
# Update status
if warning_lines:
test_obj.failed = True
test_obj.set_status('Failed')
elif passing_lines and 'Aborted' not in test_obj.status:
test_obj.passed = True
test_obj.set_status('Passed')
else:
test_obj.set_status('Unknown')
# Update report
for line in passing_lines:
test_obj.report.append(f' {line}')
for line in warning_lines:
test_obj.report.append(std.color_string(f' {line}', 'YELLOW'))
if not (passing_lines or warning_lines):
test_obj.report.append(std.color_string(' Unknown result', 'YELLOW'))
def check_self_test_results(test_obj, aborted=False):
"""Check SMART self-test results."""
test_obj.report.append(std.color_string('Self-Test', 'BLUE'))
@ -726,8 +521,8 @@ def cpu_stress_tests(state, test_objects):
# Stress CPU
std.print_info('Running stress test')
set_apple_fan_speed('max')
proc_mprime = start_mprime(state.log_dir, prime_log)
hw_cpu.set_apple_fan_speed('max')
proc_mprime = hw_cpu.start_mprime(state.log_dir, prime_log)
# Show countdown
print('')
@ -737,7 +532,7 @@ def cpu_stress_tests(state, test_objects):
aborted = True
# Stop Prime95
stop_mprime(proc_mprime)
hw_cpu.stop_mprime(proc_mprime)
# Update progress if necessary
if sensors.cpu_reached_critical_temp() or aborted:
@ -754,7 +549,9 @@ def cpu_stress_tests(state, test_objects):
# Check Prime95 results
test_mprime_obj.report.append(std.color_string('Prime95', 'BLUE'))
check_mprime_results(test_obj=test_mprime_obj, working_dir=state.log_dir)
hw_cpu.check_mprime_results(
test_obj=test_mprime_obj, working_dir=state.log_dir,
)
# Run Sysbench test if necessary
run_sysbench = (
@ -767,7 +564,7 @@ def cpu_stress_tests(state, test_objects):
std.clear_screen()
std.print_info('Running alternate stress test')
print('')
proc_sysbench, filehandle_sysbench = start_sysbench(
proc_sysbench, filehandle_sysbench = hw_cpu.start_sysbench(
sensors,
sensors_out,
log_path=prime_log.with_name('sysbench.log'),
@ -780,7 +577,7 @@ def cpu_stress_tests(state, test_objects):
LOG.error('Failed to find sysbench process', exc_info=True)
except KeyboardInterrupt:
aborted = True
stop_sysbench(proc_sysbench, filehandle_sysbench)
hw_cpu.stop_sysbench(proc_sysbench, filehandle_sysbench)
# Update progress
# NOTE: CPU critical temp check isn't really necessary
@ -792,7 +589,7 @@ def cpu_stress_tests(state, test_objects):
# Check Cooling results
test_cooling_obj.report.append(std.color_string('Temps', 'BLUE'))
check_cooling_results(test_cooling_obj, sensors, run_sysbench)
hw_cpu.check_cooling_results(test_cooling_obj, sensors, run_sysbench)
# Cleanup
state.update_progress_pane()
@ -832,77 +629,6 @@ def disk_io_benchmark(state, test_objects, skip_usb=True):
LOG.info('Disk I/O Benchmark (dd)')
aborted = False
def _run_io_benchmark(test_obj, log_path):
"""Run I/O benchmark and handle exceptions."""
dev_path = test_obj.dev.path
if PLATFORM == 'Darwin':
# Use "RAW" disks under macOS
dev_path = dev_path.with_name(f'r{dev_path.name}')
LOG.info('Using %s for better performance', dev_path)
offset = 0
read_rates = []
test_obj.report.append(std.color_string('I/O Benchmark', 'BLUE'))
# Get dd values or bail
try:
dd_values = calc_io_dd_values(test_obj.dev.size)
except DeviceTooSmallError:
test_obj.set_status('N/A')
test_obj.report.append(
std.color_string('Disk too small to test', 'YELLOW'),
)
return
# Run dd read tests
for _i in range(dd_values['Read Chunks']):
_i += 1
# Build cmd
skip = dd_values['Skip Blocks']
if dd_values['Skip Extra'] and _i % dd_values['Skip Extra'] == 0:
skip += 1
cmd = [
'sudo', 'dd',
f'bs={IO_BLOCK_SIZE}',
f'skip={offset+skip}',
f'count={dd_values["Read Blocks"]}',
f'if={dev_path}',
'of=/dev/null',
]
if PLATFORM == 'Linux':
cmd.append('iflag=direct')
# Run and get read rate
try:
proc = exe.run_program(
cmd,
pipe=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
except PermissionError as err:
# Since we're using sudo we can't kill dd
# Assuming this happened during a CTRL+c
raise KeyboardInterrupt from err
match = IO_RATE_REGEX.search(proc.stdout)
if match:
read_rates.append(
int(match.group('bytes')) / float(match.group('seconds')),
)
match.group(1)
# Show progress
with open(log_path, 'a', encoding='utf-8') as _f:
if _i % 5 == 0:
percent = (_i / dd_values['Read Chunks']) * 100
_f.write(f' {graph.vertical_graph_line(percent, read_rates[-1])}\n')
# Update offset
offset += dd_values['Read Blocks'] + skip
# Check results
check_io_benchmark_results(test_obj, read_rates, IO_GRAPH_WIDTH)
# Run benchmarks
state.update_top_pane(
f'Disk I/O Benchmark{"s" if len(test_objects) > 1 else ""}',
@ -935,7 +661,7 @@ def disk_io_benchmark(state, test_objects, skip_usb=True):
)
state.update_progress_pane()
try:
_run_io_benchmark(test, test_log)
hw_benchmark.run_io_test(test, test_log)
except KeyboardInterrupt:
aborted = True
except (subprocess.CalledProcessError, TypeError, ValueError) as err:
@ -1052,65 +778,6 @@ def disk_surface_scan(state, test_objects):
threads = []
state.panes['badblocks'] = []
def _run_surface_scan(test_obj, log_path):
"""Run surface scan and handle exceptions."""
block_size = '1024'
dev = test_obj.dev
dev_path = test_obj.dev.path
if PLATFORM == 'Darwin':
# Use "RAW" disks under macOS
dev_path = dev_path.with_name(f'r{dev_path.name}')
LOG.info('Using %s for better performance', dev_path)
test_obj.report.append(std.color_string('badblocks', 'BLUE'))
test_obj.set_status('Working')
# Increase block size if necessary
if (dev.phy_sec == 4096
or dev.size >= cfg.hw.BADBLOCKS_LARGE_DISK):
block_size = '4096'
# Start scan
cmd = ['sudo', 'badblocks', '-sv', '-b', block_size, '-e', '1', dev_path]
with open(log_path, 'a', encoding='utf-8') as _f:
size_str = std.bytes_to_string(dev.size, use_binary=False)
_f.write(
std.color_string(
['[', dev.path.name, ' ', size_str, ']\n'],
[None, 'BLUE', None, 'CYAN', None],
sep='',
),
)
_f.flush()
exe.run_program(
cmd,
check=False,
pipe=False,
stderr=subprocess.STDOUT,
stdout=_f,
)
# Check results
with open(log_path, 'r', encoding='utf-8') as _f:
for line in _f.readlines():
line = std.strip_colors(line.strip())
if not line or line.startswith('Checking') or line.startswith('['):
# Skip
continue
match = BADBLOCKS_REGEX.search(line)
if match:
if all(s == '0' for s in match.groups()):
test_obj.passed = True
test_obj.report.append(f' {line}')
test_obj.set_status('Passed')
else:
test_obj.failed = True
test_obj.report.append(f' {std.color_string(line, "YELLOW")}')
test_obj.set_status('Failed')
else:
test_obj.report.append(f' {std.color_string(line, "YELLOW")}')
if not (test_obj.passed or test_obj.failed):
test_obj.set_status('Unknown')
# Update panes
state.update_top_pane(
f'Disk Surface Scan{"s" if len(test_objects) > 1 else ""}',
@ -1141,7 +808,9 @@ def disk_surface_scan(state, test_objects):
# Start thread
test_log = f'{state.log_dir}/{test.dev.path.name}_badblocks.log'
threads.append(exe.start_thread(_run_surface_scan, args=(test, test_log)))
threads.append(exe.start_thread(
hw_surface_scan.run_scan, args=(test, test_log),
))
# Show progress
if threads[-1].is_alive():
@ -1182,13 +851,6 @@ def disk_surface_scan(state, test_objects):
raise std.GenericAbort('Aborted')
def keyboard_test():
"""Test keyboard using xev."""
LOG.info('Keyboard Test (xev)')
cmd = ['xev', '-event', 'keyboard']
exe.run_program(cmd, check=False, pipe=False)
def main():
# pylint: disable=too-many-branches
"""Main function for hardware diagnostics."""
@ -1262,39 +924,6 @@ def main():
state.update_top_pane('Main Menu')
def network_test():
"""Run network tests."""
LOG.info('Network Test')
try_and_print = std.TryAndPrint()
result = try_and_print.run(
message='Network connection...',
function=net.connected_to_private_network,
msg_good='OK',
raise_on_error=True,
)
# Bail if not connected
if result['Failed']:
std.print_warning('Please connect to a network and try again')
std.pause('Press Enter to return to main menu...')
return
# Show IP address(es)
net.show_valid_addresses()
# Ping tests
try_and_print.run(
'Internet connection...', net.ping, msg_good='OK', addr='8.8.8.8')
try_and_print.run(
'DNS resolution...', net.ping, msg_good='OK', addr='google.com')
# Speedtest
try_and_print.run('Speedtest...', net.speedtest)
# Done
std.pause('Press Enter to return to main menu...')
def print_countdown(proc, seconds):
"""Print countdown to screen while proc is alive."""
for i in range(seconds):
@ -1376,51 +1005,6 @@ def run_diags(state, menu, quick_mode=False):
std.pause('Press Enter to return to main menu...')
def screensaver(name):
"""Show screensaver"""
LOG.info('Screensaver (%s)', name)
if name == 'matrix':
cmd = ['cmatrix', '-abs']
elif name == 'pipes':
cmd = [
'pipes.sh',
'-t', '0',
'-t', '1',
'-t', '2',
'-t', '3',
'-t', '5',
'-R', '-r', '4000',
]
# Switch pane to fullscreen and start screensaver
tmux.zoom_pane()
exe.run_program(cmd, check=False, pipe=False, stderr=subprocess.PIPE)
tmux.zoom_pane()
def set_apple_fan_speed(speed):
"""Set Apple fan speed."""
cmd = None
# Check
if speed not in ('auto', 'max'):
raise RuntimeError(f'Invalid speed {speed}')
# Set cmd
if PLATFORM == 'Darwin':
try:
wk_os.mac.set_fans(speed)
except (RuntimeError, ValueError, subprocess.CalledProcessError) as err:
LOG.error('Failed to set fans to %s', speed)
LOG.error('Error: %s', err)
std.print_error(f'Failed to set fans to {speed}')
for line in str(err).splitlines():
std.print_warning(f' {line.strip()}')
elif PLATFORM == 'Linux':
cmd = ['apple-fans', speed]
exe.run_program(cmd, check=False)
def show_results(state):
"""Show test results by device."""
std.sleep(0.5)
@ -1450,84 +1034,6 @@ def show_results(state):
std.print_standard(' ')
def start_mprime(working_dir, log_path):
"""Start mprime and save filtered output to log, returns Popen object."""
set_apple_fan_speed('max')
proc_mprime = subprocess.Popen( # pylint: disable=consider-using-with
['mprime', '-t'],
cwd=working_dir,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
proc_grep = subprocess.Popen( # pylint: disable=consider-using-with
'grep --ignore-case --invert-match --line-buffered stress.txt'.split(),
stdin=proc_mprime.stdout,
stdout=subprocess.PIPE,
)
proc_mprime.stdout.close()
save_nsbr = exe.NonBlockingStreamReader(proc_grep.stdout)
exe.start_thread(
save_nsbr.save_to_file,
args=(proc_grep, log_path),
)
# Return objects
return proc_mprime
def start_sysbench(sensors, sensors_out, log_path, pane):
"""Start sysbench, returns tuple with Popen object and file handle."""
set_apple_fan_speed('max')
sysbench_cmd = [
'sysbench',
f'--threads={exe.psutil.cpu_count()}',
'--cpu-max-prime=1000000000',
'cpu',
'run',
]
# Restart background monitor for Sysbench
sensors.stop_background_monitor()
sensors.start_background_monitor(
sensors_out,
alt_max='Sysbench',
thermal_action=('killall', 'sysbench', '-INT'),
)
# Update bottom pane
tmux.respawn_pane(pane, watch_file=log_path, watch_cmd='tail')
# Start sysbench
filehandle_sysbench = open( # pylint: disable=consider-using-with
log_path, 'a', encoding='utf-8',
)
proc_sysbench = exe.popen_program(sysbench_cmd, stdout=filehandle_sysbench)
# Done
return (proc_sysbench, filehandle_sysbench)
def stop_mprime(proc_mprime):
"""Stop mprime gracefully, then forcefully as needed."""
proc_mprime.terminate()
try:
proc_mprime.wait(timeout=5)
except subprocess.TimeoutExpired:
proc_mprime.kill()
set_apple_fan_speed('auto')
def stop_sysbench(proc_sysbench, filehandle_sysbench):
"""Stop sysbench."""
proc_sysbench.terminate()
try:
proc_sysbench.wait(timeout=5)
except subprocess.TimeoutExpired:
proc_sysbench.kill()
filehandle_sysbench.flush()
filehandle_sysbench.close()
set_apple_fan_speed('auto')
def sync_clock():
"""Sync clock under macOS using sntp."""
cmd = ['sudo', 'sntp', '-Ss', 'us.pool.ntp.org']

31
scripts/wk/hw/keyboard.py Normal file
View file

@ -0,0 +1,31 @@
"""WizardKit: Keyboard test functions"""
# vim: sts=2 sw=2 ts=2
import logging
from wk.exe import run_program
from wk.std import PLATFORM, print_warning
# STATIC VARIABLES
LOG = logging.getLogger(__name__)
# Functions
def keyboard_test():
"""Test keyboard using OS specific functions."""
if PLATFORM == 'Linux':
run_xev()
else:
print_warning(f'Not supported under this OS: {PLATFORM}')
def run_xev():
"""Test keyboard using xev."""
LOG.info('Keyboard Test (xev)')
cmd = ['xev', '-event', 'keyboard']
run_program(cmd, check=False, pipe=False)
if __name__ == '__main__':
print("This file is not meant to be called directly.")

58
scripts/wk/hw/network.py Normal file
View file

@ -0,0 +1,58 @@
"""WizardKit: Network test functions"""
# vim: sts=2 sw=2 ts=2
import logging
from wk.net import (
connected_to_private_network,
ping,
show_valid_addresses,
speedtest,
)
from wk.std import (
TryAndPrint,
pause,
print_warning,
)
# STATIC VARIABLES
LOG = logging.getLogger(__name__)
# Functions
def network_test():
"""Run network tests."""
LOG.info('Network Test')
try_and_print = TryAndPrint()
result = try_and_print.run(
message='Network connection...',
function=connected_to_private_network,
msg_good='OK',
raise_on_error=True,
)
# Bail if not connected
if result['Failed']:
print_warning('Please connect to a network and try again')
pause('Press Enter to return to main menu...')
return
# Show IP address(es)
show_valid_addresses()
# Ping tests
try_and_print.run(
'Internet connection...', ping, msg_good='OK', addr='8.8.8.8')
try_and_print.run(
'DNS resolution...', ping, msg_good='OK', addr='google.com')
# Speedtest
try_and_print.run('Speedtest...', speedtest)
# Done
pause('Press Enter to return to main menu...')
if __name__ == '__main__':
print("This file is not meant to be called directly.")

View file

@ -0,0 +1,40 @@
"""WizardKit: Screensaver functions"""
# vim: sts=2 sw=2 ts=2
import logging
from subprocess import PIPE
from wk.exe import run_program
from wk.tmux import zoom_pane as tmux_zoom_pane
# STATIC VARIABLES
LOG = logging.getLogger(__name__)
# Functions
def screensaver(name):
"""Show screensaver"""
LOG.info('Screensaver (%s)', name)
if name == 'matrix':
cmd = ['cmatrix', '-abs']
elif name == 'pipes':
cmd = [
'pipes.sh',
'-t', '0',
'-t', '1',
'-t', '2',
'-t', '3',
'-t', '5',
'-R', '-r', '4000',
]
# Switch pane to fullscreen and start screensaver
tmux_zoom_pane()
run_program(cmd, check=False, pipe=False, stderr=PIPE)
tmux_zoom_pane()
if __name__ == '__main__':
print("This file is not meant to be called directly.")

View file

@ -0,0 +1,85 @@
"""WizardKit: Surface scan test functions"""
# vim: sts=2 sw=2 ts=2
import logging
from subprocess import STDOUT
from wk.cfg.hw import BADBLOCKS_LARGE_DISK, BADBLOCKS_REGEX
from wk.exe import run_program
from wk.std import (
PLATFORM,
bytes_to_string,
color_string,
strip_colors,
)
# STATIC VARIABLES
LOG = logging.getLogger(__name__)
# Functions
def run_scan(test_obj, log_path):
"""Run surface scan and handle exceptions."""
block_size = '1024'
dev = test_obj.dev
dev_path = test_obj.dev.path
if PLATFORM == 'Darwin':
# Use "RAW" disks under macOS
dev_path = dev_path.with_name(f'r{dev_path.name}')
LOG.info('Using %s for better performance', dev_path)
test_obj.report.append(color_string('badblocks', 'BLUE'))
test_obj.set_status('Working')
# Increase block size if necessary
if (dev.phy_sec == 4096
or dev.size >= BADBLOCKS_LARGE_DISK):
block_size = '4096'
# Start scan
cmd = ['sudo', 'badblocks', '-sv', '-b', block_size, '-e', '1', dev_path]
with open(log_path, 'a', encoding='utf-8') as _f:
size_str = bytes_to_string(dev.size, use_binary=False)
_f.write(
color_string(
['[', dev.path.name, ' ', size_str, ']\n'],
[None, 'BLUE', None, 'CYAN', None],
sep='',
),
)
_f.flush()
run_program(
cmd,
check=False,
pipe=False,
stderr=STDOUT,
stdout=_f,
)
# Check results
with open(log_path, 'r', encoding='utf-8') as _f:
for line in _f.readlines():
line = strip_colors(line.strip())
if not line or line.startswith('Checking') or line.startswith('['):
# Skip
continue
match = BADBLOCKS_REGEX.search(line)
if match:
if all(s == '0' for s in match.groups()):
test_obj.passed = True
test_obj.report.append(f' {line}')
test_obj.set_status('Passed')
else:
test_obj.failed = True
test_obj.report.append(f' {color_string(line, "YELLOW")}')
test_obj.set_status('Failed')
else:
test_obj.report.append(f' {color_string(line, "YELLOW")}')
if not (test_obj.passed or test_obj.failed):
test_obj.set_status('Unknown')
if __name__ == '__main__':
print("This file is not meant to be called directly.")