Add disk volume utilization test

Addresses issue #19
This commit is contained in:
2Shirt 2022-05-21 19:43:47 -07:00
parent 7ffbcc83fa
commit 8c67830345
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C
5 changed files with 172 additions and 48 deletions

View file

@ -14,3 +14,4 @@ from . import smart
from . import surface_scan
from . import system
from . import test
from . import volumes

View file

@ -20,6 +20,7 @@ from wk.hw import sensors as hw_sensors
from wk.hw import smart as hw_smart
from wk.hw import surface_scan as hw_surface_scan
from wk.hw import system as hw_system
from wk.hw import volumes as hw_volumes
from wk.hw.audio import audio_test
from wk.hw.keyboard import keyboard_test
from wk.hw.network import network_test
@ -59,6 +60,7 @@ TEST_GROUPS = {
'Disk Self-Test': 'disk_self_test',
'Disk Surface Scan': 'disk_surface_scan',
'Disk I/O Benchmark': 'disk_io_benchmark',
'Disk Utilization': 'disk_volume_utilization',
}
MENU_ACTIONS = (
'Audio Test',
@ -79,6 +81,7 @@ MENU_SETS = {
'Disk Self-Test',
'Disk Surface Scan',
'Disk I/O Benchmark',
'Disk Utilization',
),
'Disk Diagnostic (Quick)': ('Disk Attributes',),
}
@ -800,6 +803,17 @@ def disk_surface_scan(state, test_objects, test_mode=False) -> None:
raise std.GenericAbort('Aborted')
def disk_volume_utilization(state, test_objects, test_mode=False) -> None:
# pylint: disable=unused-argument
"""Check disk for full volumes."""
LOG.info('Disk Utilization')
for test in test_objects:
hw_volumes.check_volume_utilization(test)
# Done
state.update_progress_pane()
def main() -> None:
# pylint: disable=too-many-branches
"""Main function for hardware diagnostics."""
@ -967,6 +981,12 @@ def run_diags(state, menu, quick_mode=False, test_mode=False) -> None:
# Post disk results
hw_osticket.post_disk_results(state, NUM_DISK_TESTS)
# Drop Disk Utilization reports (only needed for OST)
for test_group in state.test_groups:
if test_group.name == 'Disk Utilization':
for test in test_group.test_objects:
test.report = []
# Show results
show_results(state)

View file

@ -5,13 +5,11 @@ import logging
import re
from wk import std, osticket
from wk import os as wk_os
from wk.cfg.hw import (
REGEX_BLOCK_GRAPH,
REGEX_SMART_ATTRIBUTES,
)
from wk.hw import smart as hw_smart
from wk.std import PLATFORM
# STATIC VARIABLES
@ -89,11 +87,6 @@ def build_report(dev, dev_type, num_disk_tests=None):
# Spacer
report.append('')
# Volume report
if dev_type == 'Disk' and len(dev.tests) == num_disk_tests:
report.append('Volumes:')
report.extend(generate_volume_report(dev))
# Remove last line if empty
if not report[-1].strip():
report.pop(-1)
@ -148,46 +141,6 @@ def convert_report(original_report, start_index):
return report
def generate_volume_report(dev):
"""Generate volume report for dev, returns list."""
report = []
vol_report = None
# OS Check
if PLATFORM == 'Darwin':
vol_report = wk_os.mac.mount_disk(device_path=dev.path)
elif PLATFORM == 'Linux':
vol_report = wk_os.linux.mount_volumes(
device_path=dev.path,
read_write=False,
scan_corestorage=not any(t.failed for t in dev.tests),
)
else:
# Volume report unavailable
return report
# Convert mount_volume report
for line in vol_report:
line = std.strip_colors(line)
match = REGEX_VOLUME.match(line)
if match:
if match.group('result') == 'Mounted on':
report.append(
f'... {match.group("dev")}'
f'... Mounted on {match.group("path")}'
f'... ({match.group("details")})'
)
else:
# Assuming either failed to mount or info line about a skipped dev
report.append(f'... {match.group("dev")}... {match.group("result")}')
else:
# Unknown result, just print the whole line
report.append(f'... {line}')
# Done
return report
def post_disk_results(state, num_disk_tests):
"""Post disk test results for all disks."""
disk_tests = []

150
scripts/wk/hw/volumes.py Normal file
View file

