"""WizardKit: Hardware diagnostics""" # pylint: disable=too-many-lines # vim: sts=2 sw=2 ts=2 import atexit import logging import os import pathlib import re import subprocess import time from docopt import docopt from wk import cfg, debug, exe, log, osticket, std, tmux from wk import os as wk_os from wk.cfg.hw import ( IO_SMALL_DISK, REGEX_BLOCK_GRAPH, REGEX_SMART_ATTRIBUTES, REGEX_VOLUME, STATUS_COLORS, ) from wk.hw import benchmark as hw_benchmark from wk.hw import cpu as hw_cpu from wk.hw import disk as hw_disk from wk.hw import sensors as hw_sensors from wk.hw import smart as hw_smart from wk.hw import surface_scan as hw_surface_scan from wk.hw import system as hw_system from wk.hw.audio import audio_test from wk.hw.keyboard import keyboard_test from wk.hw.network import network_test from wk.hw.screensavers import screensaver from wk.hw.test import Test, TestGroup # STATIC VARIABLES DOCSTRING = f'''{cfg.main.KIT_NAME_FULL}: Hardware Diagnostics Usage: hw-diags [options] hw-diags (-h | --help) Options: -c --cli Force CLI mode -h --help Show this page -q --quick Skip menu and perform a quick check -t --test-mode Run diags in test mode --ignore-smart-errors NOT RECOMMENDED! Only use if you have RTFM, know what you're doing, understand the risks, and accept responsibililty. ''' LOG = logging.getLogger(__name__) IO_SIZE_SKIP_NAME = ( 'Skip USB Benchmarks ' f'(< {std.bytes_to_string(IO_SMALL_DISK, use_binary=False)})' ) TEST_GROUPS = { # Also used to build the menu options ## NOTE: This needs to be above MENU_SETS 'CPU & Cooling': 'cpu_stress_tests', 'Disk Attributes': 'disk_attribute_check', 'Disk Self-Test': 'disk_self_test', 'Disk Surface Scan': 'disk_surface_scan', 'Disk I/O Benchmark': 'disk_io_benchmark', } MENU_ACTIONS = ( 'Audio Test', 'Keyboard Test', 'Network Test', 'Clock Sync', 'Start', 'Quit') MENU_ACTIONS_SECRET = ( 'Matrix', 'Tubes', ) MENU_OPTIONS_QUICK = ('Disk Attributes',) MENU_SETS = { 'Full Diagnostic': (*TEST_GROUPS,), 'Disk Diagnostic': ( 'Disk Attributes', 'Disk Self-Test', 'Disk Surface Scan', 'Disk I/O Benchmark', ), 'Disk Diagnostic (Quick)': ('Disk Attributes',), } MENU_TOGGLES = ( 'osTicket Integration', 'osTicket Tech Note', IO_SIZE_SKIP_NAME, ) NUM_DISK_TESTS = len([s for s in TEST_GROUPS if s.startswith('Disk')]) PLATFORM = std.PLATFORM # Classes class State(): # pylint: disable=too-many-instance-attributes """Object for tracking hardware diagnostic data.""" def __init__(self): self.cpu_max_temp = -1 self.disks = [] self.layout = cfg.hw.TMUX_LAYOUT.copy() self.log_dir = None self.ost = osticket.osTicket() self.panes = {} self.system = None self.test_groups = [] self.top_text = std.color_string('Hardware Diagnostics', 'GREEN') # Init tmux and start a background process to maintain layout self.init_tmux() exe.start_thread(self.fix_tmux_layout_loop) def abort_testing(self) -> None: """Set unfinished tests as aborted and cleanup tmux panes.""" for group in self.test_groups: for test in group.test_objects: if test.status in ('Pending', 'Working'): test.set_status('Aborted') # Cleanup tmux self.panes.pop('Current', None) for key, pane_ids in self.panes.copy().items(): if key in ('Top', 'Started', 'Progress'): continue if isinstance(pane_ids, str): tmux.kill_pane(self.panes.pop(key)) else: for _id in pane_ids: tmux.kill_pane(_id) self.panes.pop(key) def disk_safety_checks(self) -> None: """Check for mid-run SMART failures and failed test(s).""" for dev in self.disks: disk_smart_status_check(dev, mid_run=True) for test in dev.tests: if test.failed and 'Attributes' not in test.name: dev.disable_disk_tests() break def fix_tmux_layout(self, forced=True) -> None: """Fix tmux layout based on cfg.hw.TMUX_LAYOUT.""" try: tmux.fix_layout(self.panes, self.layout, forced=forced) except RuntimeError: # Assuming self.panes changed while running pass def fix_tmux_layout_loop(self) -> None: """Fix tmux layout on a loop. NOTE: This should be called as a thread. """ while True: self.fix_tmux_layout(forced=False) std.sleep(1) def init_diags(self, menu) -> None: """Initialize diagnostic pass.""" # Reset objects self.disks.clear() self.layout.clear() self.layout.update(cfg.hw.TMUX_LAYOUT) self.test_groups.clear() # osTicket self.top_text = std.color_string('Hardware Diagnostics', 'GREEN') self.ost.init() self.ost.disabled = not menu.toggles['osTicket Integration']['Selected'] # Set log self.log_dir = log.format_log_path() self.log_dir = pathlib.Path( f'{self.log_dir.parent}/' f'Hardware-Diagnostics_{time.strftime("%Y-%m-%d_%H%M%S%z")}/' ) log.update_log_path( dest_dir=self.log_dir, dest_name='main', keep_history=False, timestamp=False, ) std.clear_screen() std.print_info('Initializing...') # Progress Pane self.update_progress_pane() tmux.respawn_pane( pane_id=self.panes['Progress'], watch_file=f'{self.log_dir}/progress.out', ) # Add HW Objects self.system = hw_system.System() self.disks = hw_disk.get_disks(skip_kits=True) # Add test objects for name, details in menu.options.items(): if not details['Selected']: # Only add selected options continue if 'CPU' in name: # Create two Test objects which will both be used by cpu_stress_tests # NOTE: Prime95 should be added first self.system.tests.append( Test(dev=self.system, label='Prime95', name=name), ) self.system.tests.append( Test(dev=self.system, label='Cooling', name=name), ) self.test_groups.append( TestGroup( name=name, function=globals()[TEST_GROUPS[name]], test_objects=self.system.tests, ), ) if 'Disk' in name: test_group = TestGroup( name=name, function=globals()[TEST_GROUPS[name]], ) for disk in self.disks: test_obj = Test(dev=disk, label=disk.path.name, name=name) disk.tests.append(test_obj) test_group.test_objects.append(test_obj) self.test_groups.append(test_group) def init_tmux(self) -> None: """Initialize tmux layout.""" tmux.kill_all_panes() # Top self.panes['Top'] = tmux.split_window( behind=True, lines=2, vertical=True, text=f'{self.top_text}\nMain Menu', ) # Started self.panes['Started'] = tmux.split_window( lines=cfg.hw.TMUX_SIDE_WIDTH, target_id=self.panes['Top'], text=std.color_string( ['Started', time.strftime("%Y-%m-%d %H:%M %Z")], ['BLUE', None], sep='\n', ), ) # Progress self.panes['Progress'] = tmux.split_window( lines=cfg.hw.TMUX_SIDE_WIDTH, text=' ', ) def save_debug_reports(self) -> None: """Save debug reports to disk.""" LOG.info('Saving debug reports') debug_dir = pathlib.Path(f'{self.log_dir}/debug') if not debug_dir.exists(): debug_dir.mkdir() # State (self) std.save_pickles({'state': self}, debug_dir) with open(f'{debug_dir}/state.report', 'a', encoding='utf-8') as _f: _f.write('\n'.join(debug.generate_object_report(self))) # Disks for disk in self.disks: with open( f'{debug_dir}/disk_{disk.path.name}.report', 'a', encoding='utf-8') as _f: _f.write('\n'.join(debug.generate_object_report(disk))) _f.write('\n\n[Tests]') for test in disk.tests: _f.write(f'\n{test.name}:\n') _f.write('\n'.join(debug.generate_object_report(test, indent=1))) cmd = [( f'sudo gpt -r show "{disk.path}"' f' >> {debug_dir}/gpt_{disk.path.name}.info' )] exe.run_program(cmd, check=False, shell=True) # osTicket with open(f'{debug_dir}/osTicket.report', 'a', encoding='utf-8') as _f: _f.write('\n'.join(debug.generate_object_report(self.ost))) # SMC if os.path.exists('/.wk-live-macos'): data = [] try: proc = exe.run_program(['smc', '-f']) data.extend(proc.stdout.splitlines()) data.append('----') proc = exe.run_program(['smc', '-l']) data.extend(proc.stdout.splitlines()) except Exception: # pylint: disable=broad-except LOG.ERROR('Error(s) encountered while exporting SMC data') data = [line.strip() for line in data] with open(f'{debug_dir}/smc.data', 'a', encoding='utf-8') as _f: _f.write('\n'.join(data)) # System with open(f'{debug_dir}/system.report', 'a', encoding='utf-8') as _f: _f.write('\n'.join(debug.generate_object_report(self.system))) _f.write('\n\n[Tests]') for test in self.system.tests: _f.write(f'\n{test.name}:\n') _f.write('\n'.join(debug.generate_object_report(test, indent=1))) def update_clock(self) -> None: """Update 'Started' pane following clock sync.""" tmux.respawn_pane( pane_id=self.panes['Started'], text=std.color_string( ['Started', time.strftime("%Y-%m-%d %H:%M %Z")], ['BLUE', None], sep='\n', ), ) def update_progress_pane(self) -> None: """Update progress pane.""" report = [] width = cfg.hw.TMUX_SIDE_WIDTH for group in self.test_groups: report.append(std.color_string(group.name, 'BLUE')) for test in group.test_objects: report.append(std.color_string( [test.label, f'{test.status:>{width-len(test.label)}}'], [None, STATUS_COLORS.get(test.status, None)], sep='', )) # Add spacer report.append(' ') # Write to progress file out_path = pathlib.Path(f'{self.log_dir}/progress.out') with open(out_path, 'w', encoding='utf-8') as _f: _f.write('\n'.join(report)) def update_top_pane(self, text) -> None: """Update top pane with text.""" tmux.respawn_pane(self.panes['Top'], text=f'{self.top_text}\n{text}') # Functions def build_menu(cli_mode=False, quick_mode=False) -> std.Menu: # pylint: disable=too-many-branches """Build main menu, returns wk.std.Menu.""" menu = std.Menu(title=None) # Add actions, options, etc for action in MENU_ACTIONS: menu.add_action(action) for action in MENU_ACTIONS_SECRET: menu.add_action(action, {'Hidden': True}) for option in TEST_GROUPS: menu.add_option(option, {'Selected': True}) for toggle in MENU_TOGGLES: menu.add_toggle(toggle, {'Selected': True}) for name, targets in MENU_SETS.items(): menu.add_set(name, {'Targets': targets}) menu.actions['Start']['Separator'] = True # osTicket menu.toggles['osTicket Tech Note']['Selected'] = False # Update default selections for quick mode if necessary if quick_mode: for name in menu.options: # Only select quick option(s) menu.options[name]['Selected'] = name in MENU_OPTIONS_QUICK # Skip CPU tests for TestStations if os.path.exists(cfg.hw.TESTSTATION_FILE): menu.options['CPU & Cooling']['Selected'] = False # Add CLI actions if necessary if cli_mode or 'DISPLAY' not in os.environ: menu.add_action('Reboot') menu.add_action('Power Off') # Compatibility checks if PLATFORM != 'Linux': for name in ('Audio Test', 'Keyboard Test'): menu.actions[name]['Disabled'] = True if PLATFORM not in ('Darwin', 'Linux'): for name in ('Matrix', 'Network Test', 'Tubes'): menu.actions[name]['Disabled'] = True # Live macOS actions if os.path.exists('/.wk-live-macos'): menu.actions['Clock Sync']['Separator'] = True else: menu.actions['Clock Sync']['Disabled'] = True menu.actions['Clock Sync']['Hidden'] = True # Done return menu def cpu_stress_tests(state, test_objects, test_mode=False) -> None: # pylint: disable=too-many-statements """CPU & cooling check using Prime95 and Sysbench.""" LOG.info('CPU Test (Prime95)') aborted = False prime_log = pathlib.Path(f'{state.log_dir}/prime.log') run_sysbench = False sensors_out = pathlib.Path(f'{state.log_dir}/sensors.out') test_minutes = cfg.hw.CPU_TEST_MINUTES if test_mode: test_minutes = cfg.hw.TEST_MODE_CPU_LIMIT test_mprime_obj, test_cooling_obj = test_objects # Bail early if test_cooling_obj.disabled or test_mprime_obj.disabled: return # Prep state.update_top_pane(test_mprime_obj.dev.cpu_description) test_cooling_obj.set_status('Working') test_mprime_obj.set_status('Working') # Start sensors monitor sensors = hw_sensors.Sensors() sensors.start_background_monitor( sensors_out, thermal_action=('killall', 'mprime', '-INT'), ) # Create monitor and worker panes state.update_progress_pane() state.panes['Prime95'] = tmux.split_window( lines=10, vertical=True, watch_file=prime_log, watch_cmd='tail') if PLATFORM == 'Darwin': state.panes['Temps'] = tmux.split_window( behind=True, percent=80, vertical=True, cmd='./hw-sensors') elif PLATFORM == 'Linux': state.panes['Temps'] = tmux.split_window( behind=True, percent=80, vertical=True, watch_file=sensors_out) tmux.resize_pane(height=3) state.panes['Current'] = '' state.layout['Current'] = {'height': 3, 'Check': True} # Get idle temps std.print_standard('Saving idle temps...') sensors.save_average_temps(temp_label='Idle', seconds=5) # Stress CPU std.print_info('Running stress test') hw_cpu.set_apple_fan_speed('max') proc_mprime = hw_cpu.start_mprime(state.log_dir, prime_log) # Show countdown print('') try: print_countdown(proc=proc_mprime, seconds=test_minutes*60) except KeyboardInterrupt: aborted = True # Stop Prime95 hw_cpu.stop_mprime(proc_mprime) # Update progress if necessary if sensors.cpu_reached_critical_temp() or aborted: test_cooling_obj.set_status('Aborted') test_mprime_obj.set_status('Aborted') state.update_progress_pane() # Get cooldown temp std.clear_screen() std.print_standard('Letting CPU cooldown...') std.sleep(5) std.print_standard('Saving cooldown temps...') sensors.save_average_temps(temp_label='Cooldown', seconds=5) # Check Prime95 results test_mprime_obj.report.append(std.color_string('Prime95', 'BLUE')) hw_cpu.check_mprime_results( test_obj=test_mprime_obj, working_dir=state.log_dir, ) # Run Sysbench test if necessary run_sysbench = ( not aborted and sensors.cpu_max_temp() >= cfg.hw.CPU_FAILURE_TEMP ) if run_sysbench: LOG.info('CPU Test (Sysbench)') std.print_standard('Letting CPU cooldown more...') std.sleep(30) std.clear_screen() std.print_info('Running alternate stress test') print('') proc_sysbench, filehandle_sysbench = hw_cpu.start_sysbench( sensors, sensors_out, log_path=prime_log.with_name('sysbench.log'), pane=state.panes['Prime95'], ) try: print_countdown(proc=proc_sysbench, seconds=test_minutes*60) except AttributeError: # Assuming the sysbench process wasn't found and proc was set to None LOG.error('Failed to find sysbench process', exc_info=True) except KeyboardInterrupt: aborted = True hw_cpu.stop_sysbench(proc_sysbench, filehandle_sysbench) # Update progress # NOTE: CPU critical temp check isn't really necessary # Hard to imagine it wasn't hit during Prime95 but was in sysbench if sensors.cpu_reached_critical_temp() or aborted: test_cooling_obj.set_status('Aborted') test_mprime_obj.set_status('Aborted') state.update_progress_pane() # Check Cooling results test_cooling_obj.report.append(std.color_string('Temps', 'BLUE')) hw_cpu.check_cooling_results(test_cooling_obj, sensors, run_sysbench) # Post results to osTicket if not state.ost.disabled: _failed = test_cooling_obj.failed or test_mprime_obj.failed std.print_info('Posting results to osTicket...') state.cpu_max_temp = sensors.cpu_max_temp() state.ost.post_response( ost_build_report(state.system, 'CPU'), color='Diags FAIL' if _failed else 'Diags', ) # Cleanup state.update_progress_pane() sensors.stop_background_monitor() state.panes.pop('Current', None) tmux.kill_pane(state.panes.pop('Prime95', None)) tmux.kill_pane(state.panes.pop('Temps', None)) # Done if aborted: raise std.GenericAbort('Aborted') def disk_attribute_check(state, test_objects, test_mode=False) -> None: # pylint: disable=unused-argument """Disk attribute check.""" LOG.info('Disk Attribute Check') for test in test_objects: disk_smart_status_check(test.dev, mid_run=False) if not test.dev.attributes: # No NVMe/SMART data test.set_status('N/A') continue # Done state.update_progress_pane() def disk_io_benchmark( state, test_objects, skip_usb=True, test_mode=False) -> None: """Disk I/O benchmark using dd.""" LOG.info('Disk I/O Benchmark (dd)') aborted = False # Run benchmarks state.update_top_pane( f'Disk I/O Benchmark{"s" if len(test_objects) > 1 else ""}', ) state.panes['I/O Benchmark'] = tmux.split_window( percent=50, vertical=True, text=' ', ) # Skip USB devices if requested for test in test_objects: if ( skip_usb and test.dev.bus == 'USB' and test.dev.size < cfg.hw.IO_SMALL_DISK ): test.set_status('Skipped') test.disabled = True continue # Start benchmark for test in test_objects: if test.disabled: continue # Start benchmark std.clear_screen() std.print_report(test.dev.generate_report()) test.set_status('Working') test_log = f'{state.log_dir}/{test.dev.path.name}_benchmark.out' tmux.respawn_pane( state.panes['I/O Benchmark'], watch_cmd='tail', watch_file=test_log, ) state.update_progress_pane() try: hw_benchmark.run_io_test(test, test_log, test_mode=test_mode) except KeyboardInterrupt: aborted = True except (subprocess.CalledProcessError, TypeError, ValueError) as err: # Something went wrong LOG.error('%s', err) test.set_status('ERROR') test.report.append(std.color_string(' Unknown Error', 'RED')) # Mark test(s) aborted if necessary if aborted: test.set_status('Aborted') test.report.append(std.color_string(' Aborted', 'YELLOW')) break # Update progress after each test state.update_progress_pane() # Cleanup state.update_progress_pane() tmux.kill_pane(state.panes.pop('I/O Benchmark', None)) # Done if aborted: raise std.GenericAbort('Aborted') def disk_self_test(state, test_objects, test_mode=False) -> None: # pylint: disable=unused-argument """Disk self-test if available.""" LOG.info('Disk Self-Test(s)') aborted = False threads = [] state.panes['SMART'] = [] # Run self-tests state.update_top_pane( f'Disk self-test{"s" if len(test_objects) > 1 else ""}', ) std.print_info(f'Starting self-test{"s" if len(test_objects) > 1 else ""}') for test in reversed(test_objects): if test.disabled: # Skip continue # Start thread test.set_status('Working') test_log = f'{state.log_dir}/{test.dev.path.name}_selftest.log' threads.append(exe.start_thread(hw_smart.run_self_test, args=(test, test_log))) # Show progress if threads[-1].is_alive(): state.panes['SMART'].append( tmux.split_window(lines=4, vertical=True, watch_file=test_log), ) # Wait for all tests to complete state.update_progress_pane() try: while True: if any(t.is_alive() for t in threads): std.sleep(1) else: break except KeyboardInterrupt: aborted = True for test in test_objects: hw_smart.abort_self_test(test.dev) std.sleep(0.5) hw_smart.build_self_test_report(test, aborted=True) # Cleanup state.update_progress_pane() for pane in state.panes['SMART']: tmux.kill_pane(pane) state.panes.pop('SMART', None) # Done if aborted: raise std.GenericAbort('Aborted') def disk_smart_status_check(dev, mid_run=True) -> None: """Check SMART status.""" msg = None color = None disable_tests = False # Check SMART status and attributes if not hw_smart.smart_status_ok(dev): msg = 'Critical SMART error detected' color = 'RED' disable_tests = True elif not hw_smart.check_attributes(dev, only_blocking=False): # Non-blocking errors msg = 'SMART attribute failure(s) detected' color = 'YELLOW' # Log errors if detected if msg and not dev.contains_note(msg): msg = f'{msg}{" during diagnostics" if mid_run else ""}' LOG.warning(msg) dev.add_note(msg, color) # Set Disk Attributes test result for test in dev.tests: if test.name == 'Disk Attributes': test.failed = bool(test.failed or msg) test.passed = not test.failed if test.failed: test.set_status('Failed') elif 'N/A' not in test.status: test.set_status('Passed') # Disable further testing if needed if disable_tests: dev.disable_disk_tests() def disk_surface_scan(state, test_objects, test_mode=False) -> None: """Read-only disk surface scan using badblocks.""" LOG.info('Disk Surface Scan (badblocks)') aborted = False threads = [] state.panes['badblocks'] = [] # Update panes state.update_top_pane( f'Disk Surface Scan{"s" if len(test_objects) > 1 else ""}', ) std.print_info( f'Starting disk surface scan{"s" if len(test_objects) > 1 else ""}', ) for disk in state.disks: failed_attributes = [ line for line in hw_smart.generate_attribute_report(disk) if 'failed' in line ] if failed_attributes: size_str = std.bytes_to_string(disk.size, use_binary=False) std.print_colored( ['[', disk.path.name, ' ', size_str, ']'], [None, 'BLUE', None, 'CYAN', None], sep='', ) std.print_report(failed_attributes) std.print_standard('') # Run surface scans for test in reversed([test for test in test_objects if not test.disabled]): # Start thread test_log = f'{state.log_dir}/{test.dev.path.name}_badblocks.log' threads.append(exe.start_thread( hw_surface_scan.run_scan, args=(test, test_log, test_mode), )) # Show progress if threads[-1].is_alive(): state.panes['badblocks'].append( tmux.split_window( lines=5, vertical=True, watch_cmd='tail', watch_file=test_log, ), ) # Wait for all tests to complete try: while True: if any(t.is_alive() for t in threads): state.update_progress_pane() std.sleep(5) else: break except KeyboardInterrupt: aborted = True std.sleep(0.5) # Handle aborts for test in test_objects: if not (test.disabled or test.passed or test.failed): test.set_status('Aborted') test.report.append(std.color_string(' Aborted', 'YELLOW')) # Cleanup state.update_progress_pane() for pane in state.panes['badblocks']: tmux.kill_pane(pane) state.panes.pop('badblocks', None) # Done if aborted: raise std.GenericAbort('Aborted') def main() -> None: # pylint: disable=too-many-branches """Main function for hardware diagnostics.""" args = docopt(DOCSTRING) log.update_log_path(dest_name='Hardware-Diagnostics', timestamp=True) # Safety check if 'TMUX' not in os.environ: LOG.error('tmux session not found') raise RuntimeError('tmux session not found') # Init atexit.register(tmux.kill_all_panes) menu = build_menu(cli_mode=args['--cli'], quick_mode=args['--quick']) state = State() state.override_all_smart_errors = args['--ignore-smart-errors'] # pylint: disable=attribute-defined-outside-init # Quick Mode if args['--quick']: menu.toggles['osTicket Integration']['Selected'] = False run_diags(state, menu, quick_mode=True, test_mode=args['--test-mode']) return # Show menu while True: action = None selection = menu.advanced_select() # Set action if 'Audio Test' in selection: action = audio_test elif 'Keyboard Test' in selection: action = keyboard_test elif 'Network Test' in selection: action = network_test elif 'Clock Sync' in selection: action = sync_clock # Run simple test if action: state.update_top_pane(selection[0]) try: action() except KeyboardInterrupt: std.print_warning('Aborted.') std.print_standard('') std.pause('Press Enter to return to main menu...') if 'Clock Sync' in selection: state.update_clock() # Secrets if 'Matrix' in selection: screensaver('matrix') elif 'Tubes' in selection: # Tubes ≈≈ Pipes? screensaver('pipes') # Quit if 'Reboot' in selection: cmd = ['/usr/local/bin/wk-power-command', 'reboot'] exe.run_program(cmd, check=False) elif 'Power Off' in selection: cmd = ['/usr/local/bin/wk-power-command', 'poweroff'] exe.run_program(cmd, check=False) elif 'Quit' in selection: break # Start diagnostics if 'Start' in selection: run_diags(state, menu, quick_mode=False, test_mode=args['--test-mode']) # Reset top pane state.update_top_pane('Main Menu') def ost_build_report(dev, dev_type): # pylint: disable=too-many-branches """Build report for posting to osTicket, returns str.""" report = [] # Combined result if dev_type == 'CPU' or len(dev.tests) == NUM_DISK_TESTS: # Build list of failed tests (if any) failed_tests = [t.name for t in dev.tests if t.failed] failed_tests = [name.replace('Disk ', '') for name in failed_tests] if len(failed_tests) > 2: failed_tests = f'{", ".join(failed_tests[:-1])}, & {failed_tests[-1]}' else: failed_tests = ' & '.join(failed_tests) # Get overall result result = 'UNKNOWN' if any(t.failed for t in dev.tests): result = 'FAILED' elif all(t.passed for t in dev.tests): result = 'PASSED' # Add to report report.append( f'{dev_type} hardware diagnostic tests: {result}' f'{" ("+failed_tests+")" if failed_tests else ""}' ) report.append('') # Description if hasattr(dev, 'cpu_description'): report.append(dev.cpu_description) else: report.append(dev.description) if hasattr(dev, 'ram_total'): if len(dev.ram_dimms) == 1 and 'justTotalRAM' in dev.ram_dimms[0]: report.append(f'{dev.ram_total} (Total - no DIMM info available)') else: report.append(f'{dev.ram_total} ({", ".join(dev.ram_dimms)})') if hasattr(dev, 'serial') and dev.serial: report.append(f'Serial Number: {dev.serial}') report.append('') # Notes if hasattr(dev, 'notes') and dev.notes: report.append('Notes') report.extend([f'... {note}' for note in dev.notes]) report.append('') # Tests for test in dev.tests: report.append(f'{test.name} ({test.status})') # Report if test.name == 'Disk Attributes' and dev.attributes: report.extend( ost_convert_report( hw_smart.generate_attribute_report(dev), start_index=0, ), ) else: report.extend(ost_convert_report(test.report, start_index=1)) # I/O graph upload report report.extend(getattr(test, 'upload_report', [])) # Spacer report.append('') # Volume report if dev_type == 'Disk' and len(dev.tests) == NUM_DISK_TESTS: report.append('Volumes:') report.extend(ost_generate_volume_report(dev)) # Remove last line if empty if not report[-1].strip(): report.pop(-1) # Done return std.strip_colors('\n'.join(report)) def ost_convert_report(original_report, start_index): """Convert report to an osTicket compatible type, returns list.""" report = [] # Convert report for line in original_report[start_index:]: # Remove colors and leading spaces line = std.strip_colors(line) line = re.sub(r'^\s+', '', line) # Disk I/O Benchmark if REGEX_BLOCK_GRAPH.search(line): line = REGEX_BLOCK_GRAPH.sub('', line) line = line.strip() # SMART attributes match = REGEX_SMART_ATTRIBUTES.search(line) if match: # Switch decimal and hex labels _dec = f'{match.group("decimal"):>3}' _dec = osticket.pad_with_dots(_dec) _hex = match.group('hex') _data = match.group('data') line = f'{_hex}/{_dec}: {_data}' line = line.replace('failed', 'FAILED') # Skip empty lines if not line.strip(): continue # Fix inner spacing for spacing in re.findall(r'\s\s+', line): new_padding = osticket.pad_with_dots(spacing) new_padding += ' ' line = line.replace(spacing, new_padding) # Indent line line = f'... {line}' # Add to (converted) report report.append(line) # Done return report def ost_generate_volume_report(dev): """Generate volume report for dev, returns list.""" report = [] vol_report = None # OS Check if PLATFORM == 'Darwin': vol_report = wk_os.mac.mount_disk(device_path=dev.path) elif PLATFORM == 'Linux': vol_report = wk_os.linux.mount_volumes( device_path=dev.path, read_write=False, scan_corestorage=not any(t.failed for t in dev.tests), ) else: # Volume report unavailable return report # Convert mount_volume report for line in vol_report: line = std.strip_colors(line) match = REGEX_VOLUME.match(line) if match: if match.group('result') == 'Mounted on': report.append( f'... {match.group("dev")}' f'... Mounted on {match.group("path")}' f'... ({match.group("details")})' ) else: # Assuming either failed to mount or info line about a skipped dev report.append(f'... {match.group("dev")}... {match.group("result")}') else: # Unknown result, just print the whole line report.append(f'... {line}') # Done return report def ost_post_disk_results(state): """Post disk test results for all disks.""" disk_tests = [] for group in state.test_groups: if group.name.startswith('Disk'): disk_tests.extend(group.test_objects) # Bail if no disk tests were run if not disk_tests or state.ost.disabled: return # Post disk results std.print_info('Posting results to osTicket...') for disk in state.disks: state.ost.post_response( ost_build_report(disk, 'Disk'), color='Diags FAIL' if any(t.failed for t in disk.tests) else 'Diags', ) def ost_update_checkboxes(state): # pylint: disable=too-many-branches """Update osTicket checkboxes after confirmation.""" cpu_tests = [] disk_tests = [] num_disk_tests_run = len(state.test_groups) # Build list of tests for group in state.test_groups: if group.name.startswith('CPU'): cpu_tests.extend(group.test_objects) num_disk_tests_run -= 1 elif group.name.startswith('Disk'): disk_tests.extend(group.test_objects) # Bail if osTicket integration disabled if state.ost.disabled: return # Bail if values not confirmed if not std.ask('Update osTicket checkboxes using the data above?'): return # CPU max temp and pass/fail if cpu_tests: state.ost.set_cpu_max_temp( state.cpu_max_temp, ) if any(t.failed for t in cpu_tests): state.ost.set_flag_failed('CPU') elif all(t.passed for t in cpu_tests): state.ost.set_flag_passed('CPU') # Check results for all disks if state.disks: all_disks_passed = True for disk in state.disks: if any(t.failed for t in disk.tests): # Mark failed disk in osTicket and stop checking results all_disks_passed = False state.ost.set_flag_failed('Disk') break if not all(t.passed for t in disk.tests): all_disks_passed = False break # All disks passed if all_disks_passed and num_disk_tests_run == NUM_DISK_TESTS: # Only mark as passed if a full disk diagnostic passed state.ost.set_flag_passed('Disk') def print_countdown(proc, seconds) -> None: """Print countdown to screen while proc is alive.""" seconds = int(seconds) for i in range(seconds): sec_left = (seconds - i) % 60 min_left = int((seconds - i) / 60) out_str = '\r ' if min_left: out_str += f'{min_left} minute{"s" if min_left != 1 else ""}, ' out_str += f'{sec_left} second{"s" if sec_left != 1 else ""}' out_str += ' remaining' print(f'{out_str:<42}', end='', flush=True) try: proc.wait(1) except subprocess.TimeoutExpired: # proc still going, continue pass if ((hasattr(proc, 'poll') and proc.poll() is not None) or (hasattr(proc, 'is_running') and not proc.is_running())): # proc exited, stop countdown break # Done print('') def run_diags(state, menu, quick_mode=False, test_mode=False) -> None: """Run selected diagnostics.""" aborted = False atexit.register(state.save_debug_reports) state.init_diags(menu) def _init_osticket(): """Dumb private function to avoid pylint error.""" if not state.ost.disabled: # Select Ticket state.ost.select_ticket() # Update top_text if state.ost.ticket_id: state.top_text += std.color_string( [f' #{state.ost.ticket_id}', state.ost.ticket_name], [None, 'CYAN'], ) # Add note if (state.ost.ticket_id and menu.toggles['osTicket Tech Note']['Selected']): state.ost.add_note() # Just return if no tests were selected if not state.test_groups: std.print_warning('No tests selected?') std.pause() return # osTicket _init_osticket() # Run tests for group in state.test_groups: # Run test(s) function = group.function args = [group.test_objects] if group.name == 'Disk I/O Benchmark': args.append(menu.toggles[IO_SIZE_SKIP_NAME]['Selected']) std.clear_screen() try: function(state, *args, test_mode=test_mode) except (KeyboardInterrupt, std.GenericAbort): aborted = True state.abort_testing() state.update_progress_pane() break else: # Run safety checks after disk tests if group.name.startswith('Disk'): state.disk_safety_checks() # Handle aborts if aborted: for group in state.test_groups: for test in group.test_objects: if test.status == 'Pending': test.set_status('Aborted') # Post disk results ost_post_disk_results(state) # Show results show_results(state) # Update checkboxes ost_update_checkboxes(state) # Done state.save_debug_reports() atexit.unregister(state.save_debug_reports) if quick_mode: std.pause('Press Enter to exit...') else: std.pause('Press Enter to return to main menu...') # osTicket state.top_text = std.color_string('Hardware Diagnostics', 'GREEN') def show_results(state) -> None: """Show test results by device.""" std.sleep(0.5) std.clear_screen() state.update_top_pane('Results') # CPU Tests cpu_tests_enabled = [ group.name for group in state.test_groups if 'CPU' in group.name ] if cpu_tests_enabled: std.print_success('CPU:') std.print_report(state.system.generate_report()) std.print_standard(' ') # Disk Tests disk_tests_enabled = [ group.name for group in state.test_groups if 'Disk' in group.name ] if disk_tests_enabled: std.print_success(f'Disk{"s" if len(state.disks) > 1 else ""}:') for disk in state.disks: std.print_report(disk.generate_report()) std.print_standard(' ') if not state.disks: std.print_warning('No devices') std.print_standard(' ') def sync_clock() -> None: """Sync clock under macOS using sntp.""" cmd = ['sudo', 'sntp', '-Ss', 'us.pool.ntp.org'] proc = exe.run_program(cmd, check=False) if proc.returncode: # Assuming we're running under an older version of macOS cmd[2] = '-s' exe.run_program(cmd, check=False) if __name__ == '__main__': print("This file is not meant to be called directly.")