# Wizard Kit: Functions - HW Diagnostics import base64 import Gnuplot import json import math import mysql.connector as mariadb import requests import time from functions.common import * from numpy import * # Database connection ost_db = { 'Connection': None, 'Cursor': None, 'Errors': False, 'Tunnel': None, } # STATIC VARIABLES ATTRIBUTES = { 'NVMe': { 'critical_warning': {'Error': 1}, 'media_errors': {'Error': 1}, 'power_on_hours': {'Warning': 12000, 'Error': 18000, 'Ignore': True}, 'unsafe_shutdowns': {'Warning': 1}, }, 'SMART': { 5: {'Error': 1}, 9: {'Warning': 12000, 'Error': 18000, 'Ignore': True}, 10: {'Warning': 1}, 184: {'Error': 1}, 187: {'Warning': 1}, 188: {'Warning': 1}, 196: {'Warning': 1, 'Error': 10, 'Ignore': True}, 197: {'Error': 1}, 198: {'Error': 1}, 199: {'Error': 1, 'Ignore': True}, 201: {'Warning': 1}, }, } IO_VARS = { 'Block Size': 512*1024, 'Chunk Size': 32*1024**2, 'Minimum Dev Size': 8*1024**3, 'Minimum Test Size': 10*1024**3, 'Alt Test Size Factor': 0.01, 'Progress Refresh Rate': 5, 'Scale 8': [2**(0.56*(x+1))+(16*(x+1)) for x in range(8)], 'Scale 16': [2**(0.56*(x+1))+(16*(x+1)) for x in range(16)], 'Scale 32': [2**(0.56*(x+1)/2)+(16*(x+1)/2) for x in range(32)], 'Threshold Fail': 65*1024**2, 'Threshold Warn': 135*1024**2, 'Threshold Great': 750*1024**2, 'Graph Horizontal': ('▁', '▂', '▃', '▄', '▅', '▆', '▇', '█'), 'Graph Horizontal Width': 40, 'Graph Vertical': ( '▏', '▎', '▍', '▌', '▋', '▊', '▉', '█', '█▏', '█▎', '█▍', '█▌', '█▋', '█▊', '█▉', '██', '██▏', '██▎', '██▍', '██▌', '██▋', '██▊', '██▉', '███', '███▏', '███▎', '███▍', '███▌', '███▋', '███▊', '███▉', '████'), } OST_STAFF_ID = '23' OST_STAFF_NAME = 'Wizard Kit' OST_SQL_SET_HOLD = "UPDATE `{db_name}`.`ost_ticket` SET `hold` = '{hold_type}' WHERE `ost_ticket`.`ticket_id` = {ticket_id};" OST_SQL_SET_FLAG = "UPDATE `{db_name}`.`ost_ticket` SET `{flag}` = '{value}' WHERE `ost_ticket`.`ticket_id` = {ticket_id};" OST_SQL_POST_REPLY = ("INSERT INTO `{db_name}`.`ost_ticket_response` (ticket_id, staff_id, staff_name, response, created) " "VALUES ('{ticket_id}', '{staff_id}', '{staff_name}', '{response}', '{created}');") OST_DRIVE_FLAG = 'zHDTune' OST_DRIVE_PASSED = 1 OST_DRIVE_FAILED = 2 OST_NEEDS_ATTENTION = 4 TESTS = { 'Prime95': { 'Enabled': False, 'Status': 'Pending', }, 'NVMe/SMART': { 'Enabled': False, 'Quick': False, 'Status': {}, }, 'badblocks': { 'Enabled': False, 'Results': {}, 'Status': {}, }, 'iobenchmark': { 'Data': {}, 'Enabled': False, 'Results': {}, 'Status': {}, }, } def connect_to_db(): """Connect to osTicket database via SSH tunnel.""" cmd = [ 'ssh', '-N', '-p', SSH_PORT, '-L3306:127.0.0.1:3306', '{user}@{host}'.format(user=SSH_USER, host=DB_HOST), ] # Establish SSH tunnel unless one already exists if not ost_db['Tunnel']: ost_db['Tunnel'] = popen_program(cmd) # Establish SQL connection (try a few times in case SSH is slow) for x in range(5): sleep(3) try: ost_db['Connection'] = mariadb.connect( user=DB_USER, password=DB_PASS, database=DB_NAME) except: # Just try again pass else: break ost_db['Cursor'] = ost_db['Connection'].cursor() ost_db['Errors'] = False def disconnect_from_db(): """Disconnect from SQL DB and close SSH tunnel.""" if ost_db['Cursor']: ost_db['Cursor'].close() if ost_db['Connection']: ost_db['Connection'].close() if ost_db['Tunnel']: ost_db['Tunnel'].kill() def export_png_graph(name, dev): """Exports PNG graph using gnuplot, returns file path as str.""" max_rate = max(TESTS['iobenchmark']['Data'][name]['Read Rates']) max_rate /= (1024**2) max_rate = max(800, max_rate) out_path = '{}/iobenchmark-{}.png'.format(global_vars['LogDir'], name) plot_data = '{}/iobenchmark-{}-raw.log'.format(global_vars['LogDir'], name) # Adjust Y-axis range to either 1000 or roughly max rate + 200 ## Round up to the nearest 100 and then add 200 y_range = (math.ceil(max_rate/100)*100) + 200 # Run gnuplot commands g = Gnuplot.Gnuplot() g('reset') g('set output "{}"'.format(out_path)) g('set terminal png large size 660,300 truecolor font "Noto Sans,11"') g('set title "I/O Benchmark"') g('set yrange [0:{}]'.format(y_range)) g('set style data lines') g('plot "{data}" title "{size} ({tran}) {model} {serial}"'.format( data=plot_data, size=dev['lsblk'].get('size', '???b'), tran=dev['lsblk'].get('tran', '???'), model=dev['lsblk'].get('model', 'Unknown Model'), serial=dev['lsblk'].get('serial', 'Unknown Serial'), )) # Cleanup g.close() del(g) return out_path def generate_horizontal_graph(rates, oneline=False): """Generate two-line horizontal graph from rates, returns str.""" line_1 = '' line_2 = '' line_3 = '' line_4 = '' for r in rates: step = get_graph_step(r, scale=32) if oneline: step = get_graph_step(r, scale=8) # Set color r_color = COLORS['CLEAR'] if r < IO_VARS['Threshold Fail']: r_color = COLORS['RED'] elif r < IO_VARS['Threshold Warn']: r_color = COLORS['YELLOW'] elif r > IO_VARS['Threshold Great']: r_color = COLORS['GREEN'] # Build graph full_block = '{}{}'.format(r_color, IO_VARS['Graph Horizontal'][-1]) if step >= 24: line_1 += '{}{}'.format(r_color, IO_VARS['Graph Horizontal'][step-24]) line_2 += full_block line_3 += full_block line_4 += full_block elif step >= 16: line_1 += ' ' line_2 += '{}{}'.format(r_color, IO_VARS['Graph Horizontal'][step-16]) line_3 += full_block line_4 += full_block elif step >= 8: line_1 += ' ' line_2 += ' ' line_3 += '{}{}'.format(r_color, IO_VARS['Graph Horizontal'][step-8]) line_4 += full_block else: line_1 += ' ' line_2 += ' ' line_3 += ' ' line_4 += '{}{}'.format(r_color, IO_VARS['Graph Horizontal'][step]) line_1 += COLORS['CLEAR'] line_2 += COLORS['CLEAR'] line_3 += COLORS['CLEAR'] line_4 += COLORS['CLEAR'] if oneline: return line_4 else: return '\n'.join([line_1, line_2, line_3, line_4]) def get_graph_step(rate, scale=16): """Get graph step based on rate and scale, returns int.""" m_rate = rate / (1024**2) step = 0 scale_name = 'Scale {}'.format(scale) for x in range(scale-1, -1, -1): # Iterate over scale backwards if m_rate >= IO_VARS[scale_name][x]: step = x break return step def get_osticket_number(): """Get ticket number and confirm with name from osTicket DB.""" ticket_number = None if not ost_db['Cursor']: # No DB access, return None to disable integration return None while ticket_number is None: print_standard(' ') _input = input('Enter ticket number (or leave blank to disable): ') if re.match(r'^\s*$', _input): if ask('Disable osTicket integration for this run?'): return None else: continue if not re.match(r'^([0-9]+)$', _input): continue _name = osticket_get_ticket_name(_input) if _name: print_standard('You have selected ticket #{} {}'.format( _input, _name)) if ask('Is this correct?'): ticket_number = _input return ticket_number def get_read_rate(s): """Get read rate in bytes/s from dd progress output.""" real_rate = None if re.search(r'[KMGT]B/s', s): human_rate = re.sub(r'^.*\s+(\d+\.?\d*)\s+(.B)/s\s*$', r'\1 \2', s) real_rate = convert_to_bytes(human_rate) return real_rate def get_smart_details(dev): """Get SMART data for dev if possible, returns dict.""" cmd = 'sudo smartctl --all --json {}{}'.format( '' if '/dev/' in dev else '/dev/', dev).split() result = run_program(cmd, check=False) try: return json.loads(result.stdout.decode()) except Exception: # Let other sections deal with the missing data return {} def get_smart_value(smart_data, smart_id): """Get SMART value from table, returns int or None.""" value = None table = smart_data.get('ata_smart_attributes', {}).get('table', []) for row in table: if str(row.get('id', '?')) == str(smart_id): value = row.get('raw', {}).get('value', None) return value def get_status_color(s): """Get color based on status, returns str.""" color = COLORS['CLEAR'] if s in ['Denied', 'ERROR', 'NS', 'OVERRIDE']: color = COLORS['RED'] elif s in ['Aborted', 'Unknown', 'Working', 'Skipped']: color = COLORS['YELLOW'] elif s in ['CS']: color = COLORS['GREEN'] return color def menu_diags(*args): """Main HW-Diagnostic menu.""" diag_modes = [ {'Name': 'All tests', 'Tests': ['Prime95', 'NVMe/SMART', 'badblocks', 'iobenchmark']}, {'Name': 'Prime95', 'Tests': ['Prime95']}, {'Name': 'All drive tests', 'Tests': ['NVMe/SMART', 'badblocks', 'iobenchmark']}, {'Name': 'NVMe/SMART', 'Tests': ['NVMe/SMART']}, {'Name': 'badblocks', 'Tests': ['badblocks']}, {'Name': 'I/O Benchmark', 'Tests': ['iobenchmark']}, {'Name': 'Quick drive test', 'Tests': ['Quick', 'NVMe/SMART']}, ] actions = [ {'Letter': 'A', 'Name': 'Audio test'}, {'Letter': 'K', 'Name': 'Keyboard test'}, {'Letter': 'N', 'Name': 'Network test'}, {'Letter': 'M', 'Name': 'Screen Saver - Matrix', 'CRLF': True}, {'Letter': 'P', 'Name': 'Screen Saver - Pipes'}, {'Letter': 'Q', 'Name': 'Quit', 'CRLF': True}, ] # CLI-mode actions if 'DISPLAY' not in global_vars['Env']: actions.extend([ {'Letter': 'R', 'Name': 'Reboot', 'CRLF': True}, {'Letter': 'S', 'Name': 'Shutdown'}, ]) # Quick disk check if 'quick' in args: run_tests(['Quick', 'NVMe/SMART']) exit_script() # Show menu while True: selection = menu_select( title = 'Hardware Diagnostics: Menu', main_entries = diag_modes, action_entries = actions, spacer = '──────────────────────────') if selection.isnumeric(): ticket_number = None if diag_modes[int(selection)-1]['Name'] != 'Quick drive test': clear_screen() print_standard(' ') try_and_print( message='Connecting to osTicket database...', function=connect_to_db, width=40) # Save log for non-quick tests ticket_number = get_osticket_number() global_vars['LogDir'] = '{}/Logs/{}_{}'.format( global_vars['Env']['HOME'], ticket_number, global_vars['Date-Time']) os.makedirs(global_vars['LogDir'], exist_ok=True) global_vars['LogFile'] = '{}/Hardware Diagnostics.log'.format( global_vars['LogDir']) run_tests(diag_modes[int(selection)-1]['Tests'], ticket_number) elif selection == 'A': run_program(['hw-diags-audio'], check=False, pipe=False) pause('Press Enter to return to main menu... ') elif selection == 'K': run_program(['xev', '-event', 'keyboard'], check=False, pipe=False) elif selection == 'N': run_program(['hw-diags-network'], check=False, pipe=False) pause('Press Enter to return to main menu... ') elif selection == 'M': run_program(['cmatrix', '-abs'], check=False, pipe=False) elif selection == 'P': run_program( 'pipes -t 0 -t 1 -t 2 -t 3 -p 5 -R -r 4000'.split(), check=False, pipe=False) elif selection == 'R': run_program(['systemctl', 'reboot']) elif selection == 'S': run_program(['systemctl', 'poweroff']) elif selection == 'Q': break # Done disconnect_from_db() def osticket_get_ticket_name(ticket_id): """Lookup ticket and return name as str.""" ticket_name = 'Unknown' if not ticket_id: raise GenericError if not ost_db['Cursor']: # Skip section return # Set command sql_cmd = "SELECT name FROM `ost_ticket` WHERE `ticket_id` = {}".format( ticket_id) # Run command try: ost_db['Cursor'].execute(sql_cmd) for name in ost_db['Cursor']: ticket_name = name[0] return ticket_name except: ost_db['Errors'] = True def osticket_needs_attention(ticket_id): """[DISABLED] Marks the ticket as "NEEDS ATTENTION" in osTicket.""" return # This function has been DISABLED due to a repurposing of that flag if not ticket_id: raise GenericError if not ost_db['Cursor']: # Skip section return # Set command sql_cmd = OST_SQL_SET_HOLD.format( db_name=DB_NAME, hold_type=OST_NEEDS_ATTENTION, ticket_id=ticket_id) # Run command try: ost_db['Cursor'].execute(sql_cmd) except: ost_db['Errors'] = True def osticket_post_reply(ticket_id, response): """Post a reply to a ticket in osTicket.""" if not ticket_id: raise GenericError if not ost_db['Cursor']: # Skip section return # Set command sql_cmd = OST_SQL_POST_REPLY.format( db_name=DB_NAME, ticket_id=ticket_id, staff_id=OST_STAFF_ID, staff_name=OST_STAFF_NAME, response=response, created=time.strftime("%Y-%m-%d %H:%M:%S")) # Run command try: ost_db['Cursor'].execute(sql_cmd) except: ost_db['Errors'] = True def osticket_set_drive_result(ticket_id, passed): """Marks the pass/fail box for the drive(s) in osTicket.""" if not ticket_id: raise GenericError if not ost_db['Cursor']: # Skip section return # Set command sql_cmd = OST_SQL_SET_FLAG.format( db_name=DB_NAME, flag=OST_DRIVE_FLAG, value=OST_DRIVE_PASSED if passed else OST_DRIVE_FAILED, ticket_id=ticket_id) # Run command try: ost_db['Cursor'].execute(sql_cmd) except: ost_db['Errors'] = True def pad_with_dots(s, left_pad=True): """Replace ' ' padding with '..' for osTicket posts.""" s = str(s).replace(' ', '..') if '.' in s: if left_pad: s = '.' + s else: s = s + '.' return s def post_drive_results(ticket_number): """Post drive test results to osTicket.""" tested = False # Check if test(s) were run for t in ['NVMe/SMART', 'badblocks', 'iobenchmark']: tested |= TESTS[t]['Enabled'] if not tested or TESTS['NVMe/SMART']['Quick']: # No tests were run so no post necessary return # Build reports for all tested devices for name, dev in sorted(TESTS['NVMe/SMART']['Devices'].items()): dev_failed = False dev_passed = True dev_unknown = False report = [] # Check all test results for dev for t in ['NVMe/SMART', 'badblocks', 'iobenchmark']: if not TESTS[t]['Enabled']: continue status = TESTS[t]['Status'].get(name, 'Unknown') dev_failed |= status == 'NS' dev_passed &= status == 'CS' dev_unknown |= status in ('Working', 'Unknown') if not dev_failed and not dev_passed and not dev_unknown: # Assuming drive was skipped so no reply is needed continue # Start drive report if dev_failed: report.append('Drive hardware diagnostics tests: FAILED') elif dev_unknown: report.append('Drive hardware diagnostics tests: UNKNOWN') elif dev_passed: report.append('Drive hardware diagnostics tests: Passed') report.append('') # Drive description report.append('{size} ({tran}) {model} {serial}'.format( size=dev['lsblk'].get('size', '???b'), tran=dev['lsblk'].get('tran', '???'), model=dev['lsblk'].get('model', 'Unknown Model'), serial=dev['lsblk'].get('serial', 'Unknown Serial'), )) report.append('') # Warnings (if any) if dev.get('NVMe Disk', False): if dev['Quick Health Ok']: report.append('WARNING: NVMe support is still experimental') else: report.append('ERROR: NVMe disk is reporting critical warnings') report.append('') elif not dev['SMART Support']: report.append('ERROR: Unable to retrieve SMART data') report.append('') elif not dev['SMART Pass']: report.append('ERROR: SMART overall-health assessment result: FAILED') report.append('') # NVMe/SMART Attributes if dev.get('NVMe Disk', False): report.append('NVMe Attributes ({}):'.format( TESTS['NVMe/SMART']['Status'][name])) for attrib in sorted(ATTRIBUTES['NVMe'].keys()): if attrib in dev['nvme-cli']: report.append('{attrib:30}{value}'.format( attrib=attrib, value=dev['nvme-cli'][attrib], )) report[-1] = report[-1].strip().replace(' ', '.') report[-1] = report[-1].replace('_', ' ') elif dev['smartctl'].get('ata_smart_attributes', None): report.append('SMART Attributes ({}):'.format( TESTS['NVMe/SMART']['Status'][name])) s_table = dev['smartctl'].get('ata_smart_attributes', {}).get( 'table', {}) s_table = {a.get('id', 'Unknown'): a for a in s_table} for attrib in sorted(ATTRIBUTES['SMART'].keys()): if attrib in s_table: # Pad attributewith dots for osTicket hex_str = str(hex(int(attrib))).upper()[2:] hex_str = pad_with_dots('{:>2}'.format(hex_str)) dec_str = pad_with_dots('{:>3}'.format(attrib)) val_str = '{:<20}'.format(s_table[attrib]['raw']['string']) val_str = pad_with_dots(val_str, left_pad=False) report.append('{:>2}/{:>3}: {} ({})'.format( hex_str, dec_str, val_str, s_table[attrib]['name'], )) report[-1] = report[-1].replace('_', ' ') report.append('') # badblocks bb_status = TESTS['badblocks']['Status'].get(name, None) if TESTS['badblocks']['Enabled'] and bb_status not in ['Denied', 'Skipped']: report.append('badblocks ({}):'.format( TESTS['badblocks']['Status'][name])) bb_result = TESTS['badblocks']['Results'].get( name, 'ERROR: Failed to read log.') for line in bb_result.splitlines(): line = line.strip() if not line: continue if re.search('Pass completed', line, re.IGNORECASE): line = re.sub( r'Pass completed,?\s+', r'', line, re.IGNORECASE) report.append(line) report.append('') # I/O Benchmark io_status = TESTS['iobenchmark']['Status'].get(name, None) if TESTS['iobenchmark']['Enabled'] and io_status not in ['Denied', 'ERROR', 'Skipped']: one_line_graph = generate_horizontal_graph( rates=TESTS['iobenchmark']['Data'][name]['Merged Rates'], oneline=True) for c in COLORS.values(): one_line_graph = one_line_graph.replace(c, '') report.append('I/O Benchmark ({}):'.format( TESTS['iobenchmark']['Status'][name])) report.append(one_line_graph) report.append('{}'.format( TESTS['iobenchmark']['Data'][name]['Avg/Min/Max'])) # Export PNG try: png_path = export_png_graph(name, dev) except: png_path = None # imgur try: url = upload_to_imgur(png_path) report.append('Imgur: {}'.format(url)) except: # Oh well pass # Nextcloud try: url = upload_to_nextcloud(png_path, ticket_number, name) report.append('Nextcloud: {}'.format(url)) except: # Oh well pass # Post reply for drive osticket_post_reply( ticket_id=ticket_number, response='\n'.join(report)) # Mark ticket HDD/SSD pass/fail checkbox (as needed) if dev_failed: osticket_set_drive_result( ticket_id=ticket_number, passed=False) elif dev_unknown: pass elif dev_passed: osticket_set_drive_result( ticket_id=ticket_number, passed=True) # Mark ticket as NEEDS ATTENTION osticket_needs_attention(ticket_id=ticket_number) def run_badblocks(ticket_number): """Run a read-only test for all detected disks.""" aborted = False clear_screen() print_log('\nStart badblocks test(s)\n') progress_file = '{}/badblocks_progress.out'.format(global_vars['LogDir']) update_progress() # Set Window layout and start test run_program('tmux split-window -dhl 15 watch -c -n1 -t cat {}'.format( TESTS['Progress Out']).split()) # Show disk details for name, dev in sorted(TESTS['badblocks']['Devices'].items()): show_disk_details(dev) print_standard(' ') update_progress() # Run print_standard('Running badblock test(s):') for name, dev in sorted(TESTS['badblocks']['Devices'].items()): cur_status = TESTS['badblocks']['Status'][name] nvme_smart_status = TESTS['NVMe/SMART']['Status'].get(name, None) if cur_status == 'Denied': # Skip denied disks continue if nvme_smart_status == 'NS': TESTS['badblocks']['Status'][name] = 'Skipped' else: # Not testing SMART, SMART CS, or SMART OVERRIDE TESTS['badblocks']['Status'][name] = 'Working' update_progress() print_standard(' /dev/{:11} '.format(name+'...'), end='', flush=True) run_program('tmux split-window -dl 5 {} {} {}'.format( 'hw-diags-badblocks', '/dev/{}'.format(name), progress_file).split()) wait_for_process('badblocks') print_standard('Done', timestamp=False) # Check results if os.path.exists(progress_file): with open(progress_file, 'r') as f: text = f.read() TESTS['badblocks']['Results'][name] = text r = re.search(r'Pass completed.*0/0/0 errors', text) if r: TESTS['badblocks']['Status'][name] = 'CS' else: TESTS['badblocks']['Status'][name] = 'NS' # Move temp file shutil.move(progress_file, '{}/badblocks-{}.log'.format( global_vars['LogDir'], name)) else: TESTS['badblocks']['Status'][name] = 'NS' update_progress() # Done run_program('tmux kill-pane -a'.split(), check=False) pass def run_iobenchmark(ticket_number): """Run a read-only test for all detected disks.""" aborted = False clear_screen() print_log('\nStart I/O Benchmark test(s)\n') progress_file = '{}/iobenchmark_progress.out'.format(global_vars['LogDir']) update_progress() # Set Window layout and start test run_program('tmux split-window -dhl 15 watch -c -n1 -t cat {}'.format( TESTS['Progress Out']).split()) # Show disk details for name, dev in sorted(TESTS['iobenchmark']['Devices'].items()): show_disk_details(dev) print_standard(' ') update_progress() # Run print_standard('Running benchmark test(s):') for name, dev in sorted(TESTS['iobenchmark']['Devices'].items()): cur_status = TESTS['iobenchmark']['Status'][name] nvme_smart_status = TESTS['NVMe/SMART']['Status'].get(name, None) bb_status = TESTS['badblocks']['Status'].get(name, None) if cur_status == 'Denied': # Skip denied disks continue if nvme_smart_status == 'NS': TESTS['iobenchmark']['Status'][name] = 'Skipped' elif bb_status in ['NS', 'Skipped']: TESTS['iobenchmark']['Status'][name] = 'Skipped' else: # (SMART tests not run or CS/OVERRIDE) # AND (BADBLOCKS tests not run or CS) TESTS['iobenchmark']['Status'][name] = 'Working' update_progress() print_standard(' /dev/{:11} '.format(name+'...'), end='', flush=True) # Get dev size cmd = 'sudo lsblk -bdno size /dev/{}'.format(name) try: result = run_program(cmd.split()) dev_size = result.stdout.decode().strip() dev_size = int(dev_size) except: # Failed to get dev size, requires manual testing instead TESTS['iobenchmark']['Status'][name] = 'ERROR' continue if dev_size < IO_VARS['Minimum Dev Size']: TESTS['iobenchmark']['Status'][name] = 'ERROR' continue # Calculate dd values ## test_size is the area to be read in bytes ## If the dev is < 10Gb then it's the whole dev ## Otherwise it's the larger of 10Gb or 1% of the dev ## ## test_chunks is the number of groups of "Chunk Size" in test_size ## This number is reduced to a multiple of the graph width in ## order to allow for the data to be condensed cleanly ## ## skip_blocks is the number of "Block Size" groups not tested ## skip_count is the number of blocks to skip per test_chunk ## skip_extra is how often to add an additional skip block ## This is needed to ensure an even testing across the dev ## This is calculated by using the fractional amount left off ## of the skip_count variable test_size = min(IO_VARS['Minimum Test Size'], dev_size) test_size = max( test_size, dev_size*IO_VARS['Alt Test Size Factor']) test_chunks = int(test_size // IO_VARS['Chunk Size']) test_chunks -= test_chunks % IO_VARS['Graph Horizontal Width'] test_size = test_chunks * IO_VARS['Chunk Size'] skip_blocks = int((dev_size - test_size) // IO_VARS['Block Size']) skip_count = int((skip_blocks / test_chunks) // 1) skip_extra = 0 try: skip_extra = 1 + int(1 / ((skip_blocks / test_chunks) % 1)) except ZeroDivisionError: # skip_extra == 0 is fine pass # Open dd progress pane after initializing file with open(progress_file, 'w') as f: f.write('') sleep(1) cmd = 'tmux split-window -dp 75 -PF #D tail -f {}'.format( progress_file) result = run_program(cmd.split()) bottom_pane = result.stdout.decode().strip() # Run dd read tests offset = 0 TESTS['iobenchmark']['Data'][name] = { 'Graph': [], 'Read Rates': []} for i in range(test_chunks): i += 1 s = skip_count c = int(IO_VARS['Chunk Size'] / IO_VARS['Block Size']) if skip_extra and i % skip_extra == 0: s += 1 cmd = 'sudo dd bs={b} skip={s} count={c} if=/dev/{n} of={o} iflag=direct'.format( b=IO_VARS['Block Size'], s=offset+s, c=c, n=name, o='/dev/null') result = run_program(cmd.split()) result_str = result.stderr.decode().replace('\n', '') cur_rate = get_read_rate(result_str) TESTS['iobenchmark']['Data'][name]['Read Rates'].append( cur_rate) TESTS['iobenchmark']['Data'][name]['Graph'].append( '{percent:0.1f} {rate}'.format( percent=i/test_chunks*100, rate=int(cur_rate/(1024**2)))) if i % IO_VARS['Progress Refresh Rate'] == 0: # Update vertical graph update_io_progress( percent=i/test_chunks*100, rate=cur_rate, progress_file=progress_file) # Update offset offset += s + c print_standard('Done', timestamp=False) # Close bottom pane run_program(['tmux', 'kill-pane', '-t', bottom_pane]) # Build report avg_min_max = 'Average read speed: {:3.1f} MB/s (Min: {:3.1f}, Max: {:3.1f})'.format( sum(TESTS['iobenchmark']['Data'][name]['Read Rates'])/len( TESTS['iobenchmark']['Data'][name]['Read Rates'])/(1024**2), min(TESTS['iobenchmark']['Data'][name]['Read Rates'])/(1024**2), max(TESTS['iobenchmark']['Data'][name]['Read Rates'])/(1024**2)) TESTS['iobenchmark']['Data'][name]['Avg/Min/Max'] = avg_min_max TESTS['iobenchmark']['Data'][name]['Merged Rates'] = [] pos = 0 width = int(test_chunks / IO_VARS['Graph Horizontal Width']) for i in range(IO_VARS['Graph Horizontal Width']): # Append average rate for WIDTH number of rates to new array TESTS['iobenchmark']['Data'][name]['Merged Rates'].append(sum( TESTS['iobenchmark']['Data'][name]['Read Rates'][pos:pos+width])/width) pos += width report = generate_horizontal_graph( TESTS['iobenchmark']['Data'][name]['Merged Rates']) report += '\n{}'.format(avg_min_max) TESTS['iobenchmark']['Results'][name] = report # Set CS/NS if min(TESTS['iobenchmark']['Data'][name]['Read Rates']) <= IO_VARS['Threshold Fail']: TESTS['iobenchmark']['Status'][name] = 'NS' elif min(TESTS['iobenchmark']['Data'][name]['Read Rates']) <= IO_VARS['Threshold Warn']: TESTS['iobenchmark']['Status'][name] = 'Unknown' else: TESTS['iobenchmark']['Status'][name] = 'CS' # Save logs dest_filename = '{}/iobenchmark-{}.log'.format(global_vars['LogDir'], name) shutil.move(progress_file, dest_filename) with open(dest_filename.replace('.', '-raw.'), 'a') as f: f.write('\n'.join(TESTS['iobenchmark']['Data'][name]['Graph'])) update_progress() # Done run_program('tmux kill-pane -a'.split(), check=False) pass def run_mprime(ticket_number): """Run Prime95 for MPRIME_LIMIT minutes while showing the temps.""" aborted = False print_log('\nStart Prime95 test') TESTS['Prime95']['Status'] = 'Working' update_progress() # Set Window layout and start test run_program('tmux split-window -dl 10 -c {wd} {cmd} {wd}'.format( wd=global_vars['TmpDir'], cmd='hw-diags-prime95').split()) run_program('tmux split-window -dhl 15 watch -c -n1 -t cat {}'.format( TESTS['Progress Out']).split()) run_program('tmux split-window -bd watch -c -n1 -t hw-sensors'.split()) run_program('tmux resize-pane -y 3'.split()) # Start test run_program(['apple-fans', 'max']) try: for i in range(int(MPRIME_LIMIT)): clear_screen() min_left = int(MPRIME_LIMIT) - i print_standard('Running Prime95 ({} minute{} left)'.format( min_left, 's' if min_left != 1 else '')) print_warning('If running too hot, press CTRL+c to abort the test') sleep(60) except KeyboardInterrupt: # Catch CTRL+C aborted = True # Save "final" temps run_program( cmd = 'hw-sensors >> "{}/Final Temps.out"'.format( global_vars['LogDir']).split(), check = False, pipe = False, shell = True) run_program( cmd = 'hw-sensors --nocolor >> "{}/Final Temps.log"'.format( global_vars['LogDir']).split(), check = False, pipe = False, shell = True) # Stop test run_program('killall -s INT mprime'.split(), check=False) run_program(['apple-fans', 'auto']) # Move logs to Ticket folder for item in os.scandir(global_vars['TmpDir']): try: shutil.move(item.path, global_vars['LogDir']) except Exception: print_error('ERROR: Failed to move "{}" to "{}"'.format( item.path, global_vars['LogDir'])) # Check logs TESTS['Prime95']['NS'] = False TESTS['Prime95']['CS'] = False log = '{}/results.txt'.format(global_vars['LogDir']) if os.path.exists(log): with open(log, 'r') as f: text = f.read() TESTS['Prime95']['results.txt'] = text r = re.search(r'(error|fail)', text) TESTS['Prime95']['NS'] = bool(r) log = '{}/prime.log'.format(global_vars['LogDir']) if os.path.exists(log): with open(log, 'r') as f: text = f.read() TESTS['Prime95']['prime.log'] = text r = re.search(r'completed.*0 errors, 0 warnings', text) TESTS['Prime95']['CS'] = bool(r) # Update status if aborted: TESTS['Prime95']['Status'] = 'Aborted' print_warning('\nAborted.') update_progress() if TESTS['NVMe/SMART']['Enabled'] or TESTS['badblocks']['Enabled']: if not ask('Proceed to next test?'): run_program('tmux kill-pane -a'.split()) raise GenericError else: if TESTS['Prime95']['NS']: TESTS['Prime95']['Status'] = 'NS' elif TESTS['Prime95']['CS']: TESTS['Prime95']['Status'] = 'CS' else: TESTS['Prime95']['Status'] = 'Unknown' update_progress() # Build osTicket report report = ['Prime95 ({}):'.format(TESTS['Prime95']['Status'])] log_path = '{}/prime.log'.format(global_vars['LogDir']) try: with open(log_path, 'r') as f: for line in f.readlines(): line = line.strip() r = re.search('(completed \d+ tests.*)', line, re.IGNORECASE) if r: report.append(r.group(1)) except: report.append('ERROR: Failed to read log.') report.append('') report.append('Final temps:') log_path = '{}/Final Temps.log'.format(global_vars['LogDir']) try: with open(log_path, 'r') as f: for line in f.readlines(): line = line.strip() if not line: # Stop after CPU temp(s) break report.append(line) except: report.append('ERROR: Failed to read log.') # Upload osTicket report osticket_post_reply( ticket_id=ticket_number, response='\n'.join(report)) # Done run_program('tmux kill-pane -a'.split()) def run_nvme_smart(ticket_number): """Run the built-in NVMe or SMART test for all detected disks.""" aborted = False clear_screen() print_log('\nStart NVMe/SMART test(s)\n') progress_file = '{}/selftest_progress.out'.format(global_vars['LogDir']) update_progress() # Set Window layout and start test run_program('tmux split-window -dl 3 watch -c -n1 -t cat {}'.format( progress_file).split()) run_program('tmux split-window -dhl 15 watch -c -n1 -t cat {}'.format( TESTS['Progress Out']).split()) # Show disk details for name, dev in sorted(TESTS['NVMe/SMART']['Devices'].items()): show_disk_details(dev) print_standard(' ') update_progress() # Run for name, dev in sorted(TESTS['NVMe/SMART']['Devices'].items()): cur_status = TESTS['NVMe/SMART']['Status'][name] if cur_status == 'OVERRIDE': # Skipping test per user request continue if TESTS['NVMe/SMART']['Quick'] or dev.get('NVMe Disk', False): # Skip SMART self-tests for quick checks and NVMe disks if dev['Quick Health OK']: TESTS['NVMe/SMART']['Status'][name] = 'CS' else: TESTS['NVMe/SMART']['Status'][name] = 'NS' elif not dev['Quick Health OK']: # SMART overall == Failed or attributes bad, avoid self-test TESTS['NVMe/SMART']['Status'][name] = 'NS' else: # Start SMART short self-test test_length = dev['smartctl'].get( 'ata_smart_data', {}).get( 'self_test', {}).get( 'polling_minutes', {}).get( 'short', 5) test_length = int(test_length) + 5 TESTS['NVMe/SMART']['Status'][name] = 'Working' update_progress() print_standard('Running SMART short self-test(s):') print_standard( ' /dev/{:8}({} minutes)... '.format(name, test_length), end='', flush=True) run_program( 'sudo smartctl -t short /dev/{}'.format(name).split(), check=False) # Wait and show progress (in 10 second increments) for iteration in range(int(test_length*60/10)): # Update SMART data dev['smartctl'] = get_smart_details(name) # Check if test is complete if iteration >= 6: done = dev['smartctl'].get( 'ata_smart_data', {}).get( 'self_test', {}).get( 'status', {}).get( 'passed', False) if done: break # Update progress_file with open(progress_file, 'w') as f: f.write('SMART self-test status:\n {}'.format( dev['smartctl'].get( 'ata_smart_data', {}).get( 'self_test', {}).get( 'status', {}).get( 'string', 'unknown'))) sleep(10) os.remove(progress_file) # Check result test_passed = dev['smartctl'].get( 'ata_smart_data', {}).get( 'self_test', {}).get( 'status', {}).get( 'passed', False) if test_passed: TESTS['NVMe/SMART']['Status'][name] = 'CS' else: TESTS['NVMe/SMART']['Status'][name] = 'NS' update_progress() print_standard('Done', timestamp=False) # Done run_program('tmux kill-pane -a'.split(), check=False) def run_tests(tests, ticket_number=None): """Run selected hardware test(s).""" clear_screen() print_standard('Starting Hardware Diagnostics') print_standard(' ') print_standard('Running tests: {}'.format(', '.join(tests))) # Enable selected tests for t in ['Prime95', 'NVMe/SMART', 'badblocks', 'iobenchmark']: TESTS[t]['Enabled'] = t in tests TESTS['NVMe/SMART']['Quick'] = 'Quick' in tests # Initialize if TESTS['NVMe/SMART']['Enabled'] or TESTS['badblocks']['Enabled'] or TESTS['iobenchmark']['Enabled']: print_standard(' ') scan_disks() update_progress() # Run mprime_aborted = False if TESTS['Prime95']['Enabled']: try: run_mprime(ticket_number) except GenericError: mprime_aborted = True if not mprime_aborted: if TESTS['NVMe/SMART']['Enabled']: run_nvme_smart(ticket_number) if TESTS['badblocks']['Enabled']: run_badblocks(ticket_number) if TESTS['iobenchmark']['Enabled']: run_iobenchmark(ticket_number) # Show results post_drive_results(ticket_number) show_results() # Open log if not TESTS['NVMe/SMART']['Quick']: try: popen_program(['nohup', 'leafpad', global_vars['LogFile']], pipe=True) except Exception: print_error('ERROR: Failed to open log: {}'.format( global_vars['LogFile'])) pause('Press Enter to exit...') def scan_disks(full_paths=False, only_path=None): """Scan for disks eligible for hardware testing.""" # Get eligible disk list cmd = ['lsblk', '-J', '-O'] if full_paths: cmd.append('-p') if only_path: cmd.append(only_path) result = run_program(cmd) json_data = json.loads(result.stdout.decode()) devs = {} for d in json_data.get('blockdevices', []): if d['type'] == 'disk': if d['hotplug'] == '0': devs[d['name']] = {'lsblk': d} TESTS['NVMe/SMART']['Status'][d['name']] = 'Pending' TESTS['badblocks']['Status'][d['name']] = 'Pending' TESTS['iobenchmark']['Status'][d['name']] = 'Pending' else: # Skip WizardKit devices wk_label = '{}_LINUX'.format(KIT_NAME_SHORT) if wk_label not in [c.get('label', '') for c in d.get('children', [])]: devs[d['name']] = {'lsblk': d} TESTS['NVMe/SMART']['Status'][d['name']] = 'Pending' TESTS['badblocks']['Status'][d['name']] = 'Pending' TESTS['iobenchmark']['Status'][d['name']] = 'Pending' for dev, data in devs.items(): # Get SMART attributes run_program( cmd = 'sudo smartctl -s on {}{}'.format( '' if full_paths else '/dev/', dev).split(), check = False) data['smartctl'] = get_smart_details(dev) # Get NVMe attributes if data['lsblk']['tran'] == 'nvme': cmd = 'sudo nvme smart-log /dev/{} -o json'.format(dev).split() cmd = 'sudo nvme smart-log {}{} -o json'.format( '' if full_paths else '/dev/', dev).split() result = run_program(cmd, check=False) try: data['nvme-cli'] = json.loads(result.stdout.decode()) except Exception: # Let other sections deal with the missing data data['nvme-cli'] = {} data['NVMe Disk'] = True # Set "Quick Health OK" value ## NOTE: If False then require override for badblocks test wanted_smart_list = [ 'ata_smart_attributes', 'ata_smart_data', 'smart_status', ] if data.get('NVMe Disk', False): crit_warn = data['nvme-cli'].get('critical_warning', 1) data['Quick Health OK'] = True if crit_warn == 0 else False elif set(wanted_smart_list).issubset(data['smartctl'].keys()): data['SMART Pass'] = data['smartctl'].get('smart_status', {}).get( 'passed', False) data['Quick Health OK'] = data['SMART Pass'] data['SMART Support'] = True else: data['Quick Health OK'] = False data['SMART Support'] = False # Ask for manual overrides if necessary if TESTS['badblocks']['Enabled'] or TESTS['iobenchmark']['Enabled']: show_disk_details(data) needs_override = False if not data['Quick Health OK']: needs_override = True print_warning( "WARNING: Health can't be confirmed for: /dev/{}".format(dev)) if get_smart_value(data['smartctl'], '199'): # SMART attribute present and it's value is non-zero needs_override = True print_warning( 'WARNING: SMART 199/C7 error detected on /dev/{}'.format(dev)) print_standard(' (Have you tried swapping the drive cable?)') if needs_override: dev_name = data['lsblk']['name'] print_standard(' ') if ask('Run tests on this device anyway?'): TESTS['NVMe/SMART']['Status'][dev_name] = 'OVERRIDE' else: TESTS['NVMe/SMART']['Status'][dev_name] = 'Skipped' TESTS['badblocks']['Status'][dev_name] = 'Denied' TESTS['iobenchmark']['Status'][dev_name] = 'Denied' print_standard(' ') # In case there's more than one "OVERRIDE" disk TESTS['NVMe/SMART']['Devices'] = devs TESTS['badblocks']['Devices'] = devs TESTS['iobenchmark']['Devices'] = devs return devs def show_disk_details(dev, only_attributes=False): """Display disk details.""" dev_name = dev['lsblk']['name'] if not only_attributes: # Device description print_info('Device: {}{}'.format( '' if '/dev/' in dev['lsblk']['name'] else '/dev/', dev['lsblk']['name'])) print_standard(' {:>4} ({}) {} {}'.format( str(dev['lsblk'].get('size', '???b')).strip(), str(dev['lsblk'].get('tran', '???')).strip().upper().replace( 'NVME', 'NVMe'), str(dev['lsblk'].get('model', 'Unknown Model')).strip(), str(dev['lsblk'].get('serial', 'Unknown Serial')).strip(), )) # Warnings if dev.get('NVMe Disk', False): if dev['Quick Health OK']: print_warning('WARNING: NVMe support is still experimental') else: print_error('ERROR: NVMe disk is reporting critical warnings') elif not dev['SMART Support']: print_error('ERROR: Unable to retrieve SMART data') elif not dev['SMART Pass']: print_error('ERROR: SMART overall-health assessment result: FAILED') # Attributes if dev.get('NVMe Disk', False): if only_attributes: print_info('SMART Attributes:', end='') print_warning(' Updated: {}'.format( time.strftime('%Y-%m-%d %H:%M %Z'))) else: print_info('Attributes:') for attrib, threshold in sorted(ATTRIBUTES['NVMe'].items()): if attrib in dev['nvme-cli']: print_standard( ' {:37}'.format(attrib.replace('_', ' ').title()), end='', flush=True) raw_num = dev['nvme-cli'][attrib] raw_str = str(raw_num) if (threshold.get('Error', False) and raw_num >= threshold.get('Error', -1)): print_error(raw_str, timestamp=False) if not threshold.get('Ignore', False): dev['Quick Health OK'] = False TESTS['NVMe/SMART']['Status'][dev_name] = 'NS' elif (threshold.get('Warning', False) and raw_num >= threshold.get('Warning', -1)): print_warning(raw_str, timestamp=False) else: print_success(raw_str, timestamp=False) elif dev['smartctl'].get('ata_smart_attributes', None): # SMART attributes if only_attributes: print_info('SMART Attributes:', end='') print_warning(' Updated: {}'.format( time.strftime('%Y-%m-%d %H:%M %Z'))) else: print_info('Attributes:') s_table = dev['smartctl'].get('ata_smart_attributes', {}).get( 'table', {}) s_table = {a.get('id', 'Unknown'): a for a in s_table} for attrib, threshold in sorted(ATTRIBUTES['SMART'].items()): if attrib in s_table: print_standard( ' {:>3} {:32}'.format( attrib, s_table[attrib]['name']).replace('_', ' ').title(), end='', flush=True) raw_str = s_table[attrib]['raw']['string'] raw_num = re.sub(r'^(\d+).*$', r'\1', raw_str) try: raw_num = float(raw_num) except ValueError: # Not sure about this one, print raw_str without color? print_standard(raw_str, timestamp=False) continue if (threshold.get('Error', False) and raw_num >= threshold.get('Error', -1)): print_error(raw_str, timestamp=False) if not threshold.get('Ignore', False): dev['Quick Health OK'] = False TESTS['NVMe/SMART']['Status'][dev_name] = 'NS' elif (threshold.get('Warning', False) and raw_num >= threshold.get('Warning', -1)): print_warning(raw_str, timestamp=False) else: print_success(raw_str, timestamp=False) def show_results(): """Show results for selected test(s).""" clear_screen() print_log('\n───────────────────────────') print_standard('Hardware Diagnostic Results') update_progress() # Set Window layout and show progress run_program('tmux split-window -dhl 15 watch -c -n1 -t cat {}'.format( TESTS['Progress Out']).split()) # Prime95 if TESTS['Prime95']['Enabled']: print_success('\nPrime95:') for log, regex in [ ['results.txt', r'(error|fail)'], ['prime.log', r'completed.*0 errors, 0 warnings']]: if log in TESTS['Prime95']: print_info('Log: {}'.format(log)) lines = [line.strip() for line in TESTS['Prime95'][log].splitlines() if re.search(regex, line, re.IGNORECASE)] for line in lines[-4:]: line = re.sub(r'^.*Worker #\d.*Torture Test (.*)', r'\1', line, re.IGNORECASE) if TESTS['Prime95'].get('NS', False): print_error(' {}'.format(line)) else: print_standard(' {}'.format(line)) print_info('Final temps') print_log(' See Final Temps.log') with open('{}/Final Temps.out'.format(global_vars['LogDir']), 'r') as f: for line in f.readlines(): if re.search(r'^\s*$', line.strip()): # Stop after coretemps (which should be first) break print(' {}'.format(line.strip())) print_standard(' ') # NVMe/SMART / badblocks / iobenchmark if TESTS['NVMe/SMART']['Enabled'] or TESTS['badblocks']['Enabled'] or TESTS['iobenchmark']['Enabled']: print_success('Disks:') for name, dev in sorted(TESTS['NVMe/SMART']['Devices'].items()): show_disk_details(dev) bb_status = TESTS['badblocks']['Status'].get(name, None) if (TESTS['badblocks']['Enabled'] and bb_status not in ['Denied', 'OVERRIDE', 'Skipped']): print_info('badblocks:') result = TESTS['badblocks']['Results'].get(name, '') for line in result.splitlines(): if re.search(r'Pass completed', line, re.IGNORECASE): line = re.sub( r'Pass completed,?\s+', r'', line.strip(), re.IGNORECASE) if TESTS['badblocks']['Status'][name] == 'CS': print_standard(' {}'.format(line)) else: print_error(' {}'.format(line)) io_status = TESTS['iobenchmark']['Status'].get(name, None) if (TESTS['iobenchmark']['Enabled'] and io_status not in ['Denied', 'OVERRIDE', 'Skipped']): print_info('Benchmark:') result = TESTS['iobenchmark']['Results'].get(name, '') for line in result.split('\n'): print_standard(' {}'.format(line)) print_standard(' ') # Done pause('Press Enter to return to main menu... ') run_program('tmux kill-pane -a'.split()) def update_io_progress(percent, rate, progress_file): """Update I/O progress file.""" bar_color = COLORS['CLEAR'] rate_color = COLORS['CLEAR'] step = get_graph_step(rate, scale=32) if rate < IO_VARS['Threshold Fail']: bar_color = COLORS['RED'] rate_color = COLORS['YELLOW'] elif rate < IO_VARS['Threshold Warn']: bar_color = COLORS['YELLOW'] rate_color = COLORS['YELLOW'] elif rate > IO_VARS['Threshold Great']: bar_color = COLORS['GREEN'] rate_color = COLORS['GREEN'] line = ' {p:5.1f}% {b_color}{b:<4} {r_color}{r:6.1f} Mb/s{c}\n'.format( p=percent, b_color=bar_color, b=IO_VARS['Graph Vertical'][step], r_color=rate_color, r=rate/(1024**2), c=COLORS['CLEAR']) with open(progress_file, 'a') as f: f.write(line) def update_progress(): """Update progress file.""" if 'Progress Out' not in TESTS: TESTS['Progress Out'] = '{}/progress.out'.format(global_vars['LogDir']) output = [] output.append('{BLUE}HW Diagnostics{CLEAR}'.format(**COLORS)) output.append('───────────────') if TESTS['Prime95']['Enabled']: output.append(' ') output.append('{BLUE}Prime95{s_color}{status:>8}{CLEAR}'.format( s_color = get_status_color(TESTS['Prime95']['Status']), status = TESTS['Prime95']['Status'], **COLORS)) if TESTS['NVMe/SMART']['Enabled']: output.append(' ') output.append('{BLUE}NVMe / SMART{CLEAR}'.format(**COLORS)) if TESTS['NVMe/SMART']['Quick']: output.append('{YELLOW} (Quick Check){CLEAR}'.format(**COLORS)) for dev, status in sorted(TESTS['NVMe/SMART']['Status'].items()): output.append('{dev}{s_color}{status:>{pad}}{CLEAR}'.format( dev = dev, pad = 15-len(dev), s_color = get_status_color(status), status = status, **COLORS)) if TESTS['badblocks']['Enabled']: output.append(' ') output.append('{BLUE}badblocks{CLEAR}'.format(**COLORS)) for dev, status in sorted(TESTS['badblocks']['Status'].items()): output.append('{dev}{s_color}{status:>{pad}}{CLEAR}'.format( dev = dev, pad = 15-len(dev), s_color = get_status_color(status), status = status, **COLORS)) if TESTS['iobenchmark']['Enabled']: output.append(' ') output.append('{BLUE}I/O Benchmark{CLEAR}'.format(**COLORS)) for dev, status in sorted(TESTS['iobenchmark']['Status'].items()): output.append('{dev}{s_color}{status:>{pad}}{CLEAR}'.format( dev = dev, pad = 15-len(dev), s_color = get_status_color(status), status = status, **COLORS)) # Add line-endings output = ['{}\n'.format(line) for line in output] with open(TESTS['Progress Out'], 'w') as f: f.writelines(output) def upload_to_imgur(image_path): """Upload image to Imgur and return image url as str.""" image_data = None image_link = None # Bail early if not image_path: raise GenericError # Read image file and convert to base64 then convert to str with open(image_path, 'rb') as f: image_data = base64.b64encode(f.read()).decode() # POST image url = "https://api.imgur.com/3/image" boundary = '----WebKitFormBoundary7MA4YWxkTrZu0gW' payload = ('--{boundary}\r\nContent-Disposition: form-data; ' 'name="image"\r\n\r\n{data}\r\n--{boundary}--') payload = payload.format(boundary=boundary, data=image_data) headers = { 'content-type': 'multipart/form-data; boundary={}'.format(boundary), 'Authorization': 'Client-ID {}'.format(IMGUR_CLIENT_ID), } response = requests.request("POST", url, data=payload, headers=headers) # Return image link if response.ok: json_data = json.loads(response.text) image_link = json_data['data']['link'] return image_link def upload_to_nextcloud(image_path, ticket_number, dev_name): """Upload image to Nextcloud server and return folder url as str.""" image_data = None # Bail early if not image_path: raise GenericError # Read image file and convert to base64 with open(image_path, 'rb') as f: image_data = f.read() # PUT image url = '{base_url}/{ticket_number}_iobenchmark_{dev_name}_{date}.png'.format( base_url=BENCHMARK_SERVER['Url'], ticket_number=ticket_number, dev_name=dev_name, date=global_vars.get('Date-Time', 'Unknown Date-Time')) requests.put( url, data=image_data, headers = {'X-Requested-With': 'XMLHttpRequest'}, auth = (BENCHMARK_SERVER['User'], BENCHMARK_SERVER['Pass'])) # Return folder link return BENCHMARK_SERVER['Short Url'] if __name__ == '__main__': print("This file is not meant to be called directly.") # vim: sts=4 sw=4 ts=4