Added initial version of wk.hw.sensors
* Supports Linux and macOS * Only initial temp, no updates yet
This commit is contained in:
parent
49c0ce9a62
commit
dc030ab076
5 changed files with 306 additions and 0 deletions
|
|
@ -54,6 +54,62 @@ KNOWN_RAM_VENDOR_IDS = {
|
||||||
REGEX_POWER_ON_TIME = re.compile(
|
REGEX_POWER_ON_TIME = re.compile(
|
||||||
r'^(\d+)([Hh].*|\s+\(\d+\s+\d+\s+\d+\).*)'
|
r'^(\d+)([Hh].*|\s+\(\d+\s+\d+\s+\d+\).*)'
|
||||||
)
|
)
|
||||||
|
SMC_IDS = {
|
||||||
|
# Sources: https://github.com/beltex/SMCKit/blob/master/SMCKit/SMC.swift
|
||||||
|
# http://www.opensource.apple.com/source/net_snmp/
|
||||||
|
# https://github.com/jedda/OSX-Monitoring-Tools
|
||||||
|
'TA0P': {'CPU Temp': False, 'Source': 'Ambient temp'},
|
||||||
|
'TA0S': {'CPU Temp': False, 'Source': 'PCIE Slot 1 Ambient'},
|
||||||
|
'TA1P': {'CPU Temp': False, 'Source': 'Ambient temp'},
|
||||||
|
'TA1S': {'CPU Temp': False, 'Source': 'PCIE Slot 1 PCB'},
|
||||||
|
'TA2S': {'CPU Temp': False, 'Source': 'PCIE Slot 2 Ambient'},
|
||||||
|
'TA3S': {'CPU Temp': False, 'Source': 'PCIE Slot 2 PCB'},
|
||||||
|
'TC0C': {'CPU Temp': True, 'Source': 'CPU Core 0'},
|
||||||
|
'TC0D': {'CPU Temp': True, 'Source': 'CPU die temp'},
|
||||||
|
'TC0H': {'CPU Temp': True, 'Source': 'CPU heatsink temp'},
|
||||||
|
'TC0P': {'CPU Temp': True, 'Source': 'CPU Ambient 1'},
|
||||||
|
'TC1C': {'CPU Temp': True, 'Source': 'CPU Core 1'},
|
||||||
|
'TC1P': {'CPU Temp': True, 'Source': 'CPU Ambient 2'},
|
||||||
|
'TC2C': {'CPU Temp': True, 'Source': 'CPU B Core 0'},
|
||||||
|
'TC2P': {'CPU Temp': True, 'Source': 'CPU B Ambient 1'},
|
||||||
|
'TC3C': {'CPU Temp': True, 'Source': 'CPU B Core 1'},
|
||||||
|
'TC3P': {'CPU Temp': True, 'Source': 'CPU B Ambient 2'},
|
||||||
|
'TCAC': {'CPU Temp': True, 'Source': 'CPU core from PCECI'},
|
||||||
|
'TCAH': {'CPU Temp': True, 'Source': 'CPU HeatSink'},
|
||||||
|
'TCBC': {'CPU Temp': True, 'Source': 'CPU B core from PCECI'},
|
||||||
|
'TCBH': {'CPU Temp': True, 'Source': 'CPU HeatSink'},
|
||||||
|
'Te1P': {'CPU Temp': False, 'Source': 'PCIE ambient temp'},
|
||||||
|
'Te1S': {'CPU Temp': False, 'Source': 'PCIE slot 1'},
|
||||||
|
'Te2S': {'CPU Temp': False, 'Source': 'PCIE slot 2'},
|
||||||
|
'Te3S': {'CPU Temp': False, 'Source': 'PCIE slot 3'},
|
||||||
|
'Te4S': {'CPU Temp': False, 'Source': 'PCIE slot 4'},
|
||||||
|
'TG0C': {'CPU Temp': False, 'Source': 'Mezzanine GPU Core'},
|
||||||
|
'TG0P': {'CPU Temp': False, 'Source': 'Mezzanine GPU Exhaust'},
|
||||||
|
'TH0P': {'CPU Temp': False, 'Source': 'Drive Bay 0'},
|
||||||
|
'TH1P': {'CPU Temp': False, 'Source': 'Drive Bay 1'},
|
||||||
|
'TH2P': {'CPU Temp': False, 'Source': 'Drive Bay 2'},
|
||||||
|
'TH3P': {'CPU Temp': False, 'Source': 'Drive Bay 3'},
|
||||||
|
'TH4P': {'CPU Temp': False, 'Source': 'Drive Bay 4'},
|
||||||
|
'TM0P': {'CPU Temp': False, 'Source': 'CPU DIMM Exit Ambient'},
|
||||||
|
'Tp0C': {'CPU Temp': False, 'Source': 'PSU1 Inlet Ambient'},
|
||||||
|
'Tp0P': {'CPU Temp': False, 'Source': 'PSU1 Inlet Ambient'},
|
||||||
|
'Tp1C': {'CPU Temp': False, 'Source': 'PSU1 Secondary Component'},
|
||||||
|
'Tp1P': {'CPU Temp': False, 'Source': 'PSU1 Primary Component'},
|
||||||
|
'Tp2P': {'CPU Temp': False, 'Source': 'PSU1 Secondary Component'},
|
||||||
|
'Tp3P': {'CPU Temp': False, 'Source': 'PSU2 Inlet Ambient'},
|
||||||
|
'Tp4P': {'CPU Temp': False, 'Source': 'PSU2 Primary Component'},
|
||||||
|
'Tp5P': {'CPU Temp': False, 'Source': 'PSU2 Secondary Component'},
|
||||||
|
'TS0C': {'CPU Temp': False, 'Source': 'CPU B DIMM Exit Ambient'},
|
||||||
|
}
|
||||||
|
TEMP_COLORS = {
|
||||||
|
float('-inf'): 'CYAN',
|
||||||
|
00: 'BLUE',
|
||||||
|
60: 'GREEN',
|
||||||
|
70: 'YELLOW',
|
||||||
|
80: 'ORANGE',
|
||||||
|
90: 'RED',
|
||||||
|
100: 'ORANGE_RED',
|
||||||
|
}
|
||||||
TMUX_SIDE_WIDTH = 20
|
TMUX_SIDE_WIDTH = 20
|
||||||
TMUX_LAYOUT = OrderedDict({
|
TMUX_LAYOUT = OrderedDict({
|
||||||
'Top': {'height': 2, 'Check': True},
|
'Top': {'height': 2, 'Check': True},
|
||||||
|
|
|
||||||
|
|
@ -2,3 +2,4 @@
|
||||||
|
|
||||||
from wk.hw import diags
|
from wk.hw import diags
|
||||||
from wk.hw import obj
|
from wk.hw import obj
|
||||||
|
from wk.hw import sensors
|
||||||
|
|
|
||||||
|
|
@ -273,6 +273,7 @@ def keyboard_test():
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
|
# pylint: disable=too-many-branches
|
||||||
"""Main function for hardware diagnostics."""
|
"""Main function for hardware diagnostics."""
|
||||||
args = docopt(DOCSTRING)
|
args = docopt(DOCSTRING)
|
||||||
log.update_log_path(dest_name='Hardware-Diagnostics', timestamp=True)
|
log.update_log_path(dest_name='Hardware-Diagnostics', timestamp=True)
|
||||||
|
|
|
||||||
247
scripts/wk/hw/sensors.py
Normal file
247
scripts/wk/hw/sensors.py
Normal file
|
|
@ -0,0 +1,247 @@
|
||||||
|
"""WizardKit: Hardware sensors"""
|
||||||
|
# vim: sts=2 sw=2 ts=2
|
||||||
|
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import platform
|
||||||
|
import re
|
||||||
|
|
||||||
|
from subprocess import CalledProcessError
|
||||||
|
|
||||||
|
from wk.cfg.hw import SMC_IDS, TEMP_COLORS
|
||||||
|
from wk.exe import run_program
|
||||||
|
from wk.std import color_string
|
||||||
|
|
||||||
|
|
||||||
|
# STATIC VARIABLES
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
LM_SENSORS_CPU_REGEX = re.compile(r'(core|k\d+)temp', re.IGNORECASE)
|
||||||
|
SMC_REGEX = re.compile(
|
||||||
|
r'^\s*(?P<ID>\w{4})'
|
||||||
|
r'\s+\[(?P<Type>.*)\]'
|
||||||
|
r'\s+(?P<Value>.*?)'
|
||||||
|
r'\s*\(bytes (?P<Bytes>.*)\)$'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# Error Classes
|
||||||
|
class ThermalLimitReachedError(RuntimeError):
|
||||||
|
"""Raised when the thermal threshold is reached."""
|
||||||
|
|
||||||
|
|
||||||
|
# Classes
|
||||||
|
class Sensors():
|
||||||
|
"""Class for holding sensor specific data."""
|
||||||
|
def __init__(self):
|
||||||
|
self.data = get_sensor_data()
|
||||||
|
|
||||||
|
def clear_temps(self):
|
||||||
|
"""Clear saved temps but keep structure"""
|
||||||
|
for adapters in self.data.values():
|
||||||
|
for sources in adapters.values():
|
||||||
|
for source_data in sources.values():
|
||||||
|
source_data['Temps'] = []
|
||||||
|
|
||||||
|
def generate_report(self, *temp_labels, colored=True, only_cpu=False):
|
||||||
|
"""Generate report based on given temp_labels, returns list."""
|
||||||
|
report = []
|
||||||
|
|
||||||
|
for section, adapters in sorted(self.data.items()):
|
||||||
|
if only_cpu and not section.startswith('CPU'):
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Ugly section
|
||||||
|
for adapter, sources in sorted(adapters.items()):
|
||||||
|
report.append(fix_sensor_name(adapter))
|
||||||
|
for source, source_data in sorted(sources.items()):
|
||||||
|
line = f'{fix_sensor_name(source):18} '
|
||||||
|
for label in temp_labels:
|
||||||
|
if label != 'Current':
|
||||||
|
line += f' {label.lower()}: '
|
||||||
|
line += get_temp_str(
|
||||||
|
source_data.get(label, '???'),
|
||||||
|
colored=colored,
|
||||||
|
)
|
||||||
|
report.append(line)
|
||||||
|
if not only_cpu:
|
||||||
|
report.append('')
|
||||||
|
|
||||||
|
# Handle empty reports
|
||||||
|
if not report:
|
||||||
|
report = [
|
||||||
|
color_string('WARNING: No sensors found', 'YELLOW'),
|
||||||
|
'',
|
||||||
|
'Please monitor temps manually',
|
||||||
|
]
|
||||||
|
|
||||||
|
# Done
|
||||||
|
return report
|
||||||
|
|
||||||
|
|
||||||
|
# Functions
|
||||||
|
def fix_sensor_name(name):
|
||||||
|
"""Cleanup sensor name, returns str."""
|
||||||
|
name = re.sub(r'^(\w+)-(\w+)-(\w+)', r'\1 (\2 \3)', name, re.IGNORECASE)
|
||||||
|
name = name.title()
|
||||||
|
name = name.replace('Coretemp', 'CoreTemp')
|
||||||
|
name = name.replace('Acpi', 'ACPI')
|
||||||
|
name = name.replace('ACPItz', 'ACPI TZ')
|
||||||
|
name = name.replace('Isa ', 'ISA ')
|
||||||
|
name = name.replace('Pci ', 'PCI ')
|
||||||
|
name = name.replace('Id ', 'ID ')
|
||||||
|
name = re.sub(r'(\D+)(\d+)', r'\1 \2', name, re.IGNORECASE)
|
||||||
|
name = re.sub(r'^K (\d+)Temp', r'AMD K\1 Temps', name, re.IGNORECASE)
|
||||||
|
name = re.sub(r'T(ctl|die)', r'CPU (T\1)', name, re.IGNORECASE)
|
||||||
|
return name
|
||||||
|
|
||||||
|
|
||||||
|
def get_lm_sensor_data():
|
||||||
|
"""Get sensor data via lm_sensors, returns dict."""
|
||||||
|
raw_lm_sensor_data = get_raw_lm_sensor_data()
|
||||||
|
sensor_data = {'CPUTemps': {}, 'Others': {}}
|
||||||
|
|
||||||
|
# Parse lm_sensor data
|
||||||
|
for adapter, sources in raw_lm_sensor_data.items():
|
||||||
|
section = 'Others'
|
||||||
|
if LM_SENSORS_CPU_REGEX.search(adapter):
|
||||||
|
section = 'CPUTemps'
|
||||||
|
sensor_data[section][adapter] = {}
|
||||||
|
sources.pop('Adapter', None)
|
||||||
|
|
||||||
|
# Find current temp and add to dict
|
||||||
|
## current temp is labeled xxxx_input
|
||||||
|
for source, labels in sources.items():
|
||||||
|
for label, temp in labels.items():
|
||||||
|
if label.startswith('fan') or label.startswith('in'):
|
||||||
|
# Skip fan RPMs and voltages
|
||||||
|
continue
|
||||||
|
if 'input' in label:
|
||||||
|
sensor_data[section][adapter][source] = {
|
||||||
|
'Current': temp,
|
||||||
|
'Label': label,
|
||||||
|
'Max': temp,
|
||||||
|
'Temps': [temp],
|
||||||
|
}
|
||||||
|
|
||||||
|
# Remove empty adapters
|
||||||
|
if not sensor_data[section][adapter]:
|
||||||
|
sensor_data[section].pop(adapter)
|
||||||
|
|
||||||
|
# Remove empty sections
|
||||||
|
for adapters in sensor_data.values():
|
||||||
|
adapters = {source: source_data for source, source_data in adapters.items()
|
||||||
|
if source_data}
|
||||||
|
|
||||||
|
# Done
|
||||||
|
return sensor_data
|
||||||
|
|
||||||
|
|
||||||
|
def get_raw_lm_sensor_data():
|
||||||
|
"""Get raw sensor data via lm_sensors, returns dict."""
|
||||||
|
raw_lm_sensor_data = {}
|
||||||
|
cmd = ['sensors', '-j']
|
||||||
|
|
||||||
|
# Get raw data
|
||||||
|
try:
|
||||||
|
proc = run_program(cmd)
|
||||||
|
except CalledProcessError:
|
||||||
|
# Assuming no sensors available, return empty dict
|
||||||
|
return {}
|
||||||
|
|
||||||
|
# Workaround for bad sensors
|
||||||
|
raw_data = []
|
||||||
|
for line in proc.stdout.splitlines():
|
||||||
|
if line.strip() == ',':
|
||||||
|
# Assuming malformatted line caused by missing data
|
||||||
|
continue
|
||||||
|
raw_data.append(line)
|
||||||
|
|
||||||
|
# Parse JSON data
|
||||||
|
try:
|
||||||
|
raw_lm_sensor_data = json.loads('\n'.join(raw_data))
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
# Still broken, just return the empty dict
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Done
|
||||||
|
return raw_lm_sensor_data
|
||||||
|
|
||||||
|
|
||||||
|
def get_sensor_data():
|
||||||
|
"""Get sensor data via OS-specific means, returns dict."""
|
||||||
|
sensor_data = {}
|
||||||
|
if platform.system() == 'Darwin':
|
||||||
|
sensor_data = get_smc_sensor_data()
|
||||||
|
elif platform.system() == 'Linux':
|
||||||
|
sensor_data = get_lm_sensor_data()
|
||||||
|
|
||||||
|
return sensor_data
|
||||||
|
|
||||||
|
|
||||||
|
def get_smc_sensor_data():
|
||||||
|
"""Get sensor data via SMC, returns dict.
|
||||||
|
|
||||||
|
NOTE: The data is structured like the lm_sensor data.
|
||||||
|
"""
|
||||||
|
cmd = ['smc', '-l']
|
||||||
|
sensor_data = {'CPUTemps': {'smc': {}}, 'Others': {'smc': {}}}
|
||||||
|
|
||||||
|
# Parse SMC data
|
||||||
|
proc = run_program(cmd)
|
||||||
|
for line in proc.stdout.splitlines():
|
||||||
|
tmp = SMC_REGEX.match(line.strip())
|
||||||
|
if tmp:
|
||||||
|
value = tmp.group('Value')
|
||||||
|
try:
|
||||||
|
LOG.debug('Invalid sensor: %s', tmp.group('ID'))
|
||||||
|
value = float(value)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
# Skip this sensor
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Only add known sensor IDs
|
||||||
|
sensor_id = tmp.group('ID')
|
||||||
|
if sensor_id not in SMC_IDS:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Add to dict
|
||||||
|
section = 'Others'
|
||||||
|
if SMC_IDS[sensor_id].get('CPUTemp', False):
|
||||||
|
section = 'CPUTemps'
|
||||||
|
source = SMC_IDS[sensor_id]['Source']
|
||||||
|
sensor_data[section]['smc'][source] = {
|
||||||
|
'Current': value,
|
||||||
|
'Label': sensor_id,
|
||||||
|
'Max': value,
|
||||||
|
'Temps': [value],
|
||||||
|
}
|
||||||
|
|
||||||
|
# Done
|
||||||
|
return sensor_data
|
||||||
|
|
||||||
|
|
||||||
|
def get_temp_str(temp, colored=True):
|
||||||
|
"""Get colored string based on temp, returns str."""
|
||||||
|
temp_color = None
|
||||||
|
|
||||||
|
# Safety check
|
||||||
|
try:
|
||||||
|
temp = float(temp)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
# Invalid temp?
|
||||||
|
return color_string(temp, 'PURPLE')
|
||||||
|
|
||||||
|
# Determine color
|
||||||
|
if colored:
|
||||||
|
for threshold, color in sorted(TEMP_COLORS.items(), reverse=True):
|
||||||
|
if temp >= threshold:
|
||||||
|
temp_color = color
|
||||||
|
break
|
||||||
|
|
||||||
|
# Done
|
||||||
|
return color_string(f'{"-" if temp < 0 else ""}{temp:2.0f}°C', temp_color)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
print("This file is not meant to be called directly.")
|
||||||
|
|
@ -37,6 +37,7 @@ COLORS = {
|
||||||
'RED': '\033[31m',
|
'RED': '\033[31m',
|
||||||
'RED_BLINK': '\033[31;5m',
|
'RED_BLINK': '\033[31;5m',
|
||||||
'ORANGE': '\033[31;1m',
|
'ORANGE': '\033[31;1m',
|
||||||
|
'ORANGE_RED': '\033[1;31;41m',
|
||||||
'GREEN': '\033[32m',
|
'GREEN': '\033[32m',
|
||||||
'YELLOW': '\033[33m',
|
'YELLOW': '\033[33m',
|
||||||
'YELLOW_BLINK': '\033[33;5m',
|
'YELLOW_BLINK': '\033[33;5m',
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue