Started Python refactoring

# Plan
* Use current WizardKit scripts as the new base
* Split functions into multiple files under Scripts\functions\
* Review menus and menu-flow
* Log everything and open log at end of menu-flows
  * (before returning to the root/main menu)
This commit is contained in:
Alan Mason 2017-11-30 13:39:00 -08:00
parent 7c7008bdda
commit 446867b611
11 changed files with 2317 additions and 1094 deletions

3
.gitignore vendored
View file

@ -1,11 +1,10 @@
**/__pycache__/*
*.bak
*.iso
.bin/Scripts/__pycache__
.bin/tmp
Drivers
Logs
Mount
PEFiles
Scripts/__pycache__
WK/amd64/
WK/x86/

File diff suppressed because it is too large Load diff

486
Scripts/functions/backup.py Normal file
View file

@ -0,0 +1,486 @@
# Wizard Kit PE: Functions - Backup
from functions.common import *
import partition_uids
def assign_volume_letters():
try:
# Run script
with open(DISKPART_SCRIPT, 'w') as script:
for vol in get_volumes():
script.write('select volume {Number}\n'.format(**vol))
script.write('assign\n')
run_program('diskpart /s {script}'.format(script=DISKPART_SCRIPT))
except subprocess.CalledProcessError:
pass
def backup_partition(bin=None, disk=None, par=None):
# Bail early
if bin is None:
raise Exception('bin path not specified.')
if disk is None:
raise Exception('Disk not specified.')
if par is None:
raise Exception('Partition not specified.')
print(' Partition {Number} Backup...\t\t'.format(**par), end='', flush=True)
if par['Number'] in disk['Bad Partitions']:
print_warning('Skipped.')
else:
cmd = '{bin}\\wimlib\\wimlib-imagex capture {Letter}:\\ "{Image Path}\\{Image File}" "{Image Name}" "{Image Name}" --compress=none'.format(bin=bin, **par)
if par['Image Exists']:
print_warning('Skipped.')
else:
try:
os.makedirs('{Image Path}'.format(**par), exist_ok=True)
run_program(cmd)
print_success('Complete.')
except subprocess.CalledProcessError as err:
print_error('Failed.')
par['Error'] = err.stderr.decode().splitlines()
raise BackupError
def get_attached_disk_info():
"""Get details about the attached disks"""
disks = []
print_info('Getting drive info...')
# Assign all the letters
assign_volume_letters()
# Get disks
disks = get_disks()
# Get disk details
for disk in disks:
# Get partition style
disk['Table'] = get_table_type(disk)
# Get disk name/model and physical details
disk.update(get_disk_details(disk))
# Get partition info for disk
disk['Partitions'] = get_partitions(disk)
for par in disk['Partitions']:
# Get partition details
par.update(get_partition_details(disk, par))
# Done
return disks
def get_disk_details(disk=None):
details = {}
# Bail early
if disk is None:
raise Exception('Disk not specified.')
try:
# Run script
with open(DISKPART_SCRIPT, 'w') as script:
script.write('select disk {Number}\n'.format(**disk))
script.write('detail disk\n')
process_return = run_program('diskpart /s {script}'.format(script=DISKPART_SCRIPT))
process_return = process_return.stdout.decode().strip()
except subprocess.CalledProcessError:
pass
else:
# Remove empty lines
tmp = [s.strip() for s in process_return.splitlines() if s.strip() != '']
# Set disk name
details['Name'] = tmp[4]
# Remove lines without a ':' and split each remaining line at the ':' to form a key/value pair
tmp = [s.split(':') for s in tmp if ':' in s]
# Add key/value pairs to the details variable and return dict
details.update({key.strip(): value.strip() for (key, value) in tmp})
return details
def get_disks():
disks = []
try:
# Run script
with open(DISKPART_SCRIPT, 'w') as script:
script.write('list disk\n')
process_return = run_program('diskpart /s {script}'.format(script=DISKPART_SCRIPT))
process_return = process_return.stdout.decode().strip()
except subprocess.CalledProcessError:
pass
else:
# Append disk numbers
for tmp in re.findall(r'Disk (\d+)\s+\w+\s+(\d+\s+\w+)', process_return):
_num = tmp[0]
_size = human_readable_size(tmp[1])
disks.append({'Number': _num, 'Size': _size})
return disks
def get_partition_details(disk=None, par=None):
details = {}
# Bail early
if disk is None:
raise Exception('Disk not specified.')
if par is None:
raise Exception('Partition not specified.')
# Diskpart details
try:
# Run script
with open(DISKPART_SCRIPT, 'w') as script:
script.write('select disk {Number}\n'.format(**disk))
script.write('select partition {Number}\n'.format(**par))
script.write('detail partition\n')
process_return = run_program('diskpart /s {script}'.format(script=DISKPART_SCRIPT))
process_return = process_return.stdout.decode().strip()
except subprocess.CalledProcessError:
pass
else:
# Get volume letter or RAW status
tmp = re.search(r'Volume\s+\d+\s+(\w|RAW)\s+', process_return)
if tmp:
if tmp.group(1).upper() == 'RAW':
details['FileSystem'] = RAW
else:
details['Letter'] = tmp.group(1)
# Remove empty lines from process_return
tmp = [s.strip() for s in process_return.splitlines() if s.strip() != '']
# Remove lines without a ':' and split each remaining line at the ':' to form a key/value pair
tmp = [s.split(':') for s in tmp if ':' in s]
# Add key/value pairs to the details variable and return dict
details.update({key.strip(): value.strip() for (key, value) in tmp})
# Get MBR type / GPT GUID for extra details on "Unknown" partitions
guid = partition_uids.lookup_guid(details['Type'])
if guid is not None:
details.update({
'Description': guid.get('Description', ''),
'OS': guid.get('OS', '')})
if 'Letter' in details:
# Disk usage
tmp = shutil.disk_usage('{Letter}:\\'.format(**details))
details['Used Space'] = human_readable_size(tmp.used)
# fsutil details
try:
process_return = run_program('fsutil fsinfo volumeinfo {Letter}:'.format(**details))
process_return = process_return.stdout.decode().strip()
except subprocess.CalledProcessError:
pass
else:
# Remove empty lines from process_return
tmp = [s.strip() for s in process_return.splitlines() if s.strip() != '']
# Add "Feature" lines
details['File System Features'] = [s.strip() for s in tmp if ':' not in s]
# Remove lines without a ':' and split each remaining line at the ':' to form a key/value pair
tmp = [s.split(':') for s in tmp if ':' in s]
# Add key/value pairs to the details variable and return dict
details.update({key.strip(): value.strip() for (key, value) in tmp})
# Set Volume Name
details['Name'] = details.get('Volume Name', '')
# Set FileSystem Type
if details.get('FileSystem', '') != 'RAW':
details['FileSystem'] = details.get('File System Name', 'Unknown')
return details
def get_partitions(disk=None):
partitions = []
# Bail early
if disk is None:
raise Exception('Disk not specified.')
try:
# Run script
with open(DISKPART_SCRIPT, 'w') as script:
script.write('select disk {Number}\n'.format(**disk))
script.write('list partition\n')
process_return = run_program('diskpart /s {script}'.format(script=DISKPART_SCRIPT))
process_return = process_return.stdout.decode().strip()
except subprocess.CalledProcessError:
pass
else:
# Append partition numbers
for tmp in re.findall(r'Partition\s+(\d+)\s+\w+\s+(\d+\s+\w+)\s+', process_return, re.IGNORECASE):
_num = tmp[0]
_size = human_readable_size(tmp[1])
partitions.append({'Number': _num, 'Size': _size})
return partitions
def get_table_type(disk=None):
_type = 'Unknown'
# Bail early
if disk is None:
raise Exception('Disk not specified.')
try:
with open(DISKPART_SCRIPT, 'w') as script:
script.write('select disk {Number}\n'.format(**disk))
script.write('uniqueid disk\n')
process_return = run_program('diskpart /s {script}'.format(script=DISKPART_SCRIPT))
process_return = process_return.stdout.decode().strip()
except subprocess.CalledProcessError:
pass
else:
if re.findall(r'Disk ID: {[A-Z0-9]+-[A-Z0-9]+-[A-Z0-9]+-[A-Z0-9]+-[A-Z0-9]+}', process_return, re.IGNORECASE):
_type = 'GPT'
elif re.findall(r'Disk ID: 00000000', process_return, re.IGNORECASE):
_type = 'RAW'
elif re.findall(r'Disk ID: [A-Z0-9]+', process_return, re.IGNORECASE):
_type = 'MBR'
return _type
def get_volumes():
vols = []
try:
# Run script
with open(DISKPART_SCRIPT, 'w') as script:
script.write('list volume\n')
process_return = run_program('diskpart /s {script}'.format(script=DISKPART_SCRIPT))
process_return = process_return.stdout.decode().strip()
except subprocess.CalledProcessError:
pass
else:
# Append volume numbers
for tmp in re.findall(r'Volume (\d+)\s+([A-Za-z]?)\s+', process_return):
vols.append({'Number': tmp[0], 'Letter': tmp[1]})
return vols
def prep_disk_for_backup(dest=None, disk=None, ticket_id=None):
disk['Backup Warnings'] = '\n'
disk['Clobber Risk'] = []
width = len(str(len(disk['Partitions'])))
# Bail early
if dest is None:
raise Exception('Destination not provided.')
if disk is None:
raise Exception('Disk not provided.')
if ticket_id is None:
raise Exception('Ticket ID not provided.')
# Get partition totals
disk['Bad Partitions'] = [par['Number'] for par in disk['Partitions'] if 'Letter' not in par or re.search(r'(RAW|Unknown)', par['FileSystem'], re.IGNORECASE)]
disk['Valid Partitions'] = len(disk['Partitions']) - len(disk['Bad Partitions'])
# Bail if no valid partitions are found (those that can be imaged)
if disk['Valid Partitions'] <= 0:
abort_to_main_menu(' No partitions can be imaged for the selected drive')
# Prep partitions
for par in disk['Partitions']:
if par['Number'] in disk['Bad Partitions']:
par['Display String'] = '{YELLOW} * Partition {Number:>{width}}:\t{Size} {FileSystem}\t\t{q}{Name}{q}\t{Description} ({OS}){CLEAR}'.format(
width=width,
q='"' if par['Name'] != '' else '',
**par,
**COLORS)
else:
# Update info for WIM capturing
par['Image Name'] = str(par['Name'])
if par['Image Name'] == '':
par['Image Name'] = 'Unknown'
if 'IP' in dest:
par['Image Path'] = '\\\\{IP}\\{Share}\\{ticket}'.format(ticket=ticket_id, **dest)
else:
par['Image Path'] = '{Letter}:\\{ticket}'.format(ticket=ticket_id, **dest)
par['Image File'] = '{Number}_{Image Name}'.format(**par)
par['Image File'] = '{fixed_name}.wim'.format(fixed_name=re.sub(r'\W', '_', par['Image File']))
# Check for existing backups
par['Image Exists'] = False
if os.path.exists('{Image Path}\\{Image File}'.format(**par)):
par['Image Exists'] = True
disk['Clobber Risk'].append(par['Number'])
par['Display String'] = '{BLUE} + '.format(**COLORS)
else:
par['Display String'] = '{CLEAR} '.format(**COLORS)
# Append rest of Display String for valid/clobber partitions
par['Display String'] += 'Partition {Number:>{width}}:\t{Size} {FileSystem} (Used: {Used Space})\t{q}{Name}{q}{CLEAR}'.format(
width=width,
q='"' if par['Name'] != '' else '',
**par,
**COLORS)
# Set description for bad partitions
if len(disk['Bad Partitions']) > 1:
disk['Backup Warnings'] += '{YELLOW} * Unable to backup these partitions{CLEAR}\n'.format(**COLORS)
elif len(disk['Bad Partitions']) == 1:
print_warning(' * Unable to backup this partition')
disk['Backup Warnings'] += '{YELLOW} * Unable to backup this partition{CLEAR}\n'.format(**COLORS)
# Set description for partitions that would be clobbered
if len(disk['Clobber Risk']) > 1:
disk['Backup Warnings'] += '{BLUE} + These partitions already have backup images on {Name}{CLEAR}\n'.format(**dest, **COLORS)
elif len(disk['Clobber Risk']) == 1:
disk['Backup Warnings'] += '{BLUE} + This partition already has a backup image on {Name}{CLEAR}\n'.format(**dest, **COLORS)
# Set warning for skipped partitions
if len(disk['Clobber Risk']) + len(disk['Bad Partitions']) > 1:
disk['Backup Warnings'] += '\n{YELLOW}If you continue the partitions marked above will NOT be backed up.{CLEAR}\n'.format(**COLORS)
if len(disk['Clobber Risk']) + len(disk['Bad Partitions']) == 1:
disk['Backup Warnings'] += '\n{YELLOW}If you continue the partition marked above will NOT be backed up.{CLEAR}\n'.format(**COLORS)
def prep_disk_for_formatting(disk=None):
disk['Format Warnings'] = '\n'
width = len(str(len(disk['Partitions'])))
# Bail early
if disk is None:
raise Exception('Disk not provided.')
# Set boot method and partition table type
disk['Use GPT'] = True
if (get_boot_mode() == 'UEFI'):
if (not ask("Setup Windows to use UEFI booting?")):
disk['Use GPT'] = False
else:
if (ask("Setup Windows to use BIOS/Legacy booting?")):
disk['Use GPT'] = False
# Set Display and Warning Strings
if len(disk['Partitions']) == 0:
disk['Format Warnings'] += 'No partitions found\n'
for par in disk['Partitions']:
if 'Letter' not in par or re.search(r'(RAW|Unknown)', par['FileSystem'], re.IGNORECASE):
# FileSystem not accessible to WinPE. List partition type / OS info for technician
par['Display String'] = ' Partition {Number:>{width}}:\t{Size} {FileSystem}\t\t{q}{Name}{q}\t{Description} ({OS})'.format(
width=width,
q='"' if par['Name'] != '' else '',
**par)
else:
# FileSystem accessible to WinPE. List space used instead of partition type / OS info for technician
par['Display String'] = ' Partition {Number:>{width}}:\t{Size} {FileSystem} (Used: {Used Space})\t{q}{Name}{q}'.format(
width=width,
q='"' if par['Name'] != '' else '',
**par)
def remove_volume_letters(keep=''):
if keep is None:
keep = ''
try:
# Run script
with open(DISKPART_SCRIPT, 'w') as script:
for vol in get_volumes():
if vol['Letter'].upper() != keep.upper():
script.write('select volume {Number}\n'.format(**vol))
script.write('remove noerr\n')
run_program('diskpart /s {script}'.format(script=DISKPART_SCRIPT))
except subprocess.CalledProcessError:
pass
def select_backup_destination():
# Build menu
dests = []
for server in BACKUP_SERVERS:
if server['Mounted']:
dests.append(server)
actions = [
{'Name': 'Main Menu', 'Letter': 'M'},
]
# Size check
for dest in dests:
if 'IP' in dest:
dest['Usage'] = shutil.disk_usage('\\\\{IP}\\{Share}'.format(**dest))
else:
dest['Usage'] = shutil.disk_usage('{Letter}:\\'.format(**dest))
dest['Free Space'] = human_readable_size(dest['Usage'].free)
dest['Display Name'] = '{Name} ({Free Space} available)'.format(**dest)
# Show menu or bail
if len(dests) > 0:
selection = menu_select('Where are we backing up to?', dests, actions)
if selection == 'M':
return None
else:
return dests[int(selection)-1]
else:
print_warning('No backup destinations found.')
return None
def select_disk(prompt='Which disk?'):
"""Select a disk from the attached disks"""
disks = get_attached_disk_info()
# Build menu
disk_options = []
for disk in disks:
display_name = '{Size}\t[{Table}] ({Type}) {Name}'.format(**disk)
if len(disk['Partitions']) > 0:
pwidth=len(str(len(disk['Partitions'])))
for par in disk['Partitions']:
# Show unsupported partition(s) in RED
par_skip = False
if 'Letter' not in par or re.search(r'(RAW|Unknown)', par['FileSystem'], re.IGNORECASE):
par_skip = True
if par_skip:
display_name += COLORS['YELLOW']
# Main text
display_name += '\n\t\t\tPartition {Number:>{pwidth}}: {Size} ({FileSystem})'.format(pwidth=pwidth, **par)
if par['Name'] != '':
display_name += '\t"{Name}"'.format(**par)
# Clear color (if set above)
if par_skip:
display_name += COLORS['CLEAR']
else:
display_name += '{YELLOW}\n\t\t\tNo partitions found.{CLEAR}'.format(**COLORS)
disk_options.append({'Name': display_name, 'Disk': disk})
actions = [
{'Name': 'Main Menu', 'Letter': 'M'},
]
# Menu loop
selection = menu_select(prompt, disk_options, actions)
if (selection.isnumeric()):
return disk_options[int(selection)-1]['Disk']
elif (selection == 'M'):
abort_to_main_menu()
def verify_wim_backup(bin=None, par=None):
# Bail early
if bin is None:
raise Exception('bin path not specified.')
if par is None:
raise Exception('Partition not specified.')
# Verify hiding all output for quicker verification
print(' Partition {Number} Image...\t\t'.format(**par), end='', flush=True)
cmd = '{bin}\\wimlib\\wimlib-imagex verify "{Image Path}\\{Image File}" --nocheck'.format(bin=bin, **par)
if not os.path.exists('{Image Path}\\{Image File}'.format(**par)):
print_error('Missing.')
else:
try:
run_program(cmd)
print_success('OK.')
except subprocess.CalledProcessError as err:
print_error('Damaged.')
par['Error'] = par.get('Error', []) + err.stderr.decode().splitlines()
raise BackupError
if __name__ == '__main__':
print("This file is not meant to be called directly.")

682
Scripts/functions/common.py Normal file
View file

@ -0,0 +1,682 @@
# Wizard Kit PE: Functions - Common
import os
import psutil
import re
import shutil
import subprocess
import sys
import time
import traceback
import winreg
from subprocess import CalledProcessError
from settings.main import *
from settings.tools import *
# Global variables
global_vars = {}
# STATIC VARIABLES
COLORS = {
'CLEAR': '\033[0m',
'RED': '\033[31m',
'GREEN': '\033[32m',
'YELLOW': '\033[33m',
'BLUE': '\033[34m'
}
HKU = winreg.HKEY_USERS
HKCU = winreg.HKEY_CURRENT_USER
HKLM = winreg.HKEY_LOCAL_MACHINE
# Error Classes
class BIOSKeyNotFoundError(Exception):
pass
class BinNotFoundError(Exception):
pass
class GenericError(Exception):
pass
class GenericRepair(Exception):
pass
class MultipleInstallationsError(Exception):
pass
class NotInstalledError(Exception):
pass
class NoProfilesError(Exception):
pass
class PathNotFoundException(Exception):
pass
class UnsupportedOSError(Exception):
pass
# General functions
def abort():
"""Abort script."""
print_warning('Aborted.')
sleep(5)
exit_script()
def ask(prompt='Kotaero!'):
"""Prompt the user with a Y/N question, log answer, and return a bool."""
answer = None
prompt = '{} [Y/N]: '.format(prompt)
while answer is None:
tmp = input(prompt)
if re.search(r'^y(es|)$', tmp, re.IGNORECASE):
answer = True
elif re.search(r'^n(o|ope|)$', tmp, re.IGNORECASE):
answer = False
message = '{prompt}{answer_text}'.format(
prompt = prompt,
answer_text = 'Yes' if answer else 'No')
print_log(message=message)
return answer
def convert_to_bytes(size):
"""Convert human-readable size str to bytes and return an int."""
size = str(size)
tmp = re.search(r'(\d+)\s+([KMGT]B)', size.upper())
if tmp:
size = int(tmp.group(1))
units = tmp.group(2)
if units == 'TB':
size *= 1099511627776
elif units == 'GB':
size *= 1073741824
elif units == 'MB':
size *= 1048576
elif units == 'KB':
size *= 1024
else:
return -1
return size
def exit_script(return_value=0):
"""Exits the script after some cleanup and opens the log (if set)."""
# Remove dirs (if empty)
for dir in ['BackupDir', 'LogDir', 'TmpDir']:
try:
dir = global_vars[dir]
os.rmdir(dir)
except Exception:
pass
# Open Log (if it exists)
log = global_vars.get('LogFile', '')
if log and os.path.exists(log):
try:
extract_item('NotepadPlusPlus', silent=True)
popen_program(
[global_vars['Tools']['NotepadPlusPlus'],
global_vars['LogFile']])
except Exception:
print_error('ERROR: Failed to extract Notepad++ and open log.')
pause('Press Enter to exit...')
# Kill Caffeine if still running
kill_process('caffeine.exe')
# Exit
sys.exit(return_value)
def extract_item(item, filter='', silent=False):
"""Extract item from .cbin into .bin."""
cmd = [
global_vars['Tools']['SevenZip'], 'x', '-aos', '-bso0', '-bse0',
'-p{ArchivePassword}'.format(**global_vars),
r'-o{BinDir}\{item}'.format(item=item, **global_vars),
r'{CBinDir}\{item}.7z'.format(item=item, **global_vars),
filter]
if not silent:
print_standard('Extracting "{item}"...'.format(item=item))
try:
run_program(cmd)
except subprocess.CalledProcessError:
if not silent:
print_warning('WARNING: Errors encountered while exctracting data')
def get_ticket_number():
"""Get TicketNumber from user, save in LogDir, and return as str."""
ticket_number = None
while ticket_number is None:
_input = input('Enter ticket number: ')
if re.match(r'^([0-9]+([-_]?\w+|))$', _input):
ticket_number = _input
with open(r'{LogDir}\TicketNumber'.format(**global_vars), 'w') as f:
f.write(ticket_number)
return ticket_number
def human_readable_size(size, decimals=0):
"""Convert size in bytes to a human-readable format and return a str."""
# Prep string formatting
width = 3+decimals
if decimals > 0:
width += 1
# Convert size to int
try:
size = int(size)
except ValueError:
size = convert_to_bytes(size)
# Verify we have a valid size
if size <= 0:
return '{size:>{width}} b'.format(size='???', width=width)
# Convert to sensible units
if size >= 1099511627776:
size /= 1099511627776
units = 'Tb'
elif size >= 1073741824:
size /= 1073741824
units = 'Gb'
elif size >= 1048576:
size /= 1048576
units = 'Mb'
elif size >= 1024:
size /= 1024
units = 'Kb'
else:
units = ' b'
# Return
return '{size:>{width}.{decimals}f} {units}'.format(
size=size, width=width, decimals=decimals, units=units)
def kill_process(name):
"""Kill any running caffeine.exe processes."""
for proc in psutil.process_iter():
if proc.name() == name:
proc.kill()
def major_exception():
"""Display traceback and exit"""
print_error('Major exception')
print_warning(SUPPORT_MESSAGE)
print(traceback.format_exc())
print_log(traceback.format_exc())
sleep(30)
pause('Press Enter to exit...')
exit_script(1)
def menu_select(title='~ Untitled Menu ~',
prompt='Please make a selection', secret_exit=False,
main_entries=[], action_entries=[], disabled_label='DISABLED'):
"""Display options in a menu and return selected option as a str."""
# Bail early
if not main_entries and not action_entries:
raise Exception("MenuError: No items given")
# Build menu
menu_splash = '{}\n\n'.format(title)
width = len(str(len(main_entries)))
valid_answers = []
if (secret_exit):
valid_answers.append('Q')
# Add main entries
for i in range(len(main_entries)):
entry = main_entries[i]
# Add Spacer
if ('CRLF' in entry):
menu_splash += '\n'
entry_str = '{number:>{width}}: {name}'.format(
number = i+1,
width = width,
name = entry.get('Display Name', entry['Name']))
if entry.get('Disabled', False):
entry_str = '{YELLOW}{entry_str} ({disabled}){CLEAR}'.format(
entry_str = entry_str,
disabled = disabled_label,
**COLORS)
else:
valid_answers.append(str(i+1))
menu_splash += '{}\n'.format(entry_str)
menu_splash += '\n'
# Add action entries
for entry in action_entries:
# Add Spacer
if ('CRLF' in entry):
menu_splash += '\n'
valid_answers.append(entry['Letter'])
menu_splash += '{letter:>{width}}: {name}\n'.format(
letter = entry['Letter'].upper(),
width = len(str(len(action_entries))),
name = entry['Name'])
menu_splash += '\n'
answer = ''
while (answer.upper() not in valid_answers):
os.system('cls')
print(menu_splash)
answer = input('{}: '.format(prompt))
return answer.upper()
def non_clobber_rename(full_path):
"""Append suffix to path, if necessary, to avoid clobbering path"""
new_path = full_path
_i = 1;
while os.path.exists(new_path):
new_path = '{path}_{i}'.format(i=_i, path=full_path)
_i += 1
return new_path
def pause(prompt='Press Enter to continue... '):
"""Simple pause implementation."""
input(prompt)
def ping(addr='google.com'):
"""Attempt to ping addr."""
cmd = ['ping', '-n', '2', addr]
run_program(cmd)
def popen_program(cmd, pipe=False, minimized=False, shell=False, **kwargs):
"""Run program and return a subprocess.Popen object."""
startupinfo=None
if minimized:
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags = subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = 6
if pipe:
popen_obj = subprocess.Popen(cmd, shell=shell, startupinfo=startupinfo,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
else:
popen_obj = subprocess.Popen(cmd, shell=shell, startupinfo=startupinfo)
return popen_obj
def print_error(*args, **kwargs):
"""Prints message to screen in RED."""
print_standard(*args, color=COLORS['RED'], **kwargs)
def print_info(*args, **kwargs):
"""Prints message to screen in BLUE."""
print_standard(*args, color=COLORS['BLUE'], **kwargs)
def print_standard(message='Generic info',
color=None, end='\n', timestamp=True, **kwargs):
"""Prints message to screen and log (if set)."""
display_message = message
if color:
display_message = color + message + COLORS['CLEAR']
# **COLORS is used below to support non-"standard" color printing
print(display_message.format(**COLORS), end=end, **kwargs)
print_log(message, end, timestamp)
def print_success(*args, **kwargs):
"""Prints message to screen in GREEN."""
print_standard(*args, color=COLORS['GREEN'], **kwargs)
def print_warning(*args, **kwargs):
"""Prints message to screen in YELLOW."""
print_standard(*args, color=COLORS['YELLOW'], **kwargs)
def print_log(message='', end='\n', timestamp=True):
time_str = time.strftime("%Y-%m-%d %H%M%z: ") if timestamp else ''
if 'LogFile' in global_vars and global_vars['LogFile'] is not None:
with open(global_vars['LogFile'], 'a') as f:
for line in message.splitlines():
f.write('{timestamp}{line}{end}'.format(
timestamp = time_str,
line = line,
end = end))
def run_program(cmd, args=[], check=True, pipe=True, shell=False):
"""Run program and return a subprocess.CompletedProcess object."""
if args:
# Deprecated so let's raise an exception to find & fix all occurances
print_error('ERROR: Using args is no longer supported.')
raise Exception
cmd = [c for c in cmd if c]
if shell:
cmd = ' '.join(cmd)
if pipe:
process_return = subprocess.run(cmd, check=check, shell=shell,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
else:
process_return = subprocess.run(cmd, check=check, shell=shell)
return process_return
def show_info(message='~Some message~', info='~Some info~', indent=8, width=32):
"""Display info with formatting."""
print_standard('{indent}{message:<{width}}{info}'.format(
indent=' '*indent, width=width, message=message, info=info))
def sleep(seconds=2):
"""Wait for a while."""
time.sleep(seconds)
def stay_awake():
"""Prevent the system from sleeping or hibernating."""
# Bail if caffeine is already running
for proc in psutil.process_iter():
if proc.name() == 'caffeine.exe':
return
# Extract and run
extract_item('Caffeine', silent=True)
try:
popen_program(global_vars['Tools']['Caffeine'])
except Exception:
print_error('ERROR: No caffeine available.')
print_warning('Please set the power setting to High Performance.')
def get_exception(s):
"""Get exception by name, returns Exception object."""
return getattr(sys.modules[__name__], s)
def try_and_print(message='Trying...',
function=None, cs='CS', ns='NS', other_results={},
catch_all=True, print_return=False, silent_function=True,
indent=8, width=32, *args, **kwargs):
"""Run function, print if successful or not, and return dict.
other_results is in the form of
{
'Warning': {'ExceptionClassName': 'Result Message'},
'Error': {'ExceptionClassName': 'Result Message'}
}
The the ExceptionClassNames will be excepted conditions
and the result string will be printed in the correct color.
catch_all=False will result in unspecified exceptions being re-raised."""
err = None
w_exceptions = other_results.get('Warning', {}).keys()
w_exceptions = tuple(get_exception(e) for e in w_exceptions)
e_exceptions = other_results.get('Error', {}).keys()
e_exceptions = tuple(get_exception(e) for e in e_exceptions)
w_results = other_results.get('Warning', {})
e_results = other_results.get('Error', {})
# Run function and catch errors
print_standard('{indent}{message:<{width}}'.format(
indent=' '*indent, message=message, width=width), end='', flush=True)
try:
out = function(*args, **kwargs)
if print_return:
print_standard(out[0], timestamp=False)
for item in out[1:]:
print_standard('{indent}{item}'.format(
indent=' '*(indent+width), item=item))
elif silent_function:
print_success(cs, timestamp=False)
except w_exceptions as e:
_result = w_results.get(e.__class__.__name__, 'Warning')
print_warning(_result, timestamp=False)
err = e
except e_exceptions as e:
_result = e_results.get(e.__class__.__name__, 'Error')
print_error(_result, timestamp=False)
err = e
except Exception:
print_error(ns, timestamp=False)
err = traceback.format_exc()
# Return or raise?
if bool(err) and not catch_all:
raise
else:
return {'CS': not bool(err), 'Error': err}
def upload_data(path, file):
"""Add CLIENT_INFO_SERVER to authorized connections and upload file."""
if not ENABLED_UPLOAD_DATA:
raise GenericError('Feature disabled.')
extract_item('PuTTY', filter='wizkit.ppk psftp.exe', silent=True)
# Authorize connection to the server
winreg.CreateKey(HKCU, r'Software\SimonTatham\PuTTY\SshHostKeys')
with winreg.OpenKey(HKCU, r'Software\SimonTatham\PuTTY\SshHostKeys',
access=winreg.KEY_WRITE) as key:
winreg.SetValueEx(key,
'rsa2@22:{IP}'.format(**CLIENT_INFO_SERVER), 0,
winreg.REG_SZ, CLIENT_INFO_SERVER['RegEntry'])
# Write batch file
with open(r'{TmpDir}\psftp.batch'.format(**global_vars),
'w', encoding='ascii') as f:
f.write('lcd "{path}"\n'.format(path=path))
f.write('cd "{Share}"\n'.format(**CLIENT_INFO_SERVER))
f.write('mkdir {TicketNumber}\n'.format(**global_vars))
f.write('cd {TicketNumber}\n'.format(**global_vars))
f.write('put "{file}"\n'.format(file=file))
# Upload Info
cmd = [
global_vars['Tools']['PuTTY-PSFTP'],
'-noagent',
'-i', r'{BinDir}\PuTTY\wizkit.ppk'.format(**global_vars),
'{User}@{IP}'.format(**CLIENT_INFO_SERVER),
'-b', r'{TmpDir}\psftp.batch'.format(**global_vars)]
run_program(cmd)
def upload_info():
"""Upload compressed Info file to the NAS as set in settings.main.py."""
if not ENABLED_UPLOAD_DATA:
raise GenericError('Feature disabled.')
path = '{ClientDir}'.format(**global_vars)
file = 'Info_{Date-Time}.7z'.format(**global_vars)
upload_data(path, file)
def compress_info():
"""Compress ClientDir info folders with 7-Zip for upload_info()."""
path = '{ClientDir}'.format(**global_vars)
file = 'Info_{Date-Time}.7z'.format(**global_vars)
_cmd = [
global_vars['Tools']['SevenZip'],
'a', '-t7z', '-mx=9', '-bso0', '-bse0',
r'{}\{}'.format(path, file),
r'{ClientDir}\Info'.format(**global_vars)]
run_program(_cmd)
def wait_for_process(name, poll_rate=3):
"""Wait for process by name."""
running = True
while running:
sleep(poll_rate)
running = False
for proc in psutil.process_iter():
if re.search(r'^{}'.format(name), proc.name(), re.IGNORECASE):
running = True
sleep(1)
# global_vars functions
def init_global_vars():
"""Sets global variables based on system info."""
print_info('Initializing')
os.system('title Wizard Kit')
init_functions = [
['Checking .bin...', find_bin],
['Checking environment...', set_common_vars],
['Checking OS...', check_os],
['Checking tools...', check_tools],
['Creating folders...', make_tmp_dirs],
['Clearing collisions...', clean_env_vars],
]
try:
for f in init_functions:
try_and_print(
message=f[0], function=f[1],
cs='Done', ns='Error', catch_all=False)
except:
major_exception()
def check_os():
"""Set OS specific variables."""
tmp = {}
# Query registry
_reg_path = winreg.OpenKey(
HKLM, r'SOFTWARE\Microsoft\Windows NT\CurrentVersion')
for key in ['CSDVersion', 'CurrentBuild', 'CurrentBuildNumber',
'CurrentVersion', 'ProductName']:
try:
tmp[key] = winreg.QueryValueEx(_reg_path, key)[0]
if key in ['CurrentBuild', 'CurrentBuildNumber']:
tmp[key] = int(tmp[key])
except ValueError:
# Couldn't convert Build to int so this should be interesting...
tmp[key] = 0
except Exception:
tmp[key] = 'Unknown'
# Determine OS bit depth
tmp['Arch'] = 32
if 'PROGRAMFILES(X86)' in global_vars['Env']:
tmp['Arch'] = 64
# Determine OS Name
tmp['Name'] = '{ProductName} {CSDVersion}'.format(**tmp)
if tmp['CurrentBuild'] == 9600:
tmp['Name'] += ' Update' # Win 8.1u
if tmp['CurrentBuild'] == 10240:
tmp['Name'] += ' Release 1507 "Threshold 1"'
if tmp['CurrentBuild'] == 10586:
tmp['Name'] += ' Release 1511 "Threshold 2"'
if tmp['CurrentBuild'] == 14393:
tmp['Name'] += ' Release 1607 "Redstone 1" / "Anniversary Update"'
if tmp['CurrentBuild'] == 15063:
tmp['Name'] += ' Release 1703 "Redstone 2" / "Creators Update"'
if tmp['CurrentBuild'] == 16299:
tmp['Name'] += ' Release 1709 "Redstone 3" / "Fall Creators Update"'
tmp['Name'] = tmp['Name'].replace('Service Pack ', 'SP')
tmp['Name'] = tmp['Name'].replace('Unknown Release', 'Release')
tmp['Name'] = re.sub(r'\s+', ' ', tmp['Name'])
# Determine OS version
name = '{Name} x{Arch}'.format(**tmp)
if tmp['CurrentVersion'] == '6.0':
tmp['Version'] = 'Vista'
name += ' (very outdated)'
elif tmp['CurrentVersion'] == '6.1':
tmp['Version'] = '7'
if tmp['CSDVersion'] == 'Service Pack 1':
name += ' (outdated)'
else:
name += ' (very outdated)'
elif tmp['CurrentVersion'] in ['6.2', '6.3']:
if int(tmp['CurrentBuildNumber']) <= 9600:
tmp['Version'] = '8'
elif int(tmp['CurrentBuildNumber']) >= 10240:
tmp['Version'] = '10'
if tmp['CurrentBuild'] in [9200, 10240, 10586]:
name += ' (very outdated)'
elif tmp['CurrentBuild'] in [9600, 14393, 15063]:
name += ' (outdated)'
elif tmp['CurrentBuild'] == 16299:
pass # Current Win10
else:
name += ' (unrecognized)'
tmp['DisplayName'] = name
# == vista ==
# 6.0.6000
# 6.0.6001
# 6.0.6002
# ==== 7 ====
# 6.1.7600
# 6.1.7601
# 6.1.7602
# ==== 8 ====
# 6.2.9200
# === 8.1 ===
# 6.3.9200
# === 8.1u ==
# 6.3.9600
# === 10 v1507 "Threshold 1" ==
# 6.3.10240
# === 10 v1511 "Threshold 2" ==
# 6.3.10586
# === 10 v1607 "Redstone 1" "Anniversary Update" ==
# 6.3.14393
# === 10 v1703 "Redstone 2" "Creators Update" ==
# 6.3.15063
# === 10 v1709 "Redstone 3" "Fall Creators Update" ==
# 6.3.16299
global_vars['OS'] = tmp
def check_tools():
"""Set tool variables based on OS bit-depth and tool availability."""
if global_vars['OS'].get('Arch', 32) == 64:
global_vars['Tools'] = {
k: v.get('64', v.get('32')) for (k, v) in TOOLS.items()}
else:
global_vars['Tools'] = {k: v.get('32') for (k, v) in TOOLS.items()}
# Fix paths
global_vars['Tools'] = {k: os.path.join(global_vars['BinDir'], v)
for (k, v) in global_vars['Tools'].items()}
def clean_env_vars():
"""Remove conflicting global_vars and env variables.
This fixes an issue where both global_vars and
global_vars['Env'] are expanded at the same time."""
for key in global_vars.keys():
global_vars['Env'].pop(key, None)
def find_bin():
"""Find .bin folder in the cwd or it's parents."""
wd = os.getcwd()
base = None
while base is None:
if os.path.exists('.bin'):
base = os.getcwd()
break
if re.fullmatch(r'\w:\\', os.getcwd()):
break
os.chdir('..')
os.chdir(wd)
if base is None:
raise BinNotFoundError
global_vars['BaseDir'] = base
def make_tmp_dirs():
"""Make temp directories."""
os.makedirs(global_vars['BackupDir'], exist_ok=True)
os.makedirs(global_vars['LogDir'], exist_ok=True)
os.makedirs(global_vars['TmpDir'], exist_ok=True)
def set_common_vars():
"""Set common variables."""
global_vars['Date'] = time.strftime("%Y-%m-%d")
global_vars['Date-Time'] = time.strftime("%Y-%m-%d_%H%M_%z")
global_vars['Env'] = os.environ.copy()
global_vars['ArchivePassword'] = ARCHIVE_PASSWORD
global_vars['BinDir'] = r'{BaseDir}\.bin'.format(
**global_vars)
global_vars['CBinDir'] = r'{BaseDir}\.cbin'.format(
**global_vars)
global_vars['ClientDir'] = r'{SYSTEMDRIVE}\{prefix}'.format(
prefix=KIT_NAME_SHORT, **global_vars['Env'])
global_vars['BackupDir'] = r'{ClientDir}\Backups\{Date}'.format(
**global_vars)
global_vars['LogDir'] = r'{ClientDir}\Info\{Date}'.format(
**global_vars)
global_vars['ProgBackupDir'] = r'{ClientDir}\Backups'.format(
**global_vars)
global_vars['QuarantineDir'] = r'{ClientDir}\Quarantine'.format(
**global_vars)
global_vars['TmpDir'] = r'{BinDir}\tmp'.format(
**global_vars)
if __name__ == '__main__':
print("This file is not meant to be called directly.")

607
Scripts/functions/data.py Normal file
View file

@ -0,0 +1,607 @@
# Wizard Kit PE: Functions - Data
import ctypes
from operator import itemgetter
from functions.common import *
# Classes
class LocalDisk():
def __init__(self, disk):
self.disk = disk
self.name = disk.mountpoint.upper()
self.path = self.name
def is_dir(self):
# Should always be true
return True
# Regex
REGEX_EXCL_ITEMS = re.compile(
r'^(\.(AppleDB|AppleDesktop|AppleDouble'
r'|com\.apple\.timemachine\.supported|dbfseventsd'
r'|DocumentRevisions-V100.*|DS_Store|fseventsd|PKInstallSandboxManager'
r'|Spotlight.*|SymAV.*|symSchedScanLockxz|TemporaryItems|Trash.*'
r'|vol|VolumeIcon\.icns)|desktop\.(ini|.*DB|.*DF)'
r'|(hiberfil|pagefile)\.sys|lost\+found|Network\.*Trash\.*Folder'
r'|Recycle[dr]|System\.*Volume\.*Information|Temporary\.*Items'
r'|Thumbs\.db)$',
re.IGNORECASE)
REGEX_EXCL_ROOT_ITEMS = re.compile(
r'^\\?(boot(mgr|nxt)$|Config.msi'
r'|(eula|globdata|install|vc_?red)'
r'|.*.sys$|System Volume Information|RECYCLER?|\$Recycle\.bin'
r'|\$?Win(dows(.old.*|\.~BT|)$|RE_)|\$GetCurrent|Windows10Upgrade'
r'|PerfLogs|Program Files|SYSTEM.SAV'
r'|.*\.(esd|swm|wim|dd|map|dmg|image)$)',
re.IGNORECASE)
REGEX_INCL_ROOT_ITEMS = re.compile(
r'^\\?(AdwCleaner|(My\s*|)(Doc(uments?( and Settings|)|s?)|Downloads'
r'|Media|Music|Pic(ture|)s?|Vid(eo|)s?)'
r'|{prefix}(-?Info|-?Transfer|)'
r'|(ProgramData|Recovery|Temp.*|Users)$'
r'|.*\.(log|txt|rtf|qb\w*|avi|m4a|m4v|mp4|mkv|jpg|png|tiff?)$)'
r''.format(prefix=KIT_NAME_SHORT),
re.IGNORECASE)
REGEX_WIM_FILE = re.compile(
r'\.wim$',
re.IGNORECASE)
REGEX_WINDOWS_OLD = re.compile(
r'^\\Win(dows|)\.old',
re.IGNORECASE)
# STATIC VARIABLES
FAST_COPY_EXCLUDES = [
r'\*.esd',
r'\*.swm',
r'\*.wim',
r'\*.dd',
r'\*.dd.tgz',
r'\*.dd.txz',
r'\*.map',
r'\*.dmg',
r'\*.image',
r'$RECYCLE.BIN',
r'$Recycle.Bin',
r'.AppleDB',
r'.AppleDesktop',
r'.AppleDouble',
r'.com.apple.timemachine.supported',
r'.dbfseventsd',
r'.DocumentRevisions-V100*',
r'.DS_Store',
r'.fseventsd',
r'.PKInstallSandboxManager',
r'.Spotlight*',
r'.SymAV*',
r'.symSchedScanLockxz',
r'.TemporaryItems',
r'.Trash*',
r'.vol',
r'.VolumeIcon.icns',
r'desktop.ini',
r'Desktop?DB',
r'Desktop?DF',
r'hiberfil.sys',
r'lost+found',
r'Network?Trash?Folder',
r'pagefile.sys',
r'Recycled',
r'RECYCLER',
r'System?Volume?Information',
r'Temporary?Items',
r'Thumbs.db',
]
FAST_COPY_ARGS = [
'/cmd=noexist_only',
'/utf8',
'/skip_empty_dir',
'/linkdest',
'/no_ui',
'/auto_close',
'/exclude={}'.format(';'.join(FAST_COPY_EXCLUDES)),
]
# Code borrowed from: https://stackoverflow.com/a/29075319
SEM_NORMAL = ctypes.c_uint()
SEM_FAILCRITICALERRORS = 1
SEM_NOOPENFILEERRORBOX = 0x8000
SEM_FAIL = SEM_NOOPENFILEERRORBOX | SEM_FAILCRITICALERRORS
def cleanup_transfer(dest_path):
"""Fix attributes and move extraneous items outside the Transfer folder."""
try:
# Remove dest_path if empty
os.rmdir(dest_path)
except OSError:
pass
if not os.path.exists(dest_path):
# Bail if dest_path was empty and removed
raise Exception
# Fix attributes
cmd = ['attrib', '-a', '-h', '-r', '-s', dest_path]
run_program(cmd, check=False)
for root, dirs, files in os.walk(dest_path, topdown=False):
for name in dirs:
# Remove empty directories and junction points
try:
os.rmdir(os.path.join(root, name))
except OSError:
pass
for name in files:
# "Remove" files based on exclusion regex
if REGEX_EXCL_ITEMS.search(name):
# Make dest folder
dest_name = root.replace(dest_path, dest_path+'.Removed')
os.makedirs(dest_name, exist_ok=True)
# Set dest filename
dest_name = os.path.join(dest_name, name)
dest_name = non_clobber_rename(dest_name)
source_name = os.path.join(root, name)
try:
shutil.move(source_name, dest_name)
except Exception:
pass
def is_valid_wim_file(item):
"""Checks if the provided os.DirEntry is a valid WIM file, returns bool."""
valid = bool(item.is_file() and REGEX_WIM_FILE.search(item.name))
if valid:
extract_item('wimlib', silent=True)
cmd = [global_vars['Tools']['wimlib-imagex'], 'info', item.path]
try:
run_program(cmd)
except subprocess.CalledProcessError:
valid = False
print_log('WARNING: Image "{}" damaged.'.format(item.name))
return valid
def mount_backup_shares():
"""Mount the backup shares unless labeled as already mounted."""
for server in BACKUP_SERVERS:
# Blindly skip if we mounted earlier
if server['Mounted']:
continue
mount_network_share(server)
def mount_network_share(server):
"""Mount a network share defined by server."""
# Test connection
try:
ping(server['IP'])
except subprocess.CalledProcessError:
print_error(
r'Failed to mount \\{Name}\{Share}, {IP} unreachable.'.format(
**server))
sleep(1)
return False
# Mount
cmd = r'net use \\{IP}\{Share} /user:{User} {Pass}'.format(**server)
cmd = cmd.split(' ')
try:
run_program(cmd)
except Exception:
print_warning(r'Failed to mount \\{Name}\{Share} ({IP})'.format(
**server))
sleep(1)
else:
print_info('Mounted {Name}'.format(**server))
server['Mounted'] = True
def run_fast_copy(items, dest):
"""Copy items to dest using FastCopy."""
if not items:
raise Exception
cmd = [global_vars['Tools']['FastCopy'], *FAST_COPY_ARGS]
if 'LogFile' in global_vars:
cmd.append('/logfile={LogFile}'.format(**global_vars))
cmd.extend(items)
cmd.append('/to={}\\'.format(dest))
run_program(cmd)
def run_wimextract(source, items, dest):
"""Extract items from source WIM to dest folder."""
if not items:
raise Exception
extract_item('wimlib', silent=True)
# Write files.txt
with open(r'{TmpDir}\wim_files.txt'.format(**global_vars), 'w') as f:
# Defaults
for item in items:
f.write('{item}\n'.format(item=item))
sleep(1) # For safety?
# Extract files
cmd = [
global_vars['Tools']['wimlib-imagex'],
'extract',
source, '1',
r'@{TmpDir}\wim_files.txt'.format(**global_vars),
'--dest-dir={}\\'.format(dest),
'--no-acls',
'--nullglob']
run_program(cmd)
def scan_source(source_obj, dest_path):
"""Scan source for files/folders to transfer."""
selected_items = []
if source_obj.is_dir():
# File-Based
print_standard('Scanning source (folder): {}'.format(source_obj.path))
selected_items = scan_source_path(source_obj.path, dest_path)
else:
# Image-Based
if REGEX_WIM_FILE.search(source_obj.name):
print_standard('Scanning source (image): {}'.format(
source_obj.path))
selected_items = scan_source_wim(source_obj.path, dest_path)
else:
print_error('ERROR: Unsupported image: {}'.format(
source_obj.path))
raise GenericError
return selected_items
def scan_source_path(source_path, dest_path, rel_path=None, interactive=True):
"""Scan source folder for files/folders to transfer, returns list.
This will scan the root and (recursively) any Windows.old folders."""
rel_path = '\\' + rel_path if rel_path else ''
if rel_path:
dest_path = dest_path + rel_path
selected_items = []
win_olds = []
# Root items
root_items = []
for item in os.scandir(source_path):
if REGEX_INCL_ROOT_ITEMS.search(item.name):
root_items.append(item.path)
elif not REGEX_EXCL_ROOT_ITEMS.search(item.name):
if (not interactive
or ask('Copy: "{}{}" ?'.format(rel_path, item.name))):
root_items.append(item.path)
if REGEX_WINDOWS_OLD.search(item.name):
win_olds.append(item)
if root_items:
selected_items.append({
'Message': '{}Root Items...'.format(rel_path),
'Items': root_items.copy(),
'Destination': dest_path})
# Fonts
if os.path.exists(r'{}\Windows\Fonts'.format(source_path)):
selected_items.append({
'Message': '{}Fonts...'.format(rel_path),
'Items': [r'{}\Windows\Fonts'.format(rel_path)],
'Destination': r'{}\Windows'.format(dest_path)})
# Registry
registry_items = []
for folder in ['config', 'OEM']:
folder = r'Windows\System32\{}'.format(folder)
folder = os.path.join(source_path, folder)
if os.path.exists(folder):
registry_items.append(folder)
if registry_items:
selected_items.append({
'Message': '{}Registry...'.format(rel_path),
'Items': registry_items.copy(),
'Destination': r'{}\Windows\System32'.format(dest_path)})
# Windows.old(s)
for old in win_olds:
selected_items.append(
scan_source_path(
old.path, dest_path, rel_path=old.name, interactive=False))
# Done
return selected_items
def scan_source_wim(source_wim, dest_path, rel_path=None, interactive=True):
"""Scan source WIM file for files/folders to transfer, returns list.
This will scan the root and (recursively) any Windows.old folders."""
rel_path = '\\' + rel_path if rel_path else ''
selected_items = []
win_olds = []
# Scan source
extract_item('wimlib', silent=True)
cmd = [
global_vars['Tools']['wimlib-imagex'], 'dir',
source_wim, '1']
try:
file_list = run_program(cmd)
except subprocess.CalledProcessError:
print_error('ERROR: Failed to get file list.')
raise
# Root Items
file_list = [i.strip()
for i in file_list.stdout.decode('utf-8', 'ignore').splitlines()
if i.count('\\') == 1 and i.strip() != '\\']
root_items = []
if rel_path:
file_list = [i.replace(rel_path, '') for i in file_list]
for item in file_list:
if REGEX_INCL_ROOT_ITEMS.search(item):
root_items.append(item)
elif not REGEX_EXCL_ROOT_ITEMS.search(item):
if (not interactive
or ask('Extract: "{}{}" ?'.format(rel_path, item))):
root_items.append('{}{}'.format(rel_path, item))
if REGEX_WINDOWS_OLD.search(item):
win_olds.append(item)
if root_items:
selected_items.append({
'Message': '{}Root Items...'.format(rel_path),
'Items': root_items.copy(),
'Destination': dest_path})
# Fonts
if wim_contains(source_wim, r'{}Windows\Fonts'.format(rel_path)):
selected_items.append({
'Message': '{}Fonts...'.format(rel_path),
'Items': [r'{}\Windows\Fonts'.format(rel_path)],
'Destination': dest_path})
# Registry
registry_items = []
for folder in ['config', 'OEM']:
folder = r'{}Windows\System32\{}'.format(rel_path, folder)
if wim_contains(source_wim, folder):
registry_items.append(folder)
if registry_items:
selected_items.append({
'Message': '{}Registry...'.format(rel_path),
'Items': registry_items.copy(),
'Destination': dest_path})
# Windows.old(s)
for old in win_olds:
scan_source_wim(source_wim, dest_path, rel_path=old, interactive=False)
# Done
return selected_items
def select_destination(folder_path, prompt='Select destination'):
"""Select destination drive, returns path as string."""
disk = select_disk(prompt)
if 'fixed' not in disk['Disk'].opts:
folder_path = folder_path.replace('\\', '-')
path = '{disk}{folder_path}_{Date}'.format(
disk = disk['Disk'].mountpoint,
folder_path = folder_path,
**global_vars)
# Avoid merging with existing folder
path = non_clobber_rename(path)
os.makedirs(path, exist_ok=True)
return path
def select_disk(title='Select disk', auto_select=True):
"""Select disk from attached disks. returns dict."""
actions = [{'Name': 'Quit', 'Letter': 'Q'}]
disks = []
# Build list of disks
set_thread_error_mode(silent=True) # Prevents "No disk" popups
for d in psutil.disk_partitions():
info = {
'Disk': d,
'Name': d.mountpoint}
try:
usage = psutil.disk_usage(d.device)
free = '{free} / {total} available'.format(
free = human_readable_size(usage.free, 2),
total = human_readable_size(usage.total, 2))
except Exception:
# Meh, leaving unsupported destinations out
pass
# free = 'Unknown'
# info['Disabled'] = True
else:
info['Display Name'] = '{} ({})'.format(info['Name'], free)
disks.append(info)
set_thread_error_mode(silent=False) # Return to normal
# Skip menu?
if len(disks) == 1 and auto_select:
return disks[0]
# Show menu
selection = menu_select(title, main_entries=disks, action_entries=actions)
if selection == 'Q':
exit_script()
else:
return disks[int(selection)-1]
def select_source(ticket_number):
"""Select backup from those found on the BACKUP_SERVERS for the ticket."""
selected_source = None
sources = []
mount_backup_shares()
# Check for ticket folders on servers
for server in BACKUP_SERVERS:
if server['Mounted']:
print_standard('Scanning {}...'.format(server['Name']))
for d in os.scandir(r'\\{IP}\{Share}'.format(**server)):
if (d.is_dir()
and d.name.lower().startswith(ticket_number.lower())):
# Add folder to sources
sources.append({
'Name': '{:9}| File-Based: [DIR] {}'.format(
server['Name'], d.name),
'Server': server,
'Source': d})
# Check for images and subfolders
for ticket_path in sources.copy():
for item in os.scandir(ticket_path['Source'].path):
if item.is_dir():
# Add folder to sources
sources.append({
'Name': r'{:9}| File-Based: [DIR] {}\{}'.format(
ticket_path['Server']['Name'], # Server
ticket_path['Source'].name, # Ticket folder
item.name, # Sub-folder
),
'Server': ticket_path['Server'],
'Source': item})
# Check for images in folder
for subitem in os.scandir(item.path):
if REGEX_WIM_FILE.search(item.name):
# Add image to sources
try:
size = human_readable_size(item.stat().st_size)
except Exception:
size = ' ? ?' # unknown
sources.append({
'Disabled': bool(not is_valid_wim_file(subitem)),
'Name': r'{:9}| Image-Based: {:>7} {}\{}\{}'.format(
ticket_path['Server']['Name'], # Server
size, # Size (duh)
ticket_path['Source'].name, # Ticket folder
item.name, # Sub-folder
subitem.name, # Image file
),
'Server': ticket_path['Server'],
'Source': subitem})
elif REGEX_WIM_FILE.search(item.name):
# Add image to sources
try:
size = human_readable_size(item.stat().st_size)
except Exception:
size = ' ? ?' # unknown
sources.append({
'Disabled': bool(not is_valid_wim_file(item)),
'Name': r'{:9}| Image-Based: {:>7} {}\{}'.format(
ticket_path['Server']['Name'], # Server
size, # Size (duh)
ticket_path['Source'].name, # Ticket folder
item.name, # Image file
),
'Server': ticket_path['Server'],
'Source': item})
# Check for local sources
print_standard('Scanning for local sources...')
set_thread_error_mode(silent=True) # Prevents "No disk" popups
sys_drive = global_vars['Env']['SYSTEMDRIVE']
for d in psutil.disk_partitions():
if re.search(r'^{}'.format(sys_drive), d.mountpoint, re.IGNORECASE):
# Skip current OS drive
continue
if 'fixed' in d.opts:
# Skip DVD, etc
sources.append({
'Name': '{:9}| File-Based: [DISK] {}'.format(
' Local', d.mountpoint),
'Source': LocalDisk(d)})
set_thread_error_mode(silent=False) # Return to normal
# Build Menu
sources.sort(key=itemgetter('Name'))
actions = [{'Name': 'Quit', 'Letter': 'Q'}]
# Select backup from sources
if len(sources) > 0:
selection = menu_select(
'Which backup are we using?',
main_entries=sources,
action_entries=actions,
disabled_label='DAMAGED')
if selection == 'Q':
umount_backup_shares()
exit_script()
else:
selected_source = sources[int(selection)-1]['Source']
else:
print_error('ERROR: No backups found for ticket: {}.'.format(
ticket_number))
umount_backup_shares()
pause("Press Enter to exit...")
exit_script()
# Done
return selected_source
def set_thread_error_mode(silent=True):
"""Disable or Enable Windows error message dialogs.
Disable when scanning for disks to avoid popups for empty cardreaders, etc
"""
# Code borrowed from: https://stackoverflow.com/a/29075319
kernel32 = ctypes.WinDLL('kernel32')
if silent:
kernel32.SetThreadErrorMode(SEM_FAIL, ctypes.byref(SEM_NORMAL))
else:
kernel32.SetThreadErrorMode(SEM_NORMAL, ctypes.byref(SEM_NORMAL))
def transfer_source(source_obj, dest_path, selected_items):
"""Transfer, or extract, files/folders from source to destination."""
if source_obj.is_dir():
# Run FastCopy for each selection "group"
for group in selected_items:
try_and_print(message=group['Message'],
function=run_fast_copy, cs='Done',
items=group['Items'],
dest=group['Destination'])
else:
if REGEX_WIM_FILE.search(source_obj.name):
# Extract files from WIM
for group in selected_items:
try_and_print(message=group['Message'],
function=run_wimextract, cs='Done',
source=source_obj.path,
items=group['Items'],
dest=group['Destination'])
else:
print_error('ERROR: Unsupported image: {}'.format(source_obj.path))
raise GenericError
def umount_backup_shares():
"""Unnount the backup shares regardless of current status."""
for server in BACKUP_SERVERS:
umount_network_share(server)
def umount_network_share(server):
"""Unnount a network share defined by server."""
cmd = r'net use \\{IP}\{Share} /delete'.format(**server)
cmd = cmd.split(' ')
try:
run_program(cmd)
except Exception:
print_error(r'Failed to umount \\{Name}\{Share}.'.format(**server))
sleep(1)
else:
print_info('Umounted {Name}'.format(**server))
server['Mounted'] = False
def wim_contains(source_path, file_path):
"""Check if the WIM contains a file or folder."""
_cmd = [
global_vars['Tools']['wimlib-imagex'], 'dir',
source_path, '1',
'--path={}'.format(file_path),
'--one-file-only']
try:
run_program(_cmd)
except subprocess.CalledProcessError:
return False
else:
return True
if __name__ == '__main__':
print("This file is not meant to be called directly.")

View file

@ -1,4 +1,4 @@
# WK WinPE - PARTITION UIDs
# Wizard Kit PE: Functions - PARTITION UIDs
# sources: https://en.wikipedia.org/wiki/GUID_Partition_Table
# https://en.wikipedia.org/wiki/Partition_type

View file

@ -0,0 +1,273 @@
# Wizard Kit PE: Functions - Windows Setup
from functions.common import *
# STATIC VARIABLES
DISKPART_SCRIPT = '{tmp}\\diskpart.script'.format(tmp=os.environ['TMP'])
WINDOWS_VERSIONS = [
{'Name': 'Windows 7 Home Basic',
'Image File': 'Win7',
'Image Name': 'Windows 7 HOMEBASIC',
'Family': '7'},
{'Name': 'Windows 7 Home Premium',
'Image File': 'Win7',
'Image Name': 'Windows 7 HOMEPREMIUM',
'Family': '7'},
{'Name': 'Windows 7 Professional',
'Image File': 'Win7',
'Image Name': 'Windows 7 PROFESSIONAL',
'Family': '7'},
{'Name': 'Windows 7 Ultimate',
'Image File': 'Win7',
'Image Name': 'Windows 7 ULTIMATE',
'Family': '7'},
{'Name': 'Windows 8.1',
'Image File': 'Win8',
'Image Name': 'Windows 8.1',
'Family': '8',
'CRLF': True},
{'Name': 'Windows 8.1 Pro',
'Image File': 'Win8',
'Image Name': 'Windows 8.1 Pro',
'Family': '8'},
{'Name': 'Windows 10 Home',
'Image File': 'Win10',
'Image Name': 'Windows 10 Home',
'Family': '10',
'CRLF': True},
{'Name': 'Windows 10 Pro',
'Image File': 'Win10',
'Image Name': 'Windows 10 Pro',
'Family': '10'},
]
def is_index_in_image(bin=None, filename=None, imagename=None):
# Bail early
if bin is None:
raise Exception('bin not specified.')
if filename is None:
raise Exception('Filename not specified.')
if imagename is None:
raise Exception('Image Name not specified.')
cmd = '{bin}\\wimlib\\wimlib-imagex info "{filename}" "{imagename}"'.format(bin=bin, filename=filename, imagename=imagename)
try:
run_program(cmd)
except subprocess.CalledProcessError:
print_error('Invalid image: {filename}'.format(filename=filename))
return False
return True
def find_windows_image(bin, windows_version=None):
"""Search for a Windows source image file on local drives and network drives (in that order)"""
image = {}
# Bail early
if windows_version is None:
raise Exception('Windows version not specified.')
imagefile = windows_version['Image File']
# Search local source
process_return = run_program('mountvol')
for tmp in re.findall(r'.*([A-Za-z]):\\', process_return.stdout.decode()):
for ext in ['esd', 'wim', 'swm']:
filename = '{drive}:\\images\\{imagefile}'.format(drive=tmp[0], imagefile=imagefile)
filename_ext = '{filename}.{ext}'.format(filename=filename, ext=ext)
if os.path.isfile(filename_ext):
if is_index_in_image(bin, filename_ext, windows_version['Image Name']):
image['Ext'] = ext
image['File'] = filename
image['Glob'] = '--ref="{File}*.swm"'.format(**image) if ext == 'swm' else ''
image['Source'] = tmp[0]
break
# Check for network source (if necessary)
if not any(image):
if not WINDOWS_SERVER['Mounted']:
mount_windows_share()
for ext in ['esd', 'wim', 'swm']:
filename = '\\\\{IP}\\{Share}\\images\\{imagefile}'.format(imagefile=imagefile, **WINDOWS_SERVER)
filename_ext = '{filename}.{ext}'.format(filename=filename, ext=ext)
if os.path.isfile(filename_ext):
if is_index_in_image(bin, filename_ext, windows_version['Image Name']):
image['Ext'] = ext
image['File'] = filename
image['Glob'] = '--ref="{File}*.swm"'.format(**image) if ext == 'swm' else ''
image['Source'] = None
break
# Display image to be used (if any) and return
if any(image):
print_info('Using image: {File}.{Ext}'.format(**image))
return image
else:
print_error('Failed to find Windows source image for {winver}'.format(winver=windows_version['Name']))
abort_to_main_menu('Aborting Windows setup')
def format_gpt(disk=None, windows_family=None):
"""Format disk for use as a Windows OS drive using the GPT (UEFI) layout."""
# Bail early
if disk is None:
raise Exception('No disk provided.')
if windows_family is None:
raise Exception('No Windows family provided.')
# Format drive
# print_info('Drive will use a GPT (UEFI) layout.')
with open(DISKPART_SCRIPT, 'w') as script:
# Partition table
script.write('select disk {number}\n'.format(number=disk['Number']))
script.write('clean\n')
script.write('convert gpt\n')
# System partition
script.write('create partition efi size=260\n') # NOTE: Allows for Advanced Format 4K drives
script.write('format quick fs=fat32 label="System"\n')
script.write('assign letter="S"\n')
# Microsoft Reserved (MSR) partition
script.write('create partition msr size=128\n')
# Windows partition
script.write('create partition primary\n')
script.write('format quick fs=ntfs label="Windows"\n')
script.write('assign letter="W"\n')
# Recovery Tools partition (Windows 8+)
if re.search(r'^(8|10)', windows_family):
script.write('shrink minimum=500\n')
script.write('create partition primary\n')
script.write('format quick fs=ntfs label="Recovery Tools"\n')
script.write('assign letter="T"\n')
script.write('set id="de94bba4-06d1-4d40-a16a-bfd50179d6ac"\n')
script.write('gpt attributes=0x8000000000000001\n')
# Run script
run_program('diskpart /s {script}'.format(script=DISKPART_SCRIPT))
time.sleep(2)
def format_mbr(disk=None, windows_family=None):
"""Format disk for use as a Windows OS drive using the MBR (legacy) layout."""
# Bail early
if disk is None:
raise Exception('No disk provided.')
if windows_family is None:
raise Exception('No Windows family provided.')
# Format drive
# print_info('Drive will use a MBR (legacy) layout.')
with open(DISKPART_SCRIPT, 'w') as script:
# Partition table
script.write('select disk {number}\n'.format(number=disk['Number']))
script.write('clean\n')
# System partition
script.write('create partition primary size=100\n')
script.write('format fs=ntfs quick label="System Reserved"\n')
script.write('active\n')
script.write('assign letter="S"\n')
# Windows partition
script.write('create partition primary\n')
script.write('format fs=ntfs quick label="Windows"\n')
script.write('assign letter="W"\n')
# Recovery Tools partition (Windows 8+)
if re.search(r'^(8|10)', windows_family):
script.write('shrink minimum=500\n')
script.write('create partition primary\n')
script.write('format quick fs=ntfs label="Recovery"\n')
script.write('assign letter="T"\n')
script.write('set id=27\n')
# Run script
run_program('diskpart /s {script}'.format(script=DISKPART_SCRIPT))
time.sleep(2)
def get_boot_mode():
boot_mode = 'Legacy'
try:
reg_key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, 'System\\CurrentControlSet\\Control')
reg_value = winreg.QueryValueEx(reg_key, 'PEFirmwareType')[0]
if reg_value == 2:
boot_mode = 'UEFI'
except:
boot_mode = 'Unknown'
return boot_mode
def mount_windows_share():
"""Mount the Windows images share for use in Windows setup"""
# Blindly skip if we mounted earlier
if WINDOWS_SERVER['Mounted']:
return None
else:
try:
# Test connection
run_program('ping -w 800 -n 2 {IP}'.format(**WINDOWS_SERVER))
# Mount
run_program('net use \\\\{IP}\\{Share} /user:{User} {Pass}'.format(**WINDOWS_SERVER))
print_info('Mounted {Name}'.format(**WINDOWS_SERVER))
WINDOWS_SERVER['Mounted'] = True
except subprocess.CalledProcessError:
print_error('Failed to mount \\\\{Name}\\{Share}, {IP} unreachable.'.format(**WINDOWS_SERVER))
time.sleep(1)
except:
print_warning('Failed to mount \\\\{Name}\\{Share}'.format(**WINDOWS_SERVER))
time.sleep(1)
def select_windows_version():
actions = [{'Name': 'Main Menu', 'Letter': 'M'},]
# Menu loop
selection = menu_select('Which version of Windows are we installing?', WINDOWS_VERSIONS, actions)
if selection.isnumeric():
return WINDOWS_VERSIONS[int(selection)-1]
elif selection == 'M':
abort_to_main_menu()
def setup_windows(bin=None, windows_image=None, windows_version=None):
# Bail early
if bin is None:
raise Exception('bin path not specified.')
if windows_image is None:
raise Exception('Windows image not specified.')
if windows_version is None:
raise Exception('Windows version not specified.')
# Apply image
cmd = '{bin}\\wimlib\\wimlib-imagex apply "{File}.{Ext}" "{Image Name}" W:\\ {Glob}'.format(bin=bin, **windows_image, **windows_version)
run_program(cmd)
def setup_windows_re(windows_version=None, windows_letter='W', tools_letter='T'):
# Bail early
if windows_version is None:
raise Exception('Windows version not specified.')
_win = '{win}:\\Windows'.format(win=windows_letter)
_winre = '{win}\\System32\\Recovery\\WinRE.wim'.format(win=_win)
_dest = '{tools}:\\Recovery\\WindowsRE'.format(tools=tools_letter)
if re.search(r'^(8|10)', windows_version['Family']):
# Copy WinRE.wim
os.makedirs(_dest, exist_ok=True)
shutil.copy(_winre, '{dest}\\WinRE.wim'.format(dest=_dest))
# Set location
run_program('{win}\\System32\\reagentc /setreimage /path {dest} /target {win}'.format(dest=_dest, win=_win))
else:
# Only supported on Windows 8 and above
raise SetupError
def update_boot_partition(system_letter='S', windows_letter='W', mode='ALL'):
run_program('bcdboot {win}:\\Windows /s {sys}: /f {mode}'.format(win=windows_letter, sys=system_letter, mode=mode))
if __name__ == '__main__':
print("This file is not meant to be called directly.")

View file

@ -1,38 +1,66 @@
# WK WinPE Menu
# Wizard Kit PE: Menus
# Init
import os
import re
import subprocess
import sys
import time
import traceback
os.chdir(os.path.dirname(os.path.realpath(__file__)))
bin = os.path.abspath('..\\')
sys.path.append(os.getcwd())
from functions import *
from functions.data import *
## Colors
COLORS = {
'CLEAR': '\033[0m',
'RED': '\033[31m',
'GREEN': '\033[32m',
'YELLOW': '\033[33m',
'BLUE': '\033[34m'}
# STATIC VARIABLES
FAST_COPY_ARGS = [
'/cmd=noexist_only',
'/utf8',
'/skip_empty_dir',
'/linkdest',
'/no_ui',
'/auto_close',
'/exclude={}'.format(';'.join(FAST_COPY_EXCLUDES)),
]
PE_TOOLS = {
'BlueScreenView': {
'Path': r'BlueScreenView\BlueScreenView.exe',
},
'FastCopy': {
'Path': r'FastCopy\FastCopy.exe',
'Args': FAST_COPY_ARGS,
},
'HWiNFO': {
'Path': r'HWiNFO\HWiNFO.exe',
},
'NT Password Editor': {
'Path': r'NT Password Editor\ntpwedit.exe',
},
'Notepad++': {
'Path': r'NotepadPlusPlus\NotepadPlusPlus.exe',
},
'PhotoRec': {
'Path': r'TestDisk\photorec_win.exe',
'Args': ['-new_console:n'],
},
'Prime95': {
'Path': r'Prime95\prime95.exe',
},
'ProduKey': {
'Path': r'ProduKey\ProduKey.exe',
},
'Q-Dir': {
'Path': r'Q-Dir\Q-Dir.exe',
},
'TestDisk': {
'Path': r'TestDisk\testdisk_win.exe',
'Args': ['-new_console:n'],
},
}
def menu_backup_imaging():
def menu_backup():
"""Take backup images of partition(s) in the WIM format and save them to a backup share"""
errors = False
# Set ticket ID
os.system('cls')
ticket_id = get_ticket_id()
ticket_id = get_ticket_number()
# Mount backup shares
mount_backup_shares()
# Select destination
dest = select_destination()
dest = select_backup_destination()
if dest is None:
abort_to_main_menu('Aborting Backup Creation')
@ -90,13 +118,48 @@ def menu_backup_imaging():
time.sleep(5)
pause('\nPress Enter to return to main menu... ')
def menu_windows_setup():
def menu_root():
title = '{}: Main Menu'.format(KIT_NAME_FULL)
menus = [
{'Name': 'Create Backups', 'Menu': menu_backup},
{'Name': 'Setup Windows', 'Menu': menu_setup},
{'Name': 'Misc Tools', 'Menu': menu_tools},
]
actions = [
{'Name': 'Command Prompt', 'Letter': 'C'},
{'Name': 'Reboot', 'Letter': 'R'},
{'Name': 'Shutdown', 'Letter': 'S'},
]
# Main loop
while True:
selection = menu_select(
title=title,
main_entries=menus,
action_entries=actions,
secret_exit=True)
if (selection.isnumeric()):
try:
menus[int(selection)-1]['Menu']()
except AbortError:
pass
elif (selection == 'C'):
run_program(['cmd', '-new_console:n'], check=False)
elif (selection == 'R'):
run_program(['wpeutil', 'reboot'])
elif (selection == 'S'):
run_program(['wpeutil', 'shutdown'])
else:
exit_script()
def menu_setup():
"""Format a drive, partition for MBR or GPT, apply a Windows image, and rebuild the boot files"""
errors = False
# Set ticket ID
os.system('cls')
ticket_id = get_ticket_id()
ticket_id = get_ticket_number()
# Select the version of Windows to apply
windows_version = select_windows_version()
@ -197,80 +260,55 @@ def menu_windows_setup():
pause('\nPress Enter to return to main menu... ')
def menu_tools():
tools = [
{'Name': 'Blue Screen View', 'Folder': 'BlueScreenView', 'File': 'BlueScreenView.exe'},
{'Name': 'Fast Copy', 'Folder': 'FastCopy', 'File': 'FastCopy.exe', 'Args': ['/log', '/logfile=X:\WK\Info\FastCopy.log', '/cmd=noexist_only', '/utf8', '/skip_empty_dir', '/linkdest', '/open_window', '/balloon=FALSE', r'/exclude=$RECYCLE.BIN;$Recycle.Bin;.AppleDB;.AppleDesktop;.AppleDouble;.com.apple.timemachine.supported;.dbfseventsd;.DocumentRevisions-V100*;.DS_Store;.fseventsd;.PKInstallSandboxManager;.Spotlight*;.SymAV*;.symSchedScanLockxz;.TemporaryItems;.Trash*;.vol;.VolumeIcon.icns;desktop.ini;Desktop?DB;Desktop?DF;hiberfil.sys;lost+found;Network?Trash?Folder;pagefile.sys;Recycled;RECYCLER;System?Volume?Information;Temporary?Items;Thumbs.db']},
{'Name': 'HWiNFO', 'Folder': 'HWiNFO', 'File': 'HWiNFO.exe'},
{'Name': 'NT Password Editor', 'Folder': 'NT Password Editor', 'File': 'ntpwedit.exe'},
{'Name': 'Notepad++', 'Folder': 'NotepadPlusPlus', 'File': 'notepadplusplus.exe'},
{'Name': 'PhotoRec', 'Folder': 'TestDisk', 'File': 'photorec_win.exe', 'Args': ['-new_console:n']},
{'Name': 'Prime95', 'Folder': 'Prime95', 'File': 'prime95.exe'},
{'Name': 'ProduKey', 'Folder': 'ProduKey', 'File': 'ProduKey.exe', 'Args': ['/external', '/ExtractEdition:1']},
{'Name': 'Q-Dir', 'Folder': 'Q-Dir', 'File': 'Q-Dir.exe'},
{'Name': 'TestDisk', 'Folder': 'TestDisk', 'File': 'testdisk_win.exe', 'Args': ['-new_console:n']},
]
actions = [
{'Name': 'Main Menu', 'Letter': 'M'},
]
title = '{}: Tools Menu'.format(KIT_NAME_FULL)
tools = [k for k in sorted(PE_TOOLS.keys())]
actions = [{'Name': 'Main Menu', 'Letter': 'M'},]
# Menu loop
while True:
selection = menu_select('Tools Menu', tools, actions)
selection = menu_select(title, tools, actions)
if (selection.isnumeric()):
tool = tools[int(selection)-1]
cmd = ['{bin}\\{folder}\\{file}'.format(bin=bin, folder=tool['Folder'], file=tool['File'])]
if tool['Name'] == 'Blue Screen View':
cmd = [PE_TOOLS[tool]['Path']] + PE_TOOLS[tool].get('Args', [])
if tool == 'Blue Screen View':
# Select path to scan
minidump_path = select_minidump_path()
if minidump_path is not None:
tool['Args'] = ['/MiniDumpFolder', minidump_path]
if 'Args' in tool:
cmd += tool['Args']
if minidump_path:
cmd.extend(['/MiniDumpFolder', minidump_path])
try:
subprocess.Popen(cmd)
except:
popen_program(cmd)
except Exception:
print_error('Failed to run {prog}'.format(prog=tool['Name']))
time.sleep(2)
time.sleep(2)
pause()
elif (selection == 'M'):
break
def menu_main():
menus = [
{'Name': 'Create Backups', 'Menu': menu_backup_imaging},
{'Name': 'Setup Windows', 'Menu': menu_windows_setup},
{'Name': 'Misc Tools', 'Menu': menu_tools},
]
actions = [
{'Name': 'Command Prompt', 'Letter': 'C'},
{'Name': 'Reboot', 'Letter': 'R'},
{'Name': 'Shutdown', 'Letter': 'S'},
]
def select_minidump_path():
dumps = []
# Main loop
while True:
selection = menu_select('Main Menu', menus, actions, secret_exit=True)
# Assign volume letters first
assign_volume_letters()
if (selection.isnumeric()):
try:
menus[int(selection)-1]['Menu']()
except AbortError:
pass
except:
print_error('Major exception in: {menu}'.format(menu=menus[int(selection)-1]['Name']))
print_warning(' Please let The Wizard know and he\'ll look into it (Please include the details below).')
print(traceback.print_exc())
time.sleep(5)
print_info(' You can retry but if this crashes again an alternative approach may be required.')
pause('\nPress enter to return to the main menu')
elif (selection == 'C'):
run_program(['cmd', '-new_console:n'], check=False)
elif (selection == 'R'):
run_program(['wpeutil', 'reboot'])
elif (selection == 'S'):
run_program(['wpeutil', 'shutdown'])
else:
sys.exit(0)
# Search for minidumps
tmp = run_program('mountvol')
tmp = [d for d in re.findall(r'.*([A-Za-z]):\\', tmp.stdout.decode())]
# Remove RAMDisk letter
if 'X' in tmp:
tmp.remove('X')
for drive in tmp:
if os.path.exists('{drive}:\\Windows\\MiniDump'.format(drive=drive)):
dumps.append({'Name': '{drive}:\\Windows\\MiniDump'.format(drive=drive)})
# Check results before showing menu
if len(dumps) == 0:
print_error(' No BSoD / MiniDump paths found')
time.sleep(2)
return None
# Menu
selection = menu_select('Which BSoD / MiniDump path are we scanning?', dumps, [])
return dumps[int(selection) - 1]['Name']
if __name__ == '__main__':
menu_main()
print("This file is not meant to be called directly.")

68
Scripts/settings/main.py Normal file
View file

@ -0,0 +1,68 @@
# Wizard Kit PE: Settings - Main / Branding
# Features
ENABLED_UPLOAD_DATA = False
# STATIC VARIABLES (also used by .cmd files)
## Not using spaces aroung '=' for easier .cmd substrings
ARCHIVE_PASSWORD='Abracadabra'
KIT_NAME_FULL='Wizard Kit PE'
KIT_NAME_SHORT='WKPE'
OFFICE_SERVER_IP='10.0.0.10'
QUICKBOOKS_SERVER_IP='10.0.0.10'
SUPPORT_MESSAGE='Please let 2Shirt know by opening an issue on GitHub'
TIME_ZONE='Pacific Standard Time' # Always use "Standard Time" (DST is applied correctly)
# SERVER VARIABLES
## NOTE: Windows can only use one user per server. This means that if
## one server serves multiple shares then you have to use the same
## user/password for all of those shares.
BACKUP_SERVERS = [
{ 'IP': '10.0.0.10',
'Name': 'ServerOne',
'Mounted': False,
'Share': 'Backups',
'User': 'restore',
'Pass': 'Abracadabra',
},
{ 'IP': '10.0.0.11',
'Name': 'ServerTwo',
'Mounted': False,
'Share': 'Backups',
'User': 'restore',
'Pass': 'Abracadabra',
},
]
CLIENT_INFO_SERVER = {
'IP': '10.0.0.10',
'RegEntry': r'0x10001,0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000',
'Share': '/srv/ClientInfo',
'User': 'upload',
}
OFFICE_SERVER = {
'IP': OFFICE_SERVER_IP,
'Name': 'ServerOne',
'Mounted': False,
'Share': 'Office',
'User': 'restore',
'Pass': 'Abracadabra',
}
QUICKBOOKS_SERVER = {
'IP': QUICKBOOKS_SERVER_IP,
'Name': 'ServerOne',
'Mounted': False,
'Share': 'QuickBooks',
'User': 'restore',
'Pass': 'Abracadabra',
}
WINDOWS_SERVER = {
'IP': '10.0.0.10',
'Name': 'ServerOne',
'Mounted': False,
'Share': 'Windows',
'User': 'restore',
'Pass': 'Abracadabra',
}
if __name__ == '__main__':
print("This file is not meant to be called directly.")

55
Scripts/settings/tools.py Normal file
View file

@ -0,0 +1,55 @@
# Wizard Kit PE: Settings - Tools
TOOLS = {
# NOTE: BinDir will be prepended to these paths at runtime
'AIDA64': {
'32': r'AIDA64\aida64.exe'},
'AutoRuns': {
'32': r'Autoruns\autoruns.exe',
'64': r'Autoruns\autoruns64.exe'},
'BleachBit': {
'32': r'BleachBit\bleachbit_console.exe'},
'Caffeine': {
'32': r'Caffeine\caffeine.exe'},
'Du': {
'32': r'Du\du.exe',
'64': r'Du\du64.exe'},
'ERUNT': {
'32': r'ERUNT\ERUNT.EXE'},
'Everything': {
'32': r'Everything\Everything.exe',
'64': r'Everything\Everything64.exe'},
'FastCopy': {
'32': r'FastCopy\FastCopy.exe',
'64': r'FastCopy\FastCopy64.exe'},
'HitmanPro': {
'32': r'HitmanPro\HitmanPro.exe',
'64': r'HitmanPro\HitmanPro64.exe'},
'HWiNFO': {
'32': r'HWiNFO\HWiNFO.exe',
'64': r'HWiNFO\HWiNFO64.exe'},
'KVRT': {
'32': r'KVRT\KVRT.exe'},
'NotepadPlusPlus': {
'32': r'NotepadPlusPlus\notepadplusplus.exe'},
'ProduKey': {
'32': r'ProduKey\ProduKey.exe',
'64': r'ProduKey\ProduKey64.exe'},
'PuTTY-PSFTP': {
'32': r'PuTTY\PSFTP.EXE'},
'RKill': {
'32': r'RKill\RKill.exe'},
'SevenZip': {
'32': r'7-Zip\7za.exe',
'64': r'7-Zip\7za64.exe'},
'TDSSKiller': {
'32': r'TDSSKiller\TDSSKiller.exe'},
'wimlib-imagex': {
'32': r'wimlib\x32\wimlib-imagex.exe',
'64': r'wimlib\x64\wimlib-imagex.exe'},
'XMPlay': {
'32': r'XMPlay\xmplay.exe'},
}
if __name__ == '__main__':
print("This file is not meant to be called directly.")

View file

@ -0,0 +1,21 @@
# Wizard Kit PE: Root Menu
import os
import sys
# Init
os.chdir(os.path.dirname(os.path.realpath(__file__)))
sys.path.append(os.getcwd())
from functions.common import *
from functions.winpe_menus import *
init_global_vars()
os.system('title {}: Root Menu'.format(KIT_NAME_FULL))
global_vars['LogFile'] = r'{LogDir}\WinPE.log'.format(**global_vars)
if __name__ == '__main__':
try:
menu_root()
except SystemExit:
pass
except:
major_exception()