Merge remote-tracking branch 'upstream/dev' into dev
This commit is contained in:
commit
f0d0ea8164
6 changed files with 185 additions and 51 deletions
5
scripts/wk-debug
Executable file
5
scripts/wk-debug
Executable file
|
|
@ -0,0 +1,5 @@
|
||||||
|
#!/bin/bash
|
||||||
|
#
|
||||||
|
## WizardKit: Debug Launcher
|
||||||
|
|
||||||
|
python3 -i wk_debug.py
|
||||||
|
|
@ -148,6 +148,7 @@ class BlockPair():
|
||||||
'scrape': 'Pending',
|
'scrape': 'Pending',
|
||||||
})
|
})
|
||||||
self.view_map = 'DISPLAY' in os.environ or 'WAYLAND_DISPLAY' in os.environ
|
self.view_map = 'DISPLAY' in os.environ or 'WAYLAND_DISPLAY' in os.environ
|
||||||
|
self.view_proc = None
|
||||||
|
|
||||||
# Set map path
|
# Set map path
|
||||||
# e.g. '(Clone|Image)_Model[_p#]_Size[_Label].map'
|
# e.g. '(Clone|Image)_Model[_p#]_Size[_Label].map'
|
||||||
|
|
@ -2229,8 +2230,8 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True):
|
||||||
|
|
||||||
# Start ddrescue and ddrescueview (if enabled)
|
# Start ddrescue and ddrescueview (if enabled)
|
||||||
proc = exe.popen_program(cmd)
|
proc = exe.popen_program(cmd)
|
||||||
if block_pair.view_map:
|
if block_pair.view_map and not block_pair.view_proc:
|
||||||
exe.popen_program(
|
block_pair.view_proc = exe.popen_program(
|
||||||
['ddrescueview', '-r', '5s', block_pair.map_path],
|
['ddrescueview', '-r', '5s', block_pair.map_path],
|
||||||
pipe=True,
|
pipe=True,
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -39,7 +39,7 @@ def generate_object_report(obj, indent=0):
|
||||||
# Add attribute to report (expanded if necessary)
|
# Add attribute to report (expanded if necessary)
|
||||||
if isinstance(attr, dict):
|
if isinstance(attr, dict):
|
||||||
report.append(f'{name}:')
|
report.append(f'{name}:')
|
||||||
for key, value in sorted(attr.items()):
|
for key, value in attr.items():
|
||||||
report.append(f'{" "*(indent+1)}{key}: {str(value)}')
|
report.append(f'{" "*(indent+1)}{key}: {str(value)}')
|
||||||
else:
|
else:
|
||||||
report.append(f'{" "*indent}{name}: {str(attr)}')
|
report.append(f'{" "*indent}{name}: {str(attr)}')
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
"""WizardKit: Disk object and functions"""
|
"""WizardKit: Disk object and functions"""
|
||||||
# vim: sts=2 sw=2 ts=2
|
# vim: sts=2 sw=2 ts=2
|
||||||
|
|
||||||
|
import copy
|
||||||
import logging
|
import logging
|
||||||
import pathlib
|
import pathlib
|
||||||
import platform
|
import platform
|
||||||
|
|
@ -17,9 +18,10 @@ from wk.hw.test import Test
|
||||||
from wk.hw.smart import (
|
from wk.hw.smart import (
|
||||||
enable_smart,
|
enable_smart,
|
||||||
generate_attribute_report,
|
generate_attribute_report,
|
||||||
|
get_known_disk_attributes,
|
||||||
update_smart_details,
|
update_smart_details,
|
||||||
)
|
)
|
||||||
from wk.std import PLATFORM, bytes_to_string, color_string, strip_colors
|
from wk.std import PLATFORM, color_string, strip_colors
|
||||||
|
|
||||||
|
|
||||||
# STATIC VARIABLES
|
# STATIC VARIABLES
|
||||||
|
|
@ -35,28 +37,33 @@ WK_LABEL_REGEX = re.compile(
|
||||||
class Disk:
|
class Disk:
|
||||||
# pylint: disable=too-many-instance-attributes
|
# pylint: disable=too-many-instance-attributes
|
||||||
"""Object for tracking disk specific data."""
|
"""Object for tracking disk specific data."""
|
||||||
attributes: dict[Any, dict] = field(init=False, default_factory=dict)
|
attributes: dict[Any, dict] = field(init=False, default_factory=dict)
|
||||||
bus: str = field(init=False)
|
bus: str = field(init=False)
|
||||||
children: list[dict] = field(init=False, default_factory=list)
|
children: list[dict] = field(init=False, default_factory=list)
|
||||||
filesystem: str = field(init=False)
|
description: str = field(init=False)
|
||||||
log_sec: int = field(init=False)
|
filesystem: str = field(init=False)
|
||||||
model: str = field(init=False)
|
initial_attributes: dict[Any, dict] = field(init=False)
|
||||||
name: str = field(init=False)
|
known_attributes: dict[Any, dict] = field(init=False, default_factory=dict)
|
||||||
notes: list[str] = field(init=False, default_factory=list)
|
log_sec: int = field(init=False)
|
||||||
|
model: str = field(init=False)
|
||||||
|
name: str = field(init=False)
|
||||||
|
notes: list[str] = field(init=False, default_factory=list)
|
||||||
path: Union[pathlib.Path, str]
|
path: Union[pathlib.Path, str]
|
||||||
parent: str = field(init=False)
|
parent: str = field(init=False)
|
||||||
phy_sec: int = field(init=False)
|
phy_sec: int = field(init=False)
|
||||||
raw_details: dict[str, Any] = field(init=False)
|
raw_details: dict[str, Any] = field(init=False)
|
||||||
raw_smartctl: dict[str, Any] = field(init=False)
|
raw_smartctl: dict[str, Any] = field(init=False)
|
||||||
serial: str = field(init=False)
|
serial: str = field(init=False)
|
||||||
size: int = field(init=False)
|
size: int = field(init=False)
|
||||||
ssd: bool = field(init=False)
|
ssd: bool = field(init=False)
|
||||||
tests: list[Test] = field(init=False, default_factory=list)
|
tests: list[Test] = field(init=False, default_factory=list)
|
||||||
use_sat: bool = field(init=False, default=False)
|
use_sat: bool = field(init=False, default=False)
|
||||||
|
|
||||||
def __post_init__(self) -> None:
|
def __post_init__(self) -> None:
|
||||||
self.path = pathlib.Path(self.path).resolve()
|
self.path = pathlib.Path(self.path).resolve()
|
||||||
self.update_details()
|
self.update_details()
|
||||||
|
self.set_description()
|
||||||
|
self.known_attributes = get_known_disk_attributes(self.model)
|
||||||
enable_smart(self)
|
enable_smart(self)
|
||||||
update_smart_details(self)
|
update_smart_details(self)
|
||||||
if not self.attributes and self.bus == 'USB':
|
if not self.attributes and self.bus == 'USB':
|
||||||
|
|
@ -66,6 +73,7 @@ class Disk:
|
||||||
self.use_sat = True
|
self.use_sat = True
|
||||||
enable_smart(self)
|
enable_smart(self)
|
||||||
update_smart_details(self)
|
update_smart_details(self)
|
||||||
|
self.initial_attributes = copy.deepcopy(self.attributes)
|
||||||
if not self.is_4k_aligned():
|
if not self.is_4k_aligned():
|
||||||
self.add_note('One or more partitions are not 4K aligned', 'YELLOW')
|
self.add_note('One or more partitions are not 4K aligned', 'YELLOW')
|
||||||
|
|
||||||
|
|
@ -85,14 +93,6 @@ class Disk:
|
||||||
present = True
|
present = True
|
||||||
return present
|
return present
|
||||||
|
|
||||||
@property
|
|
||||||
def description(self) -> str:
|
|
||||||
"""Get disk description from details."""
|
|
||||||
return (
|
|
||||||
f'{bytes_to_string(self.size, use_binary=False)}'
|
|
||||||
f' ({self.bus}) {self.model} {self.serial}'
|
|
||||||
)
|
|
||||||
|
|
||||||
def disable_disk_tests(self) -> None:
|
def disable_disk_tests(self) -> None:
|
||||||
"""Disable all tests."""
|
"""Disable all tests."""
|
||||||
LOG.warning('Disabling all tests for: %s', self.path)
|
LOG.warning('Disabling all tests for: %s', self.path)
|
||||||
|
|
@ -159,6 +159,33 @@ class Disk:
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
def set_description(self) -> None:
|
||||||
|
"""Set disk description from details."""
|
||||||
|
decimals = 1
|
||||||
|
suffix = ' '
|
||||||
|
|
||||||
|
# Set size_str (try binary scale first)
|
||||||
|
for scale in (1024, 1000):
|
||||||
|
size = float(self.size)
|
||||||
|
units = list('KMGTPEZY')
|
||||||
|
while units:
|
||||||
|
if abs(size) < scale:
|
||||||
|
break
|
||||||
|
size /= scale
|
||||||
|
suffix = units.pop(0)
|
||||||
|
size = ((size * 10) // 1) / 10
|
||||||
|
if size % 1 == 0:
|
||||||
|
# Found an exact whole number, drop the decimal
|
||||||
|
decimals = 0
|
||||||
|
break
|
||||||
|
if size % 1 == 0.5:
|
||||||
|
break
|
||||||
|
|
||||||
|
# Done
|
||||||
|
self.description = (
|
||||||
|
f'{size:0.{decimals}f} {suffix}B ({self.bus}) {self.model} {self.serial}'
|
||||||
|
)
|
||||||
|
|
||||||
def update_details(self, skip_children=True) -> None:
|
def update_details(self, skip_children=True) -> None:
|
||||||
"""Update disk details using OS specific methods.
|
"""Update disk details using OS specific methods.
|
||||||
|
|
||||||
|
|
@ -322,6 +349,8 @@ def get_disks(skip_kits=False) -> list[Disk]:
|
||||||
|
|
||||||
# Skip WK disks
|
# Skip WK disks
|
||||||
if skip_kits:
|
if skip_kits:
|
||||||
|
for disk in disks:
|
||||||
|
disk.update_details(skip_children=False)
|
||||||
disks = [
|
disks = [
|
||||||
disk_obj for disk_obj in disks
|
disk_obj for disk_obj in disks
|
||||||
if not any(
|
if not any(
|
||||||
|
|
@ -347,6 +376,10 @@ def get_disks_linux() -> list[Disk]:
|
||||||
if disk_obj.raw_details.get('type', '???') != 'disk':
|
if disk_obj.raw_details.get('type', '???') != 'disk':
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# Skip empty devices (usually card readers)
|
||||||
|
if disk_obj.size <= 0:
|
||||||
|
continue
|
||||||
|
|
||||||
# Add disk
|
# Add disk
|
||||||
disks.append(disk_obj)
|
disks.append(disk_obj)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
"""WizardKit: SMART test functions"""
|
"""WizardKit: SMART test functions"""
|
||||||
# vim: sts=2 sw=2 ts=2
|
# vim: sts=2 sw=2 ts=2
|
||||||
|
|
||||||
|
import copy
|
||||||
import logging
|
import logging
|
||||||
import re
|
import re
|
||||||
|
|
||||||
|
|
@ -67,16 +68,15 @@ def build_self_test_report(test_obj, aborted=False) -> None:
|
||||||
def check_attributes(dev, only_blocking=False) -> bool:
|
def check_attributes(dev, only_blocking=False) -> bool:
|
||||||
"""Check if any known attributes are failing, returns bool."""
|
"""Check if any known attributes are failing, returns bool."""
|
||||||
attributes_ok = True
|
attributes_ok = True
|
||||||
known_attributes = get_known_disk_attributes(dev.model)
|
|
||||||
for attr, value in dev.attributes.items():
|
for attr, value in dev.attributes.items():
|
||||||
# Skip unknown attributes
|
# Skip unknown attributes
|
||||||
if attr not in known_attributes:
|
if attr not in dev.known_attributes:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Get thresholds
|
# Get thresholds
|
||||||
blocking_attribute = known_attributes[attr].get('Blocking', False)
|
blocking_attribute = dev.known_attributes[attr].get('Blocking', False)
|
||||||
err_thresh = known_attributes[attr].get('Error', None)
|
err_thresh = dev.known_attributes[attr].get('Error', None)
|
||||||
max_thresh = known_attributes[attr].get('Maximum', None)
|
max_thresh = dev.known_attributes[attr].get('Maximum', None)
|
||||||
if not max_thresh:
|
if not max_thresh:
|
||||||
max_thresh = float('inf')
|
max_thresh = float('inf')
|
||||||
|
|
||||||
|
|
@ -89,7 +89,7 @@ def check_attributes(dev, only_blocking=False) -> bool:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Check attribute
|
# Check attribute
|
||||||
if known_attributes[attr].get('PercentageLife', False):
|
if dev.known_attributes[attr].get('PercentageLife', False):
|
||||||
if 0 <= value['raw'] <= err_thresh:
|
if 0 <= value['raw'] <= err_thresh:
|
||||||
attributes_ok = False
|
attributes_ok = False
|
||||||
elif err_thresh <= value['raw'] < max_thresh:
|
elif err_thresh <= value['raw'] < max_thresh:
|
||||||
|
|
@ -114,18 +114,17 @@ def enable_smart(dev) -> None:
|
||||||
|
|
||||||
def generate_attribute_report(dev) -> list[str]:
|
def generate_attribute_report(dev) -> list[str]:
|
||||||
"""Generate attribute report, returns list."""
|
"""Generate attribute report, returns list."""
|
||||||
known_attributes = get_known_disk_attributes(dev.model)
|
|
||||||
report = []
|
report = []
|
||||||
for attr, value in sorted(dev.attributes.items()):
|
for attr, value in sorted(dev.attributes.items()):
|
||||||
note = ''
|
note = ''
|
||||||
value_color = 'GREEN'
|
value_color = 'GREEN'
|
||||||
|
|
||||||
# Skip attributes not in our list
|
# Skip attributes not in our list
|
||||||
if attr not in known_attributes:
|
if attr not in dev.known_attributes:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Check for attribute note
|
# Check for attribute note
|
||||||
note = known_attributes[attr].get('Note', '')
|
note = dev.known_attributes[attr].get('Note', '')
|
||||||
|
|
||||||
# ID / Name
|
# ID / Name
|
||||||
label = f'{attr:>3}'
|
label = f'{attr:>3}'
|
||||||
|
|
@ -135,9 +134,9 @@ def generate_attribute_report(dev) -> list[str]:
|
||||||
label = f' {label.replace("_", " "):38}'
|
label = f' {label.replace("_", " "):38}'
|
||||||
|
|
||||||
# Value color
|
# Value color
|
||||||
if known_attributes[attr].get('PercentageLife', False):
|
if dev.known_attributes[attr].get('PercentageLife', False):
|
||||||
# PercentageLife values
|
# PercentageLife values
|
||||||
if 0 <= value['raw'] <= known_attributes[attr]['Error']:
|
if 0 <= value['raw'] <= dev.known_attributes[attr]['Error']:
|
||||||
value_color = 'RED'
|
value_color = 'RED'
|
||||||
note = '(failed, % life remaining)'
|
note = '(failed, % life remaining)'
|
||||||
elif value['raw'] < 0 or value['raw'] > 100:
|
elif value['raw'] < 0 or value['raw'] > 100:
|
||||||
|
|
@ -145,7 +144,7 @@ def generate_attribute_report(dev) -> list[str]:
|
||||||
note = '(invalid?)'
|
note = '(invalid?)'
|
||||||
else:
|
else:
|
||||||
for threshold, color in ATTRIBUTE_COLORS:
|
for threshold, color in ATTRIBUTE_COLORS:
|
||||||
threshold_val = known_attributes[attr].get(threshold, None)
|
threshold_val = dev.known_attributes[attr].get(threshold, None)
|
||||||
if threshold_val and value['raw'] >= threshold_val:
|
if threshold_val and value['raw'] >= threshold_val:
|
||||||
value_color = color
|
value_color = color
|
||||||
if threshold == 'Error':
|
if threshold == 'Error':
|
||||||
|
|
@ -159,7 +158,7 @@ def generate_attribute_report(dev) -> list[str]:
|
||||||
|
|
||||||
# Build colored string and append to report
|
# Build colored string and append to report
|
||||||
line = color_string(
|
line = color_string(
|
||||||
[label, value['raw_str'], note],
|
[label, get_attribute_value_string(dev, attr), note],
|
||||||
[None, value_color, 'YELLOW'],
|
[None, value_color, 'YELLOW'],
|
||||||
)
|
)
|
||||||
report.append(line)
|
report.append(line)
|
||||||
|
|
@ -168,18 +167,41 @@ def generate_attribute_report(dev) -> list[str]:
|
||||||
return report
|
return report
|
||||||
|
|
||||||
|
|
||||||
def get_known_disk_attributes(model) -> dict[Any, dict]:
|
def get_attribute_value_string(dev, attr) -> str:
|
||||||
"""Get known NVMe/SMART attributes (model specific), returns dict."""
|
"""Get attribute value string and report if it has changed."""
|
||||||
known_attributes = KNOWN_DISK_ATTRIBUTES.copy()
|
current_value = dev.attributes.get(attr, {})
|
||||||
|
initial_value = dev.initial_attributes.get(attr, {})
|
||||||
|
value_str = current_value.get('raw_str', '')
|
||||||
|
|
||||||
|
# Compare current value against initial value
|
||||||
|
if (
|
||||||
|
current_value.get('raw', None) is None
|
||||||
|
or initial_value.get('raw', None) is None
|
||||||
|
):
|
||||||
|
return value_str
|
||||||
|
if current_value['raw'] != initial_value['raw']:
|
||||||
|
value_str = (
|
||||||
|
f'{initial_value.get("raw_str", "?")} --> '
|
||||||
|
f'{current_value.get("raw_str", "?")}'
|
||||||
|
)
|
||||||
|
|
||||||
|
# Done
|
||||||
|
return value_str
|
||||||
|
|
||||||
|
|
||||||
|
def get_known_disk_attributes(model) -> None:
|
||||||
|
"""Get known disk attributes based on the device model."""
|
||||||
|
known_attributes = copy.deepcopy(KNOWN_DISK_ATTRIBUTES)
|
||||||
|
|
||||||
# Apply model-specific data
|
# Apply model-specific data
|
||||||
for regex, data in KNOWN_DISK_MODELS.items():
|
for regex, data in KNOWN_DISK_MODELS.items():
|
||||||
if re.search(regex, model):
|
if not re.search(regex, model):
|
||||||
for attr, thresholds in data.items():
|
continue
|
||||||
if attr in known_attributes:
|
for attr, thresholds in data.items():
|
||||||
known_attributes[attr].update(thresholds)
|
if attr in known_attributes:
|
||||||
else:
|
known_attributes[attr].update(thresholds)
|
||||||
known_attributes[attr] = thresholds
|
else:
|
||||||
|
known_attributes[attr] = copy.deepcopy(thresholds)
|
||||||
|
|
||||||
# Done
|
# Done
|
||||||
return known_attributes
|
return known_attributes
|
||||||
|
|
|
||||||
73
scripts/wk_debug.py
Executable file
73
scripts/wk_debug.py
Executable file
|
|
@ -0,0 +1,73 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""WizardKit: Debug Launcher"""
|
||||||
|
# pylint: disable=invalid-name
|
||||||
|
# vim: sts=2 sw=2 ts=2
|
||||||
|
|
||||||
|
import os
|
||||||
|
import pathlib
|
||||||
|
import pickle
|
||||||
|
import sys
|
||||||
|
|
||||||
|
os.chdir(os.path.dirname(os.path.realpath(__file__)))
|
||||||
|
sys.path.append(os.getcwd())
|
||||||
|
import wk # pylint: disable=wrong-import-position
|
||||||
|
|
||||||
|
|
||||||
|
# STATIC VARIABLES
|
||||||
|
OPTIONS = {
|
||||||
|
'Cloning / Imaging': 'ddrescue-TUI',
|
||||||
|
'Hardware Diagnostics': 'Hardware-Diagnostics',
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def get_debug_prefix() -> str:
|
||||||
|
"""Ask what we're debugging, returns log dir prefix."""
|
||||||
|
menu = wk.std.Menu(title=f'{wk.cfg.main.KIT_NAME_FULL}: Debugging Menu\n')
|
||||||
|
for name, prefix in OPTIONS.items():
|
||||||
|
menu.add_option(name, {'Prefix': prefix})
|
||||||
|
selection = menu.simple_select()
|
||||||
|
return selection[1]['Prefix']
|
||||||
|
|
||||||
|
def get_debug_path() -> pathlib.Path:
|
||||||
|
"""Get debug path."""
|
||||||
|
log_dir = pathlib.Path('~/Logs').expanduser().resolve()
|
||||||
|
debug_paths = []
|
||||||
|
prefix = get_debug_prefix()
|
||||||
|
|
||||||
|
# Build list of options
|
||||||
|
for item in log_dir.iterdir():
|
||||||
|
if item.is_dir() and item.name.startswith(prefix):
|
||||||
|
debug_paths.append(item.joinpath('debug'))
|
||||||
|
debug_paths = [item for item in debug_paths if item.exists()]
|
||||||
|
debug_paths.sort()
|
||||||
|
|
||||||
|
# Safety check
|
||||||
|
if not debug_paths:
|
||||||
|
wk.std.abort('No logs found, aborting.')
|
||||||
|
|
||||||
|
# Use latest option
|
||||||
|
if wk.std.ask('Use latest session?'):
|
||||||
|
return debug_paths[-1]
|
||||||
|
|
||||||
|
# Select from list
|
||||||
|
menu = wk.std.Menu(title=f'{wk.cfg.main.KIT_NAME_FULL}: Debugging Menu\n')
|
||||||
|
for item in debug_paths:
|
||||||
|
menu.add_option(item.parent.name, {'Path': item})
|
||||||
|
selection = menu.simple_select()
|
||||||
|
|
||||||
|
# Done
|
||||||
|
return selection[1]['Path']
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
# Leaving this at the global level
|
||||||
|
try:
|
||||||
|
debug_path = get_debug_path()
|
||||||
|
except SystemExit:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
state = None
|
||||||
|
|
||||||
|
# Load pickle
|
||||||
|
with open(debug_path.joinpath('state.pickle'), 'rb') as f:
|
||||||
|
state = pickle.load(f)
|
||||||
Loading…
Reference in a new issue