Update function docstrings

This commit is contained in:
2Shirt 2018-01-30 16:50:28 -07:00
parent 89c343943f
commit 96c6997a44
9 changed files with 68 additions and 3 deletions

View file

@ -11,6 +11,7 @@ REGEX_BAD_PATH_NAMES = re.compile(
re.IGNORECASE)
def backup_partition(disk, par):
"""Create a backup image of a partition."""
if par.get('Image Exists', False) or par['Number'] in disk['Bad Partitions']:
raise GenericAbort
@ -28,9 +29,15 @@ def backup_partition(disk, par):
run_program(cmd)
def fix_path(path):
"""Replace invalid filename characters with underscores."""
return REGEX_BAD_PATH_NAMES.sub('_', path)
def prep_disk_for_backup(destination, disk, ticket_number):
"""Gather details about the disk and its partitions.
This includes partitions that can't be backed up,
whether backups already exist on the BACKUP_SERVER,
partition names/sizes/used space, etc."""
disk['Clobber Risk'] = []
width = len(str(len(disk['Partitions'])))
@ -102,7 +109,7 @@ def prep_disk_for_backup(destination, disk, ticket_number):
disk['Backup Warnings'] = warnings
def select_backup_destination(auto_select=True):
# Build menu
"""Select a backup destination from a menu, returns server dict."""
destinations = [s for s in BACKUP_SERVERS if s['Mounted']]
actions = [
{'Name': 'Main Menu', 'Letter': 'M'},
@ -136,6 +143,7 @@ def select_backup_destination(auto_select=True):
return destinations[int(selection)-1]
def verify_wim_backup(partition):
"""Verify WIM integrity."""
if not os.path.exists(partition['Image Path']):
raise PathNotFoundError
cmd = [

View file

@ -399,6 +399,7 @@ def print_warning(*args, **kwargs):
print_standard(*args, color=COLORS['YELLOW'], **kwargs)
def print_log(message='', end='\n', timestamp=True):
"""Writes message to a log if LogFile is set."""
time_str = time.strftime("%Y-%m-%d %H%M%z: ") if timestamp else ''
if 'LogFile' in global_vars and global_vars['LogFile']:
with open(global_vars['LogFile'], 'a', encoding='utf-8') as f:
@ -526,6 +527,9 @@ def try_and_print(message='Trying...',
return {'CS': not bool(err), 'Error': err, 'Out': out}
def upload_crash_details():
"""Upload log and runtime data to the CRASH_SERVER.
Intended for uploading to a public Nextcloud share."""
if not ENABLED_UPLOAD_DATA:
raise GenericError
@ -762,6 +766,9 @@ def set_common_vars():
**global_vars)
def set_linux_vars():
"""Set common variables in a Linux environment.
These assume we're running under a WK-Linux build."""
result = run_program(['mktemp', '-d'])
global_vars['TmpDir'] = result.stdout.decode().strip()
global_vars['Date'] = time.strftime("%Y-%m-%d")

View file

@ -353,6 +353,7 @@ def run_wimextract(source, items, dest):
run_program(cmd)
def list_source_items(source_obj, rel_path=None):
"""List items in a dir or WIM, returns a list of SourceItem objects."""
items = []
rel_path = '{}{}'.format(os.sep, rel_path) if rel_path else ''
if source_obj.is_dir():
@ -470,6 +471,7 @@ def scan_source(source_obj, dest_path, rel_path='', interactive=True):
return selected_items
def get_source_item_obj(source_obj, rel_path, item_path):
"""Check if the item exists and return a SourceItem object if it does."""
item_obj = None
item_path = re.sub(r'(\\|/)', os.sep, item_path)
if source_obj.is_dir():
@ -481,6 +483,8 @@ def get_source_item_obj(source_obj, rel_path, item_path):
rel_path,
os.sep if rel_path else '',
item_path))
if not os.path.exists(item_obj.path):
item_obj = None
else:
# Assuming WIM file
if psutil.WINDOWS:
@ -734,12 +738,12 @@ def transfer_source(source_obj, dest_path, selected_items):
raise GenericError
def umount_backup_shares():
"""Unnount the backup shares regardless of current status."""
"""Unmount the backup shares regardless of current status."""
for server in BACKUP_SERVERS:
umount_network_share(server)
def umount_network_share(server):
"""Unnount a network share defined by server."""
"""Unmount a network share defined by server."""
cmd = r'net use \\{IP}\{Share} /delete'.format(**server)
cmd = cmd.split(' ')
try:

