Merge remote-tracking branch 'upstream/dev' into dev

This commit is contained in:
2Shirt 2023-09-23 16:34:02 -07:00
commit 8cb9e6c840
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C
6 changed files with 89 additions and 22 deletions

View file

@ -5,9 +5,17 @@ python.exe -i embedded_python_env.py
"""
# vim: sts=2 sw=2 ts=2
import pickle
import wk
# Functions
def load_state():
with open('debug/state.pickle', 'rb') as f:
return pickle.load(f)
# Main
wk.ui.cli.print_colored(
(wk.cfg.main.KIT_NAME_FULL, ': ', 'Debug Console'),
('GREEN', None, 'YELLOW'),

View file

@ -47,8 +47,8 @@ SOURCES = {
'Fluent-Metro': 'https://github.com/bonzibudd/Fluent-Metro/releases/download/v1.5.3/Fluent-Metro_1.5.3.zip',
'FurMark': 'https://geeks3d.com/dl/get/696',
'HWiNFO': 'https://www.sac.sk/download/utildiag/hwi_746.zip',
'LibreOffice32': 'https://download.documentfoundation.org/libreoffice/stable/7.5.4/win/x86/LibreOffice_7.5.4_Win_x86.msi',
'LibreOffice64': 'https://download.documentfoundation.org/libreoffice/stable/7.5.4/win/x86_64/LibreOffice_7.5.4_Win_x64.msi',
'LibreOffice32': 'https://download.documentfoundation.org/libreoffice/stable/7.5.5/win/x86/LibreOffice_7.5.5_Win_x86.msi',
'LibreOffice64': 'https://download.documentfoundation.org/libreoffice/stable/7.5.5/win/x86_64/LibreOffice_7.5.5_Win_x86-64.msi',
'Linux Reader': 'https://www.diskinternals.com/download/Linux_Reader.exe',
'Macs Fan Control': 'https://www.crystalidea.com/downloads/macsfancontrol_setup.exe',
'Neutron': 'http://keir.net/download/neutron.zip',

View file

@ -53,8 +53,8 @@ class BlockPair():
self.view_proc: subprocess.Popen | None = None
# Set map path
# e.g. '(Clone|Image)_Model[_p#]_Size[_Label].map'
map_name = source_dev.model
# e.g. '(Clone|Image)_Model_Serial[_p#]_Size[_Label].map'
map_name = f'{source_dev.model}_{source_dev.serial}'
if source_dev.bus == 'Image':
map_name = 'Image'
if source_dev.parent:

View file

@ -68,16 +68,20 @@ def check_cooling_results(sensors, test_object) -> None:
# Build report
report_labels = ['Idle']
average_labels = []
if 'Sysbench' in sensors.temp_labels:
average_labels.append('Sysbench')
report_labels.extend(['Sysbench', 'Cooldown'])
if 'Prime95' in sensors.temp_labels:
average_labels.append('Prime95')
report_labels.append('Prime95')
if 'Cooldown' not in report_labels:
report_labels.append('Cooldown')
if len(sensors.temp_labels.intersection(['Prime95', 'Sysbench'])) < 1:
# Include overall max temp if needed
report_labels.append('Max')
for line in sensors.generate_report(*report_labels, only_cpu=True):
for line in sensors.generate_report(
*report_labels, only_cpu=True, include_avg_for=average_labels):
test_object.report.append(f' {line}')

View file

@ -370,7 +370,7 @@ def build_menu(cli_mode=False, quick_mode=False) -> cli.Menu:
return menu
def cpu_tests_init(state) -> None:
def cpu_tests_init(state: State) -> None:
"""Initialize CPU tests."""
sensors_out = pathlib.Path(f'{state.log_dir}/sensors.out')
state.update_title_text(state.system.cpu_description)
@ -391,19 +391,20 @@ def cpu_tests_init(state) -> None:
# Save idle temps
cli.print_standard('Saving idle temps...')
state.sensors.save_average_temps(temp_label='Idle', seconds=5)
state.sensors.save_average_temps(temp_label='Idle', seconds=5, save_history=False)
def cpu_tests_end(state) -> None:
def cpu_tests_end(state: State) -> None:
"""End CPU tests."""
# Cleanup
state.sensors.clear_temps(next_label='Done')
state.sensors.stop_background_monitor()
state.ui.clear_current_pane_height()
state.ui.remove_all_info_panes()
state.ui.remove_all_worker_panes()
def cpu_test_cooling(state, test_object, test_mode=False) -> None:
def cpu_test_cooling(state: State, test_object, test_mode=False) -> None:
"""CPU cooling test via sensor data assessment."""
_ = test_mode
LOG.info('CPU Test (Cooling)')
@ -416,7 +417,7 @@ def cpu_test_cooling(state, test_object, test_mode=False) -> None:
state.update_progress_file()
def cpu_test_mprime(state, test_object, test_mode=False) -> None:
def cpu_test_mprime(state: State, test_object, test_mode=False) -> None:
"""CPU stress test using mprime."""
LOG.info('CPU Test (Prime95)')
aborted = False
@ -442,6 +443,7 @@ def cpu_test_mprime(state, test_object, test_mode=False) -> None:
print('')
# Start sensors monitor
state.sensors.clear_temps(next_label='Prime95')
state.sensors.stop_background_monitor()
state.sensors.start_background_monitor(
sensors_out,
@ -465,6 +467,7 @@ def cpu_test_mprime(state, test_object, test_mode=False) -> None:
if 'Cooldown' in state.sensors.temp_labels:
# Give Prime95 time to save the results
std.sleep(1)
state.sensors.clear_temps(next_label='Cooldown')
else:
# Save cooldown temp
state.ui.clear_current_pane()
@ -489,7 +492,7 @@ def cpu_test_mprime(state, test_object, test_mode=False) -> None:
raise std.GenericAbort('Aborted')
def cpu_test_sysbench(state, test_object, test_mode=False) -> None:
def cpu_test_sysbench(state: State, test_object, test_mode=False) -> None:
"""CPU stress test using Sysbench."""
LOG.info('CPU Test (Sysbench)')
aborted = False
@ -511,6 +514,7 @@ def cpu_test_sysbench(state, test_object, test_mode=False) -> None:
print('')
# Start sensors monitor
state.sensors.clear_temps(next_label='Sysbench')
state.sensors.stop_background_monitor()
state.sensors.start_background_monitor(
sensors_out,
@ -531,7 +535,9 @@ def cpu_test_sysbench(state, test_object, test_mode=False) -> None:
hw_cpu.stop_sysbench(proc, filehandle)
# Get cooldown temp
if 'Cooldown' not in state.sensors.temp_labels:
if 'Cooldown' in state.sensors.temp_labels:
state.sensors.clear_temps(next_label='Cooldown')
else:
state.ui.clear_current_pane()
cli.print_standard('Letting CPU cooldown...')
std.sleep(5)
@ -568,7 +574,7 @@ def cpu_test_sysbench(state, test_object, test_mode=False) -> None:
raise std.GenericAbort('Aborted')
def disk_attribute_check(state, test_objects, test_mode=False) -> None:
def disk_attribute_check(state: State, test_objects, test_mode=False) -> None:
"""Disk attribute check."""
_ = test_mode
LOG.info('Disk Attribute Check')
@ -651,7 +657,7 @@ def disk_io_benchmark(
raise std.GenericAbort('Aborted')
def disk_self_test(state, test_objects, test_mode=False) -> None:
def disk_self_test(state: State, test_objects, test_mode=False) -> None:
"""Disk self-test if available."""
_ = test_mode
LOG.info('Disk Self-Test(s)')
@ -744,7 +750,7 @@ def disk_smart_status_check(dev, mid_run=True) -> None:
dev.disable_disk_tests()
def disk_surface_scan(state, test_objects, test_mode=False) -> None:
def disk_surface_scan(state: State, test_objects, test_mode=False) -> None:
"""Read-only disk surface scan using badblocks."""
LOG.info('Disk Surface Scan (badblocks)')
aborted = False
@ -932,7 +938,7 @@ def print_countdown(proc, seconds) -> None:
# Done
print('')
def run_cpu_tests(state, test_objects, test_mode=False) -> None:
def run_cpu_tests(state: State, test_objects, test_mode=False) -> None:
"""Run selected CPU test(s)."""
state.update_progress_file()
cpu_tests_init(state)
@ -1043,7 +1049,7 @@ def run_diags(
cli.pause('Press Enter to return to main menu...')
def show_failed_attributes(state) -> None:
def show_failed_attributes(state: State) -> None:
"""Show failed attributes for all disks."""
for dev in state.disks:
cli.print_colored([dev.name, dev.description], ['CYAN', None])
@ -1053,7 +1059,7 @@ def show_failed_attributes(state) -> None:
cli.print_standard('')
def show_results(state) -> None:
def show_results(state: State) -> None:
"""Show test results by device."""
std.sleep(0.5)
state.ui.clear_current_pane()

View file

@ -6,6 +6,7 @@ import logging
import pathlib
import re
from copy import deepcopy
from subprocess import CalledProcessError
from threading import Thread
from typing import Any
@ -40,11 +41,37 @@ class Sensors():
def __init__(self):
self.background_thread: Thread | None = None
self.data: dict[Any, Any] = get_sensor_data()
self.history: list[tuple[str, dict]] = []
self.history_index: dict[str, int] = {}
self.history_next_label: str = 'Idle'
self.out_path: pathlib.Path | str | None = None
self.temp_labels: set = set(['Current', 'Max'])
def clear_temps(self) -> None:
def clear_temps(self, next_label: str, save_history: bool = True) -> None:
"""Clear saved temps but keep structure"""
prev_label = self.history_next_label
self.history_next_label = next_label
# Save history
if save_history:
cur_data = deepcopy(self.data)
# Calculate averages
for adapters in cur_data.values():
for sources in adapters.values():
for name in sources:
temp_list = sources[name]['Temps']
try:
sources[name]['Average'] = sum(temp_list) / len(temp_list)
except ZeroDivisionError:
LOG.error('Failed to calculate averate temp for %s', name)
sources[name]['Average'] = 0
# Add to history
self.history.append((prev_label, cur_data))
self.history_index[prev_label] = len(self.history) - 1
# Clear data
for adapters in self.data.values():
for sources in adapters.values():
for source_data in sources.values():
@ -67,9 +94,15 @@ class Sensors():
return False
def generate_report(
self, *temp_labels, colored=True, only_cpu=False) -> list[str]:
self,
*temp_labels: str,
colored: bool = True,
only_cpu: bool = False,
include_avg_for: list[str] | None = None,
) -> list[str]:
"""Generate report based on given temp_labels, returns list."""
report = []
include_avg_for = include_avg_for if include_avg_for else []
for section, adapters in sorted(self.data.items()):
if only_cpu and not section.startswith('CPU'):
@ -83,6 +116,10 @@ class Sensors():
for label in temp_labels:
if label != 'Current':
line += f' {label.lower()}: '
if label in include_avg_for:
avg_temp = self.get_avg_temp(
label, section, adapter, source, colored)
line += f'{avg_temp} / '
line += get_temp_str(
source_data.get(label, '???'),
colored=colored,
@ -102,6 +139,13 @@ class Sensors():
# Done
return report
def get_avg_temp(self, label, section, adapter, source, colored) -> str:
"""Get average temp from history, return str."""
# NOTE: This is Super-ugly
label_index = self.history_index[label]
avg_temp = self.history[label_index][1][section][adapter][source]['Average']
return get_temp_str(avg_temp, colored=colored)
def get_cpu_temp(self, label) -> float:
"""Get temp for label from any CPU source, returns float.
@ -158,9 +202,14 @@ class Sensors():
# Sleep before next loop
sleep(0.5)
def save_average_temps(self, temp_label, seconds=10) -> None:
def save_average_temps(
self,
temp_label: str,
seconds: int = 10,
save_history: bool = True,
) -> None:
"""Save average temps under temp_label over provided seconds.."""
self.clear_temps()
self.clear_temps(next_label=temp_label, save_history=save_history)
self.temp_labels.add(temp_label)
# Get temps