From 6a3ef60881ed43800605b4692e79836a2d978c03 Mon Sep 17 00:00:00 2001 From: 2Shirt <2xShirt@gmail.com> Date: Sat, 8 Dec 2018 17:41:29 -0700 Subject: [PATCH] Added CpuObj and renamed dev names to disk * This should make the code more clear * The CpuObj is similar to DiskObj to abstract the device/tests calls * New calls will be like: run_test(state, dev) --- .bin/Scripts/functions/hw_diags.py | 204 +++++++++++++---------------- 1 file changed, 90 insertions(+), 114 deletions(-) diff --git a/.bin/Scripts/functions/hw_diags.py b/.bin/Scripts/functions/hw_diags.py index e1b97df9..a2c8293d 100644 --- a/.bin/Scripts/functions/hw_diags.py +++ b/.bin/Scripts/functions/hw_diags.py @@ -67,32 +67,43 @@ SIDE_PANE_WIDTH = 20 TOP_PANE_TEXT = '{GREEN}Hardware Diagnostics{CLEAR}'.format(**COLORS) # Classes -class DevObj(): - """Device object for tracking device specific data.""" - def __init__(self, state, dev_path): - self.failing = False +class CpuObj(): + """Object for tracking CPU specific data.""" + def __init__(self): + self.lscpu = {} + self.tests = {} + self.get_details() + + def get_details(self): + """Get CPU details from lscpu.""" + cmd = ['lscpu', '--json'] + try: + result = run_program(cmd, check=False) + json_data = json.loads(result.stdout.decode()) + except Exception: + # Ignore and leave self.lscpu empty + return + for line in json_data.get('lscpu', []): + _field = line.get('field', None).replace(':', '') + _data = line.get('data', None) + if not _field and not _data: + # Skip + print_warning(_field, _data) + pause() + continue + self.lscpu[_field] = _data + +class DiskObj(): + """Object for tracking disk specific data.""" + def __init__(self, disk_path): self.labels = [] self.lsblk = {} - self.name = re.sub(r'^.*/(.*)', r'\1', dev_path) + self.name = re.sub(r'^.*/(.*)', r'\1', disk_path) self.nvme_attributes = {} - self.override = False - self.path = dev_path + self.path = disk_path self.smart_attributes = {} self.smartctl = {} - self.state = state - self.tests = { - 'NVMe / SMART': { - 'Result': '', 'Started': False, 'Status': '', 'Order': 1}, - 'badblocks': { - 'Result': '', 'Started': False, 'Status': '', 'Order': 2}, - 'I/O Benchmark': { - 'Result': '', - 'Started': False, - 'Status': '', - 'Read Rates': [], - 'Graph Data': [], - 'Order': 3}, - } + self.tests = {} self.get_details() self.get_smart_details() @@ -122,9 +133,9 @@ class DevObj(): self.lsblk['tran'] = self.lsblk['tran'].upper().replace('NVME', 'NVMe') # Build list of labels - for dev in [self.lsblk, *self.lsblk.get('children', [])]: - self.labels.append(dev.get('label', '')) - self.labels.append(dev.get('partlabel', '')) + for disk in [self.lsblk, *self.lsblk.get('children', [])]: + self.labels.append(disk.get('label', '')) + self.labels.append(disk.get('partlabel', '')) self.labels = [str(label) for label in self.labels if label] def get_smart_details(self): @@ -158,31 +169,15 @@ class DevObj(): self.smart_attributes[_id] = { 'name': _name, 'raw': _raw, 'raw_str': _raw_str} - def update_progress(self): - """Update status strings.""" - for k, v in self.tests.items(): - if self.state.tests[k]['Enabled']: - _status = '' - if not v['Status']: - _status = 'Pending' - if v['Started']: - if v['Result']: - _status = v['Result'] - else: - _status = 'Working' - if _status: - v['Status'] = build_status_string(self.name, _status) class State(): """Object to track device objects and overall state.""" def __init__(self): - self.lscpu = {} - self.devs = [] - self.finished = False + self.cpu = None + self.disks = [] self.panes = {} self.progress_out = '{}/progress.out'.format(global_vars['LogDir']) self.quick_mode = False - self.started = False self.tests = { 'Prime95 & Temps': {'Enabled': False, 'Order': 1, 'Result': '', 'Sensor Data': get_sensor_data(), @@ -191,32 +186,10 @@ class State(): 'badblocks': {'Enabled': False, 'Order': 3}, 'I/O Benchmark': {'Enabled': False, 'Order': 4}, } - self.get_cpu_details() - - def get_cpu_details(self): - """Get CPU details from lscpu.""" - cmd = ['lscpu', '--json'] - try: - result = run_program(cmd, check=False) - json_data = json.loads(result.stdout.decode()) - except Exception as err: - # Ignore and leave self.cpu empty - print_error(err) - pause() - return - for line in json_data.get('lscpu', []): - _field = line.get('field', None).replace(':', '') - _data = line.get('data', None) - if not _field and not _data: - # Skip - print_warning(_field, _data) - pause() - continue - self.lscpu[_field] = _data def init(self): - """Scan for block devices and reset all tests.""" - self.devs = [] + """Set log and add devices.""" + self.disks = [] for k in ['Result', 'Started', 'Status']: self.tests['Prime95 & Temps'][k] = False if k == 'Started' else '' @@ -230,27 +203,30 @@ class State(): global_vars['LogFile'] = '{}/Hardware Diagnostics.log'.format( global_vars['LogDir']) + # Add CPU + self.cpu = CpuObj() + # Add block devices cmd = ['lsblk', '--json', '--nodeps', '--paths'] result = run_program(cmd, check=False) json_data = json.loads(result.stdout.decode()) - for dev in json_data['blockdevices']: - skip_dev = False - dev_obj = DevObj(self, dev['name']) + for disk in json_data['blockdevices']: + skip_disk = False + disk_obj = DiskObj(disk['name']) - # Skip loopback and optical devices - if dev_obj.lsblk['type'] in ['loop', 'rom']: - skip_dev = True + # Skip loopback devices, optical devices, etc + if disk_obj.lsblk['type'] != 'disk': + skip_disk = True - # Skip WK devices + # Skip WK disks wk_label_regex = r'{}_(LINUX|UFD)'.format(KIT_NAME_SHORT) - for label in dev_obj.labels: + for label in disk_obj.labels: if re.search(wk_label_regex, label, re.IGNORECASE): - skip_dev = True + skip_disk = True - # Add device - if not skip_dev: - self.devs.append(dev_obj) + # Add disk + if not skip_disk: + self.disks.append(disk_obj) def update_progress(self): """Update status strings.""" @@ -313,32 +289,32 @@ def build_status_string(label, status, info_label=False): s_w=SIDE_PANE_WIDTH-len(label), **COLORS) -def check_dev_attributes(dev): - """Check if device should be tested and allow overrides.""" +def check_disk_attributes(disk): + """Check if disk should be tested and allow overrides.""" needs_override = False print_standard(' {size:>6} ({tran}) {model} {serial}'.format( - **dev.lsblk)) + **disk.lsblk)) # General checks - if not dev.nvme_attributes and not dev.smart_attributes: + if not disk.nvme_attributes and not disk.smart_attributes: needs_override = True print_warning( ' WARNING: No NVMe or SMART attributes available for: {}'.format( - dev.path)) + disk.path)) # NVMe checks - # TODO check all tracked attributes and set dev.failing if needed + # TODO check all tracked attributes and set disk.failing if needed # SMART checks - # TODO check all tracked attributes and set dev.failing if needed + # TODO check all tracked attributes and set disk.failing if needed # Ask for override if necessary if needs_override: if ask(' Run tests on this device anyway?'): - # TODO Set override for this dev + # TODO Set override for this disk pass else: - for v in dev.tests.values(): + for v in disk.tests.values(): # Started is set to True to fix the status string v['Result'] = 'Skipped' v['Started'] = True @@ -551,11 +527,11 @@ def run_badblocks_test(state): state.panes['Top'], text='{}\n{}'.format( TOP_PANE_TEXT, 'badblocks')) print_standard('TODO: run_badblocks_test()') - for dev in state.devs: - dev.tests['badblocks']['Started'] = True + for disk in state.disks: + disk.tests['badblocks']['Started'] = True update_progress_pane(state) sleep(3) - dev.tests['badblocks']['Result'] = 'OVERRIDE' + disk.tests['badblocks']['Result'] = 'OVERRIDE' update_progress_pane(state) def run_hw_tests(state): @@ -610,11 +586,11 @@ def run_io_benchmark(state): state.panes['Top'], text='{}\n{}'.format( TOP_PANE_TEXT, 'I/O Benchmark')) print_standard('TODO: run_io_benchmark()') - for dev in state.devs: - dev.tests['I/O Benchmark']['Started'] = True + for disk in state.disks: + disk.tests['I/O Benchmark']['Started'] = True update_progress_pane(state) sleep(3) - dev.tests['I/O Benchmark']['Result'] = 'Unknown' + disk.tests['I/O Benchmark']['Result'] = 'Unknown' update_progress_pane(state) def run_keyboard_test(): @@ -741,42 +717,42 @@ def run_network_test(): run_program(['hw-diags-network'], check=False, pipe=False) pause('Press Enter to return to main menu... ') -def run_nvme_smart(state): +def run_nvme_smart_tests(state): """TODO""" - for dev in state.devs: + for disk in state.disks: tmux_update_pane( state.panes['Top'], text='{t}\nDisk Health: {size:>6} ({tran}) {model} {serial}'.format( - t=TOP_PANE_TEXT, **dev.lsblk)) - dev.tests['NVMe / SMART']['Started'] = True + t=TOP_PANE_TEXT, **disk.lsblk)) + disk.tests['NVMe / SMART']['Started'] = True update_progress_pane(state) - if dev.nvme_attributes: - run_nvme_tests(state, dev) - elif dev.smart_attributes: - run_smart_tests(state, dev) + if disk.nvme_attributes: + run_nvme_tests(state, disk) + elif disk.smart_attributes: + run_smart_tests(state, disk) else: - print_standard('TODO: run_nvme_smart({})'.format( - dev.path)) + print_standard('TODO: run_nvme_smart_tests({})'.format( + disk.path)) print_warning( " WARNING: Device {} doesn't support NVMe or SMART test".format( - dev.path)) - dev.tests['NVMe / SMART']['Status'] = 'N/A' - dev.tests['NVMe / SMART']['Result'] = 'N/A' + disk.path)) + disk.tests['NVMe / SMART']['Status'] = 'N/A' + disk.tests['NVMe / SMART']['Result'] = 'N/A' update_progress_pane(state) sleep(3) -def run_nvme_tests(state, dev): +def run_nvme_tests(state, disk): """TODO""" - print_standard('TODO: run_nvme_test({})'.format(dev.path)) + print_standard('TODO: run_nvme_test({})'.format(disk.path)) sleep(3) - dev.tests['NVMe / SMART']['Result'] = 'CS' + disk.tests['NVMe / SMART']['Result'] = 'CS' update_progress_pane(state) -def run_smart_tests(state, dev): +def run_smart_tests(state, disk): """TODO""" - print_standard('TODO: run_smart_tests({})'.format(dev.path)) + print_standard('TODO: run_smart_tests({})'.format(disk.path)) sleep(3) - dev.tests['NVMe / SMART']['Result'] = 'CS' + disk.tests['NVMe / SMART']['Result'] = 'CS' update_progress_pane(state) def secret_screensaver(screensaver=None): @@ -872,8 +848,8 @@ def update_progress_pane(state): if 'Prime95' not in k and v['Enabled']: output.append('{BLUE}{test_name}{CLEAR}'.format( test_name=k, **COLORS)) - for dev in state.devs: - output.append(dev.tests[k]['Status']) + for disk in state.disks: + output.append(disk.tests[k]['Status']) output.append(' ') # Add line-endings