From 6880a353cca1a82c0ad9dc649caf494209a9c8e7 Mon Sep 17 00:00:00 2001 From: 2Shirt <2xShirt@gmail.com> Date: Sat, 8 Oct 2022 14:15:32 -0700 Subject: [PATCH] Set known_attributes when intializing Disk() This new design uses copy.deepcopy() to avoid erroneous thresholds being applied to drives during diags. This also reduces the number of lookups to one per Disk. --- scripts/wk/hw/disk.py | 37 ++++++++++++++++++++----------------- scripts/wk/hw/smart.py | 42 +++++++++++++++++++++--------------------- 2 files changed, 41 insertions(+), 38 deletions(-) diff --git a/scripts/wk/hw/disk.py b/scripts/wk/hw/disk.py index 00d2f85e..a663cd6b 100644 --- a/scripts/wk/hw/disk.py +++ b/scripts/wk/hw/disk.py @@ -17,6 +17,7 @@ from wk.hw.test import Test from wk.hw.smart import ( enable_smart, generate_attribute_report, + get_known_disk_attributes, update_smart_details, ) from wk.std import PLATFORM, bytes_to_string, color_string, strip_colors @@ -35,28 +36,30 @@ WK_LABEL_REGEX = re.compile( class Disk: # pylint: disable=too-many-instance-attributes """Object for tracking disk specific data.""" - attributes: dict[Any, dict] = field(init=False, default_factory=dict) - bus: str = field(init=False) - children: list[dict] = field(init=False, default_factory=list) - filesystem: str = field(init=False) - log_sec: int = field(init=False) - model: str = field(init=False) - name: str = field(init=False) - notes: list[str] = field(init=False, default_factory=list) + attributes: dict[Any, dict] = field(init=False, default_factory=dict) + bus: str = field(init=False) + children: list[dict] = field(init=False, default_factory=list) + filesystem: str = field(init=False) + known_attributes: dict[Any, dict] = field(init=False, default_factory=dict) + log_sec: int = field(init=False) + model: str = field(init=False) + name: str = field(init=False) + notes: list[str] = field(init=False, default_factory=list) path: Union[pathlib.Path, str] - parent: str = field(init=False) - phy_sec: int = field(init=False) - raw_details: dict[str, Any] = field(init=False) - raw_smartctl: dict[str, Any] = field(init=False) - serial: str = field(init=False) - size: int = field(init=False) - ssd: bool = field(init=False) - tests: list[Test] = field(init=False, default_factory=list) - use_sat: bool = field(init=False, default=False) + parent: str = field(init=False) + phy_sec: int = field(init=False) + raw_details: dict[str, Any] = field(init=False) + raw_smartctl: dict[str, Any] = field(init=False) + serial: str = field(init=False) + size: int = field(init=False) + ssd: bool = field(init=False) + tests: list[Test] = field(init=False, default_factory=list) + use_sat: bool = field(init=False, default=False) def __post_init__(self) -> None: self.path = pathlib.Path(self.path).resolve() self.update_details() + self.known_attributes = get_known_disk_attributes(self.model) enable_smart(self) update_smart_details(self) if not self.attributes and self.bus == 'USB': diff --git a/scripts/wk/hw/smart.py b/scripts/wk/hw/smart.py index 927668d7..248959f0 100644 --- a/scripts/wk/hw/smart.py +++ b/scripts/wk/hw/smart.py @@ -1,6 +1,7 @@ """WizardKit: SMART test functions""" # vim: sts=2 sw=2 ts=2 +import copy import logging import re @@ -67,16 +68,15 @@ def build_self_test_report(test_obj, aborted=False) -> None: def check_attributes(dev, only_blocking=False) -> bool: """Check if any known attributes are failing, returns bool.""" attributes_ok = True - known_attributes = get_known_disk_attributes(dev.model) for attr, value in dev.attributes.items(): # Skip unknown attributes - if attr not in known_attributes: + if attr not in dev.known_attributes: continue # Get thresholds - blocking_attribute = known_attributes[attr].get('Blocking', False) - err_thresh = known_attributes[attr].get('Error', None) - max_thresh = known_attributes[attr].get('Maximum', None) + blocking_attribute = dev.known_attributes[attr].get('Blocking', False) + err_thresh = dev.known_attributes[attr].get('Error', None) + max_thresh = dev.known_attributes[attr].get('Maximum', None) if not max_thresh: max_thresh = float('inf') @@ -89,7 +89,7 @@ def check_attributes(dev, only_blocking=False) -> bool: continue # Check attribute - if known_attributes[attr].get('PercentageLife', False): + if dev.known_attributes[attr].get('PercentageLife', False): if 0 <= value['raw'] <= err_thresh: attributes_ok = False elif err_thresh <= value['raw'] < max_thresh: @@ -114,18 +114,17 @@ def enable_smart(dev) -> None: def generate_attribute_report(dev) -> list[str]: """Generate attribute report, returns list.""" - known_attributes = get_known_disk_attributes(dev.model) report = [] for attr, value in sorted(dev.attributes.items()): note = '' value_color = 'GREEN' # Skip attributes not in our list - if attr not in known_attributes: + if attr not in dev.known_attributes: continue # Check for attribute note - note = known_attributes[attr].get('Note', '') + note = dev.known_attributes[attr].get('Note', '') # ID / Name label = f'{attr:>3}' @@ -135,9 +134,9 @@ def generate_attribute_report(dev) -> list[str]: label = f' {label.replace("_", " "):38}' # Value color - if known_attributes[attr].get('PercentageLife', False): + if dev.known_attributes[attr].get('PercentageLife', False): # PercentageLife values - if 0 <= value['raw'] <= known_attributes[attr]['Error']: + if 0 <= value['raw'] <= dev.known_attributes[attr]['Error']: value_color = 'RED' note = '(failed, % life remaining)' elif value['raw'] < 0 or value['raw'] > 100: @@ -145,7 +144,7 @@ def generate_attribute_report(dev) -> list[str]: note = '(invalid?)' else: for threshold, color in ATTRIBUTE_COLORS: - threshold_val = known_attributes[attr].get(threshold, None) + threshold_val = dev.known_attributes[attr].get(threshold, None) if threshold_val and value['raw'] >= threshold_val: value_color = color if threshold == 'Error': @@ -168,18 +167,19 @@ def generate_attribute_report(dev) -> list[str]: return report -def get_known_disk_attributes(model) -> dict[Any, dict]: - """Get known NVMe/SMART attributes (model specific), returns dict.""" - known_attributes = KNOWN_DISK_ATTRIBUTES.copy() +def get_known_disk_attributes(model) -> None: + """Get known disk attributes based on the device model.""" + known_attributes = copy.deepcopy(KNOWN_DISK_ATTRIBUTES) # Apply model-specific data for regex, data in KNOWN_DISK_MODELS.items(): - if re.search(regex, model): - for attr, thresholds in data.items(): - if attr in known_attributes: - known_attributes[attr].update(thresholds) - else: - known_attributes[attr] = thresholds + if not re.search(regex, model): + continue + for attr, thresholds in data.items(): + if attr in known_attributes: + known_attributes[attr].update(thresholds) + else: + known_attributes[attr] = copy.deepcopy(thresholds) # Done return known_attributes