@ -0,0 +1,150 @@
"""WizardKit: Volume functions"""
# vim: sts=2 sw=2 ts=2
import logging
from wk import os as wk_os
from wk.cfg.hw import VOLUME_FAILURE_THRESHOLD
from wk.std import PLATFORM, bytes_to_string
# STATIC VARIABLES
LOG = logging.getLogger(__name__)
DEV_INFO = '{dev}... {size}... {filesystem}'
MOUNT_INFO = '... Mounted on {mountpoint}... ({used} used, {free} free) {msg}'
# Functions
def add_child_dev_line(test_obj, child) -> None:
"""Add child device line to test report."""
dev_info = DEV_INFO.format(
dev=child['name'],
size=bytes_to_string(child['size']),
filesystem=child['fstype'],
)
vol_full = False
# Bail early
if not child['mountpoint']:
test_obj.report.append(dev_info)
return
# Get sizes
percent_used = -1
size = child['size']
try:
if PLATFORM == 'Darwin':
size = child['TotalSize']
free = child['FreeSpace']
used = size - free
elif PLATFORM == 'Linux':
free = int(child['fsavail'])
used = int(child['fsused'])
percent_used = (used / size) * 100
except TypeError:
# Bail early
test_obj.report.append(f'{dev_info}... Mounted on {child["mountpoint"]}')
return
# Check for failures
if percent_used >= VOLUME_FAILURE_THRESHOLD:
test_obj.failed = True
vol_full = True
# Mount / Size info
mount_size_info = MOUNT_INFO.format(
mountpoint=child['mountpoint'],
used=bytes_to_string(used),
free=bytes_to_string(free),
msg=f'[{percent_used:0.0f}% full]' if vol_full else '',
)
# Done
test_obj.report.append(f'{dev_info}{mount_size_info}')
def add_root_dev_line(test_obj) -> None:
"""Add root device line to test report."""
dev = test_obj.dev
dev_info= DEV_INFO.format(
dev=dev.name,
size=bytes_to_string(dev.size),
filesystem=dev.filesystem,
)
vol_full = False
# Get sizes
percent_used = 0
size = dev.size
try:
if PLATFORM == 'Darwin':
size = dev.raw_details['TotalSize']
free = dev.raw_details['FreeSpace']
used = size - free
elif PLATFORM == 'Linux':
free = int(dev.raw_details['fsavail'])
used = int(dev.raw_details['fsused'])
percent_used = (used / size) * 100
except TypeError:
# Bail early
test_obj.report.append(
f'{dev_info}... Mounted on {dev.raw_details["mountpoint"]}',
)
return
# Check for failures
if percent_used >= VOLUME_FAILURE_THRESHOLD:
test_obj.failed = True
vol_full = True
# Mount / Size info
mount_size_info = MOUNT_INFO.format(
mountpoint=dev.raw_details['mountpoint'],
used=bytes_to_string(used),
free=bytes_to_string(free),
msg=f'[{percent_used:0.0f}% full]' if vol_full else '',
)
# Done
test_obj.report.append(f'{dev_info}{mount_size_info}')
def check_volume_utilization(test_obj) -> None:
"""Check volume utilization using OS specific methods."""
dev = test_obj.dev
# Mount all volumes (read only if possible)
mount_all_volumes(test_obj.dev)
# Add root dev to report
add_root_dev_line(test_obj)
# Add children
dev.update_details(skip_children=False)
for child in dev.children:
add_child_dev_line(test_obj, child)
# Update test object
if test_obj.failed:
test_obj.dev.add_note('Full volume(s) detected', color='YELLOW')
test_obj.passed = False
test_obj.set_status('Failed')
else:
test_obj.passed = True
test_obj.set_status('Passed')
def mount_all_volumes(dev) -> None:
"""Mount all volumes for dev using."""
if PLATFORM == 'Darwin':
wk_os.mac.mount_disk(device_path=dev.path)
elif PLATFORM == 'Linux':
wk_os.linux.mount_volumes(
device_path=dev.path,
read_write=False,
scan_corestorage=not any(t.failed for t in dev.tests),
)
if __name__ == '__main__':
print("This file is not meant to be called directly.")

View file

@ -83,7 +83,7 @@ def get_core_storage_volumes(device_path):
for l_vg in plist_data['CoreStorageLogicalVolumeGroups']:
related = False
# Compare parent physical volumes againt device_path
# Compare parent physical volumes against device_path
for p_v in l_vg['CoreStoragePhysicalVolumes']:
uuid = p_v['CoreStorageUUID']
cmd = ['diskutil', 'coreStorage', 'info', '-plist', uuid]