From 52d61226a0385d92daef5ba9a3a82edc9d85444d Mon Sep 17 00:00:00 2001 From: 2Shirt <2xShirt@gmail.com> Date: Wed, 23 Oct 2019 15:23:50 -0700 Subject: [PATCH] Added Disk() obj --- scripts/wk/exe.py | 2 +- scripts/wk/obj.py | 128 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 129 insertions(+), 1 deletion(-) diff --git a/scripts/wk/exe.py b/scripts/wk/exe.py index 9a747e4c..753df2a0 100644 --- a/scripts/wk/exe.py +++ b/scripts/wk/exe.py @@ -97,7 +97,7 @@ def get_json_from_command(cmd, check=True, encoding='utf-8', errors='ignore'): """Capture JSON content from cmd output, returns dict. If the data can't be decoded then either an exception is raised - or an empty dict is returned depending on ignore_errors. + or an empty dict is returned depending on errors. """ json_data = {} diff --git a/scripts/wk/obj.py b/scripts/wk/obj.py index 8507d6f0..46cc1cd8 100644 --- a/scripts/wk/obj.py +++ b/scripts/wk/obj.py @@ -1,13 +1,23 @@ """WizardKit: Objects.""" # vim: sts=2 sw=2 ts=2 +import logging import pathlib +import re from collections import OrderedDict from wk.exe import get_json_from_command, run_program from wk.std import bytes_to_string, color_string, string_to_bytes +# STATIC VARIABLES +KEY_NVME = 'nvme_smart_health_information_log' +KEY_SMART = 'ata_smart_attributes' +LOG = logging.getLogger(__name__) +REGEX_POWER_ON_TIME = re.compile( + r'^(\d+)([Hh].*|\s+\(\d+\s+\d+\s+\d+\).*)' + ) + # Classes class CpuRam(): """Object for tracking CPU & RAM specific data.""" @@ -87,5 +97,123 @@ class CpuRam(): return report +class Disk(): + """Object for tracking disk specific data.""" + def __init__(self, path): + self.attributes = {} + self.description = 'UNKNOWN' + self.lsblk = {} + self.nvme_smart_notes = {} + self.path = pathlib.Path(path).resolve() + self.smartctl = {} + self.tests = OrderedDict() + + # Update details + self.get_details() + self.enable_smart() + self.update_smart_details() + + def enable_smart(self): + """Try enabling SMART for this disk.""" + cmd = [ + 'sudo', + 'smartctl', + '--tolerance=permissive', + '--smart=on', + self.path, + ] + run_program(cmd, check=False) + + def get_details(self): + """Get details via lsblk. + + Required details default to generic descriptions and + are converted to the correct type. + """ + cmd = ['lsblk', '--bytes', '--json', '--output-all', '--paths', self.path] + json_data = get_json_from_command(cmd) + self.lsblk = json_data.get('blockdevices', [{}])[0] + + # Set necessary details + self.lsblk['model'] = self.lsblk.get('model', 'Unknown Model') + self.lsblk['name'] = self.lsblk.get('name', self.path) + self.lsblk['log-sec'] = self.lsblk.get('log-sec', 512) + self.lsblk['phy-sec'] = self.lsblk.get('phy-sec', 512) + self.lsblk['rota'] = self.lsblk.get('rota', True) + self.lsblk['serial'] = self.lsblk.get('serial', 'Unknown Serial') + self.lsblk['size'] = self.lsblk.get('size', -1) + self.lsblk['tran'] = self.lsblk.get('tran', '???') + self.lsblk['tran'] = self.lsblk['tran'].upper().replace('NVME', 'NVMe') + + # Ensure certain attributes types + for attr in ['model', 'name', 'serial', 'tran']: + if not isinstance(self.lsblk[attr], str): + self.lsblk[attr] = str(self.lsblk[attr]) + for attr in ['log-sec', 'phy-sec', 'size']: + if not isinstance(self.lsblk[attr], int): + self.lsblk[attr] = int(self.lsblk[attr]) + + def get_labels(self): + """Build list of labels for this disk, returns list.""" + labels = [] + + # Add all labels from lsblk + for disk in [self.lsblk, *self.lsblk.get('children', [])]: + labels.append(disk.get('label', '')) + labels.append(disk.get('partlabel', '')) + + # Remove empty labels + labels = [str(label) for label in labels if label] + + # Done + return labels + + def update_smart_details(self): + """Update SMART details via smartctl.""" + self.attributes = {} + cmd = [ + 'sudo', + 'smartctl', + '--tolerance=verypermissive', + '--all', + '--json', + self.path, + ] + self.smartctl = get_json_from_command(cmd) + + # Check for attributes + if KEY_NVME in self.smartctl: + for name, value in self.smartctl[KEY_NVME].items(): + try: + self.attributes[name] = { + 'name': name, + 'raw': int(value), + 'raw_str': str(value), + } + except ValueError: + # Ignoring invalid attribute + LOG.error('Invalid NVMe attribute: %s %s', name, value) + elif KEY_SMART in self.smartctl: + for attribute in self.smartctl[KEY_SMART].get('table', {}): + try: + _id = int(attribute['id']) + except (KeyError, ValueError): + # Ignoring invalid attribute + LOG.error('Invalid SMART attribute: %s', attribute) + continue + name = str(attribute.get('name', 'UNKNOWN')).replace('_', ' ').title() + raw = int(attribute.get('raw', {}).get('value', -1)) + raw_str = attribute.get('raw', {}).get('string', 'UNKNOWN') + + # Fix power-on time + match = REGEX_POWER_ON_TIME.match(raw_str) + if _id == 9 and match: + raw = int(match.group(1)) + + # Add to dict + self.attributes[_id] = { + 'name': name, 'raw': raw, 'raw_str': raw_str} + + if __name__ == '__main__': print("This file is not meant to be called directly.")