1006 lines
37 KiB
Python
1006 lines
37 KiB
Python
# WK WinPE Functions
|
|
|
|
# Init
|
|
import os
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import winreg
|
|
os.chdir(os.path.dirname(os.path.realpath(__file__)))
|
|
bin = os.path.abspath('..\\')
|
|
sys.path.append(os.getcwd())
|
|
from functions import *
|
|
import partition_uids
|
|
|
|
# Init
|
|
BACKUP_SERVERS = [
|
|
{ 'IP': '10.0.0.10',
|
|
'Mounted': False,
|
|
'Name': 'ServerOne',
|
|
'Share': 'Backups',
|
|
'User': 'backup',
|
|
'Pass': 'Abracadabra',
|
|
},
|
|
{ 'IP': '10.0.0.11',
|
|
'Name': 'ServerTwo',
|
|
'Mounted': False,
|
|
'Share': 'Backups',
|
|
'User': 'backup',
|
|
'Pass': 'Abracadabra',
|
|
},
|
|
]
|
|
WINDOWS_SERVER = {
|
|
'IP': '10.0.0.10',
|
|
'Name': 'ServerOne',
|
|
'Mounted': False,
|
|
'Share': 'Windows',
|
|
'User': 'backup', # Using these credentials in case both the windows source and backup shares are mounted.
|
|
'Pass': 'Abracadabra', # This is because Windows only allows one set of credentials to be used per server at a time.
|
|
}
|
|
WINDOWS_VERSIONS = [
|
|
{'Name': 'Windows 7 Home Basic',
|
|
'Image File': 'Win7',
|
|
'Image Name': 'Windows 7 HOMEBASIC',
|
|
'Family': '7'},
|
|
{'Name': 'Windows 7 Home Premium',
|
|
'Image File': 'Win7',
|
|
'Image Name': 'Windows 7 HOMEPREMIUM',
|
|
'Family': '7'},
|
|
{'Name': 'Windows 7 Professional',
|
|
'Image File': 'Win7',
|
|
'Image Name': 'Windows 7 PROFESSIONAL',
|
|
'Family': '7'},
|
|
{'Name': 'Windows 7 Ultimate',
|
|
'Image File': 'Win7',
|
|
'Image Name': 'Windows 7 ULTIMATE',
|
|
'Family': '7'},
|
|
|
|
{'Name': 'Windows 8.1',
|
|
'Image File': 'Win8',
|
|
'Image Name': 'Windows 8.1',
|
|
'Family': '8',
|
|
'CRLF': True},
|
|
{'Name': 'Windows 8.1 Pro',
|
|
'Image File': 'Win8',
|
|
'Image Name': 'Windows 8.1 Pro',
|
|
'Family': '8'},
|
|
|
|
{'Name': 'Windows 10 Home',
|
|
'Image File': 'Win10',
|
|
'Image Name': 'Windows 10 Home',
|
|
'Family': '10',
|
|
'CRLF': True},
|
|
{'Name': 'Windows 10 Pro',
|
|
'Image File': 'Win10',
|
|
'Image Name': 'Windows 10 Pro',
|
|
'Family': '10'},
|
|
]
|
|
diskpart_script = '{tmp}\\diskpart.script'.format(tmp=os.environ['TMP'])
|
|
|
|
## Colors
|
|
COLORS = {
|
|
'CLEAR': '\033[0m',
|
|
'RED': '\033[31m',
|
|
'GREEN': '\033[32m',
|
|
'YELLOW': '\033[33m',
|
|
'BLUE': '\033[34m'}
|
|
|
|
class AbortError(Exception):
|
|
pass
|
|
|
|
class BackupError(Exception):
|
|
pass
|
|
|
|
class SetupError(Exception):
|
|
pass
|
|
|
|
def abort_to_main_menu(message='Returning to main menu...'):
|
|
print_warning(message)
|
|
pause('Press Enter to return to main menu... ')
|
|
raise AbortError
|
|
|
|
def ask(prompt='Kotaero'):
|
|
answer = None
|
|
prompt = prompt + ' [Y/N]: '
|
|
while answer is None:
|
|
tmp = input(prompt)
|
|
if re.search(r'^y(es|)$', tmp, re.IGNORECASE):
|
|
answer = True
|
|
elif re.search(r'^n(o|ope|)$', tmp, re.IGNORECASE):
|
|
answer = False
|
|
return answer
|
|
|
|
def assign_volume_letters():
|
|
try:
|
|
# Run script
|
|
with open(diskpart_script, 'w') as script:
|
|
for vol in get_volumes():
|
|
script.write('select volume {Number}\n'.format(**vol))
|
|
script.write('assign\n')
|
|
run_program('diskpart /s {script}'.format(script=diskpart_script))
|
|
except subprocess.CalledProcessError:
|
|
pass
|
|
|
|
def backup_partition(bin=None, disk=None, par=None):
|
|
# Bail early
|
|
if bin is None:
|
|
raise Exception('bin path not specified.')
|
|
if disk is None:
|
|
raise Exception('Disk not specified.')
|
|
if par is None:
|
|
raise Exception('Partition not specified.')
|
|
|
|
print(' Partition {Number} Backup...\t\t'.format(**par), end='', flush=True)
|
|
if par['Number'] in disk['Bad Partitions']:
|
|
print_warning('Skipped.')
|
|
else:
|
|
cmd = '{bin}\\wimlib\\wimlib-imagex capture {Letter}:\\ "{Image Path}\\{Image File}" "{Image Name}" "{Image Name}" --compress=none'.format(bin=bin, **par)
|
|
if par['Image Exists']:
|
|
print_warning('Skipped.')
|
|
else:
|
|
try:
|
|
os.makedirs('{Image Path}'.format(**par), exist_ok=True)
|
|
run_program(cmd)
|
|
print_success('Complete.')
|
|
except subprocess.CalledProcessError as err:
|
|
print_error('Failed.')
|
|
par['Error'] = err.stderr.decode().splitlines()
|
|
raise BackupError
|
|
|
|
def convert_to_bytes(size):
|
|
size = str(size)
|
|
tmp = re.search(r'(\d+)\s+([KMGT]B)', size.upper())
|
|
if tmp:
|
|
size = int(tmp.group(1))
|
|
units = tmp.group(2)
|
|
if units == 'TB':
|
|
size *= 1099511627776
|
|
elif units == 'GB':
|
|
size *= 1073741824
|
|
elif units == 'MB':
|
|
size *= 1048576
|
|
elif units == 'KB':
|
|
size *= 1024
|
|
else:
|
|
return -1
|
|
|
|
return size
|
|
|
|
def is_valid_image(bin=None, filename=None, imagename=None):
|
|
# Bail early
|
|
if bin is None:
|
|
raise Exception('bin not specified.')
|
|
if filename is None:
|
|
raise Exception('Filename not specified.')
|
|
if imagename is None:
|
|
raise Exception('Image Name not specified.')
|
|
|
|
cmd = '{bin}\\wimlib\\wimlib-imagex info "{filename}" "{imagename}"'.format(bin=bin, filename=filename, imagename=imagename)
|
|
try:
|
|
run_program(cmd)
|
|
except subprocess.CalledProcessError:
|
|
print_error('Invalid image: {filename}'.format(filename=filename))
|
|
return False
|
|
|
|
return True
|
|
|
|
def find_windows_image(bin, windows_version=None):
|
|
"""Search for a Windows source image file on local drives and network drives (in that order)"""
|
|
image = {}
|
|
|
|
# Bail early
|
|
if windows_version is None:
|
|
raise Exception('Windows version not specified.')
|
|
imagefile = windows_version['Image File']
|
|
|
|
# Search local source
|
|
process_return = run_program('mountvol')
|
|
for tmp in re.findall(r'.*([A-Za-z]):\\', process_return.stdout.decode()):
|
|
for ext in ['esd', 'wim', 'swm']:
|
|
filename = '{drive}:\\images\\{imagefile}'.format(drive=tmp[0], imagefile=imagefile)
|
|
filename_ext = '{filename}.{ext}'.format(filename=filename, ext=ext)
|
|
if os.path.isfile(filename_ext):
|
|
if is_valid_image(bin, filename_ext, windows_version['Image Name']):
|
|
image['Ext'] = ext
|
|
image['File'] = filename
|
|
image['Glob'] = '--ref="{File}*.swm"'.format(**image) if ext == 'swm' else ''
|
|
image['Source'] = tmp[0]
|
|
break
|
|
|
|
# Check for network source (if necessary)
|
|
if not any(image):
|
|
if not WINDOWS_SERVER['Mounted']:
|
|
mount_windows_share()
|
|
for ext in ['esd', 'wim', 'swm']:
|
|
filename = '\\\\{IP}\\{Share}\\images\\{imagefile}'.format(imagefile=imagefile, **WINDOWS_SERVER)
|
|
filename_ext = '{filename}.{ext}'.format(filename=filename, ext=ext)
|
|
if os.path.isfile(filename_ext):
|
|
if is_valid_image(bin, filename_ext, windows_version['Image Name']):
|
|
image['Ext'] = ext
|
|
image['File'] = filename
|
|
image['Glob'] = '--ref="{File}*.swm"'.format(**image) if ext == 'swm' else ''
|
|
image['Source'] = None
|
|
break
|
|
|
|
# Display image to be used (if any) and return
|
|
if any(image):
|
|
print_info('Using image: {File}.{Ext}'.format(**image))
|
|
return image
|
|
else:
|
|
print_error('Failed to find Windows source image for {winver}'.format(winver=windows_version['Name']))
|
|
abort_to_main_menu('Aborting Windows setup')
|
|
|
|
def format_gpt(disk=None, windows_family=None):
|
|
"""Format disk for use as a Windows OS drive using the GPT (UEFI) layout."""
|
|
|
|
# Bail early
|
|
if disk is None:
|
|
raise Exception('No disk provided.')
|
|
if windows_family is None:
|
|
raise Exception('No Windows family provided.')
|
|
|
|
# Format drive
|
|
# print_info('Drive will use a GPT (UEFI) layout.')
|
|
with open(diskpart_script, 'w') as script:
|
|
# Partition table
|
|
script.write('select disk {number}\n'.format(number=disk['Number']))
|
|
script.write('clean\n')
|
|
script.write('convert gpt\n')
|
|
|
|
# System partition
|
|
script.write('create partition efi size=260\n') # NOTE: Allows for Advanced Format 4K drives
|
|
script.write('format quick fs=fat32 label="System"\n')
|
|
script.write('assign letter="S"\n')
|
|
|
|
# Microsoft Reserved (MSR) partition
|
|
script.write('create partition msr size=128\n')
|
|
|
|
# Windows partition
|
|
script.write('create partition primary\n')
|
|
script.write('format quick fs=ntfs label="Windows"\n')
|
|
script.write('assign letter="W"\n')
|
|
|
|
# Recovery Tools partition (Windows 8+)
|
|
if re.search(r'^(8|10)', windows_family):
|
|
script.write('shrink minimum=500\n')
|
|
script.write('create partition primary\n')
|
|
script.write('format quick fs=ntfs label="Recovery Tools"\n')
|
|
script.write('assign letter="T"\n')
|
|
script.write('set id="de94bba4-06d1-4d40-a16a-bfd50179d6ac"\n')
|
|
script.write('gpt attributes=0x8000000000000001\n')
|
|
|
|
# Run script
|
|
run_program('diskpart /s {script}'.format(script=diskpart_script))
|
|
time.sleep(2)
|
|
|
|
def format_mbr(disk=None, windows_family=None):
|
|
"""Format disk for use as a Windows OS drive using the MBR (legacy) layout."""
|
|
|
|
# Bail early
|
|
if disk is None:
|
|
raise Exception('No disk provided.')
|
|
if windows_family is None:
|
|
raise Exception('No Windows family provided.')
|
|
|
|
# Format drive
|
|
# print_info('Drive will use a MBR (legacy) layout.')
|
|
with open(diskpart_script, 'w') as script:
|
|
# Partition table
|
|
script.write('select disk {number}\n'.format(number=disk['Number']))
|
|
script.write('clean\n')
|
|
|
|
# System partition
|
|
script.write('create partition primary size=100\n')
|
|
script.write('format fs=ntfs quick label="System Reserved"\n')
|
|
script.write('active\n')
|
|
script.write('assign letter="S"\n')
|
|
|
|
# Windows partition
|
|
script.write('create partition primary\n')
|
|
script.write('format fs=ntfs quick label="Windows"\n')
|
|
script.write('assign letter="W"\n')
|
|
|
|
# Recovery Tools partition (Windows 8+)
|
|
if re.search(r'^(8|10)', windows_family):
|
|
script.write('shrink minimum=500\n')
|
|
script.write('create partition primary\n')
|
|
script.write('format quick fs=ntfs label="Recovery"\n')
|
|
script.write('assign letter="T"\n')
|
|
script.write('set id=27\n')
|
|
|
|
# Run script
|
|
run_program('diskpart /s {script}'.format(script=diskpart_script))
|
|
time.sleep(2)
|
|
|
|
def get_attached_disk_info():
|
|
"""Get details about the attached disks"""
|
|
disks = []
|
|
print_info('Getting drive info...')
|
|
|
|
# Assign all the letters
|
|
assign_volume_letters()
|
|
|
|
# Get disks
|
|
disks = get_disks()
|
|
|
|
# Get disk details
|
|
for disk in disks:
|
|
# Get partition style
|
|
disk['Table'] = get_table_type(disk)
|
|
|
|
# Get disk name/model and physical details
|
|
disk.update(get_disk_details(disk))
|
|
|
|
# Get partition info for disk
|
|
disk['Partitions'] = get_partitions(disk)
|
|
|
|
for par in disk['Partitions']:
|
|
# Get partition details
|
|
par.update(get_partition_details(disk, par))
|
|
|
|
# Done
|
|
return disks
|
|
|
|
def get_boot_mode():
|
|
boot_mode = 'Legacy'
|
|
try:
|
|
reg_key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, 'System\\CurrentControlSet\\Control')
|
|
reg_value = winreg.QueryValueEx(reg_key, 'PEFirmwareType')[0]
|
|
if reg_value == 2:
|
|
boot_mode = 'UEFI'
|
|
except:
|
|
boot_mode = 'Unknown'
|
|
|
|
return boot_mode
|
|
|
|
def get_disk_details(disk=None):
|
|
details = {}
|
|
|
|
# Bail early
|
|
if disk is None:
|
|
raise Exception('Disk not specified.')
|
|
|
|
try:
|
|
# Run script
|
|
with open(diskpart_script, 'w') as script:
|
|
script.write('select disk {Number}\n'.format(**disk))
|
|
script.write('detail disk\n')
|
|
process_return = run_program('diskpart /s {script}'.format(script=diskpart_script))
|
|
process_return = process_return.stdout.decode().strip()
|
|
except subprocess.CalledProcessError:
|
|
pass
|
|
else:
|
|
# Remove empty lines
|
|
tmp = [s.strip() for s in process_return.splitlines() if s.strip() != '']
|
|
|
|
# Set disk name
|
|
details['Name'] = tmp[4]
|
|
|
|
# Remove lines without a ':' and split each remaining line at the ':' to form a key/value pair
|
|
tmp = [s.split(':') for s in tmp if ':' in s]
|
|
|
|
# Add key/value pairs to the details variable and return dict
|
|
details.update({key.strip(): value.strip() for (key, value) in tmp})
|
|
|
|
return details
|
|
|
|
def get_disks():
|
|
disks = []
|
|
|
|
try:
|
|
# Run script
|
|
with open(diskpart_script, 'w') as script:
|
|
script.write('list disk\n')
|
|
process_return = run_program('diskpart /s {script}'.format(script=diskpart_script))
|
|
process_return = process_return.stdout.decode().strip()
|
|
except subprocess.CalledProcessError:
|
|
pass
|
|
else:
|
|
# Append disk numbers
|
|
for tmp in re.findall(r'Disk (\d+)\s+\w+\s+(\d+\s+\w+)', process_return):
|
|
_num = tmp[0]
|
|
_size = human_readable_size(tmp[1])
|
|
disks.append({'Number': _num, 'Size': _size})
|
|
|
|
return disks
|
|
|
|
def get_partition_details(disk=None, par=None):
|
|
details = {}
|
|
|
|
# Bail early
|
|
if disk is None:
|
|
raise Exception('Disk not specified.')
|
|
if par is None:
|
|
raise Exception('Partition not specified.')
|
|
|
|
# Diskpart details
|
|
try:
|
|
# Run script
|
|
with open(diskpart_script, 'w') as script:
|
|
script.write('select disk {Number}\n'.format(**disk))
|
|
script.write('select partition {Number}\n'.format(**par))
|
|
script.write('detail partition\n')
|
|
process_return = run_program('diskpart /s {script}'.format(script=diskpart_script))
|
|
process_return = process_return.stdout.decode().strip()
|
|
except subprocess.CalledProcessError:
|
|
pass
|
|
else:
|
|
# Get volume letter or RAW status
|
|
tmp = re.search(r'Volume\s+\d+\s+(\w|RAW)\s+', process_return)
|
|
if tmp:
|
|
if tmp.group(1).upper() == 'RAW':
|
|
details['FileSystem'] = RAW
|
|
else:
|
|
details['Letter'] = tmp.group(1)
|
|
|
|
# Remove empty lines from process_return
|
|
tmp = [s.strip() for s in process_return.splitlines() if s.strip() != '']
|
|
|
|
# Remove lines without a ':' and split each remaining line at the ':' to form a key/value pair
|
|
tmp = [s.split(':') for s in tmp if ':' in s]
|
|
|
|
# Add key/value pairs to the details variable and return dict
|
|
details.update({key.strip(): value.strip() for (key, value) in tmp})
|
|
|
|
# Get MBR type / GPT GUID for extra details on "Unknown" partitions
|
|
guid = partition_uids.lookup_guid(details['Type'])
|
|
if guid is not None:
|
|
details.update({
|
|
'Description': guid.get('Description', ''),
|
|
'OS': guid.get('OS', '')})
|
|
|
|
if 'Letter' in details:
|
|
# Disk usage
|
|
tmp = shutil.disk_usage('{Letter}:\\'.format(**details))
|
|
details['Used Space'] = human_readable_size(tmp.used)
|
|
|
|
# fsutil details
|
|
try:
|
|
process_return = run_program('fsutil fsinfo volumeinfo {Letter}:'.format(**details))
|
|
process_return = process_return.stdout.decode().strip()
|
|
except subprocess.CalledProcessError:
|
|
pass
|
|
else:
|
|
# Remove empty lines from process_return
|
|
tmp = [s.strip() for s in process_return.splitlines() if s.strip() != '']
|
|
|
|
# Add "Feature" lines
|
|
details['File System Features'] = [s.strip() for s in tmp if ':' not in s]
|
|
|
|
# Remove lines without a ':' and split each remaining line at the ':' to form a key/value pair
|
|
tmp = [s.split(':') for s in tmp if ':' in s]
|
|
|
|
# Add key/value pairs to the details variable and return dict
|
|
details.update({key.strip(): value.strip() for (key, value) in tmp})
|
|
|
|
# Set Volume Name
|
|
details['Name'] = details.get('Volume Name', '')
|
|
|
|
# Set FileSystem Type
|
|
if details.get('FileSystem', '') != 'RAW':
|
|
details['FileSystem'] = details.get('File System Name', 'Unknown')
|
|
|
|
return details
|
|
|
|
def get_partitions(disk=None):
|
|
partitions = []
|
|
|
|
# Bail early
|
|
if disk is None:
|
|
raise Exception('Disk not specified.')
|
|
|
|
try:
|
|
# Run script
|
|
with open(diskpart_script, 'w') as script:
|
|
script.write('select disk {Number}\n'.format(**disk))
|
|
script.write('list partition\n')
|
|
process_return = run_program('diskpart /s {script}'.format(script=diskpart_script))
|
|
process_return = process_return.stdout.decode().strip()
|
|
except subprocess.CalledProcessError:
|
|
pass
|
|
else:
|
|
# Append partition numbers
|
|
for tmp in re.findall(r'Partition\s+(\d+)\s+\w+\s+(\d+\s+\w+)\s+', process_return, re.IGNORECASE):
|
|
_num = tmp[0]
|
|
_size = human_readable_size(tmp[1])
|
|
partitions.append({'Number': _num, 'Size': _size})
|
|
|
|
return partitions
|
|
|
|
def get_table_type(disk=None):
|
|
_type = 'Unknown'
|
|
|
|
# Bail early
|
|
if disk is None:
|
|
raise Exception('Disk not specified.')
|
|
|
|
try:
|
|
with open(diskpart_script, 'w') as script:
|
|
script.write('select disk {Number}\n'.format(**disk))
|
|
script.write('uniqueid disk\n')
|
|
process_return = run_program('diskpart /s {script}'.format(script=diskpart_script))
|
|
process_return = process_return.stdout.decode().strip()
|
|
except subprocess.CalledProcessError:
|
|
pass
|
|
else:
|
|
if re.findall(r'Disk ID: {[A-Z0-9]+-[A-Z0-9]+-[A-Z0-9]+-[A-Z0-9]+-[A-Z0-9]+}', process_return, re.IGNORECASE):
|
|
_type = 'GPT'
|
|
elif re.findall(r'Disk ID: 00000000', process_return, re.IGNORECASE):
|
|
_type = 'RAW'
|
|
elif re.findall(r'Disk ID: [A-Z0-9]+', process_return, re.IGNORECASE):
|
|
_type = 'MBR'
|
|
|
|
return _type
|
|
|
|
def get_ticket_id():
|
|
ticket_id = None
|
|
while ticket_id is None:
|
|
tmp = input('Enter ticket number: ')
|
|
if re.match(r'^([0-9]+([\-_]*\w+|))$', tmp):
|
|
ticket_id = tmp
|
|
|
|
return ticket_id
|
|
|
|
def get_volumes():
|
|
vols = []
|
|
|
|
try:
|
|
# Run script
|
|
with open(diskpart_script, 'w') as script:
|
|
script.write('list volume\n')
|
|
process_return = run_program('diskpart /s {script}'.format(script=diskpart_script))
|
|
process_return = process_return.stdout.decode().strip()
|
|
except subprocess.CalledProcessError:
|
|
pass
|
|
else:
|
|
# Append volume numbers
|
|
for tmp in re.findall(r'Volume (\d+)\s+([A-Za-z]?)\s+', process_return):
|
|
vols.append({'Number': tmp[0], 'Letter': tmp[1]})
|
|
|
|
return vols
|
|
|
|
def human_readable_size(size, decimals=0):
|
|
# Prep string formatting
|
|
width = 3+decimals
|
|
if decimals > 0:
|
|
width += 1
|
|
human_format = '>{width}.{decimals}f'.format(width=width, decimals=decimals)
|
|
tmp = ''
|
|
|
|
# Convert size to int
|
|
try:
|
|
size = int(size)
|
|
except ValueError:
|
|
size = convert_to_bytes(size)
|
|
|
|
# Verify we have a valid size
|
|
if size <= 0:
|
|
return '{size:>{width}} b'.format(size='???', width=width)
|
|
|
|
# Format string
|
|
if size >= 1099511627776:
|
|
size /= 1099511627776
|
|
tmp = '{size:{human_format}} Tb'.format(size=size, human_format=human_format)
|
|
elif size >= 1073741824:
|
|
size /= 1073741824
|
|
tmp = '{size:{human_format}} Gb'.format(size=size, human_format=human_format)
|
|
elif size >= 1048576:
|
|
size /= 1048576
|
|
tmp = '{size:{human_format}} Mb'.format(size=size, human_format=human_format)
|
|
elif size >= 1024:
|
|
size /= 1024
|
|
tmp = '{size:{human_format}} Kb'.format(size=size, human_format=human_format)
|
|
else:
|
|
tmp = '{size:{human_format}} b'.format(size=size, human_format=human_format)
|
|
|
|
# Return
|
|
return tmp
|
|
|
|
def menu_select(title='~ Untitled Menu ~', main_entries=[], action_entries=[], prompt='Please make a selection', secret_exit=False):
|
|
"""Display options in a menu for user selection"""
|
|
|
|
# Bail early
|
|
if (len(main_entries) + len(action_entries) == 0):
|
|
raise Exception("MenuError: No items given")
|
|
|
|
# Build menu
|
|
menu_splash = '{title}\n\n'.format(title=title)
|
|
valid_answers = []
|
|
if (secret_exit):
|
|
valid_answers.append('Q')
|
|
|
|
# Add main entries
|
|
if (len(main_entries) > 0):
|
|
for i in range(len(main_entries)):
|
|
entry = main_entries[i]
|
|
# Add Spacer
|
|
if ('CRLF' in entry):
|
|
menu_splash += '\n'
|
|
valid_answers.append(str(i+1))
|
|
menu_splash += '{number:>{mwidth}}: {name}\n'.format(number=i+1, mwidth=len(str(len(main_entries))), name=entry.get('Display Name', entry['Name']))
|
|
menu_splash += '\n'
|
|
|
|
# Add action entries
|
|
if (len(action_entries) > 0):
|
|
for entry in action_entries:
|
|
# Add Spacer
|
|
if ('CRLF' in entry):
|
|
menu_splash += '\n'
|
|
valid_answers.append(entry['Letter'])
|
|
menu_splash += '{letter:>{mwidth}}: {name}\n'.format(letter=entry['Letter'].upper(), mwidth=len(str(len(action_entries))), name=entry['Name'])
|
|
menu_splash += '\n'
|
|
|
|
answer = ''
|
|
|
|
while (answer.upper() not in valid_answers):
|
|
os.system('cls')
|
|
print(menu_splash)
|
|
answer = input('{prompt}: '.format(prompt=prompt))
|
|
|
|
return answer.upper()
|
|
|
|
def mount_backup_shares():
|
|
"""Mount the backup shares for use as destinations for backup image creation"""
|
|
|
|
# Attempt to mount share(s)
|
|
for server in BACKUP_SERVERS:
|
|
# Blindly skip if we mounted earlier
|
|
if server['Mounted']:
|
|
continue
|
|
else:
|
|
try:
|
|
# Test connection
|
|
run_program('ping -w 800 -n 2 {IP}'.format(**server))
|
|
|
|
# Mount
|
|
run_program('net use \\\\{IP}\\{Share} /user:{User} {Pass}'.format(**server))
|
|
print_info('Mounted {Name}'.format(**server))
|
|
server['Mounted'] = True
|
|
except subprocess.CalledProcessError:
|
|
print_error('Failed to mount \\\\{Name}\\{Share}, {IP} unreachable.'.format(**server))
|
|
time.sleep(1)
|
|
except:
|
|
print_warning('Failed to mount \\\\{Name}\\{Share} ({IP})'.format(**server))
|
|
time.sleep(1)
|
|
|
|
def mount_windows_share():
|
|
"""Mount the Windows images share for use in Windows setup"""
|
|
|
|
# Blindly skip if we mounted earlier
|
|
if WINDOWS_SERVER['Mounted']:
|
|
return None
|
|
else:
|
|
try:
|
|
# Test connection
|
|
run_program('ping -w 800 -n 2 {IP}'.format(**WINDOWS_SERVER))
|
|
# Mount
|
|
run_program('net use \\\\{IP}\\{Share} /user:{User} {Pass}'.format(**WINDOWS_SERVER))
|
|
print_info('Mounted {Name}'.format(**WINDOWS_SERVER))
|
|
WINDOWS_SERVER['Mounted'] = True
|
|
except subprocess.CalledProcessError:
|
|
print_error('Failed to mount \\\\{Name}\\{Share}, {IP} unreachable.'.format(**WINDOWS_SERVER))
|
|
time.sleep(1)
|
|
except:
|
|
print_warning('Failed to mount \\\\{Name}\\{Share}'.format(**WINDOWS_SERVER))
|
|
time.sleep(1)
|
|
|
|
def pause(prompt='Press Enter to continue... '):
|
|
input(prompt)
|
|
|
|
def prep_disk_for_backup(dest=None, disk=None, ticket_id=None):
|
|
disk['Backup Warnings'] = '\n'
|
|
disk['Clobber Risk'] = []
|
|
width = len(str(len(disk['Partitions'])))
|
|
|
|
# Bail early
|
|
if dest is None:
|
|
raise Exception('Destination not provided.')
|
|
if disk is None:
|
|
raise Exception('Disk not provided.')
|
|
if ticket_id is None:
|
|
raise Exception('Ticket ID not provided.')
|
|
|
|
# Get partition totals
|
|
disk['Bad Partitions'] = [par['Number'] for par in disk['Partitions'] if 'Letter' not in par or re.search(r'(RAW|Unknown)', par['FileSystem'], re.IGNORECASE)]
|
|
disk['Valid Partitions'] = len(disk['Partitions']) - len(disk['Bad Partitions'])
|
|
|
|
# Bail if no valid partitions are found (those that can be imaged)
|
|
if disk['Valid Partitions'] <= 0:
|
|
abort_to_main_menu(' No partitions can be imaged for the selected drive')
|
|
|
|
# Prep partitions
|
|
for par in disk['Partitions']:
|
|
if par['Number'] in disk['Bad Partitions']:
|
|
par['Display String'] = '{YELLOW} * Partition {Number:>{width}}:\t{Size} {FileSystem}\t\t{q}{Name}{q}\t{Description} ({OS}){CLEAR}'.format(
|
|
width=width,
|
|
q='"' if par['Name'] != '' else '',
|
|
**par,
|
|
**COLORS)
|
|
else:
|
|
# Update info for WIM capturing
|
|
par['Image Name'] = str(par['Name'])
|
|
if par['Image Name'] == '':
|
|
par['Image Name'] = 'Unknown'
|
|
if 'IP' in dest:
|
|
par['Image Path'] = '\\\\{IP}\\{Share}\\{ticket}'.format(ticket=ticket_id, **dest)
|
|
else:
|
|
par['Image Path'] = '{Letter}:\\{ticket}'.format(ticket=ticket_id, **dest)
|
|
par['Image File'] = '{Number}_{Image Name}'.format(**par)
|
|
par['Image File'] = '{fixed_name}.wim'.format(fixed_name=re.sub(r'\W', '_', par['Image File']))
|
|
|
|
# Check for existing backups
|
|
par['Image Exists'] = False
|
|
if os.path.exists('{Image Path}\\{Image File}'.format(**par)):
|
|
par['Image Exists'] = True
|
|
disk['Clobber Risk'].append(par['Number'])
|
|
par['Display String'] = '{BLUE} + '.format(**COLORS)
|
|
else:
|
|
par['Display String'] = '{CLEAR} '.format(**COLORS)
|
|
|
|
# Append rest of Display String for valid/clobber partitions
|
|
par['Display String'] += 'Partition {Number:>{width}}:\t{Size} {FileSystem} (Used: {Used Space})\t{q}{Name}{q}{CLEAR}'.format(
|
|
width=width,
|
|
q='"' if par['Name'] != '' else '',
|
|
**par,
|
|
**COLORS)
|
|
|
|
# Set description for bad partitions
|
|
if len(disk['Bad Partitions']) > 1:
|
|
disk['Backup Warnings'] += '{YELLOW} * Unable to backup these partitions{CLEAR}\n'.format(**COLORS)
|
|
elif len(disk['Bad Partitions']) == 1:
|
|
print_warning(' * Unable to backup this partition')
|
|
disk['Backup Warnings'] += '{YELLOW} * Unable to backup this partition{CLEAR}\n'.format(**COLORS)
|
|
|
|
# Set description for partitions that would be clobbered
|
|
if len(disk['Clobber Risk']) > 1:
|
|
disk['Backup Warnings'] += '{BLUE} + These partitions already have backup images on {Name}{CLEAR}\n'.format(**dest, **COLORS)
|
|
elif len(disk['Clobber Risk']) == 1:
|
|
disk['Backup Warnings'] += '{BLUE} + This partition already has a backup image on {Name}{CLEAR}\n'.format(**dest, **COLORS)
|
|
|
|
# Set warning for skipped partitions
|
|
if len(disk['Clobber Risk']) + len(disk['Bad Partitions']) > 1:
|
|
disk['Backup Warnings'] += '\n{YELLOW}If you continue the partitions marked above will NOT be backed up.{CLEAR}\n'.format(**COLORS)
|
|
if len(disk['Clobber Risk']) + len(disk['Bad Partitions']) == 1:
|
|
disk['Backup Warnings'] += '\n{YELLOW}If you continue the partition marked above will NOT be backed up.{CLEAR}\n'.format(**COLORS)
|
|
|
|
def prep_disk_for_formatting(disk=None):
|
|
disk['Format Warnings'] = '\n'
|
|
width = len(str(len(disk['Partitions'])))
|
|
|
|
# Bail early
|
|
if disk is None:
|
|
raise Exception('Disk not provided.')
|
|
|
|
# Set boot method and partition table type
|
|
disk['Use GPT'] = True
|
|
if (get_boot_mode() == 'UEFI'):
|
|
if (not ask("Setup Windows to use UEFI booting?")):
|
|
disk['Use GPT'] = False
|
|
else:
|
|
if (ask("Setup Windows to use BIOS/Legacy booting?")):
|
|
disk['Use GPT'] = False
|
|
|
|
# Set Display and Warning Strings
|
|
if len(disk['Partitions']) == 0:
|
|
disk['Format Warnings'] += 'No partitions found\n'
|
|
for par in disk['Partitions']:
|
|
if 'Letter' not in par or re.search(r'(RAW|Unknown)', par['FileSystem'], re.IGNORECASE):
|
|
# FileSystem not accessible to WinPE. List partition type / OS info for technician
|
|
par['Display String'] = ' Partition {Number:>{width}}:\t{Size} {FileSystem}\t\t{q}{Name}{q}\t{Description} ({OS})'.format(
|
|
width=width,
|
|
q='"' if par['Name'] != '' else '',
|
|
**par)
|
|
else:
|
|
# FileSystem accessible to WinPE. List space used instead of partition type / OS info for technician
|
|
par['Display String'] = ' Partition {Number:>{width}}:\t{Size} {FileSystem} (Used: {Used Space})\t{q}{Name}{q}'.format(
|
|
width=width,
|
|
q='"' if par['Name'] != '' else '',
|
|
**par)
|
|
|
|
def print_error(message='Generic error', **kwargs):
|
|
print('{RED}{message}{CLEAR}'.format(message=message, **COLORS, **kwargs))
|
|
|
|
def print_info(message='Generic info', **kwargs):
|
|
print('{BLUE}{message}{CLEAR}'.format(message=message, **COLORS, **kwargs))
|
|
|
|
def print_success(message='Generic success', **kwargs):
|
|
print('{GREEN}{message}{CLEAR}'.format(message=message, **COLORS, **kwargs))
|
|
|
|
def print_warning(message='Generic warning', **kwargs):
|
|
print('{YELLOW}{message}{CLEAR}'.format(message=message, **COLORS, **kwargs))
|
|
|
|
def remove_volume_letters(keep=''):
|
|
if keep is None:
|
|
keep = ''
|
|
try:
|
|
# Run script
|
|
with open(diskpart_script, 'w') as script:
|
|
for vol in get_volumes():
|
|
if vol['Letter'].upper() != keep.upper():
|
|
script.write('select volume {Number}\n'.format(**vol))
|
|
script.write('remove noerr\n')
|
|
run_program('diskpart /s {script}'.format(script=diskpart_script))
|
|
except subprocess.CalledProcessError:
|
|
pass
|
|
|
|
def run_program(cmd=None, args=[], check=True):
|
|
if cmd is None:
|
|
raise Exception('No program passed.')
|
|
|
|
if len(args) > 0:
|
|
args = [cmd] + args
|
|
process_return = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=check)
|
|
else:
|
|
process_return = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=check)
|
|
|
|
return process_return
|
|
|
|
def select_destination():
|
|
# Build menu
|
|
dests = []
|
|
for server in BACKUP_SERVERS:
|
|
if server['Mounted']:
|
|
dests.append(server)
|
|
actions = [
|
|
{'Name': 'Main Menu', 'Letter': 'M'},
|
|
]
|
|
|
|
# Size check
|
|
for dest in dests:
|
|
if 'IP' in dest:
|
|
dest['Usage'] = shutil.disk_usage('\\\\{IP}\\{Share}'.format(**dest))
|
|
else:
|
|
dest['Usage'] = shutil.disk_usage('{Letter}:\\'.format(**dest))
|
|
dest['Free Space'] = human_readable_size(dest['Usage'].free)
|
|
dest['Display Name'] = '{Name} ({Free Space} available)'.format(**dest)
|
|
|
|
# Show menu or bail
|
|
if len(dests) > 0:
|
|
selection = menu_select('Where are we backing up to?', dests, actions)
|
|
if selection == 'M':
|
|
return None
|
|
else:
|
|
return dests[int(selection)-1]
|
|
else:
|
|
print_warning('No backup destinations found.')
|
|
return None
|
|
|
|
def select_disk(prompt='Which disk?'):
|
|
"""Select a disk from the attached disks"""
|
|
disks = get_attached_disk_info()
|
|
|
|
# Build menu
|
|
disk_options = []
|
|
for disk in disks:
|
|
display_name = '{Size}\t[{Table}] ({Type}) {Name}'.format(**disk)
|
|
if len(disk['Partitions']) > 0:
|
|
pwidth=len(str(len(disk['Partitions'])))
|
|
for par in disk['Partitions']:
|
|
# Show unsupported partition(s) in RED
|
|
par_skip = False
|
|
if 'Letter' not in par or re.search(r'(RAW|Unknown)', par['FileSystem'], re.IGNORECASE):
|
|
par_skip = True
|
|
if par_skip:
|
|
display_name += COLORS['YELLOW']
|
|
|
|
# Main text
|
|
display_name += '\n\t\t\tPartition {Number:>{pwidth}}: {Size} ({FileSystem})'.format(pwidth=pwidth, **par)
|
|
if par['Name'] != '':
|
|
display_name += '\t"{Name}"'.format(**par)
|
|
|
|
# Clear color (if set above)
|
|
if par_skip:
|
|
display_name += COLORS['CLEAR']
|
|
else:
|
|
display_name += '{YELLOW}\n\t\t\tNo partitions found.{CLEAR}'.format(**COLORS)
|
|
disk_options.append({'Name': display_name, 'Disk': disk})
|
|
actions = [
|
|
{'Name': 'Main Menu', 'Letter': 'M'},
|
|
]
|
|
|
|
# Menu loop
|
|
selection = menu_select(prompt, disk_options, actions)
|
|
|
|
if (selection.isnumeric()):
|
|
return disk_options[int(selection)-1]['Disk']
|
|
elif (selection == 'M'):
|
|
abort_to_main_menu()
|
|
|
|
def select_minidump_path():
|
|
dumps = []
|
|
|
|
# Assign volume letters first
|
|
assign_volume_letters()
|
|
|
|
# Search for minidumps
|
|
tmp = run_program('mountvol')
|
|
tmp = [d for d in re.findall(r'.*([A-Za-z]):\\', tmp.stdout.decode())]
|
|
# Remove RAMDisk letter
|
|
if 'X' in tmp:
|
|
tmp.remove('X')
|
|
for drive in tmp:
|
|
if os.path.exists('{drive}:\\Windows\\MiniDump'.format(drive=drive)):
|
|
dumps.append({'Name': '{drive}:\\Windows\\MiniDump'.format(drive=drive)})
|
|
|
|
# Check results before showing menu
|
|
if len(dumps) == 0:
|
|
print_error(' No BSoD / MiniDump paths found')
|
|
time.sleep(2)
|
|
return None
|
|
|
|
# Menu
|
|
selection = menu_select('Which BSoD / MiniDump path are we scanning?', dumps, [])
|
|
return dumps[int(selection) - 1]['Name']
|
|
|
|
def select_windows_version():
|
|
actions = [{'Name': 'Main Menu', 'Letter': 'M'},]
|
|
|
|
# Menu loop
|
|
selection = menu_select('Which version of Windows are we installing?', WINDOWS_VERSIONS, actions)
|
|
|
|
if selection.isnumeric():
|
|
return WINDOWS_VERSIONS[int(selection)-1]
|
|
elif selection == 'M':
|
|
abort_to_main_menu()
|
|
|
|
def setup_windows(bin=None, windows_image=None, windows_version=None):
|
|
# Bail early
|
|
if bin is None:
|
|
raise Exception('bin path not specified.')
|
|
if windows_image is None:
|
|
raise Exception('Windows image not specified.')
|
|
if windows_version is None:
|
|
raise Exception('Windows version not specified.')
|
|
|
|
# Apply image
|
|
cmd = '{bin}\\wimlib\\wimlib-imagex apply "{File}.{Ext}" "{Image Name}" W:\\ {Glob}'.format(bin=bin, **windows_image, **windows_version)
|
|
run_program(cmd)
|
|
|
|
def setup_windows_re(windows_version=None, windows_letter='W', tools_letter='T'):
|
|
# Bail early
|
|
if windows_version is None:
|
|
raise Exception('Windows version not specified.')
|
|
|
|
_win = '{win}:\\Windows'.format(win=windows_letter)
|
|
_winre = '{win}\\System32\\Recovery\\WinRE.wim'.format(win=_win)
|
|
_dest = '{tools}:\\Recovery\\WindowsRE'.format(tools=tools_letter)
|
|
|
|
if re.search(r'^(8|10)', windows_version['Family']):
|
|
# Copy WinRE.wim
|
|
os.makedirs(_dest, exist_ok=True)
|
|
shutil.copy(_winre, '{dest}\\WinRE.wim'.format(dest=_dest))
|
|
|
|
# Set location
|
|
run_program('{win}\\System32\\reagentc /setreimage /path {dest} /target {win}'.format(dest=_dest, win=_win))
|
|
else:
|
|
# Only supported on Windows 8 and above
|
|
raise SetupError
|
|
|
|
def update_boot_partition(system_letter='S', windows_letter='W', mode='ALL'):
|
|
run_program('bcdboot {win}:\\Windows /s {sys}: /f {mode}'.format(win=windows_letter, sys=system_letter, mode=mode))
|
|
|
|
def verify_wim_backup(bin=None, par=None):
|
|
# Bail early
|
|
if bin is None:
|
|
raise Exception('bin path not specified.')
|
|
if par is None:
|
|
raise Exception('Partition not specified.')
|
|
|
|
# Verify hiding all output for quicker verification
|
|
print(' Partition {Number} Image...\t\t'.format(**par), end='', flush=True)
|
|
cmd = '{bin}\\wimlib\\wimlib-imagex verify "{Image Path}\\{Image File}" --nocheck'.format(bin=bin, **par)
|
|
if not os.path.exists('{Image Path}\\{Image File}'.format(**par)):
|
|
print_error('Missing.')
|
|
else:
|
|
try:
|
|
run_program(cmd)
|
|
print_success('OK.')
|
|
except subprocess.CalledProcessError as err:
|
|
print_error('Damaged.')
|
|
par['Error'] = par.get('Error', []) + err.stderr.decode().splitlines()
|
|
raise BackupError
|
|
|
|
if __name__ == '__main__':
|
|
print("This file is not meant to be called directly.")
|