View file

@ -12,6 +12,7 @@ REGEX_DISK_MBR = re.compile(r'Disk ID: [A-Z0-9]+', re.IGNORECASE)
REGEX_DISK_RAW = re.compile(r'Disk ID: 00000000', re.IGNORECASE)
def assign_volume_letters():
"""Assign a volume letter to all available volumes."""
remove_volume_letters()
# Write script
@ -24,6 +25,7 @@ def assign_volume_letters():
run_diskpart(script)
def get_boot_mode():
"""Check if the boot mode was UEFI or legacy."""
boot_mode = 'Legacy'
try:
reg_key = winreg.OpenKey(
@ -37,6 +39,7 @@ def get_boot_mode():
return boot_mode
def get_disk_details(disk):
"""Get disk details using DiskPart."""
details = {}
script = [
'select disk {}'.format(disk['Number']),
@ -61,6 +64,7 @@ def get_disk_details(disk):
return details
def get_disks():
"""Get list of attached disks using DiskPart."""
disks = []
try:
@ -79,6 +83,7 @@ def get_disks():
return disks
def get_partition_details(disk, partition):
"""Get partition details using DiskPart and fsutil."""
details = {}
script = [
'select disk {}'.format(disk['Number']),
@ -157,6 +162,7 @@ def get_partition_details(disk, partition):
return details
def get_partitions(disk):
"""Get list of partition using DiskPart."""
partitions = []
script = [
'select disk {}'.format(disk['Number']),
@ -179,6 +185,7 @@ def get_partitions(disk):
return partitions
def get_table_type(disk):
"""Get disk partition table type using DiskPart."""
part_type = 'Unknown'
script = [
'select disk {}'.format(disk['Number']),
@ -200,6 +207,7 @@ def get_table_type(disk):
return part_type
def get_volumes():
"""Get list of volumes using DiskPart."""
vols = []
try:
result = run_diskpart(['list volume'])
@ -214,9 +222,11 @@ def get_volumes():
return vols
def is_bad_partition(par):
"""Check if the partition is accessible."""
return 'Letter' not in par or REGEX_BAD_PARTITION.search(par['FileSystem'])
def prep_disk_for_formatting(disk=None):
"""Gather details about the disk and its partitions."""
disk['Format Warnings'] = '\n'
width = len(str(len(disk['Partitions'])))
@ -261,6 +271,7 @@ def prep_disk_for_formatting(disk=None):
partition['Display String'] = display
def reassign_volume_letter(letter, new_letter='I'):
"""Assign a new letter to a volume using DiskPart."""
if not letter:
# Ignore
return None
@ -276,6 +287,7 @@ def reassign_volume_letter(letter, new_letter='I'):
return new_letter
def remove_volume_letters(keep=None):
"""Remove all assigned volume letters using DiskPart."""
if not keep:
keep = ''
@ -292,6 +304,7 @@ def remove_volume_letters(keep=None):
pass
def run_diskpart(script):
"""Run DiskPart script."""
tempfile = r'{}\diskpart.script'.format(global_vars['Env']['TMP'])
# Write script

View file

@ -42,6 +42,7 @@ TESTS = {
}
def get_smart_details(dev):
"""Get SMART data for dev if possible, returns dict."""
cmd = 'sudo smartctl --all --json /dev/{}'.format(dev).split()
result = run_program(cmd, check=False)
try:
@ -51,6 +52,7 @@ def get_smart_details(dev):
return {}
def get_status_color(s):
"""Get color based on status, returns str."""
color = COLORS['CLEAR']
if s in ['Denied', 'NS', 'OVERRIDE', 'Unknown']:
color = COLORS['RED']
@ -61,6 +63,7 @@ def get_status_color(s):
return color
def menu_diags(*args):
"""Main HW-Diagnostic menu."""
diag_modes = [
{'Name': 'All tests',
'Tests': ['Prime95', 'NVMe/SMART', 'badblocks']},
@ -133,6 +136,7 @@ def menu_diags(*args):
break
def run_badblocks():
"""Run a read-only test for all detected disks."""
aborted = False
clear_screen()
print_log('\nStart badblocks test(s)\n')
@ -191,6 +195,7 @@ def run_badblocks():
pass
def run_mprime():
"""Run Prime95 for MPRIME_LIMIT minutes while showing the temps."""
aborted = False
clear_screen()
print_log('\nStart Prime95 test')
@ -282,6 +287,7 @@ def run_mprime():
run_program('tmux kill-pane -a'.split())
def run_nvme_smart():
"""Run the built-in NVMe or SMART test for all detected disks."""
aborted = False
clear_screen()
print_log('\nStart NVMe/SMART test(s)\n')
@ -376,6 +382,7 @@ def run_nvme_smart():
run_program('tmux kill-pane -a'.split(), check=False)
def run_tests(tests):
"""Run selected hardware test(s)."""
print_log('Starting Hardware Diagnostics')
print_log('\nRunning tests: {}'.format(', '.join(tests)))
# Enable selected tests
@ -414,6 +421,7 @@ def run_tests(tests):
pause('Press Enter to exit...')
def scan_disks():
"""Scan for disks eligible for hardware testing."""
clear_screen()
# Get eligible disk list
@ -489,6 +497,7 @@ def scan_disks():
TESTS['badblocks']['Devices'] = devs
def show_disk_details(dev):
"""Display disk details."""
dev_name = dev['lsblk']['name']
# Device description
print_info('Device: /dev/{}'.format(dev['lsblk']['name']))
@ -566,6 +575,7 @@ def show_disk_details(dev):
print_success(raw_str, timestamp=False)
def show_results():
"""Show results for selected test(s)."""
clear_screen()
print_log('\n───────────────────────────')
print_standard('Hardware Diagnostic Results')
@ -629,6 +639,7 @@ def show_results():
run_program('tmux kill-pane -a'.split())
def update_progress():
"""Update progress file."""
if 'Progress Out' not in TESTS:
TESTS['Progress Out'] = '{}/progress.out'.format(global_vars['LogDir'])
output = []

View file

@ -54,6 +54,7 @@ def is_connected():
return False
def show_valid_addresses():
"""Show all valid private IP addresses assigned to the system."""
devs = psutil.net_if_addrs()
for dev, families in sorted(devs.items()):
for family in families:
@ -62,6 +63,7 @@ def show_valid_addresses():
show_data(message=dev, data=family.address)
def speedtest():
"""Run a network speedtest using speedtest-cli."""
result = run_program(['speedtest-cli', '--simple'])
output = [line.strip() for line in result.stdout.decode().splitlines()
if line.strip()]

View file

@ -9,6 +9,7 @@ from settings.music import *
from settings.sources import *
def compress_and_remove_item(item):
"""Compress and delete an item unless an error is encountered."""
try:
compress_item(item)
except:
@ -17,6 +18,7 @@ def compress_and_remove_item(item):
remove_item(item.path)
def compress_item(item):
"""Compress an item in a 7-Zip archive using the ARCHIVE_PASSWORD."""
# Prep
prev_dir = os.getcwd()
dest = '{}.7z'.format(item.path)
@ -58,9 +60,11 @@ def download_generic(out_dir, out_name, source_url):
raise GenericError('Failed to download file.')
def download_to_temp(out_name, source_url):
"""Download a file to the TmpDir."""
download_generic(global_vars['TmpDir'], out_name, source_url)
def extract_generic(source, dest, mode='x', sz_args=[]):
"""Extract a file to a destination."""
cmd = [
global_vars['Tools']['SevenZip'],
mode, source, r'-o{}'.format(dest),
@ -70,11 +74,13 @@ def extract_generic(source, dest, mode='x', sz_args=[]):
run_program(cmd)
def extract_temp_to_bin(source, item, mode='x', sz_args=[]):
"""Extract a file to the .bin folder."""
source = r'{}\{}'.format(global_vars['TmpDir'], source)
dest = r'{}\{}'.format(global_vars['BinDir'], item)
extract_generic(source, dest, mode, sz_args)
def extract_temp_to_cbin(source, item, mode='x', sz_args=[]):
"""Extract a file to the .cbin folder."""
source = r'{}\{}'.format(global_vars['TmpDir'], source)
dest = r'{}\{}'.format(global_vars['CBinDir'], item)
include_path = r'{}\_include\{}'.format(global_vars['CBinDir'], item)
@ -83,6 +89,7 @@ def extract_temp_to_cbin(source, item, mode='x', sz_args=[]):
extract_generic(source, dest, mode, sz_args)
def generate_launcher(section, name, options):
"""Generate a launcher script."""
# Prep
dest = r'{}\{}'.format(global_vars['BaseDir'], section)
if section == '(Root)':
@ -119,6 +126,7 @@ def generate_launcher(section, name, options):
f.write('\n'.join(out_text))
def remove_item(item_path):
"""Delete a file or folder."""
if os.path.exists(item_path):
if os.path.isdir(item_path):
shutil.rmtree(item_path, ignore_errors=True)
@ -126,6 +134,7 @@ def remove_item(item_path):
os.remove(item_path)
def remove_from_kit(item):
"""Delete a file or folder from the .bin/.cbin folders."""
item_locations = []
for p in [global_vars['BinDir'], global_vars['CBinDir']]:
item_locations.append(r'{}\{}'.format(p, item))
@ -134,6 +143,7 @@ def remove_from_kit(item):
remove_item(item_path)
def remove_from_temp(item):
"""Delete a file or folder from the TmpDir folder."""
item_path = r'{}\{}'.format(global_vars['TmpDir'], item)
remove_item(item_path)
@ -159,6 +169,7 @@ def resolve_dynamic_url(source_url, regex):
return url
def scan_for_net_installers(server, family_name, min_year):
"""Scan network shares for installers."""
if not server['Mounted']:
mount_network_share(server)

View file

@ -162,6 +162,7 @@ def mount_windows_share():
mount_network_share(WINDOWS_SERVER, read_write=True)
def select_windows_version():
"""Select Windows version from a menu, returns dict."""
actions = [
{'Name': 'Main Menu', 'Letter': 'M'},
]
@ -178,6 +179,7 @@ def select_windows_version():
raise GenericAbort
def setup_windows(windows_image, windows_version):
"""Apply a Windows image to W:\"""
cmd = [
global_vars['Tools']['wimlib-imagex'],
'apply',
@ -189,6 +191,7 @@ def setup_windows(windows_image, windows_version):
run_program(cmd)
def setup_windows_re(windows_version, windows_letter='W', tools_letter='T'):
"""Setup the WinRE partition."""
win = r'{}:\Windows'.format(windows_letter)
winre = r'{}\System32\Recovery\WinRE.wim'.format(win)
dest = r'{}:\Recovery\WindowsRE'.format(tools_letter)
@ -206,6 +209,7 @@ def setup_windows_re(windows_version, windows_letter='W', tools_letter='T'):
run_program(cmd)
def update_boot_partition(system_letter='S', windows_letter='W', mode='ALL'):
"""Setup the Windows boot partition."""
cmd = [
r'{}\Windows\System32\bcdboot.exe'.format(
global_vars['Env']['SYSTEMDRIVE']),
@ -215,6 +219,7 @@ def update_boot_partition(system_letter='S', windows_letter='W', mode='ALL'):
run_program(cmd)
def wim_contains_image(filename, imagename):
"""Check if an ESD/WIM contains the specified image, returns bool."""
cmd = [
global_vars['Tools']['wimlib-imagex'],
'info',

View file

@ -51,6 +51,7 @@ PE_TOOLS = {
}
def check_pe_tools():
"""Fix tool paths for WinPE layout."""
for k in PE_TOOLS.keys():
PE_TOOLS[k]['Path'] = r'{}\{}'.format(
global_vars['BinDir'], PE_TOOLS[k]['Path'])
@ -203,6 +204,7 @@ def menu_backup():
pause('\nPress Enter to return to main menu... ')
def menu_root():
"""Main WinPE menu."""
check_pe_tools()
menus = [
{'Name': 'Create Backups', 'Menu': menu_backup},
@ -381,6 +383,7 @@ def menu_setup():
pause('\nPress Enter to return to main menu... ')
def menu_tools():
"""Tool launcher menu."""
tools = [{'Name': k} for k in sorted(PE_TOOLS.keys())]
actions = [{'Name': 'Main Menu', 'Letter': 'M'},]
set_title(KIT_NAME_FULL)
@ -409,6 +412,7 @@ def menu_tools():
break
def select_minidump_path():
"""Select BSOD minidump path from a menu."""
dumps = []
# Assign volume letters first