Set known_attributes when intializing Disk()

This new design uses copy.deepcopy() to avoid erroneous thresholds being
applied to drives during diags.  This also reduces the number of lookups
to one per Disk.
This commit is contained in:
2Shirt 2022-10-08 14:15:32 -07:00
parent c08ad2b1fb
commit 6880a353cc
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C
2 changed files with 41 additions and 38 deletions

View file

@ -17,6 +17,7 @@ from wk.hw.test import Test
from wk.hw.smart import ( from wk.hw.smart import (
enable_smart, enable_smart,
generate_attribute_report, generate_attribute_report,
get_known_disk_attributes,
update_smart_details, update_smart_details,
) )
from wk.std import PLATFORM, bytes_to_string, color_string, strip_colors from wk.std import PLATFORM, bytes_to_string, color_string, strip_colors
@ -35,28 +36,30 @@ WK_LABEL_REGEX = re.compile(
class Disk: class Disk:
# pylint: disable=too-many-instance-attributes # pylint: disable=too-many-instance-attributes
"""Object for tracking disk specific data.""" """Object for tracking disk specific data."""
attributes: dict[Any, dict] = field(init=False, default_factory=dict) attributes: dict[Any, dict] = field(init=False, default_factory=dict)
bus: str = field(init=False) bus: str = field(init=False)
children: list[dict] = field(init=False, default_factory=list) children: list[dict] = field(init=False, default_factory=list)
filesystem: str = field(init=False) filesystem: str = field(init=False)
log_sec: int = field(init=False) known_attributes: dict[Any, dict] = field(init=False, default_factory=dict)
model: str = field(init=False) log_sec: int = field(init=False)
name: str = field(init=False) model: str = field(init=False)
notes: list[str] = field(init=False, default_factory=list) name: str = field(init=False)
notes: list[str] = field(init=False, default_factory=list)
path: Union[pathlib.Path, str] path: Union[pathlib.Path, str]
parent: str = field(init=False) parent: str = field(init=False)
phy_sec: int = field(init=False) phy_sec: int = field(init=False)
raw_details: dict[str, Any] = field(init=False) raw_details: dict[str, Any] = field(init=False)
raw_smartctl: dict[str, Any] = field(init=False) raw_smartctl: dict[str, Any] = field(init=False)
serial: str = field(init=False) serial: str = field(init=False)
size: int = field(init=False) size: int = field(init=False)
ssd: bool = field(init=False) ssd: bool = field(init=False)
tests: list[Test] = field(init=False, default_factory=list) tests: list[Test] = field(init=False, default_factory=list)
use_sat: bool = field(init=False, default=False) use_sat: bool = field(init=False, default=False)
def __post_init__(self) -> None: def __post_init__(self) -> None:
self.path = pathlib.Path(self.path).resolve() self.path = pathlib.Path(self.path).resolve()
self.update_details() self.update_details()
self.known_attributes = get_known_disk_attributes(self.model)
enable_smart(self) enable_smart(self)
update_smart_details(self) update_smart_details(self)
if not self.attributes and self.bus == 'USB': if not self.attributes and self.bus == 'USB':

View file

@ -1,6 +1,7 @@
"""WizardKit: SMART test functions""" """WizardKit: SMART test functions"""
# vim: sts=2 sw=2 ts=2 # vim: sts=2 sw=2 ts=2
import copy
import logging import logging
import re import re
@ -67,16 +68,15 @@ def build_self_test_report(test_obj, aborted=False) -> None:
def check_attributes(dev, only_blocking=False) -> bool: def check_attributes(dev, only_blocking=False) -> bool:
"""Check if any known attributes are failing, returns bool.""" """Check if any known attributes are failing, returns bool."""
attributes_ok = True attributes_ok = True
known_attributes = get_known_disk_attributes(dev.model)
for attr, value in dev.attributes.items(): for attr, value in dev.attributes.items():
# Skip unknown attributes # Skip unknown attributes
if attr not in known_attributes: if attr not in dev.known_attributes:
continue continue
# Get thresholds # Get thresholds
blocking_attribute = known_attributes[attr].get('Blocking', False) blocking_attribute = dev.known_attributes[attr].get('Blocking', False)
err_thresh = known_attributes[attr].get('Error', None) err_thresh = dev.known_attributes[attr].get('Error', None)
max_thresh = known_attributes[attr].get('Maximum', None) max_thresh = dev.known_attributes[attr].get('Maximum', None)
if not max_thresh: if not max_thresh:
max_thresh = float('inf') max_thresh = float('inf')
@ -89,7 +89,7 @@ def check_attributes(dev, only_blocking=False) -> bool:
continue continue
# Check attribute # Check attribute
if known_attributes[attr].get('PercentageLife', False): if dev.known_attributes[attr].get('PercentageLife', False):
if 0 <= value['raw'] <= err_thresh: if 0 <= value['raw'] <= err_thresh:
attributes_ok = False attributes_ok = False
elif err_thresh <= value['raw'] < max_thresh: elif err_thresh <= value['raw'] < max_thresh:
@ -114,18 +114,17 @@ def enable_smart(dev) -> None:
def generate_attribute_report(dev) -> list[str]: def generate_attribute_report(dev) -> list[str]:
"""Generate attribute report, returns list.""" """Generate attribute report, returns list."""
known_attributes = get_known_disk_attributes(dev.model)
report = [] report = []
for attr, value in sorted(dev.attributes.items()): for attr, value in sorted(dev.attributes.items()):
note = '' note = ''
value_color = 'GREEN' value_color = 'GREEN'
# Skip attributes not in our list # Skip attributes not in our list
if attr not in known_attributes: if attr not in dev.known_attributes:
continue continue
# Check for attribute note # Check for attribute note
note = known_attributes[attr].get('Note', '') note = dev.known_attributes[attr].get('Note', '')
# ID / Name # ID / Name
label = f'{attr:>3}' label = f'{attr:>3}'
@ -135,9 +134,9 @@ def generate_attribute_report(dev) -> list[str]:
label = f' {label.replace("_", " "):38}' label = f' {label.replace("_", " "):38}'
# Value color # Value color
if known_attributes[attr].get('PercentageLife', False): if dev.known_attributes[attr].get('PercentageLife', False):
# PercentageLife values # PercentageLife values
if 0 <= value['raw'] <= known_attributes[attr]['Error']: if 0 <= value['raw'] <= dev.known_attributes[attr]['Error']:
value_color = 'RED' value_color = 'RED'
note = '(failed, % life remaining)' note = '(failed, % life remaining)'
elif value['raw'] < 0 or value['raw'] > 100: elif value['raw'] < 0 or value['raw'] > 100:
@ -145,7 +144,7 @@ def generate_attribute_report(dev) -> list[str]:
note = '(invalid?)' note = '(invalid?)'
else: else:
for threshold, color in ATTRIBUTE_COLORS: for threshold, color in ATTRIBUTE_COLORS:
threshold_val = known_attributes[attr].get(threshold, None) threshold_val = dev.known_attributes[attr].get(threshold, None)
if threshold_val and value['raw'] >= threshold_val: if threshold_val and value['raw'] >= threshold_val:
value_color = color value_color = color
if threshold == 'Error': if threshold == 'Error':
@ -168,18 +167,19 @@ def generate_attribute_report(dev) -> list[str]:
return report return report
def get_known_disk_attributes(model) -> dict[Any, dict]: def get_known_disk_attributes(model) -> None:
"""Get known NVMe/SMART attributes (model specific), returns dict.""" """Get known disk attributes based on the device model."""
known_attributes = KNOWN_DISK_ATTRIBUTES.copy() known_attributes = copy.deepcopy(KNOWN_DISK_ATTRIBUTES)
# Apply model-specific data # Apply model-specific data
for regex, data in KNOWN_DISK_MODELS.items(): for regex, data in KNOWN_DISK_MODELS.items():
if re.search(regex, model): if not re.search(regex, model):
for attr, thresholds in data.items(): continue
if attr in known_attributes: for attr, thresholds in data.items():
known_attributes[attr].update(thresholds) if attr in known_attributes:
else: known_attributes[attr].update(thresholds)
known_attributes[attr] = thresholds else:
known_attributes[attr] = copy.deepcopy(thresholds)
# Done # Done
return known_attributes return known_attributes