130 lines
3.3 KiB
Python
130 lines
3.3 KiB
Python
"""WizardKit: Surface scan test functions"""
|
|
# vim: sts=2 sw=2 ts=2
|
|
|
|
import logging
|
|
|
|
from subprocess import STDOUT
|
|
|
|
from wk.cfg.hw import (
|
|
BADBLOCKS_EXTRA_LARGE_DISK,
|
|
BADBLOCKS_LARGE_DISK,
|
|
BADBLOCKS_REGEX,
|
|
BADBLOCKS_RESULTS_REGEX,
|
|
#BADBLOCKS_SKIP_REGEX,
|
|
TEST_MODE_BADBLOCKS_LIMIT,
|
|
)
|
|
from wk.exe import run_program
|
|
from wk.std import PLATFORM, bytes_to_string
|
|
from wk.ui import ansi
|
|
|
|
|
|
# STATIC VARIABLES
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
# Functions
|
|
def check_surface_scan_results(test_obj, log_path) -> None:
|
|
"""Check results and set test status."""
|
|
report = []
|
|
report_color = None
|
|
|
|
# Read result
|
|
with open(log_path, 'r', encoding='utf-8') as _f:
|
|
for line in _f.readlines():
|
|
line = ansi.strip_colors(line.strip())
|
|
if not line:
|
|
# Skip
|
|
continue
|
|
|
|
# Clean line by removing backspaces/etc
|
|
match = BADBLOCKS_RESULTS_REGEX.match(line)
|
|
if match:
|
|
if match.group(2) == ' done':
|
|
line = BADBLOCKS_RESULTS_REGEX.sub(r'\1done', line)
|
|
else:
|
|
line = BADBLOCKS_RESULTS_REGEX.sub(r'\1\2', line)
|
|
line = line.replace(r'\x08', '')
|
|
line = line.strip()
|
|
|
|
# Add to report
|
|
report.append(line)
|
|
match = BADBLOCKS_REGEX.search(line)
|
|
if match:
|
|
if all(s == '0' for s in match.groups()):
|
|
test_obj.passed = True
|
|
test_obj.set_status('Passed')
|
|
else:
|
|
test_obj.failed = True
|
|
test_obj.set_status('Failed')
|
|
|
|
# Set report color and save to test_obj
|
|
if test_obj.failed:
|
|
report_color = 'RED'
|
|
elif not test_obj.passed:
|
|
report_color = 'YELLOW'
|
|
for line in report:
|
|
test_obj.report.append(f' {ansi.color_string(line, report_color)}')
|
|
|
|
# Handle undefined result status
|
|
if not (test_obj.passed or test_obj.failed):
|
|
test_obj.set_status('Unknown')
|
|
|
|
|
|
def run_scan(test_obj, log_path, test_mode=False, max_errors=1) -> None:
|
|
"""Run surface scan and handle exceptions."""
|
|
block_size = '1024'
|
|
dev = test_obj.dev
|
|
dev_path = test_obj.dev.path
|
|
if PLATFORM == 'Darwin':
|
|
# Use "RAW" disks under macOS
|
|
dev_path = dev_path.with_name(f'r{dev_path.name}')
|
|
LOG.info('Using %s for better performance', dev_path)
|
|
test_obj.report.append(ansi.color_string('badblocks', 'BLUE'))
|
|
test_obj.set_status('Working')
|
|
|
|
# Increase block size if necessary
|
|
if dev.size >= BADBLOCKS_EXTRA_LARGE_DISK:
|
|
block_size = '8192'
|
|
if (dev.phy_sec == 4096
|
|
or dev.size >= BADBLOCKS_LARGE_DISK):
|
|
block_size = '4096'
|
|
|
|
# Max errors
|
|
if int(max_errors) <= 0:
|
|
max_errors = ''
|
|
else:
|
|
max_errors = f'-e{max_errors}'
|
|
|
|
# Start scan
|
|
cmd = ['sudo', 'badblocks', '-sv', '-b', block_size]
|
|
if max_errors:
|
|
cmd.append(max_errors)
|
|
cmd.append(dev_path)
|
|
if test_mode:
|
|
# Only test a limited scope instead of the whole device
|
|
cmd.append(TEST_MODE_BADBLOCKS_LIMIT)
|
|
|
|
with open(log_path, 'a', encoding='utf-8') as _f:
|
|
size_str = bytes_to_string(dev.size, use_binary=False)
|
|
_f.write(
|
|
ansi.color_string(
|
|
['[', dev.path.name, ' ', size_str, ']\n'],
|
|
[None, 'BLUE', None, 'CYAN', None],
|
|
sep='',
|
|
),
|
|
)
|
|
_f.flush()
|
|
run_program(
|
|
cmd,
|
|
check=False,
|
|
pipe=False,
|
|
stderr=STDOUT,
|
|
stdout=_f,
|
|
)
|
|
|
|
# Check results
|
|
check_surface_scan_results(test_obj, log_path)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
print("This file is not meant to be called directly.")
|