Merge branch 'dev' of gitea.2shirt.work:2Shirt/WizardKit into dev

This commit is contained in:
2Shirt 2023-09-18 11:32:50 -07:00
commit 17a62d6f36
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C
3 changed files with 78 additions and 19 deletions

View file

@ -68,16 +68,20 @@ def check_cooling_results(sensors, test_object) -> None:
# Build report
report_labels = ['Idle']
average_labels = []
if 'Sysbench' in sensors.temp_labels:
average_labels.append('Sysbench')
report_labels.extend(['Sysbench', 'Cooldown'])
if 'Prime95' in sensors.temp_labels:
average_labels.append('Prime95')
report_labels.append('Prime95')
if 'Cooldown' not in report_labels:
report_labels.append('Cooldown')
if len(sensors.temp_labels.intersection(['Prime95', 'Sysbench'])) < 1:
# Include overall max temp if needed
report_labels.append('Max')
for line in sensors.generate_report(*report_labels, only_cpu=True):
for line in sensors.generate_report(
*report_labels, only_cpu=True, include_avg_for=average_labels):
test_object.report.append(f' {line}')

View file

@ -317,7 +317,7 @@ def build_menu(cli_mode=False, quick_mode=False) -> cli.Menu:
return menu
def cpu_tests_init(state) -> None:
def cpu_tests_init(state: State) -> None:
"""Initialize CPU tests."""
sensors_out = pathlib.Path(f'{state.log_dir}/sensors.out')
state.update_title_text(state.system.cpu_description)
@ -338,19 +338,20 @@ def cpu_tests_init(state) -> None:
# Save idle temps
cli.print_standard('Saving idle temps...')
state.sensors.save_average_temps(temp_label='Idle', seconds=5)
state.sensors.save_average_temps(temp_label='Idle', seconds=5, save_history=False)
def cpu_tests_end(state) -> None:
def cpu_tests_end(state: State) -> None:
"""End CPU tests."""
# Cleanup
state.sensors.clear_temps(next_label='Done')
state.sensors.stop_background_monitor()
state.ui.clear_current_pane_height()
state.ui.remove_all_info_panes()
state.ui.remove_all_worker_panes()
def cpu_test_cooling(state, test_object, test_mode=False) -> None:
def cpu_test_cooling(state: State, test_object, test_mode=False) -> None:
"""CPU cooling test via sensor data assessment."""
_ = test_mode
LOG.info('CPU Test (Cooling)')
@ -363,7 +364,7 @@ def cpu_test_cooling(state, test_object, test_mode=False) -> None:
state.update_progress_file()
def cpu_test_mprime(state, test_object, test_mode=False) -> None:
def cpu_test_mprime(state: State, test_object, test_mode=False) -> None:
"""CPU stress test using mprime."""
LOG.info('CPU Test (Prime95)')
aborted = False
@ -389,6 +390,7 @@ def cpu_test_mprime(state, test_object, test_mode=False) -> None:
print('')
# Start sensors monitor
state.sensors.clear_temps(next_label='Prime95')
state.sensors.stop_background_monitor()
state.sensors.start_background_monitor(
sensors_out,
@ -412,6 +414,7 @@ def cpu_test_mprime(state, test_object, test_mode=False) -> None:
if 'Cooldown' in state.sensors.temp_labels:
# Give Prime95 time to save the results
std.sleep(1)
state.sensors.clear_temps(next_label='Cooldown')
else:
# Save cooldown temp
state.ui.clear_current_pane()
@ -436,7 +439,7 @@ def cpu_test_mprime(state, test_object, test_mode=False) -> None:
raise std.GenericAbort('Aborted')
def cpu_test_sysbench(state, test_object, test_mode=False) -> None:
def cpu_test_sysbench(state: State, test_object, test_mode=False) -> None:
"""CPU stress test using Sysbench."""
LOG.info('CPU Test (Sysbench)')
aborted = False
@ -458,6 +461,7 @@ def cpu_test_sysbench(state, test_object, test_mode=False) -> None:
print('')
# Start sensors monitor
state.sensors.clear_temps(next_label='Sysbench')
state.sensors.stop_background_monitor()
state.sensors.start_background_monitor(
sensors_out,
@ -478,7 +482,9 @@ def cpu_test_sysbench(state, test_object, test_mode=False) -> None:
hw_cpu.stop_sysbench(proc, filehandle)
# Get cooldown temp
if 'Cooldown' not in state.sensors.temp_labels:
if 'Cooldown' in state.sensors.temp_labels:
state.sensors.clear_temps(next_label='Cooldown')
else:
state.ui.clear_current_pane()
cli.print_standard('Letting CPU cooldown...')
std.sleep(5)
@ -515,7 +521,7 @@ def cpu_test_sysbench(state, test_object, test_mode=False) -> None:
raise std.GenericAbort('Aborted')
def disk_attribute_check(state, test_objects, test_mode=False) -> None:
def disk_attribute_check(state: State, test_objects, test_mode=False) -> None:
"""Disk attribute check."""
_ = test_mode
LOG.info('Disk Attribute Check')
@ -593,7 +599,7 @@ def disk_io_benchmark(
raise std.GenericAbort('Aborted')
def disk_self_test(state, test_objects, test_mode=False) -> None:
def disk_self_test(state: State, test_objects, test_mode=False) -> None:
"""Disk self-test if available."""
_ = test_mode
LOG.info('Disk Self-Test(s)')
@ -686,7 +692,7 @@ def disk_smart_status_check(dev, mid_run=True) -> None:
dev.disable_disk_tests()
def disk_surface_scan(state, test_objects, test_mode=False) -> None:
def disk_surface_scan(state: State, test_objects, test_mode=False) -> None:
"""Read-only disk surface scan using badblocks."""
LOG.info('Disk Surface Scan (badblocks)')
aborted = False
@ -838,7 +844,7 @@ def print_countdown(proc, seconds) -> None:
# Done
print('')
def run_cpu_tests(state, test_objects, test_mode=False) -> None:
def run_cpu_tests(state: State, test_objects, test_mode=False) -> None:
"""Run selected CPU test(s)."""
state.update_progress_file()
cpu_tests_init(state)
@ -849,7 +855,7 @@ def run_cpu_tests(state, test_objects, test_mode=False) -> None:
state.update_progress_file()
def run_diags(state, menu, quick_mode=False, test_mode=False) -> None:
def run_diags(state: State, menu, quick_mode=False, test_mode=False) -> None:
"""Run selected diagnostics."""
aborted = False
atexit.register(state.save_debug_reports)
@ -902,7 +908,7 @@ def run_diags(state, menu, quick_mode=False, test_mode=False) -> None:
cli.pause('Press Enter to return to main menu...')
def show_failed_attributes(state) -> None:
def show_failed_attributes(state: State) -> None:
"""Show failed attributes for all disks."""
for dev in state.disks:
cli.print_colored([dev.name, dev.description], ['CYAN', None])
@ -912,7 +918,7 @@ def show_failed_attributes(state) -> None:
cli.print_standard('')
def show_results(state) -> None:
def show_results(state: State) -> None:
"""Show test results by device."""
std.sleep(0.5)
state.ui.clear_current_pane()

View file

@ -6,6 +6,7 @@ import logging
import pathlib
import re
from copy import deepcopy
from subprocess import CalledProcessError
from threading import Thread
from typing import Any
@ -40,11 +41,37 @@ class Sensors():
def __init__(self):
self.background_thread: Thread | None = None
self.data: dict[Any, Any] = get_sensor_data()
self.history: list[tuple[str, dict]] = []
self.history_index: dict[str, int] = {}
self.history_next_label: str = 'Idle'
self.out_path: pathlib.Path | str | None = None
self.temp_labels: set = set(['Current', 'Max'])
def clear_temps(self) -> None:
def clear_temps(self, next_label: str, save_history: bool = True) -> None:
"""Clear saved temps but keep structure"""
prev_label = self.history_next_label
self.history_next_label = next_label
# Save history
if save_history:
cur_data = deepcopy(self.data)
# Calculate averages
for adapters in cur_data.values():
for sources in adapters.values():
for name in sources:
temp_list = sources[name]['Temps']
try:
sources[name]['Average'] = sum(temp_list) / len(temp_list)
except ZeroDivisionError:
LOG.error('Failed to calculate averate temp for %s', name)
sources[name]['Average'] = 0
# Add to history
self.history.append((prev_label, cur_data))
self.history_index[prev_label] = len(self.history) - 1
# Clear data
for adapters in self.data.values():
for sources in adapters.values():
for source_data in sources.values():
@ -67,9 +94,15 @@ class Sensors():
return False
def generate_report(
self, *temp_labels, colored=True, only_cpu=False) -> list[str]:
self,
*temp_labels: str,
colored: bool = True,
only_cpu: bool = False,
include_avg_for: list[str] | None = None,
) -> list[str]:
"""Generate report based on given temp_labels, returns list."""
report = []
include_avg_for = include_avg_for if include_avg_for else []
for section, adapters in sorted(self.data.items()):
if only_cpu and not section.startswith('CPU'):
@ -83,6 +116,10 @@ class Sensors():
for label in temp_labels:
if label != 'Current':
line += f' {label.lower()}: '
if label in include_avg_for:
avg_temp = self.get_avg_temp(
label, section, adapter, source, colored)
line += f'{avg_temp} / '
line += get_temp_str(
source_data.get(label, '???'),
colored=colored,
@ -102,6 +139,13 @@ class Sensors():
# Done
return report
def get_avg_temp(self, label, section, adapter, source, colored) -> str:
"""Get average temp from history, return str."""
# NOTE: This is Super-ugly
label_index = self.history_index[label]
avg_temp = self.history[label_index][1][section][adapter][source]['Average']
return get_temp_str(avg_temp, colored=colored)
def get_cpu_temp(self, label) -> float:
"""Get temp for label from any CPU source, returns float.
@ -158,9 +202,14 @@ class Sensors():
# Sleep before next loop
sleep(0.5)
def save_average_temps(self, temp_label, seconds=10) -> None:
def save_average_temps(
self,
temp_label: str,
seconds: int = 10,
save_history: bool = True,
) -> None:
"""Save average temps under temp_label over provided seconds.."""
self.clear_temps()
self.clear_temps(next_label=temp_label, save_history=save_history)
self.temp_labels.add(temp_label)
# Get temps