Added CpuObj and renamed dev names to disk
* This should make the code more clear * The CpuObj is similar to DiskObj to abstract the device/tests calls * New calls will be like: run_test(state, dev)
This commit is contained in:
parent
12ff99eb32
commit
6a3ef60881
1 changed files with 90 additions and 114 deletions
|
|
@ -67,32 +67,43 @@ SIDE_PANE_WIDTH = 20
|
||||||
TOP_PANE_TEXT = '{GREEN}Hardware Diagnostics{CLEAR}'.format(**COLORS)
|
TOP_PANE_TEXT = '{GREEN}Hardware Diagnostics{CLEAR}'.format(**COLORS)
|
||||||
|
|
||||||
# Classes
|
# Classes
|
||||||
class DevObj():
|
class CpuObj():
|
||||||
"""Device object for tracking device specific data."""
|
"""Object for tracking CPU specific data."""
|
||||||
def __init__(self, state, dev_path):
|
def __init__(self):
|
||||||
self.failing = False
|
self.lscpu = {}
|
||||||
|
self.tests = {}
|
||||||
|
self.get_details()
|
||||||
|
|
||||||
|
def get_details(self):
|
||||||
|
"""Get CPU details from lscpu."""
|
||||||
|
cmd = ['lscpu', '--json']
|
||||||
|
try:
|
||||||
|
result = run_program(cmd, check=False)
|
||||||
|
json_data = json.loads(result.stdout.decode())
|
||||||
|
except Exception:
|
||||||
|
# Ignore and leave self.lscpu empty
|
||||||
|
return
|
||||||
|
for line in json_data.get('lscpu', []):
|
||||||
|
_field = line.get('field', None).replace(':', '')
|
||||||
|
_data = line.get('data', None)
|
||||||
|
if not _field and not _data:
|
||||||
|
# Skip
|
||||||
|
print_warning(_field, _data)
|
||||||
|
pause()
|
||||||
|
continue
|
||||||
|
self.lscpu[_field] = _data
|
||||||
|
|
||||||
|
class DiskObj():
|
||||||
|
"""Object for tracking disk specific data."""
|
||||||
|
def __init__(self, disk_path):
|
||||||
self.labels = []
|
self.labels = []
|
||||||
self.lsblk = {}
|
self.lsblk = {}
|
||||||
self.name = re.sub(r'^.*/(.*)', r'\1', dev_path)
|
self.name = re.sub(r'^.*/(.*)', r'\1', disk_path)
|
||||||
self.nvme_attributes = {}
|
self.nvme_attributes = {}
|
||||||
self.override = False
|
self.path = disk_path
|
||||||
self.path = dev_path
|
|
||||||
self.smart_attributes = {}
|
self.smart_attributes = {}
|
||||||
self.smartctl = {}
|
self.smartctl = {}
|
||||||
self.state = state
|
self.tests = {}
|
||||||
self.tests = {
|
|
||||||
'NVMe / SMART': {
|
|
||||||
'Result': '', 'Started': False, 'Status': '', 'Order': 1},
|
|
||||||
'badblocks': {
|
|
||||||
'Result': '', 'Started': False, 'Status': '', 'Order': 2},
|
|
||||||
'I/O Benchmark': {
|
|
||||||
'Result': '',
|
|
||||||
'Started': False,
|
|
||||||
'Status': '',
|
|
||||||
'Read Rates': [],
|
|
||||||
'Graph Data': [],
|
|
||||||
'Order': 3},
|
|
||||||
}
|
|
||||||
self.get_details()
|
self.get_details()
|
||||||
self.get_smart_details()
|
self.get_smart_details()
|
||||||
|
|
||||||
|
|
@ -122,9 +133,9 @@ class DevObj():
|
||||||
self.lsblk['tran'] = self.lsblk['tran'].upper().replace('NVME', 'NVMe')
|
self.lsblk['tran'] = self.lsblk['tran'].upper().replace('NVME', 'NVMe')
|
||||||
|
|
||||||
# Build list of labels
|
# Build list of labels
|
||||||
for dev in [self.lsblk, *self.lsblk.get('children', [])]:
|
for disk in [self.lsblk, *self.lsblk.get('children', [])]:
|
||||||
self.labels.append(dev.get('label', ''))
|
self.labels.append(disk.get('label', ''))
|
||||||
self.labels.append(dev.get('partlabel', ''))
|
self.labels.append(disk.get('partlabel', ''))
|
||||||
self.labels = [str(label) for label in self.labels if label]
|
self.labels = [str(label) for label in self.labels if label]
|
||||||
|
|
||||||
def get_smart_details(self):
|
def get_smart_details(self):
|
||||||
|
|
@ -158,31 +169,15 @@ class DevObj():
|
||||||
self.smart_attributes[_id] = {
|
self.smart_attributes[_id] = {
|
||||||
'name': _name, 'raw': _raw, 'raw_str': _raw_str}
|
'name': _name, 'raw': _raw, 'raw_str': _raw_str}
|
||||||
|
|
||||||
def update_progress(self):
|
|
||||||
"""Update status strings."""
|
|
||||||
for k, v in self.tests.items():
|
|
||||||
if self.state.tests[k]['Enabled']:
|
|
||||||
_status = ''
|
|
||||||
if not v['Status']:
|
|
||||||
_status = 'Pending'
|
|
||||||
if v['Started']:
|
|
||||||
if v['Result']:
|
|
||||||
_status = v['Result']
|
|
||||||
else:
|
|
||||||
_status = 'Working'
|
|
||||||
if _status:
|
|
||||||
v['Status'] = build_status_string(self.name, _status)
|
|
||||||
|
|
||||||
class State():
|
class State():
|
||||||
"""Object to track device objects and overall state."""
|
"""Object to track device objects and overall state."""
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.lscpu = {}
|
self.cpu = None
|
||||||
self.devs = []
|
self.disks = []
|
||||||
self.finished = False
|
|
||||||
self.panes = {}
|
self.panes = {}
|
||||||
self.progress_out = '{}/progress.out'.format(global_vars['LogDir'])
|
self.progress_out = '{}/progress.out'.format(global_vars['LogDir'])
|
||||||
self.quick_mode = False
|
self.quick_mode = False
|
||||||
self.started = False
|
|
||||||
self.tests = {
|
self.tests = {
|
||||||
'Prime95 & Temps': {'Enabled': False, 'Order': 1,
|
'Prime95 & Temps': {'Enabled': False, 'Order': 1,
|
||||||
'Result': '', 'Sensor Data': get_sensor_data(),
|
'Result': '', 'Sensor Data': get_sensor_data(),
|
||||||
|
|
@ -191,32 +186,10 @@ class State():
|
||||||
'badblocks': {'Enabled': False, 'Order': 3},
|
'badblocks': {'Enabled': False, 'Order': 3},
|
||||||
'I/O Benchmark': {'Enabled': False, 'Order': 4},
|
'I/O Benchmark': {'Enabled': False, 'Order': 4},
|
||||||
}
|
}
|
||||||
self.get_cpu_details()
|
|
||||||
|
|
||||||
def get_cpu_details(self):
|
|
||||||
"""Get CPU details from lscpu."""
|
|
||||||
cmd = ['lscpu', '--json']
|
|
||||||
try:
|
|
||||||
result = run_program(cmd, check=False)
|
|
||||||
json_data = json.loads(result.stdout.decode())
|
|
||||||
except Exception as err:
|
|
||||||
# Ignore and leave self.cpu empty
|
|
||||||
print_error(err)
|
|
||||||
pause()
|
|
||||||
return
|
|
||||||
for line in json_data.get('lscpu', []):
|
|
||||||
_field = line.get('field', None).replace(':', '')
|
|
||||||
_data = line.get('data', None)
|
|
||||||
if not _field and not _data:
|
|
||||||
# Skip
|
|
||||||
print_warning(_field, _data)
|
|
||||||
pause()
|
|
||||||
continue
|
|
||||||
self.lscpu[_field] = _data
|
|
||||||
|
|
||||||
def init(self):
|
def init(self):
|
||||||
"""Scan for block devices and reset all tests."""
|
"""Set log and add devices."""
|
||||||
self.devs = []
|
self.disks = []
|
||||||
for k in ['Result', 'Started', 'Status']:
|
for k in ['Result', 'Started', 'Status']:
|
||||||
self.tests['Prime95 & Temps'][k] = False if k == 'Started' else ''
|
self.tests['Prime95 & Temps'][k] = False if k == 'Started' else ''
|
||||||
|
|
||||||
|
|
@ -230,27 +203,30 @@ class State():
|
||||||
global_vars['LogFile'] = '{}/Hardware Diagnostics.log'.format(
|
global_vars['LogFile'] = '{}/Hardware Diagnostics.log'.format(
|
||||||
global_vars['LogDir'])
|
global_vars['LogDir'])
|
||||||
|
|
||||||
|
# Add CPU
|
||||||
|
self.cpu = CpuObj()
|
||||||
|
|
||||||
# Add block devices
|
# Add block devices
|
||||||
cmd = ['lsblk', '--json', '--nodeps', '--paths']
|
cmd = ['lsblk', '--json', '--nodeps', '--paths']
|
||||||
result = run_program(cmd, check=False)
|
result = run_program(cmd, check=False)
|
||||||
json_data = json.loads(result.stdout.decode())
|
json_data = json.loads(result.stdout.decode())
|
||||||
for dev in json_data['blockdevices']:
|
for disk in json_data['blockdevices']:
|
||||||
skip_dev = False
|
skip_disk = False
|
||||||
dev_obj = DevObj(self, dev['name'])
|
disk_obj = DiskObj(disk['name'])
|
||||||
|
|
||||||
# Skip loopback and optical devices
|
# Skip loopback devices, optical devices, etc
|
||||||
if dev_obj.lsblk['type'] in ['loop', 'rom']:
|
if disk_obj.lsblk['type'] != 'disk':
|
||||||
skip_dev = True
|
skip_disk = True
|
||||||
|
|
||||||
# Skip WK devices
|
# Skip WK disks
|
||||||
wk_label_regex = r'{}_(LINUX|UFD)'.format(KIT_NAME_SHORT)
|
wk_label_regex = r'{}_(LINUX|UFD)'.format(KIT_NAME_SHORT)
|
||||||
for label in dev_obj.labels:
|
for label in disk_obj.labels:
|
||||||
if re.search(wk_label_regex, label, re.IGNORECASE):
|
if re.search(wk_label_regex, label, re.IGNORECASE):
|
||||||
skip_dev = True
|
skip_disk = True
|
||||||
|
|
||||||
# Add device
|
# Add disk
|
||||||
if not skip_dev:
|
if not skip_disk:
|
||||||
self.devs.append(dev_obj)
|
self.disks.append(disk_obj)
|
||||||
|
|
||||||
def update_progress(self):
|
def update_progress(self):
|
||||||
"""Update status strings."""
|
"""Update status strings."""
|
||||||
|
|
@ -313,32 +289,32 @@ def build_status_string(label, status, info_label=False):
|
||||||
s_w=SIDE_PANE_WIDTH-len(label),
|
s_w=SIDE_PANE_WIDTH-len(label),
|
||||||
**COLORS)
|
**COLORS)
|
||||||
|
|
||||||
def check_dev_attributes(dev):
|
def check_disk_attributes(disk):
|
||||||
"""Check if device should be tested and allow overrides."""
|
"""Check if disk should be tested and allow overrides."""
|
||||||
needs_override = False
|
needs_override = False
|
||||||
print_standard(' {size:>6} ({tran}) {model} {serial}'.format(
|
print_standard(' {size:>6} ({tran}) {model} {serial}'.format(
|
||||||
**dev.lsblk))
|
**disk.lsblk))
|
||||||
|
|
||||||
# General checks
|
# General checks
|
||||||
if not dev.nvme_attributes and not dev.smart_attributes:
|
if not disk.nvme_attributes and not disk.smart_attributes:
|
||||||
needs_override = True
|
needs_override = True
|
||||||
print_warning(
|
print_warning(
|
||||||
' WARNING: No NVMe or SMART attributes available for: {}'.format(
|
' WARNING: No NVMe or SMART attributes available for: {}'.format(
|
||||||
dev.path))
|
disk.path))
|
||||||
|
|
||||||
# NVMe checks
|
# NVMe checks
|
||||||
# TODO check all tracked attributes and set dev.failing if needed
|
# TODO check all tracked attributes and set disk.failing if needed
|
||||||
|
|
||||||
# SMART checks
|
# SMART checks
|
||||||
# TODO check all tracked attributes and set dev.failing if needed
|
# TODO check all tracked attributes and set disk.failing if needed
|
||||||
|
|
||||||
# Ask for override if necessary
|
# Ask for override if necessary
|
||||||
if needs_override:
|
if needs_override:
|
||||||
if ask(' Run tests on this device anyway?'):
|
if ask(' Run tests on this device anyway?'):
|
||||||
# TODO Set override for this dev
|
# TODO Set override for this disk
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
for v in dev.tests.values():
|
for v in disk.tests.values():
|
||||||
# Started is set to True to fix the status string
|
# Started is set to True to fix the status string
|
||||||
v['Result'] = 'Skipped'
|
v['Result'] = 'Skipped'
|
||||||
v['Started'] = True
|
v['Started'] = True
|
||||||
|
|
@ -551,11 +527,11 @@ def run_badblocks_test(state):
|
||||||
state.panes['Top'], text='{}\n{}'.format(
|
state.panes['Top'], text='{}\n{}'.format(
|
||||||
TOP_PANE_TEXT, 'badblocks'))
|
TOP_PANE_TEXT, 'badblocks'))
|
||||||
print_standard('TODO: run_badblocks_test()')
|
print_standard('TODO: run_badblocks_test()')
|
||||||
for dev in state.devs:
|
for disk in state.disks:
|
||||||
dev.tests['badblocks']['Started'] = True
|
disk.tests['badblocks']['Started'] = True
|
||||||
update_progress_pane(state)
|
update_progress_pane(state)
|
||||||
sleep(3)
|
sleep(3)
|
||||||
dev.tests['badblocks']['Result'] = 'OVERRIDE'
|
disk.tests['badblocks']['Result'] = 'OVERRIDE'
|
||||||
update_progress_pane(state)
|
update_progress_pane(state)
|
||||||
|
|
||||||
def run_hw_tests(state):
|
def run_hw_tests(state):
|
||||||
|
|
@ -610,11 +586,11 @@ def run_io_benchmark(state):
|
||||||
state.panes['Top'], text='{}\n{}'.format(
|
state.panes['Top'], text='{}\n{}'.format(
|
||||||
TOP_PANE_TEXT, 'I/O Benchmark'))
|
TOP_PANE_TEXT, 'I/O Benchmark'))
|
||||||
print_standard('TODO: run_io_benchmark()')
|
print_standard('TODO: run_io_benchmark()')
|
||||||
for dev in state.devs:
|
for disk in state.disks:
|
||||||
dev.tests['I/O Benchmark']['Started'] = True
|
disk.tests['I/O Benchmark']['Started'] = True
|
||||||
update_progress_pane(state)
|
update_progress_pane(state)
|
||||||
sleep(3)
|
sleep(3)
|
||||||
dev.tests['I/O Benchmark']['Result'] = 'Unknown'
|
disk.tests['I/O Benchmark']['Result'] = 'Unknown'
|
||||||
update_progress_pane(state)
|
update_progress_pane(state)
|
||||||
|
|
||||||
def run_keyboard_test():
|
def run_keyboard_test():
|
||||||
|
|
@ -741,42 +717,42 @@ def run_network_test():
|
||||||
run_program(['hw-diags-network'], check=False, pipe=False)
|
run_program(['hw-diags-network'], check=False, pipe=False)
|
||||||
pause('Press Enter to return to main menu... ')
|
pause('Press Enter to return to main menu... ')
|
||||||
|
|
||||||
def run_nvme_smart(state):
|
def run_nvme_smart_tests(state):
|
||||||
"""TODO"""
|
"""TODO"""
|
||||||
for dev in state.devs:
|
for disk in state.disks:
|
||||||
tmux_update_pane(
|
tmux_update_pane(
|
||||||
state.panes['Top'],
|
state.panes['Top'],
|
||||||
text='{t}\nDisk Health: {size:>6} ({tran}) {model} {serial}'.format(
|
text='{t}\nDisk Health: {size:>6} ({tran}) {model} {serial}'.format(
|
||||||
t=TOP_PANE_TEXT, **dev.lsblk))
|
t=TOP_PANE_TEXT, **disk.lsblk))
|
||||||
dev.tests['NVMe / SMART']['Started'] = True
|
disk.tests['NVMe / SMART']['Started'] = True
|
||||||
update_progress_pane(state)
|
update_progress_pane(state)
|
||||||
if dev.nvme_attributes:
|
if disk.nvme_attributes:
|
||||||
run_nvme_tests(state, dev)
|
run_nvme_tests(state, disk)
|
||||||
elif dev.smart_attributes:
|
elif disk.smart_attributes:
|
||||||
run_smart_tests(state, dev)
|
run_smart_tests(state, disk)
|
||||||
else:
|
else:
|
||||||
print_standard('TODO: run_nvme_smart({})'.format(
|
print_standard('TODO: run_nvme_smart_tests({})'.format(
|
||||||
dev.path))
|
disk.path))
|
||||||
print_warning(
|
print_warning(
|
||||||
" WARNING: Device {} doesn't support NVMe or SMART test".format(
|
" WARNING: Device {} doesn't support NVMe or SMART test".format(
|
||||||
dev.path))
|
disk.path))
|
||||||
dev.tests['NVMe / SMART']['Status'] = 'N/A'
|
disk.tests['NVMe / SMART']['Status'] = 'N/A'
|
||||||
dev.tests['NVMe / SMART']['Result'] = 'N/A'
|
disk.tests['NVMe / SMART']['Result'] = 'N/A'
|
||||||
update_progress_pane(state)
|
update_progress_pane(state)
|
||||||
sleep(3)
|
sleep(3)
|
||||||
|
|
||||||
def run_nvme_tests(state, dev):
|
def run_nvme_tests(state, disk):
|
||||||
"""TODO"""
|
"""TODO"""
|
||||||
print_standard('TODO: run_nvme_test({})'.format(dev.path))
|
print_standard('TODO: run_nvme_test({})'.format(disk.path))
|
||||||
sleep(3)
|
sleep(3)
|
||||||
dev.tests['NVMe / SMART']['Result'] = 'CS'
|
disk.tests['NVMe / SMART']['Result'] = 'CS'
|
||||||
update_progress_pane(state)
|
update_progress_pane(state)
|
||||||
|
|
||||||
def run_smart_tests(state, dev):
|
def run_smart_tests(state, disk):
|
||||||
"""TODO"""
|
"""TODO"""
|
||||||
print_standard('TODO: run_smart_tests({})'.format(dev.path))
|
print_standard('TODO: run_smart_tests({})'.format(disk.path))
|
||||||
sleep(3)
|
sleep(3)
|
||||||
dev.tests['NVMe / SMART']['Result'] = 'CS'
|
disk.tests['NVMe / SMART']['Result'] = 'CS'
|
||||||
update_progress_pane(state)
|
update_progress_pane(state)
|
||||||
|
|
||||||
def secret_screensaver(screensaver=None):
|
def secret_screensaver(screensaver=None):
|
||||||
|
|
@ -872,8 +848,8 @@ def update_progress_pane(state):
|
||||||
if 'Prime95' not in k and v['Enabled']:
|
if 'Prime95' not in k and v['Enabled']:
|
||||||
output.append('{BLUE}{test_name}{CLEAR}'.format(
|
output.append('{BLUE}{test_name}{CLEAR}'.format(
|
||||||
test_name=k, **COLORS))
|
test_name=k, **COLORS))
|
||||||
for dev in state.devs:
|
for disk in state.disks:
|
||||||
output.append(dev.tests[k]['Status'])
|
output.append(disk.tests[k]['Status'])
|
||||||
output.append(' ')
|
output.append(' ')
|
||||||
|
|
||||||
# Add line-endings
|
# Add line-endings
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue