WizardKit/scripts/wk/hw/benchmark.py

220 lines
6.2 KiB
Python

"""WizardKit: Benchmark test functions"""
# vim: sts=2 sw=2 ts=2
import logging
from subprocess import PIPE, STDOUT
from wk import graph
from wk.cfg.hw import (
IO_ALT_TEST_SIZE_FACTOR,
IO_BLOCK_SIZE,
IO_CHUNK_SIZE,
IO_GRAPH_WIDTH,
IO_MINIMUM_TEST_SIZE,
IO_RATE_REGEX,
THRESH_HDD_AVG_HIGH,
THRESH_HDD_AVG_LOW,
THRESH_HDD_MIN,
THRESH_SSD_AVG_HIGH,
THRESH_SSD_AVG_LOW,
THRESH_SSD_MIN,
)
from wk.exe import run_program
from wk.std import (
PLATFORM,
strip_colors,
color_string,
)
# STATIC VARIABLES
LOG = logging.getLogger(__name__)
# Error Classes
class DeviceTooSmallError(RuntimeError):
"""Raised when a device is too small to test."""
# Functions
def calc_io_dd_values(dev_size, test_mode=False) -> dict[str, int]:
"""Calculate I/O benchmark dd values, returns dict.
Calculations:
The minimum dev size is IO_GRAPH_WIDTH * IO_CHUNK_SIZE
(e.g. 1.25 GB for a width of 40 and a chunk size of 32MB)
read_total is the area to be read in bytes
If the dev is < IO_MINIMUM_TEST_SIZE then it's the whole dev
Else it's the larger of IO_MINIMUM_TEST_SIZE or the alt test size
(determined by dev * IO_ALT_TEST_SIZE_FACTOR)
read_chunks is the number of groups of IO_CHUNK_SIZE in test_obj.dev
This number is reduced to a multiple of IO_GRAPH_WIDTH in order
to allow for the data to be condensed cleanly
read_blocks is the chunk size in number of blocks
(e.g. 64 if block size is 512KB and chunk size is 32MB
skip_total is the number of IO_BLOCK_SIZE groups not tested
skip_blocks is the number of blocks to skip per IO_CHUNK_SIZE
skip_extra_rate is how often to add an additional skip block
This is needed to ensure an even testing across the dev
This is calculated by using the fractional amount left off
of the skip_blocks variable
test_mode limits the benchmark to IO_MINIMUM_TEST_SIZE (if possible)
"""
read_total = min(IO_MINIMUM_TEST_SIZE, dev_size)
read_total = max(read_total, dev_size*IO_ALT_TEST_SIZE_FACTOR)
read_chunks = int(read_total // IO_CHUNK_SIZE)
read_chunks -= read_chunks % IO_GRAPH_WIDTH
if read_chunks < IO_GRAPH_WIDTH:
raise DeviceTooSmallError
read_blocks = int(IO_CHUNK_SIZE / IO_BLOCK_SIZE)
read_total = read_chunks * IO_CHUNK_SIZE
skip_total = int((dev_size - read_total) // IO_BLOCK_SIZE)
skip_blocks = int((skip_total / read_chunks) // 1)
skip_extra_rate = 0
try:
skip_extra_rate = 1 + int(1 / ((skip_total / read_chunks) % 1))
except ZeroDivisionError:
# skip_extra_rate == 0 is fine
pass
# Test mode
if test_mode:
read_chunks_limit = int(read_chunks * 0.1)
read_chunks_limit = (read_chunks_limit // IO_GRAPH_WIDTH) * IO_GRAPH_WIDTH
read_chunks = max(IO_GRAPH_WIDTH, read_chunks_limit)
# Done
return {
'Read Chunks': read_chunks,
'Read Blocks': read_blocks,
'Skip Blocks': skip_blocks,
'Skip Extra': skip_extra_rate,
}
def check_io_results(test_obj, rate_list, graph_width) -> None:
"""Check I/O restuls and generate report using rate_list."""
avg_read = sum(rate_list) / len(rate_list)
min_read = min(rate_list)
max_read = max(rate_list)
if test_obj.dev.ssd:
thresh_min = THRESH_SSD_MIN
thresh_avg_high = THRESH_SSD_AVG_HIGH
thresh_avg_low = THRESH_SSD_AVG_LOW
else:
thresh_min = THRESH_HDD_MIN
thresh_avg_high = THRESH_HDD_AVG_HIGH
thresh_avg_low = THRESH_HDD_AVG_LOW
# Add horizontal graph to report
for line in graph.generate_horizontal_graph(rate_list, graph_width):
if not strip_colors(line).strip():
# Skip empty lines
continue
test_obj.report.append(line)
# Add read rates to report
test_obj.report.append(
f'Read speeds avg: {avg_read/(1000**2):3.1f}'
f' min: {min_read/(1000**2):3.1f}'
f' max: {max_read/(1000**2):3.1f}'
)
# Compare against thresholds
if min_read <= thresh_min and avg_read <= thresh_avg_high:
test_obj.failed = True
elif avg_read <= thresh_avg_low:
test_obj.failed = True
else:
test_obj.passed = True
# Set status
if test_obj.failed:
test_obj.set_status('Failed')
elif test_obj.passed:
test_obj.set_status('Passed')
else:
test_obj.set_status('Unknown')
def run_io_test(test_obj, log_path, test_mode=False) -> None:
"""Run I/O benchmark and handle exceptions."""
dev_path = test_obj.dev.path
if PLATFORM == 'Darwin':
# Use "RAW" disks under macOS
dev_path = dev_path.with_name(f'r{dev_path.name}')
LOG.info('Using %s for better performance', dev_path)
offset = 0
read_rates = []
test_obj.report.append(color_string('I/O Benchmark', 'BLUE'))
# Get dd values or bail
try:
dd_values = calc_io_dd_values(test_obj.dev.size, test_mode=test_mode)
except DeviceTooSmallError:
test_obj.set_status('N/A')
test_obj.report.append(
color_string('Disk too small to test', 'YELLOW'),
)
return
# Run dd read tests
for _i in range(dd_values['Read Chunks']):
_i += 1
# Build cmd
skip = dd_values['Skip Blocks']
if dd_values['Skip Extra'] and _i % dd_values['Skip Extra'] == 0:
skip += 1
cmd = [
'sudo', 'dd',
f'bs={IO_BLOCK_SIZE}',
f'skip={offset+skip}',
f'count={dd_values["Read Blocks"]}',
f'if={dev_path}',
'of=/dev/null',
]
if PLATFORM == 'Linux':
cmd.append('iflag=direct')
# Run and get read rate
try:
proc = run_program(
cmd,
pipe=False,
stdout=PIPE,
stderr=STDOUT,
)
except PermissionError as err:
# Since we're using sudo we can't kill dd
# Assuming this happened during a CTRL+c
raise KeyboardInterrupt from err
match = IO_RATE_REGEX.search(proc.stdout)
if match:
read_rates.append(
int(match.group('bytes')) / float(match.group('seconds')),
)
match.group(1)
# Show progress
with open(log_path, 'a', encoding='utf-8') as _f:
if _i % 5 == 0:
percent = (_i / dd_values['Read Chunks']) * 100
_f.write(f' {graph.vertical_graph_line(percent, read_rates[-1])}\n')
# Update offset
offset += dd_values['Read Blocks'] + skip
# Check results
check_io_results(test_obj, read_rates, IO_GRAPH_WIDTH)
if __name__ == '__main__':
print("This file is not meant to be called directly.")