"""WizardKit: Hardware sensors""" # vim: sts=2 sw=2 ts=2 import json import logging import pathlib import re from copy import deepcopy from subprocess import CalledProcessError from threading import Thread from typing import Any from wk.cfg.hw import CPU_TEMPS, SMC_IDS, TEMP_COLORS from wk.exe import run_program, start_thread from wk.io import non_clobber_path from wk.std import PLATFORM, sleep from wk.ui import ansi # STATIC VARIABLES LOG = logging.getLogger(__name__) LM_SENSORS_CPU_REGEX = re.compile(r'(core|k\d+)temp', re.IGNORECASE) SMC_REGEX = re.compile( r'^\s*(?P\w{4})' r'\s+\[(?P.*)\]' r'\s+(?P.*?)' r'\s*\(bytes (?P.*)\)$' ) SENSOR_SOURCE_WIDTH = 25 if PLATFORM == 'Darwin' else 20 # Error Classes class ThermalLimitReachedError(RuntimeError): """Raised when the thermal threshold is reached.""" # Classes class Sensors(): """Class for holding sensor specific data.""" def __init__(self): self.background_thread: Thread | None = None self.data: dict[Any, Any] = get_sensor_data() self.history: list[tuple[str, dict]] = [] self.history_index: dict[str, int] = {} self.history_next_label: str = 'Idle' self.out_path: pathlib.Path | str | None = None self.temp_labels: set = set(['Current', 'Max']) def clear_temps(self, next_label: str, save_history: bool = True) -> None: """Clear saved temps but keep structure""" prev_label = self.history_next_label self.history_next_label = next_label # Save history if save_history: cur_data = deepcopy(self.data) # Calculate averages for adapters in cur_data.values(): for sources in adapters.values(): for name in sources: temp_list = sources[name]['Temps'] try: sources[name]['Average'] = sum(temp_list) / len(temp_list) except ZeroDivisionError: LOG.error('Failed to calculate averate temp for %s', name) sources[name]['Average'] = 0 # Add to history self.history.append((prev_label, cur_data)) self.history_index[prev_label] = len(self.history) - 1 # Clear data for adapters in self.data.values(): for sources in adapters.values(): for source_data in sources.values(): source_data['Temps'] = [] def cpu_reached_critical_temp(self) -> bool: """Check if CPU exceeded critical temp, returns bool.""" for section, adapters in self.data.items(): if not section.startswith('CPU'): # Limit to CPU temps continue # Ugly section for sources in adapters.values(): for source_data in sources.values(): if source_data.get('Max', -1) > CPU_TEMPS['Critical']: return True # Didn't return above so temps are within the threshold return False def generate_report( self, *temp_labels: str, colored: bool = True, only_cpu: bool = False, include_avg_for: list[str] | None = None, ) -> list[str]: """Generate report based on given temp_labels, returns list.""" report = [] include_avg_for = include_avg_for if include_avg_for else [] for section, adapters in sorted(self.data.items()): if only_cpu and not section.startswith('CPU'): continue # Ugly section for adapter, sources in sorted(adapters.items()): report.append(fix_sensor_name(adapter)) for source, source_data in sorted(sources.items()): line = f'{fix_sensor_name(source):{SENSOR_SOURCE_WIDTH}} ' for label in temp_labels: if label != 'Current': line += f' {label.lower()}: ' if label in include_avg_for: avg_temp = self.get_avg_temp( label, section, adapter, source, colored) line += f'{avg_temp} / ' line += get_temp_str( source_data.get(label, '???'), colored=colored, ) report.append(line) if not only_cpu: report.append('') # Handle empty reports if not report: report = [ ansi.color_string('WARNING: No sensors found', 'YELLOW'), '', 'Please monitor temps manually', ] # Done return report def get_avg_temp(self, label, section, adapter, source, colored) -> str: """Get average temp from history, return str.""" # NOTE: This is Super-ugly label_index = self.history_index[label] avg_temp = self.history[label_index][1][section][adapter][source]['Average'] return get_temp_str(avg_temp, colored=colored) def get_cpu_temp(self, label) -> float: """Get temp for label from any CPU source, returns float. NOTE: This returns the highest value for the label. NOTE 2: If no temps are found this returns zero. """ max_temp = 0.0 # Check all CPU Temps for section, adapters in self.data.items(): if not section.startswith('CPU'): continue for sources in adapters.values(): for source_data in sources.values(): max_temp = max(max_temp, source_data.get(label, 0)) # Done return float(max_temp) def monitor_to_file( self, out_path, alt_max=None, exit_on_thermal_limit=True, temp_labels=None, thermal_action=None) -> None: """Write report to path every second until stopped. thermal_action is a cmd to run if ThermalLimitReachedError is caught. """ stop_path = pathlib.Path(out_path).resolve().with_suffix('.stop') if stop_path.exists(): # Rename existing file to allow thread to start as expected # Yes this is excessive but safe stop_path.rename(non_clobber_path(stop_path)) if not temp_labels: temp_labels = ['Current', 'Max'] if alt_max: temp_labels.append(alt_max) self.temp_labels.add(alt_max) # Start loop while True: try: self.update_sensor_data(alt_max, exit_on_thermal_limit) except ThermalLimitReachedError: if thermal_action: run_program(thermal_action, check=False) report = self.generate_report(*temp_labels) with open(out_path, 'w', encoding='utf-8') as _f: _f.write('\n'.join(report)) # Check if we should stop if stop_path.exists(): break # Sleep before next loop sleep(0.5) def save_average_temps( self, temp_label: str, seconds: int = 10, save_history: bool = True, ) -> None: """Save average temps under temp_label over provided seconds..""" self.clear_temps(next_label=temp_label, save_history=save_history) self.temp_labels.add(temp_label) # Get temps for _ in range(seconds): self.update_sensor_data(exit_on_thermal_limit=False) sleep(1) # Calculate averages for adapters in self.data.values(): for sources in adapters.values(): for source_data in sources.values(): temps = source_data['Temps'] try: source_data[temp_label] = sum(temps) / len(temps) except ZeroDivisionError: # Going to use unrealistic 0°C instead LOG.error( 'No temps saved for %s', source_data.get('label', 'UNKNOWN'), ) source_data[temp_label] = 0 def start_background_monitor( self, out_path, alt_max=None, exit_on_thermal_limit=True, temp_labels=None, thermal_action=None) -> None: """Start background thread to save report to file. thermal_action is a cmd to run if ThermalLimitReachedError is caught. """ if self.background_thread: raise RuntimeError('Background thread already running') self.out_path = pathlib.Path(out_path) self.background_thread = start_thread( self.monitor_to_file, args=( out_path, alt_max, exit_on_thermal_limit, temp_labels, thermal_action, ), ) def stop_background_monitor(self) -> None: """Stop background thread.""" # Bail early if self.background_thread is None: return self.out_path.with_suffix('.stop').touch() self.background_thread.join() # Reset vars to None self.background_thread = None self.out_path = None def update_sensor_data( self, alt_max=None, exit_on_thermal_limit=True) -> None: """Update sensor data via OS-specific means.""" if alt_max: self.temp_labels.add(alt_max) if PLATFORM == 'Darwin': self.update_sensor_data_macos(alt_max, exit_on_thermal_limit) elif PLATFORM == 'Linux': self.update_sensor_data_linux(alt_max, exit_on_thermal_limit) def update_sensor_data_linux( self, alt_max, exit_on_thermal_limit=True) -> None: """Update sensor data via lm_sensors.""" lm_sensor_data = get_sensor_data_lm() for section, adapters in self.data.items(): for adapter, sources in adapters.items(): for source, source_data in sources.items(): try: label = source_data['Label'] temp = lm_sensor_data[adapter][source][label] source_data['Current'] = temp source_data['Max'] = max(temp, source_data['Max']) source_data['Temps'].append(temp) if alt_max: source_data[alt_max] = max(temp, source_data.get(alt_max, 0)) except KeyError: # Dumb workaround for Dell sensors with changing source names pass # Raise exception if thermal limit reached if exit_on_thermal_limit and section == 'CPUTemps': if source_data['Current'] > CPU_TEMPS['Critical']: raise ThermalLimitReachedError('CPU temps reached limit') def update_sensor_data_macos( self, alt_max, exit_on_thermal_limit=True) -> None: """Update sensor data via SMC.""" for section, adapters in self.data.items(): for sources in adapters.values(): for source_data in sources.values(): cmd = ['smc', '-k', source_data['Label'], '-r'] proc = run_program(cmd) match = SMC_REGEX.match(proc.stdout.strip()) try: temp = float(match.group('Value')) except (TypeError, ValueError): LOG.error('Failed to update temp %s', source_data['Label']) continue # Update source source_data['Current'] = temp source_data['Max'] = max(temp, source_data['Max']) source_data['Temps'].append(temp) if alt_max: source_data[alt_max] = max(temp, source_data.get(alt_max, 0)) # Raise exception if thermal limit reached if exit_on_thermal_limit and section == 'CPUTemps': if source_data['Current'] > CPU_TEMPS['Critical']: raise ThermalLimitReachedError('CPU temps reached limit') # Functions def fix_sensor_name(name) -> str: """Cleanup sensor name, returns str.""" name = re.sub(r'^(\w+)-(\w+)-(\w+)', r'\1 (\2 \3)', name, re.IGNORECASE) name = name.title() name = name.replace('Acpi', 'ACPI') name = name.replace('ACPItz', 'ACPI TZ') name = name.replace('Coretemp', 'CoreTemp') name = name.replace('Cpu', 'CPU') name = name.replace('Id ', 'ID ') name = name.replace('Isa ', 'ISA ') name = name.replace('Pci ', 'PCI ') name = name.replace('Smc', 'SMC') name = re.sub(r'(\D+)(\d+)', r'\1 \2', name, re.IGNORECASE) name = re.sub(r'^K (\d+)Temp', r'AMD K\1 Temps', name, re.IGNORECASE) name = re.sub(r'T(ccd\s+\d+|ctl|die)', r'CPU (T\1)', name, re.IGNORECASE) name = re.sub(r'\s+', ' ', name) return name def get_sensor_data() -> dict[Any, Any]: """Get sensor data via OS-specific means, returns dict.""" sensor_data = {} if PLATFORM == 'Darwin': sensor_data = get_sensor_data_macos() elif PLATFORM == 'Linux': sensor_data = get_sensor_data_linux() return sensor_data def get_sensor_data_linux() -> dict[Any, Any]: """Get sensor data via lm_sensors, returns dict.""" raw_lm_sensor_data = get_sensor_data_lm() sensor_data = {'CPUTemps': {}, 'Others': {}} # Parse lm_sensor data for adapter, sources in raw_lm_sensor_data.items(): section = 'Others' if LM_SENSORS_CPU_REGEX.search(adapter): section = 'CPUTemps' sensor_data[section][adapter] = {} sources.pop('Adapter', None) # Find current temp and add to dict ## current temp is labeled xxxx_input for source, labels in sources.items(): for label, temp in labels.items(): if ( label.startswith('fan') or label.startswith('in') or label.startswith('curr') ): # Skip fan RPMs and voltages continue if 'input' in label: sensor_data[section][adapter][source] = { 'Current': temp, 'Label': label, 'Max': temp, 'Temps': [temp], } # Remove empty adapters if not sensor_data[section][adapter]: sensor_data[section].pop(adapter) # Remove empty sections for adapters in sensor_data.values(): adapters = {source: source_data for source, source_data in adapters.items() if source_data} # Done return sensor_data def get_sensor_data_lm() -> dict[Any, Any]: """Get raw sensor data via lm_sensors, returns dict.""" raw_lm_sensor_data = {} cmd = ['sensors', '-j'] # Get raw data try: proc = run_program(cmd) except CalledProcessError: # Assuming no sensors available, return empty dict return {} # Workaround for bad sensors raw_data = [] for line in proc.stdout.splitlines(): if line.strip() == ',': # Assuming malformatted line caused by missing data continue raw_data.append(line) # Parse JSON data try: raw_lm_sensor_data = json.loads('\n'.join(raw_data)) except json.JSONDecodeError: # Still broken, just return the empty dict pass # Done return raw_lm_sensor_data def get_sensor_data_macos() -> dict[Any, Any]: """Get sensor data via SMC, returns dict. NOTE: The data is structured like the lm_sensor data. """ cmd = ['smc', '-l'] sensor_data = {'CPUTemps': {'SMC (CPU)': {}}, 'Others': {'SMC (Other)': {}}} # Parse SMC data proc = run_program(cmd) for line in proc.stdout.splitlines(): tmp = SMC_REGEX.match(line.strip()) if tmp: value = tmp.group('Value') try: LOG.debug('Invalid sensor: %s', tmp.group('ID')) value = float(value) except (TypeError, ValueError): # Skip this sensor continue # Only add known sensor IDs sensor_id = tmp.group('ID') if sensor_id not in SMC_IDS: continue # Add to dict section = 'Others' adapter = 'SMC (Other)' if SMC_IDS[sensor_id].get('CPU Temp', False): section = 'CPUTemps' adapter = 'SMC (CPU)' source = SMC_IDS[sensor_id]['Source'] sensor_data[section][adapter][source] = { 'Current': value, 'Label': sensor_id, 'Max': value, 'Temps': [value], } # Done return sensor_data def get_temp_str(temp, colored=True) -> str: """Get colored string based on temp, returns str.""" temp_color = '' # Safety check try: temp = float(temp) except (TypeError, ValueError): # Invalid temp? return ansi.color_string(temp, 'PURPLE') # Determine color if colored: for threshold, color in sorted(TEMP_COLORS.items(), reverse=True): if temp >= threshold: temp_color = color break # Done return ansi.color_string(f'{"-" if temp < 0 else ""}{temp:2.0f}°C', temp_color) if __name__ == '__main__': print("This file is not meant to be called directly.")