diff --git a/.bin/Scripts/functions/hw_diags.py b/.bin/Scripts/functions/hw_diags.py index 169eb337..830d948a 100644 --- a/.bin/Scripts/functions/hw_diags.py +++ b/.bin/Scripts/functions/hw_diags.py @@ -1,1114 +1,246 @@ # Wizard Kit: Functions - HW Diagnostics import json +import re import time from functions.common import * # STATIC VARIABLES ATTRIBUTES = { - 'NVMe': { - 'critical_warning': {'Error': 1}, - 'media_errors': {'Error': 1}, - 'power_on_hours': {'Warning': 12000, 'Error': 18000, 'Ignore': True}, - 'unsafe_shutdowns': {'Warning': 1}, - }, - 'SMART': { - 5: {'Hex': '05', 'Error': 1}, - 9: {'Hex': '09', 'Warning': 12000, 'Error': 18000, 'Ignore': True}, - 10: {'Hex': '0A', 'Error': 1}, - 184: {'Hex': 'B8', 'Error': 1}, - 187: {'Hex': 'BB', 'Error': 1}, - 188: {'Hex': 'BC', 'Error': 1}, - 196: {'Hex': 'C4', 'Error': 1}, - 197: {'Hex': 'C5', 'Error': 1}, - 198: {'Hex': 'C6', 'Error': 1}, - 199: {'Hex': 'C7', 'Error': 1, 'Ignore': True}, - 201: {'Hex': 'C9', 'Error': 1}, - }, - } + 'NVMe': { + 'critical_warning': {'Error': 1}, + 'media_errors': {'Error': 1}, + 'power_on_hours': {'Warning': 12000, 'Error': 26298, 'Ignore': True}, + 'unsafe_shutdowns': {'Warning': 1}, + }, + 'SMART': { + 5: {'Hex': '05', 'Error': 1}, + 9: {'Hex': '09', 'Warning': 12000, 'Error': 26298, 'Ignore': True}, + 10: {'Hex': '0A', 'Error': 1}, + 184: {'Hex': 'B8', 'Error': 1}, + 187: {'Hex': 'BB', 'Error': 1}, + 188: {'Hex': 'BC', 'Error': 1}, + 196: {'Hex': 'C4', 'Error': 1}, + 197: {'Hex': 'C5', 'Error': 1}, + 198: {'Hex': 'C6', 'Error': 1}, + 199: {'Hex': 'C7', 'Error': 1, 'Ignore': True}, + 201: {'Hex': 'C9', 'Error': 1}, + }, + } IO_VARS = { - 'Block Size': 512*1024, - 'Chunk Size': 32*1024**2, - 'Minimum Dev Size': 8*1024**3, - 'Minimum Test Size': 10*1024**3, - 'Alt Test Size Factor': 0.01, - 'Progress Refresh Rate': 5, - 'Scale 8': [2**(0.56*(x+1))+(16*(x+1)) for x in range(8)], - 'Scale 16': [2**(0.56*(x+1))+(16*(x+1)) for x in range(16)], - 'Scale 32': [2**(0.56*(x+1)/2)+(16*(x+1)/2) for x in range(32)], - 'Threshold Graph Fail': 65*1024**2, - 'Threshold Graph Warn': 135*1024**2, - 'Threshold Graph Great': 750*1024**2, - 'Threshold HDD Min': 50*1024**2, - 'Threshold HDD High Avg': 75*1024**2, - 'Threshold HDD Low Avg': 65*1024**2, - 'Threshold SSD Min': 90*1024**2, - 'Threshold SSD High Avg': 135*1024**2, - 'Threshold SSD Low Avg': 100*1024**2, - 'Graph Horizontal': ('▁', '▂', '▃', '▄', '▅', '▆', '▇', '█'), - 'Graph Horizontal Width': 40, - 'Graph Vertical': ( - '▏', '▎', '▍', '▌', - '▋', '▊', '▉', '█', - '█▏', '█▎', '█▍', '█▌', - '█▋', '█▊', '█▉', '██', - '██▏', '██▎', '██▍', '██▌', - '██▋', '██▊', '██▉', '███', - '███▏', '███▎', '███▍', '███▌', - '███▋', '███▊', '███▉', '████'), - } -TESTS = { - 'Prime95': { - 'Enabled': False, - 'Status': 'Pending', - }, - 'NVMe/SMART': { - 'Enabled': False, - 'Quick': False, - 'Short Test': {}, - 'Status': {}, - }, - 'badblocks': { - 'Enabled': False, - 'Results': {}, - 'Status': {}, - }, - 'iobenchmark': { - 'Data': {}, - 'Enabled': False, - 'Results': {}, - 'Status': {}, - }, - } + 'Block Size': 512*1024, + 'Chunk Size': 32*1024**2, + 'Minimum Dev Size': 8*1024**3, + 'Minimum Test Size': 10*1024**3, + 'Alt Test Size Factor': 0.01, + 'Progress Refresh Rate': 5, + 'Scale 8': [2**(0.56*(x+1))+(16*(x+1)) for x in range(8)], + 'Scale 16': [2**(0.56*(x+1))+(16*(x+1)) for x in range(16)], + 'Scale 32': [2**(0.56*(x+1)/2)+(16*(x+1)/2) for x in range(32)], + 'Threshold Graph Fail': 65*1024**2, + 'Threshold Graph Warn': 135*1024**2, + 'Threshold Graph Great': 750*1024**2, + 'Threshold HDD Min': 50*1024**2, + 'Threshold HDD High Avg': 75*1024**2, + 'Threshold HDD Low Avg': 65*1024**2, + 'Threshold SSD Min': 90*1024**2, + 'Threshold SSD High Avg': 135*1024**2, + 'Threshold SSD Low Avg': 100*1024**2, + 'Graph Horizontal': ('▁', '▂', '▃', '▄', '▅', '▆', '▇', '█'), + 'Graph Horizontal Width': 40, + 'Graph Vertical': ( + '▏', '▎', '▍', '▌', + '▋', '▊', '▉', '█', + '█▏', '█▎', '█▍', '█▌', + '█▋', '█▊', '█▉', '██', + '██▏', '██▎', '██▍', '██▌', + '██▋', '██▊', '██▉', '███', + '███▏', '███▎', '███▍', '███▌', + '███▋', '███▊', '███▉', '████'), + } +KEY_NVME = 'nvme_smart_health_information_log' +KEY_SMART = 'ata_smart_attributes' +SIDE_PATH_WIDTH = 21 +# Classes +class DevObj(): + """Device object for tracking device specific data.""" + def __init__(self, dev_path): + self.failing = False + self.nvme_attributes = {} + self.override = False + self.path = dev_path + self.smart_attributes = {} + self.tests = { + 'NVMe / SMART': {'Result': None, 'Status': None}, + 'badblocks': {'Result': None, 'Status': None}, + 'I/O Benchmark': { + 'Result': None, + 'Status': None, + 'Read Rates': [], + 'Graph Data': []}, + } + self.get_details() + + def get_details(self): + """Get data from smartctl.""" + cmd = ['sudo', 'smartctl', '--all', '--json', self.path] + result = run_program(cmd, check=False) + self.data = json.loads(result.stdout.decode()) + + # Check for attributes + if KEY_NVME in self.data: + self.nvme_attributes.update(self.data[KEY_NVME]) + elif KEY_SMART in self.data: + for a in self.data[KEY_SMART].get('table', {}): + _id = str(a.get('id', 'UNKNOWN')) + _name = str(a.get('name', 'UNKNOWN')) + _raw = a.get('raw', {}).get('value', -1) + _raw_str = a.get('raw', {}).get('string', 'UNKNOWN') + + # Fix power-on time + _r = re.match(r'^(\d+)[Hh].*', _raw_str) + if _id == '9' and _r: + try: + _raw = int(_r.group(1)) + except ValueError: + # That's fine + pass + self.smart_attributes[_id] = { + 'name': _name, 'raw': _raw, 'raw_str': _raw_str} + +class State(): + """Object to track device objects and overall state.""" + def __init__(self): + self.devs = [] + self.finished = False + self.progress_out = '{}/progress.out'.format(global_vars['LogDir']) + self.started = False + self.tests = { + 'Prime95': {'Enabled': False, 'Result': None, 'Status': None}, + 'NVMe / SMART': {'Enabled': False}, + 'badblocks': {'Enabled': False}, + 'I/O Benchmark': {'Enabled': False}, + } + self.add_devs() + + def add_devs(self): + """Add all block devices listed by lsblk.""" + cmd = ['lsblk', '--json', '--nodeps', '--paths'] + result = run_program(cmd, check=False) + json_data = json.loads(result.stdout.decode()) + for dev in json_data['blockdevices']: + self.devs.append(DevObj(dev['name'])) + +# Functions def generate_horizontal_graph(rates, oneline=False): - """Generate two-line horizontal graph from rates, returns str.""" - line_1 = '' - line_2 = '' - line_3 = '' - line_4 = '' - for r in rates: - step = get_graph_step(r, scale=32) - if oneline: - step = get_graph_step(r, scale=8) - - # Set color - r_color = COLORS['CLEAR'] - if r < IO_VARS['Threshold Graph Fail']: - r_color = COLORS['RED'] - elif r < IO_VARS['Threshold Graph Warn']: - r_color = COLORS['YELLOW'] - elif r > IO_VARS['Threshold Graph Great']: - r_color = COLORS['GREEN'] - - # Build graph - full_block = '{}{}'.format(r_color, IO_VARS['Graph Horizontal'][-1]) - if step >= 24: - line_1 += '{}{}'.format(r_color, IO_VARS['Graph Horizontal'][step-24]) - line_2 += full_block - line_3 += full_block - line_4 += full_block - elif step >= 16: - line_1 += ' ' - line_2 += '{}{}'.format(r_color, IO_VARS['Graph Horizontal'][step-16]) - line_3 += full_block - line_4 += full_block - elif step >= 8: - line_1 += ' ' - line_2 += ' ' - line_3 += '{}{}'.format(r_color, IO_VARS['Graph Horizontal'][step-8]) - line_4 += full_block - else: - line_1 += ' ' - line_2 += ' ' - line_3 += ' ' - line_4 += '{}{}'.format(r_color, IO_VARS['Graph Horizontal'][step]) - line_1 += COLORS['CLEAR'] - line_2 += COLORS['CLEAR'] - line_3 += COLORS['CLEAR'] - line_4 += COLORS['CLEAR'] + """Generate two-line horizontal graph from rates, returns str.""" + line_1 = '' + line_2 = '' + line_3 = '' + line_4 = '' + for r in rates: + step = get_graph_step(r, scale=32) if oneline: - return line_4 + step = get_graph_step(r, scale=8) + + # Set color + r_color = COLORS['CLEAR'] + if r < IO_VARS['Threshold Graph Fail']: + r_color = COLORS['RED'] + elif r < IO_VARS['Threshold Graph Warn']: + r_color = COLORS['YELLOW'] + elif r > IO_VARS['Threshold Graph Great']: + r_color = COLORS['GREEN'] + + # Build graph + full_block = '{}{}'.format(r_color, IO_VARS['Graph Horizontal'][-1]) + if step >= 24: + line_1 += '{}{}'.format(r_color, IO_VARS['Graph Horizontal'][step-24]) + line_2 += full_block + line_3 += full_block + line_4 += full_block + elif step >= 16: + line_1 += ' ' + line_2 += '{}{}'.format(r_color, IO_VARS['Graph Horizontal'][step-16]) + line_3 += full_block + line_4 += full_block + elif step >= 8: + line_1 += ' ' + line_2 += ' ' + line_3 += '{}{}'.format(r_color, IO_VARS['Graph Horizontal'][step-8]) + line_4 += full_block else: - return '\n'.join([line_1, line_2, line_3, line_4]) + line_1 += ' ' + line_2 += ' ' + line_3 += ' ' + line_4 += '{}{}'.format(r_color, IO_VARS['Graph Horizontal'][step]) + line_1 += COLORS['CLEAR'] + line_2 += COLORS['CLEAR'] + line_3 += COLORS['CLEAR'] + line_4 += COLORS['CLEAR'] + if oneline: + return line_4 + else: + return '\n'.join([line_1, line_2, line_3, line_4]) def get_graph_step(rate, scale=16): - """Get graph step based on rate and scale, returns int.""" - m_rate = rate / (1024**2) - step = 0 - scale_name = 'Scale {}'.format(scale) - for x in range(scale-1, -1, -1): - # Iterate over scale backwards - if m_rate >= IO_VARS[scale_name][x]: - step = x - break - return step + """Get graph step based on rate and scale, returns int.""" + m_rate = rate / (1024**2) + step = 0 + scale_name = 'Scale {}'.format(scale) + for x in range(scale-1, -1, -1): + # Iterate over scale backwards + if m_rate >= IO_VARS[scale_name][x]: + step = x + break + return step def get_read_rate(s): - """Get read rate in bytes/s from dd progress output.""" - real_rate = None - if re.search(r'[KMGT]B/s', s): - human_rate = re.sub(r'^.*\s+(\d+\.?\d*)\s+(.B)/s\s*$', r'\1 \2', s) - real_rate = convert_to_bytes(human_rate) - return real_rate - -def get_smart_details(dev): - """Get SMART data for dev if possible, returns dict.""" - cmd = 'sudo smartctl --all --json {}{}'.format( - '' if '/dev/' in dev else '/dev/', - dev).split() - result = run_program(cmd, check=False) - try: - return json.loads(result.stdout.decode()) - except Exception: - # Let other sections deal with the missing data - return {} - -def get_smart_value(smart_data, smart_id): - """Get SMART value from table, returns int or None.""" - value = None - table = smart_data.get('ata_smart_attributes', {}).get('table', []) - for row in table: - if str(row.get('id', '?')) == str(smart_id): - value = row.get('raw', {}).get('value', None) - return value + """Get read rate in bytes/s from dd progress output.""" + real_rate = None + if re.search(r'[KMGT]B/s', s): + human_rate = re.sub(r'^.*\s+(\d+\.?\d*)\s+(.B)/s\s*$', r'\1 \2', s) + real_rate = convert_to_bytes(human_rate) + return real_rate def get_status_color(s): - """Get color based on status, returns str.""" - color = COLORS['CLEAR'] - if s in ['Denied', 'ERROR', 'NS', 'OVERRIDE']: - color = COLORS['RED'] - elif s in ['Aborted', 'Unknown', 'Working', 'Skipped']: - color = COLORS['YELLOW'] - elif s in ['CS']: - color = COLORS['GREEN'] - return color - -def menu_diags(*args): - """Main HW-Diagnostic menu.""" - diag_modes = [ - {'Name': 'All tests', - 'Tests': ['Prime95', 'NVMe/SMART', 'badblocks', 'iobenchmark']}, - {'Name': 'Prime95', - 'Tests': ['Prime95']}, - {'Name': 'All drive tests', - 'Tests': ['NVMe/SMART', 'badblocks', 'iobenchmark']}, - {'Name': 'NVMe/SMART', - 'Tests': ['NVMe/SMART']}, - {'Name': 'badblocks', - 'Tests': ['badblocks']}, - {'Name': 'I/O Benchmark', - 'Tests': ['iobenchmark']}, - {'Name': 'Quick drive test', - 'Tests': ['Quick', 'NVMe/SMART']}, - ] - actions = [ - {'Letter': 'A', 'Name': 'Audio test'}, - {'Letter': 'K', 'Name': 'Keyboard test'}, - {'Letter': 'N', 'Name': 'Network test'}, - {'Letter': 'M', 'Name': 'Screen Saver - Matrix', 'CRLF': True}, - {'Letter': 'P', 'Name': 'Screen Saver - Pipes'}, - {'Letter': 'Q', 'Name': 'Quit', 'CRLF': True}, - ] - - # CLI-mode actions - if 'DISPLAY' not in global_vars['Env']: - actions.extend([ - {'Letter': 'R', 'Name': 'Reboot', 'CRLF': True}, - {'Letter': 'S', 'Name': 'Shutdown'}, - ]) - - # Quick disk check - if 'quick' in args: - run_tests(['Quick', 'NVMe/SMART']) - exit_script() - - # Show menu - while True: - selection = menu_select( - title = 'Hardware Diagnostics: Menu', - main_entries = diag_modes, - action_entries = actions, - spacer = '──────────────────────────') - if selection.isnumeric(): - ticket_number = None - if diag_modes[int(selection)-1]['Name'] != 'Quick drive test': - clear_screen() - print_standard(' ') - ticket_number = get_ticket_number() - # Save log for non-quick tests - global_vars['Date-Time'] = time.strftime("%Y-%m-%d_%H%M_%z") - global_vars['LogDir'] = '{}/Logs/{}_{}'.format( - global_vars['Env']['HOME'], - ticket_number, - global_vars['Date-Time']) - os.makedirs(global_vars['LogDir'], exist_ok=True) - global_vars['LogFile'] = '{}/Hardware Diagnostics.log'.format( - global_vars['LogDir']) - run_tests(diag_modes[int(selection)-1]['Tests'], ticket_number) - elif selection == 'A': - run_program(['hw-diags-audio'], check=False, pipe=False) - pause('Press Enter to return to main menu... ') - elif selection == 'K': - run_program(['xev', '-event', 'keyboard'], check=False, pipe=False) - elif selection == 'N': - run_program(['hw-diags-network'], check=False, pipe=False) - pause('Press Enter to return to main menu... ') - elif selection == 'M': - run_program(['cmatrix', '-abs'], check=False, pipe=False) - elif selection == 'P': - run_program( - 'pipes -t 0 -t 1 -t 2 -t 3 -p 5 -R -r 4000'.split(), - check=False, pipe=False) - elif selection == 'R': - run_program(['systemctl', 'reboot']) - elif selection == 'S': - run_program(['systemctl', 'poweroff']) - elif selection == 'Q': - break - -def run_badblocks(ticket_number): - """Run a read-only test for all detected disks.""" - aborted = False - clear_screen() - print_log('\nStart badblocks test(s)\n') - progress_file = '{}/badblocks_progress.out'.format(global_vars['LogDir']) - update_progress() - - # Set Window layout and start test - run_program('tmux split-window -dhl 15 watch -c -n1 -t cat {}'.format( - TESTS['Progress Out']).split()) - - # Show disk details - for name, dev in sorted(TESTS['badblocks']['Devices'].items()): - show_disk_details(dev) - print_standard(' ') - update_progress() - - # Run - print_standard('Running badblock test(s):') - for name, dev in sorted(TESTS['badblocks']['Devices'].items()): - cur_status = TESTS['badblocks']['Status'][name] - nvme_smart_status = TESTS['NVMe/SMART']['Status'].get(name, None) - if cur_status == 'Denied': - # Skip denied disks - continue - if nvme_smart_status == 'NS': - TESTS['badblocks']['Status'][name] = 'Skipped' - else: - # Not testing SMART, SMART CS, or SMART OVERRIDE - TESTS['badblocks']['Status'][name] = 'Working' - update_progress() - print_standard(' /dev/{:11} '.format(name+'...'), end='', flush=True) - run_program('tmux split-window -dl 5 {} {} {}'.format( - 'hw-diags-badblocks', - '/dev/{}'.format(name), - progress_file).split()) - wait_for_process('badblocks') - print_standard('Done', timestamp=False) - - # Check results - if os.path.exists(progress_file): - with open(progress_file, 'r') as f: - text = f.read() - TESTS['badblocks']['Results'][name] = text - r = re.search(r'Pass completed.*0/0/0 errors', text) - if r: - TESTS['badblocks']['Status'][name] = 'CS' - else: - TESTS['badblocks']['Status'][name] = 'NS' - - # Move temp file - shutil.move(progress_file, '{}/badblocks-{}.log'.format( - global_vars['LogDir'], name)) - else: - TESTS['badblocks']['Status'][name] = 'NS' - update_progress() - - # Done - run_program('tmux kill-pane -a'.split(), check=False) - pass - -def run_iobenchmark(ticket_number): - """Run a read-only test for all detected disks.""" - aborted = False - clear_screen() - print_log('\nStart I/O Benchmark test(s)\n') - progress_file = '{}/iobenchmark_progress.out'.format(global_vars['LogDir']) - update_progress() - - # Set Window layout and start test - run_program('tmux split-window -dhl 15 watch -c -n1 -t cat {}'.format( - TESTS['Progress Out']).split()) - - # Show disk details - for name, dev in sorted(TESTS['iobenchmark']['Devices'].items()): - show_disk_details(dev) - print_standard(' ') - update_progress() - - # Run - print_standard('Running benchmark test(s):') - for name, dev in sorted(TESTS['iobenchmark']['Devices'].items()): - cur_status = TESTS['iobenchmark']['Status'][name] - nvme_smart_status = TESTS['NVMe/SMART']['Status'].get(name, None) - bb_status = TESTS['badblocks']['Status'].get(name, None) - if cur_status == 'Denied': - # Skip denied disks - continue - if nvme_smart_status == 'NS': - TESTS['iobenchmark']['Status'][name] = 'Skipped' - elif bb_status in ['NS', 'Skipped']: - TESTS['iobenchmark']['Status'][name] = 'Skipped' - else: - # (SMART tests not run or CS/OVERRIDE) - # AND (BADBLOCKS tests not run or CS) - TESTS['iobenchmark']['Status'][name] = 'Working' - update_progress() - print_standard(' /dev/{:11} '.format(name+'...'), end='', flush=True) - - # Get dev size - cmd = 'sudo lsblk -bdno size /dev/{}'.format(name) - try: - result = run_program(cmd.split()) - dev_size = result.stdout.decode().strip() - dev_size = int(dev_size) - except: - # Failed to get dev size, requires manual testing instead - TESTS['iobenchmark']['Status'][name] = 'ERROR' - continue - if dev_size < IO_VARS['Minimum Dev Size']: - TESTS['iobenchmark']['Status'][name] = 'ERROR' - continue - - # Calculate dd values - ## test_size is the area to be read in bytes - ## If the dev is < 10Gb then it's the whole dev - ## Otherwise it's the larger of 10Gb or 1% of the dev - ## - ## test_chunks is the number of groups of "Chunk Size" in test_size - ## This number is reduced to a multiple of the graph width in - ## order to allow for the data to be condensed cleanly - ## - ## skip_blocks is the number of "Block Size" groups not tested - ## skip_count is the number of blocks to skip per test_chunk - ## skip_extra is how often to add an additional skip block - ## This is needed to ensure an even testing across the dev - ## This is calculated by using the fractional amount left off - ## of the skip_count variable - test_size = min(IO_VARS['Minimum Test Size'], dev_size) - test_size = max( - test_size, dev_size*IO_VARS['Alt Test Size Factor']) - test_chunks = int(test_size // IO_VARS['Chunk Size']) - test_chunks -= test_chunks % IO_VARS['Graph Horizontal Width'] - test_size = test_chunks * IO_VARS['Chunk Size'] - skip_blocks = int((dev_size - test_size) // IO_VARS['Block Size']) - skip_count = int((skip_blocks / test_chunks) // 1) - skip_extra = 0 - try: - skip_extra = 1 + int(1 / ((skip_blocks / test_chunks) % 1)) - except ZeroDivisionError: - # skip_extra == 0 is fine - pass - - # Open dd progress pane after initializing file - with open(progress_file, 'w') as f: - f.write('') - sleep(1) - cmd = 'tmux split-window -dp 75 -PF #D tail -f {}'.format( - progress_file) - result = run_program(cmd.split()) - bottom_pane = result.stdout.decode().strip() - - # Run dd read tests - offset = 0 - TESTS['iobenchmark']['Data'][name] = { - 'Graph': [], - 'Read Rates': []} - for i in range(test_chunks): - i += 1 - s = skip_count - c = int(IO_VARS['Chunk Size'] / IO_VARS['Block Size']) - if skip_extra and i % skip_extra == 0: - s += 1 - cmd = 'sudo dd bs={b} skip={s} count={c} if=/dev/{n} of={o} iflag=direct'.format( - b=IO_VARS['Block Size'], - s=offset+s, - c=c, - n=name, - o='/dev/null') - result = run_program(cmd.split()) - result_str = result.stderr.decode().replace('\n', '') - cur_rate = get_read_rate(result_str) - TESTS['iobenchmark']['Data'][name]['Read Rates'].append( - cur_rate) - TESTS['iobenchmark']['Data'][name]['Graph'].append( - '{percent:0.1f} {rate}'.format( - percent=i/test_chunks*100, - rate=int(cur_rate/(1024**2)))) - if i % IO_VARS['Progress Refresh Rate'] == 0: - # Update vertical graph - update_io_progress( - percent=i/test_chunks*100, - rate=cur_rate, - progress_file=progress_file) - # Update offset - offset += s + c - print_standard('Done', timestamp=False) - - # Close bottom pane - run_program(['tmux', 'kill-pane', '-t', bottom_pane]) - - # Build report - avg_min_max = 'Average read speed: {:3.1f} MB/s (Min: {:3.1f}, Max: {:3.1f})'.format( - sum(TESTS['iobenchmark']['Data'][name]['Read Rates'])/len( - TESTS['iobenchmark']['Data'][name]['Read Rates'])/(1024**2), - min(TESTS['iobenchmark']['Data'][name]['Read Rates'])/(1024**2), - max(TESTS['iobenchmark']['Data'][name]['Read Rates'])/(1024**2)) - TESTS['iobenchmark']['Data'][name]['Avg/Min/Max'] = avg_min_max - TESTS['iobenchmark']['Data'][name]['Merged Rates'] = [] - pos = 0 - width = int(test_chunks / IO_VARS['Graph Horizontal Width']) - for i in range(IO_VARS['Graph Horizontal Width']): - # Append average rate for WIDTH number of rates to new array - TESTS['iobenchmark']['Data'][name]['Merged Rates'].append(sum( - TESTS['iobenchmark']['Data'][name]['Read Rates'][pos:pos+width])/width) - pos += width - report = generate_horizontal_graph( - TESTS['iobenchmark']['Data'][name]['Merged Rates']) - report += '\n{}'.format(avg_min_max) - TESTS['iobenchmark']['Results'][name] = report - - # Set CS/NS - min_read = min(TESTS['iobenchmark']['Data'][name]['Read Rates']) - avg_read = sum( - TESTS['iobenchmark']['Data'][name]['Read Rates'])/len( - TESTS['iobenchmark']['Data'][name]['Read Rates']) - dev_rotational = dev['lsblk'].get('rota', None) - if dev_rotational == "0": - # Use SSD scale - thresh_min = IO_VARS['Threshold SSD Min'] - thresh_high_avg = IO_VARS['Threshold SSD High Avg'] - thresh_low_avg = IO_VARS['Threshold SSD Low Avg'] - else: - # Use HDD scale - thresh_min = IO_VARS['Threshold HDD Min'] - thresh_high_avg = IO_VARS['Threshold HDD High Avg'] - thresh_low_avg = IO_VARS['Threshold HDD Low Avg'] - if min_read <= thresh_min and avg_read <= thresh_high_avg: - TESTS['iobenchmark']['Status'][name] = 'NS' - elif avg_read <= thresh_low_avg: - TESTS['iobenchmark']['Status'][name] = 'NS' - else: - TESTS['iobenchmark']['Status'][name] = 'CS' - - # Save logs - dest_filename = '{}/iobenchmark-{}.log'.format(global_vars['LogDir'], name) - shutil.move(progress_file, dest_filename) - with open(dest_filename.replace('.', '-raw.'), 'a') as f: - f.write('\n'.join(TESTS['iobenchmark']['Data'][name]['Graph'])) - update_progress() - - # Done - run_program('tmux kill-pane -a'.split(), check=False) - pass - -def run_mprime(ticket_number): - """Run Prime95 for MPRIME_LIMIT minutes while showing the temps.""" - aborted = False - print_log('\nStart Prime95 test') - TESTS['Prime95']['Status'] = 'Working' - update_progress() - - # Set Window layout and start test - run_program('tmux split-window -dl 10 -c {wd} {cmd} {wd}'.format( - wd=global_vars['TmpDir'], cmd='hw-diags-prime95').split()) - run_program('tmux split-window -dhl 15 watch -c -n1 -t cat {}'.format( - TESTS['Progress Out']).split()) - run_program('tmux split-window -bd watch -c -n1 -t hw-sensors'.split()) - run_program('tmux resize-pane -y 3'.split()) - - # Start test - run_program(['apple-fans', 'max']) - try: - for i in range(int(MPRIME_LIMIT)): - clear_screen() - min_left = int(MPRIME_LIMIT) - i - print_standard('Running Prime95 ({} minute{} left)'.format( - min_left, - 's' if min_left != 1 else '')) - print_warning('If running too hot, press CTRL+c to abort the test') - sleep(60) - except KeyboardInterrupt: - # Catch CTRL+C - aborted = True - TESTS['Prime95']['Status'] = 'Aborted' - print_warning('\nAborted.') - update_progress() - - # Save "final" temps - run_program( - cmd = 'hw-sensors >> "{}/Final Temps.out"'.format( - global_vars['LogDir']).split(), - check = False, - pipe = False, - shell = True) - run_program( - cmd = 'hw-sensors --nocolor >> "{}/Final Temps.log"'.format( - global_vars['LogDir']).split(), - check = False, - pipe = False, - shell = True) - - # Stop test - run_program('killall -s INT mprime'.split(), check=False) - run_program(['apple-fans', 'auto']) - - # Move logs to Ticket folder - for item in os.scandir(global_vars['TmpDir']): - try: - shutil.move(item.path, global_vars['LogDir']) - except Exception: - print_error('ERROR: Failed to move "{}" to "{}"'.format( - item.path, - global_vars['LogDir'])) - - # Check logs - TESTS['Prime95']['NS'] = False - TESTS['Prime95']['CS'] = False - log = '{}/results.txt'.format(global_vars['LogDir']) - if os.path.exists(log): - with open(log, 'r') as f: - text = f.read() - TESTS['Prime95']['results.txt'] = text - r = re.search(r'(error|fail)', text) - TESTS['Prime95']['NS'] = bool(r) - log = '{}/prime.log'.format(global_vars['LogDir']) - if os.path.exists(log): - with open(log, 'r') as f: - text = f.read() - TESTS['Prime95']['prime.log'] = text - r = re.search(r'completed.*0 errors, 0 warnings', text) - TESTS['Prime95']['CS'] = bool(r) - - # Update status - if not aborted: - if TESTS['Prime95']['NS']: - TESTS['Prime95']['Status'] = 'NS' - elif TESTS['Prime95']['CS']: - TESTS['Prime95']['Status'] = 'CS' - else: - TESTS['Prime95']['Status'] = 'Unknown' - update_progress() - - if aborted: - if TESTS['NVMe/SMART']['Enabled'] or TESTS['badblocks']['Enabled']: - if not ask('Proceed to next test?'): - for name in TESTS['NVMe/SMART']['Devices'].keys(): - for t in ['NVMe/SMART', 'badblocks', 'iobenchmark']: - cur_status = TESTS[t]['Status'][name] - if cur_status not in ['CS', 'Denied', 'NS']: - TESTS[t]['Status'][name] = 'Aborted' - run_program('tmux kill-pane -a'.split()) - raise GenericError - - # Done - run_program('tmux kill-pane -a'.split()) - -def run_nvme_smart(ticket_number): - """Run the built-in NVMe or SMART test for all detected disks.""" - aborted = False - clear_screen() - print_log('\nStart NVMe/SMART test(s)\n') - progress_file = '{}/selftest_progress.out'.format(global_vars['LogDir']) - update_progress() - - # Set Window layout and start test - run_program('tmux split-window -dl 3 watch -c -n1 -t cat {}'.format( - progress_file).split()) - run_program('tmux split-window -dhl 15 watch -c -n1 -t cat {}'.format( - TESTS['Progress Out']).split()) - - # Show disk details - for name, dev in sorted(TESTS['NVMe/SMART']['Devices'].items()): - show_disk_details(dev) - print_standard(' ') - update_progress() - - # Run - for name, dev in sorted(TESTS['NVMe/SMART']['Devices'].items()): - TESTS['NVMe/SMART']['Short Test'][name] = None - cur_status = TESTS['NVMe/SMART']['Status'][name] - if cur_status == 'OVERRIDE': - # Skipping test per user request - continue - if TESTS['NVMe/SMART']['Quick'] or dev.get('NVMe Disk', False): - # Skip SMART self-tests for quick checks and NVMe disks - if dev['Quick Health OK']: - TESTS['NVMe/SMART']['Status'][name] = 'CS' - else: - TESTS['NVMe/SMART']['Status'][name] = 'NS' - elif not dev['Quick Health OK']: - # SMART overall == Failed or attributes bad, avoid self-test - TESTS['NVMe/SMART']['Status'][name] = 'NS' - else: - # Start SMART short self-test - test_length = dev['smartctl'].get( - 'ata_smart_data', {}).get( - 'self_test', {}).get( - 'polling_minutes', {}).get( - 'short', 5) - test_length = int(test_length) + 5 - TESTS['NVMe/SMART']['Status'][name] = 'Working' - update_progress() - print_standard('Running SMART short self-test(s):') - print_standard( - ' /dev/{:8}({} minutes)... '.format(name, test_length), - end='', flush=True) - run_program( - 'sudo smartctl -t short /dev/{}'.format(name).split(), - check=False) - - # Wait and show progress (in 10 second increments) - for iteration in range(int(test_length*60/10)): - # Update SMART data - dev['smartctl'] = get_smart_details(name) - - # Check if test is complete - if iteration >= 6: - done = dev['smartctl'].get( - 'ata_smart_data', {}).get( - 'self_test', {}).get( - 'status', {}).get( - 'passed', False) - if done: - break - - # Update progress_file - with open(progress_file, 'w') as f: - f.write('SMART self-test status:\n {}'.format( - dev['smartctl'].get( - 'ata_smart_data', {}).get( - 'self_test', {}).get( - 'status', {}).get( - 'string', 'unknown'))) - sleep(10) - os.remove(progress_file) - - # Check result - test_passed = dev['smartctl'].get( - 'ata_smart_data', {}).get( - 'self_test', {}).get( - 'status', {}).get( - 'passed', False) - if test_passed: - TESTS['NVMe/SMART']['Status'][name] = 'CS' - TESTS['NVMe/SMART']['Short Test'][name] = 'CS' - else: - TESTS['NVMe/SMART']['Status'][name] = 'NS' - TESTS['NVMe/SMART']['Short Test'][name] = 'NS' - update_progress() - print_standard('Done', timestamp=False) - - # Done - run_program('tmux kill-pane -a'.split(), check=False) - -def run_tests(tests, ticket_number=None): - """Run selected hardware test(s).""" - clear_screen() - print_standard('Starting Hardware Diagnostics') - if ticket_number: - print_standard(' For Ticket #{}'.format(ticket_number)) - print_standard(' ') - print_standard('Running tests: {}'.format(', '.join(tests))) - # Enable selected tests - for t in ['Prime95', 'NVMe/SMART', 'badblocks', 'iobenchmark']: - TESTS[t]['Enabled'] = t in tests - TESTS['NVMe/SMART']['Quick'] = 'Quick' in tests - - # Initialize - if TESTS['NVMe/SMART']['Enabled'] or TESTS['badblocks']['Enabled'] or TESTS['iobenchmark']['Enabled']: - print_standard(' ') - scan_disks() - update_progress() - - # Run - mprime_aborted = False - if TESTS['Prime95']['Enabled']: - try: - run_mprime(ticket_number) - except GenericError: - mprime_aborted = True - if not mprime_aborted: - if TESTS['NVMe/SMART']['Enabled']: - run_nvme_smart(ticket_number) - if TESTS['badblocks']['Enabled']: - run_badblocks(ticket_number) - if TESTS['iobenchmark']['Enabled']: - run_iobenchmark(ticket_number) - - # Show results - show_results() - - # Open log - if not TESTS['NVMe/SMART']['Quick'] and ENABLED_OPEN_LOGS: - try: - popen_program(['nohup', 'leafpad', global_vars['LogFile']], pipe=True) - except Exception: - print_error('ERROR: Failed to open log: {}'.format( - global_vars['LogFile'])) - pause('Press Enter to exit...') - -def scan_disks(full_paths=False, only_path=None): - """Scan for disks eligible for hardware testing.""" - - # Get eligible disk list - cmd = ['lsblk', '-J', '-O'] - if full_paths: - cmd.append('-p') - if only_path: - cmd.append(only_path) - result = run_program(cmd) - json_data = json.loads(result.stdout.decode()) - devs = {} - for d in json_data.get('blockdevices', []): - if d['type'] == 'disk': - if d['hotplug'] == '0': - devs[d['name']] = {'lsblk': d} - TESTS['NVMe/SMART']['Status'][d['name']] = 'Pending' - TESTS['badblocks']['Status'][d['name']] = 'Pending' - TESTS['iobenchmark']['Status'][d['name']] = 'Pending' - else: - # Skip WizardKit devices - skip_dev=False - wk_label_regex = r'{}_(LINUX|UFD)'.format(KIT_NAME_SHORT) - for c in d.get('children', []): - r = re.search( - wk_label_regex, c.get('label', ''), re.IGNORECASE) - skip_dev = bool(r) - if not skip_dev: - devs[d['name']] = {'lsblk': d} - TESTS['NVMe/SMART']['Status'][d['name']] = 'Pending' - TESTS['badblocks']['Status'][d['name']] = 'Pending' - TESTS['iobenchmark']['Status'][d['name']] = 'Pending' - - for dev, data in devs.items(): - # Get SMART attributes - run_program( - cmd = 'sudo smartctl -s on {}{}'.format( - '' if full_paths else '/dev/', - dev).split(), - check = False) - data['smartctl'] = get_smart_details(dev) - - # Get NVMe attributes - if data['lsblk']['tran'] == 'nvme': - cmd = 'sudo nvme smart-log /dev/{} -o json'.format(dev).split() - cmd = 'sudo nvme smart-log {}{} -o json'.format( - '' if full_paths else '/dev/', - dev).split() - result = run_program(cmd, check=False) - try: - data['nvme-cli'] = json.loads(result.stdout.decode()) - except Exception: - # Let other sections deal with the missing data - data['nvme-cli'] = {} - data['NVMe Disk'] = True - - # Set "Quick Health OK" value - ## NOTE: If False then require override for badblocks test - wanted_smart_list = [ - 'ata_smart_attributes', - 'ata_smart_data', - 'smart_status', - ] - if data.get('NVMe Disk', False): - crit_warn = data['nvme-cli'].get('critical_warning', 1) - if crit_warn == 0: - dev_name = data['lsblk']['name'] - data['Quick Health OK'] = True - TESTS['NVMe/SMART']['Status'][dev_name] = 'CS' - else: - data['Quick Health OK'] = False - elif set(wanted_smart_list).issubset(data['smartctl'].keys()): - data['SMART Pass'] = data['smartctl'].get('smart_status', {}).get( - 'passed', False) - data['Quick Health OK'] = data['SMART Pass'] - data['SMART Support'] = True - else: - data['Quick Health OK'] = False - data['SMART Support'] = False - - # Ask for manual overrides if necessary - if TESTS['badblocks']['Enabled'] or TESTS['iobenchmark']['Enabled']: - show_disk_details(data) - needs_override = False - if not data['Quick Health OK']: - needs_override = True - print_warning( - "WARNING: Health can't be confirmed for: /dev/{}".format(dev)) - if get_smart_value(data['smartctl'], '199'): - # SMART attribute present and it's value is non-zero - needs_override = True - print_warning( - 'WARNING: SMART 199/C7 error detected on /dev/{}'.format(dev)) - print_standard(' (Have you tried swapping the drive cable?)') - if needs_override: - dev_name = data['lsblk']['name'] - print_standard(' ') - if ask('Run tests on this device anyway?'): - TESTS['NVMe/SMART']['Status'][dev_name] = 'OVERRIDE' - else: - TESTS['NVMe/SMART']['Status'][dev_name] = 'Skipped' - TESTS['badblocks']['Status'][dev_name] = 'Denied' - TESTS['iobenchmark']['Status'][dev_name] = 'Denied' - print_standard(' ') # In case there's more than one "OVERRIDE" disk - - TESTS['NVMe/SMART']['Devices'] = devs - TESTS['badblocks']['Devices'] = devs - TESTS['iobenchmark']['Devices'] = devs - return devs - -def show_disk_details(dev, only_attributes=False): - """Display disk details.""" - dev_name = dev['lsblk']['name'] - if not only_attributes: - # Device description - print_info('Device: {}{}'.format( - '' if '/dev/' in dev['lsblk']['name'] else '/dev/', - dev['lsblk']['name'])) - print_standard(' {:>4} ({}) {} {}'.format( - str(dev['lsblk'].get('size', '???b')).strip(), - str(dev['lsblk'].get('tran', '???')).strip().upper().replace( - 'NVME', 'NVMe'), - str(dev['lsblk'].get('model', 'Unknown Model')).strip(), - str(dev['lsblk'].get('serial', 'Unknown Serial')).strip(), - )) - - # Warnings - if dev.get('NVMe Disk', False): - if dev['Quick Health OK']: - print_warning('WARNING: NVMe support is still experimental') - else: - print_error('ERROR: NVMe disk is reporting critical warnings') - elif not dev['SMART Support']: - print_error('ERROR: Unable to retrieve SMART data') - elif not dev['SMART Pass']: - print_error('ERROR: SMART overall-health assessment result: FAILED') - - # Attributes - if dev.get('NVMe Disk', False): - if only_attributes: - print_info('SMART Attributes:', end='') - print_warning(' Updated: {}'.format( - time.strftime('%Y-%m-%d %H:%M %Z'))) - else: - print_info('Attributes:') - for attrib, threshold in sorted(ATTRIBUTES['NVMe'].items()): - if attrib in dev['nvme-cli']: - print_standard( - ' {:37}'.format(attrib.replace('_', ' ').title()), - end='', flush=True) - raw_num = dev['nvme-cli'][attrib] - raw_str = str(raw_num) - if (threshold.get('Error', False) and - raw_num >= threshold.get('Error', -1)): - print_error(raw_str, timestamp=False) - if not threshold.get('Ignore', False): - dev['Quick Health OK'] = False - TESTS['NVMe/SMART']['Status'][dev_name] = 'NS' - elif (threshold.get('Warning', False) and - raw_num >= threshold.get('Warning', -1)): - print_warning(raw_str, timestamp=False) - else: - print_success(raw_str, timestamp=False) - elif dev['smartctl'].get('ata_smart_attributes', None): - # SMART attributes - if only_attributes: - print_info('SMART Attributes:', end='') - print_warning(' Updated: {}'.format( - time.strftime('%Y-%m-%d %H:%M %Z'))) - else: - print_info('Attributes:') - s_table = dev['smartctl'].get('ata_smart_attributes', {}).get( - 'table', {}) - s_table = {a.get('id', 'Unknown'): a for a in s_table} - for attrib, threshold in sorted(ATTRIBUTES['SMART'].items()): - if attrib in s_table: - print_standard( - ' {:>3} {:32}'.format( - attrib, - s_table[attrib]['name']).replace('_', ' ').title(), - end='', flush=True) - raw_str = s_table[attrib]['raw']['string'] - raw_num = re.sub(r'^(\d+).*$', r'\1', raw_str) - try: - raw_num = float(raw_num) - except ValueError: - # Not sure about this one, print raw_str without color? - print_standard(raw_str, timestamp=False) - continue - if (threshold.get('Error', False) and - raw_num >= threshold.get('Error', -1)): - print_error(raw_str, timestamp=False) - if not threshold.get('Ignore', False): - dev['Quick Health OK'] = False - TESTS['NVMe/SMART']['Status'][dev_name] = 'NS' - elif (threshold.get('Warning', False) and - raw_num >= threshold.get('Warning', -1)): - print_warning(raw_str, timestamp=False) - else: - print_success(raw_str, timestamp=False) - -def show_results(): - """Show results for selected test(s).""" - clear_screen() - print_log('\n───────────────────────────') - print_standard('Hardware Diagnostic Results') - update_progress() - - # Set Window layout and show progress - run_program('tmux split-window -dhl 15 watch -c -n1 -t cat {}'.format( - TESTS['Progress Out']).split()) - - # Prime95 - if TESTS['Prime95']['Enabled']: - print_success('\nPrime95:') - for log, regex in [ - ['results.txt', r'(error|fail)'], - ['prime.log', r'completed.*0 errors, 0 warnings']]: - if log in TESTS['Prime95']: - print_info('Log: {}'.format(log)) - lines = [line.strip() for line - in TESTS['Prime95'][log].splitlines() - if re.search(regex, line, re.IGNORECASE)] - for line in lines[-4:]: - line = re.sub(r'^.*Worker #\d.*Torture Test (.*)', r'\1', - line, re.IGNORECASE) - if TESTS['Prime95'].get('NS', False): - print_error(' {}'.format(line)) - else: - print_standard(' {}'.format(line)) - print_info('Final temps') - print_log(' See Final Temps.log') - with open('{}/Final Temps.out'.format(global_vars['LogDir']), 'r') as f: - for line in f.readlines(): - if re.search(r'^\s*$', line.strip()): - # Stop after coretemps (which should be first) - break - print(' {}'.format(line.strip())) - print_standard(' ') - - # NVMe/SMART / badblocks / iobenchmark - if TESTS['NVMe/SMART']['Enabled'] or TESTS['badblocks']['Enabled'] or TESTS['iobenchmark']['Enabled']: - print_success('Disks:') - for name, dev in sorted(TESTS['NVMe/SMART']['Devices'].items()): - show_disk_details(dev) - bb_status = TESTS['badblocks']['Status'].get(name, None) - if (TESTS['badblocks']['Enabled'] - and bb_status not in ['Denied', 'OVERRIDE', 'Skipped']): - print_info('badblocks:') - result = TESTS['badblocks']['Results'].get(name, '') - for line in result.splitlines(): - if re.search(r'Pass completed', line, re.IGNORECASE): - line = re.sub( - r'Pass completed,?\s+', r'', - line.strip(), re.IGNORECASE) - if TESTS['badblocks']['Status'][name] == 'CS': - print_standard(' {}'.format(line)) - else: - print_error(' {}'.format(line)) - io_status = TESTS['iobenchmark']['Status'].get(name, None) - if (TESTS['iobenchmark']['Enabled'] - and io_status not in ['Denied', 'OVERRIDE', 'Skipped']): - print_info('Benchmark:') - result = TESTS['iobenchmark']['Results'].get(name, '') - for line in result.split('\n'): - print_standard(' {}'.format(line)) - print_standard(' ') - - # Done - pause('Press Enter to return to main menu... ') - run_program('tmux kill-pane -a'.split()) + """Get color based on status, returns str.""" + color = COLORS['CLEAR'] + if s in ['Denied', 'ERROR', 'NS', 'OVERRIDE']: + color = COLORS['RED'] + elif s in ['Aborted', 'Unknown', 'Working', 'Skipped']: + color = COLORS['YELLOW'] + elif s in ['CS']: + color = COLORS['GREEN'] + return color def update_io_progress(percent, rate, progress_file): - """Update I/O progress file.""" - bar_color = COLORS['CLEAR'] - rate_color = COLORS['CLEAR'] - step = get_graph_step(rate, scale=32) - if rate < IO_VARS['Threshold Graph Fail']: - bar_color = COLORS['RED'] - rate_color = COLORS['YELLOW'] - elif rate < IO_VARS['Threshold Graph Warn']: - bar_color = COLORS['YELLOW'] - rate_color = COLORS['YELLOW'] - elif rate > IO_VARS['Threshold Graph Great']: - bar_color = COLORS['GREEN'] - rate_color = COLORS['GREEN'] - line = ' {p:5.1f}% {b_color}{b:<4} {r_color}{r:6.1f} Mb/s{c}\n'.format( - p=percent, - b_color=bar_color, - b=IO_VARS['Graph Vertical'][step], - r_color=rate_color, - r=rate/(1024**2), - c=COLORS['CLEAR']) - with open(progress_file, 'a') as f: - f.write(line) - -def update_progress(): - """Update progress file.""" - if 'Progress Out' not in TESTS: - TESTS['Progress Out'] = '{}/progress.out'.format(global_vars['LogDir']) - output = [] - output.append('{BLUE}HW Diagnostics{CLEAR}'.format(**COLORS)) - output.append('───────────────') - if TESTS['Prime95']['Enabled']: - output.append(' ') - output.append('{BLUE}Prime95{s_color}{status:>8}{CLEAR}'.format( - s_color = get_status_color(TESTS['Prime95']['Status']), - status = TESTS['Prime95']['Status'], - **COLORS)) - if TESTS['NVMe/SMART']['Enabled']: - output.append(' ') - output.append('{BLUE}NVMe / SMART{CLEAR}'.format(**COLORS)) - if TESTS['NVMe/SMART']['Quick']: - output.append('{YELLOW} (Quick Check){CLEAR}'.format(**COLORS)) - for dev, status in sorted(TESTS['NVMe/SMART']['Status'].items()): - output.append('{dev}{s_color}{status:>{pad}}{CLEAR}'.format( - dev = dev, - pad = 15-len(dev), - s_color = get_status_color(status), - status = status, - **COLORS)) - if TESTS['badblocks']['Enabled']: - output.append(' ') - output.append('{BLUE}badblocks{CLEAR}'.format(**COLORS)) - for dev, status in sorted(TESTS['badblocks']['Status'].items()): - output.append('{dev}{s_color}{status:>{pad}}{CLEAR}'.format( - dev = dev, - pad = 15-len(dev), - s_color = get_status_color(status), - status = status, - **COLORS)) - if TESTS['iobenchmark']['Enabled']: - output.append(' ') - output.append('{BLUE}I/O Benchmark{CLEAR}'.format(**COLORS)) - for dev, status in sorted(TESTS['iobenchmark']['Status'].items()): - output.append('{dev}{s_color}{status:>{pad}}{CLEAR}'.format( - dev = dev, - pad = 15-len(dev), - s_color = get_status_color(status), - status = status, - **COLORS)) - - # Add line-endings - output = ['{}\n'.format(line) for line in output] - - with open(TESTS['Progress Out'], 'w') as f: - f.writelines(output) + """Update I/O progress file.""" + bar_color = COLORS['CLEAR'] + rate_color = COLORS['CLEAR'] + step = get_graph_step(rate, scale=32) + if rate < IO_VARS['Threshold Graph Fail']: + bar_color = COLORS['RED'] + rate_color = COLORS['YELLOW'] + elif rate < IO_VARS['Threshold Graph Warn']: + bar_color = COLORS['YELLOW'] + rate_color = COLORS['YELLOW'] + elif rate > IO_VARS['Threshold Graph Great']: + bar_color = COLORS['GREEN'] + rate_color = COLORS['GREEN'] + line = ' {p:5.1f}% {b_color}{b:<4} {r_color}{r:6.1f} Mb/s{c}\n'.format( + p=percent, + b_color=bar_color, + b=IO_VARS['Graph Vertical'][step], + r_color=rate_color, + r=rate/(1024**2), + c=COLORS['CLEAR']) + with open(progress_file, 'a') as f: + f.write(line) if __name__ == '__main__': - print("This file is not meant to be called directly.") + print("This file is not meant to be called directly.") -# vim: sts=4 sw=4 ts=4 +# vim: sts=2 sw=2 ts=2