diff --git a/.gitignore b/.gitignore index ed8a9f58..6e041ca5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,11 +1,10 @@ +**/__pycache__/* *.bak *.iso -.bin/Scripts/__pycache__ .bin/tmp Drivers Logs Mount PEFiles -Scripts/__pycache__ WK/amd64/ WK/x86/ diff --git a/Scripts/functions.py b/Scripts/functions.py deleted file mode 100644 index 03f03bf3..00000000 --- a/Scripts/functions.py +++ /dev/null @@ -1,1006 +0,0 @@ -# WK WinPE Functions - -# Init -import os -import re -import shutil -import subprocess -import sys -import time -import winreg -os.chdir(os.path.dirname(os.path.realpath(__file__))) -bin = os.path.abspath('..\\') -sys.path.append(os.getcwd()) -from functions import * -import partition_uids - -# Init -BACKUP_SERVERS = [ - { 'IP': '10.0.0.10', - 'Mounted': False, - 'Name': 'ServerOne', - 'Share': 'Backups', - 'User': 'backup', - 'Pass': 'Abracadabra', - }, - { 'IP': '10.0.0.11', - 'Name': 'ServerTwo', - 'Mounted': False, - 'Share': 'Backups', - 'User': 'backup', - 'Pass': 'Abracadabra', - }, -] -WINDOWS_SERVER = { - 'IP': '10.0.0.10', - 'Name': 'ServerOne', - 'Mounted': False, - 'Share': 'Windows', - 'User': 'backup', # Using these credentials in case both the windows source and backup shares are mounted. - 'Pass': 'Abracadabra', # This is because Windows only allows one set of credentials to be used per server at a time. -} -WINDOWS_VERSIONS = [ - {'Name': 'Windows 7 Home Basic', - 'Image File': 'Win7', - 'Image Name': 'Windows 7 HOMEBASIC', - 'Family': '7'}, - {'Name': 'Windows 7 Home Premium', - 'Image File': 'Win7', - 'Image Name': 'Windows 7 HOMEPREMIUM', - 'Family': '7'}, - {'Name': 'Windows 7 Professional', - 'Image File': 'Win7', - 'Image Name': 'Windows 7 PROFESSIONAL', - 'Family': '7'}, - {'Name': 'Windows 7 Ultimate', - 'Image File': 'Win7', - 'Image Name': 'Windows 7 ULTIMATE', - 'Family': '7'}, - - {'Name': 'Windows 8.1', - 'Image File': 'Win8', - 'Image Name': 'Windows 8.1', - 'Family': '8', - 'CRLF': True}, - {'Name': 'Windows 8.1 Pro', - 'Image File': 'Win8', - 'Image Name': 'Windows 8.1 Pro', - 'Family': '8'}, - - {'Name': 'Windows 10 Home', - 'Image File': 'Win10', - 'Image Name': 'Windows 10 Home', - 'Family': '10', - 'CRLF': True}, - {'Name': 'Windows 10 Pro', - 'Image File': 'Win10', - 'Image Name': 'Windows 10 Pro', - 'Family': '10'}, -] -diskpart_script = '{tmp}\\diskpart.script'.format(tmp=os.environ['TMP']) - -## Colors -COLORS = { - 'CLEAR': '\033[0m', - 'RED': '\033[31m', - 'GREEN': '\033[32m', - 'YELLOW': '\033[33m', - 'BLUE': '\033[34m'} - -class AbortError(Exception): - pass - -class BackupError(Exception): - pass - -class SetupError(Exception): - pass - -def abort_to_main_menu(message='Returning to main menu...'): - print_warning(message) - pause('Press Enter to return to main menu... ') - raise AbortError - -def ask(prompt='Kotaero'): - answer = None - prompt = prompt + ' [Y/N]: ' - while answer is None: - tmp = input(prompt) - if re.search(r'^y(es|)$', tmp, re.IGNORECASE): - answer = True - elif re.search(r'^n(o|ope|)$', tmp, re.IGNORECASE): - answer = False - return answer - -def assign_volume_letters(): - try: - # Run script - with open(diskpart_script, 'w') as script: - for vol in get_volumes(): - script.write('select volume {Number}\n'.format(**vol)) - script.write('assign\n') - run_program('diskpart /s {script}'.format(script=diskpart_script)) - except subprocess.CalledProcessError: - pass - -def backup_partition(bin=None, disk=None, par=None): - # Bail early - if bin is None: - raise Exception('bin path not specified.') - if disk is None: - raise Exception('Disk not specified.') - if par is None: - raise Exception('Partition not specified.') - - print(' Partition {Number} Backup...\t\t'.format(**par), end='', flush=True) - if par['Number'] in disk['Bad Partitions']: - print_warning('Skipped.') - else: - cmd = '{bin}\\wimlib\\wimlib-imagex capture {Letter}:\\ "{Image Path}\\{Image File}" "{Image Name}" "{Image Name}" --compress=none'.format(bin=bin, **par) - if par['Image Exists']: - print_warning('Skipped.') - else: - try: - os.makedirs('{Image Path}'.format(**par), exist_ok=True) - run_program(cmd) - print_success('Complete.') - except subprocess.CalledProcessError as err: - print_error('Failed.') - par['Error'] = err.stderr.decode().splitlines() - raise BackupError - -def convert_to_bytes(size): - size = str(size) - tmp = re.search(r'(\d+)\s+([KMGT]B)', size.upper()) - if tmp: - size = int(tmp.group(1)) - units = tmp.group(2) - if units == 'TB': - size *= 1099511627776 - elif units == 'GB': - size *= 1073741824 - elif units == 'MB': - size *= 1048576 - elif units == 'KB': - size *= 1024 - else: - return -1 - - return size - -def is_valid_image(bin=None, filename=None, imagename=None): - # Bail early - if bin is None: - raise Exception('bin not specified.') - if filename is None: - raise Exception('Filename not specified.') - if imagename is None: - raise Exception('Image Name not specified.') - - cmd = '{bin}\\wimlib\\wimlib-imagex info "{filename}" "{imagename}"'.format(bin=bin, filename=filename, imagename=imagename) - try: - run_program(cmd) - except subprocess.CalledProcessError: - print_error('Invalid image: {filename}'.format(filename=filename)) - return False - - return True - -def find_windows_image(bin, windows_version=None): - """Search for a Windows source image file on local drives and network drives (in that order)""" - image = {} - - # Bail early - if windows_version is None: - raise Exception('Windows version not specified.') - imagefile = windows_version['Image File'] - - # Search local source - process_return = run_program('mountvol') - for tmp in re.findall(r'.*([A-Za-z]):\\', process_return.stdout.decode()): - for ext in ['esd', 'wim', 'swm']: - filename = '{drive}:\\images\\{imagefile}'.format(drive=tmp[0], imagefile=imagefile) - filename_ext = '{filename}.{ext}'.format(filename=filename, ext=ext) - if os.path.isfile(filename_ext): - if is_valid_image(bin, filename_ext, windows_version['Image Name']): - image['Ext'] = ext - image['File'] = filename - image['Glob'] = '--ref="{File}*.swm"'.format(**image) if ext == 'swm' else '' - image['Source'] = tmp[0] - break - - # Check for network source (if necessary) - if not any(image): - if not WINDOWS_SERVER['Mounted']: - mount_windows_share() - for ext in ['esd', 'wim', 'swm']: - filename = '\\\\{IP}\\{Share}\\images\\{imagefile}'.format(imagefile=imagefile, **WINDOWS_SERVER) - filename_ext = '{filename}.{ext}'.format(filename=filename, ext=ext) - if os.path.isfile(filename_ext): - if is_valid_image(bin, filename_ext, windows_version['Image Name']): - image['Ext'] = ext - image['File'] = filename - image['Glob'] = '--ref="{File}*.swm"'.format(**image) if ext == 'swm' else '' - image['Source'] = None - break - - # Display image to be used (if any) and return - if any(image): - print_info('Using image: {File}.{Ext}'.format(**image)) - return image - else: - print_error('Failed to find Windows source image for {winver}'.format(winver=windows_version['Name'])) - abort_to_main_menu('Aborting Windows setup') - -def format_gpt(disk=None, windows_family=None): - """Format disk for use as a Windows OS drive using the GPT (UEFI) layout.""" - - # Bail early - if disk is None: - raise Exception('No disk provided.') - if windows_family is None: - raise Exception('No Windows family provided.') - - # Format drive - # print_info('Drive will use a GPT (UEFI) layout.') - with open(diskpart_script, 'w') as script: - # Partition table - script.write('select disk {number}\n'.format(number=disk['Number'])) - script.write('clean\n') - script.write('convert gpt\n') - - # System partition - script.write('create partition efi size=260\n') # NOTE: Allows for Advanced Format 4K drives - script.write('format quick fs=fat32 label="System"\n') - script.write('assign letter="S"\n') - - # Microsoft Reserved (MSR) partition - script.write('create partition msr size=128\n') - - # Windows partition - script.write('create partition primary\n') - script.write('format quick fs=ntfs label="Windows"\n') - script.write('assign letter="W"\n') - - # Recovery Tools partition (Windows 8+) - if re.search(r'^(8|10)', windows_family): - script.write('shrink minimum=500\n') - script.write('create partition primary\n') - script.write('format quick fs=ntfs label="Recovery Tools"\n') - script.write('assign letter="T"\n') - script.write('set id="de94bba4-06d1-4d40-a16a-bfd50179d6ac"\n') - script.write('gpt attributes=0x8000000000000001\n') - - # Run script - run_program('diskpart /s {script}'.format(script=diskpart_script)) - time.sleep(2) - -def format_mbr(disk=None, windows_family=None): - """Format disk for use as a Windows OS drive using the MBR (legacy) layout.""" - - # Bail early - if disk is None: - raise Exception('No disk provided.') - if windows_family is None: - raise Exception('No Windows family provided.') - - # Format drive - # print_info('Drive will use a MBR (legacy) layout.') - with open(diskpart_script, 'w') as script: - # Partition table - script.write('select disk {number}\n'.format(number=disk['Number'])) - script.write('clean\n') - - # System partition - script.write('create partition primary size=100\n') - script.write('format fs=ntfs quick label="System Reserved"\n') - script.write('active\n') - script.write('assign letter="S"\n') - - # Windows partition - script.write('create partition primary\n') - script.write('format fs=ntfs quick label="Windows"\n') - script.write('assign letter="W"\n') - - # Recovery Tools partition (Windows 8+) - if re.search(r'^(8|10)', windows_family): - script.write('shrink minimum=500\n') - script.write('create partition primary\n') - script.write('format quick fs=ntfs label="Recovery"\n') - script.write('assign letter="T"\n') - script.write('set id=27\n') - - # Run script - run_program('diskpart /s {script}'.format(script=diskpart_script)) - time.sleep(2) - -def get_attached_disk_info(): - """Get details about the attached disks""" - disks = [] - print_info('Getting drive info...') - - # Assign all the letters - assign_volume_letters() - - # Get disks - disks = get_disks() - - # Get disk details - for disk in disks: - # Get partition style - disk['Table'] = get_table_type(disk) - - # Get disk name/model and physical details - disk.update(get_disk_details(disk)) - - # Get partition info for disk - disk['Partitions'] = get_partitions(disk) - - for par in disk['Partitions']: - # Get partition details - par.update(get_partition_details(disk, par)) - - # Done - return disks - -def get_boot_mode(): - boot_mode = 'Legacy' - try: - reg_key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, 'System\\CurrentControlSet\\Control') - reg_value = winreg.QueryValueEx(reg_key, 'PEFirmwareType')[0] - if reg_value == 2: - boot_mode = 'UEFI' - except: - boot_mode = 'Unknown' - - return boot_mode - -def get_disk_details(disk=None): - details = {} - - # Bail early - if disk is None: - raise Exception('Disk not specified.') - - try: - # Run script - with open(diskpart_script, 'w') as script: - script.write('select disk {Number}\n'.format(**disk)) - script.write('detail disk\n') - process_return = run_program('diskpart /s {script}'.format(script=diskpart_script)) - process_return = process_return.stdout.decode().strip() - except subprocess.CalledProcessError: - pass - else: - # Remove empty lines - tmp = [s.strip() for s in process_return.splitlines() if s.strip() != ''] - - # Set disk name - details['Name'] = tmp[4] - - # Remove lines without a ':' and split each remaining line at the ':' to form a key/value pair - tmp = [s.split(':') for s in tmp if ':' in s] - - # Add key/value pairs to the details variable and return dict - details.update({key.strip(): value.strip() for (key, value) in tmp}) - - return details - -def get_disks(): - disks = [] - - try: - # Run script - with open(diskpart_script, 'w') as script: - script.write('list disk\n') - process_return = run_program('diskpart /s {script}'.format(script=diskpart_script)) - process_return = process_return.stdout.decode().strip() - except subprocess.CalledProcessError: - pass - else: - # Append disk numbers - for tmp in re.findall(r'Disk (\d+)\s+\w+\s+(\d+\s+\w+)', process_return): - _num = tmp[0] - _size = human_readable_size(tmp[1]) - disks.append({'Number': _num, 'Size': _size}) - - return disks - -def get_partition_details(disk=None, par=None): - details = {} - - # Bail early - if disk is None: - raise Exception('Disk not specified.') - if par is None: - raise Exception('Partition not specified.') - - # Diskpart details - try: - # Run script - with open(diskpart_script, 'w') as script: - script.write('select disk {Number}\n'.format(**disk)) - script.write('select partition {Number}\n'.format(**par)) - script.write('detail partition\n') - process_return = run_program('diskpart /s {script}'.format(script=diskpart_script)) - process_return = process_return.stdout.decode().strip() - except subprocess.CalledProcessError: - pass - else: - # Get volume letter or RAW status - tmp = re.search(r'Volume\s+\d+\s+(\w|RAW)\s+', process_return) - if tmp: - if tmp.group(1).upper() == 'RAW': - details['FileSystem'] = RAW - else: - details['Letter'] = tmp.group(1) - - # Remove empty lines from process_return - tmp = [s.strip() for s in process_return.splitlines() if s.strip() != ''] - - # Remove lines without a ':' and split each remaining line at the ':' to form a key/value pair - tmp = [s.split(':') for s in tmp if ':' in s] - - # Add key/value pairs to the details variable and return dict - details.update({key.strip(): value.strip() for (key, value) in tmp}) - - # Get MBR type / GPT GUID for extra details on "Unknown" partitions - guid = partition_uids.lookup_guid(details['Type']) - if guid is not None: - details.update({ - 'Description': guid.get('Description', ''), - 'OS': guid.get('OS', '')}) - - if 'Letter' in details: - # Disk usage - tmp = shutil.disk_usage('{Letter}:\\'.format(**details)) - details['Used Space'] = human_readable_size(tmp.used) - - # fsutil details - try: - process_return = run_program('fsutil fsinfo volumeinfo {Letter}:'.format(**details)) - process_return = process_return.stdout.decode().strip() - except subprocess.CalledProcessError: - pass - else: - # Remove empty lines from process_return - tmp = [s.strip() for s in process_return.splitlines() if s.strip() != ''] - - # Add "Feature" lines - details['File System Features'] = [s.strip() for s in tmp if ':' not in s] - - # Remove lines without a ':' and split each remaining line at the ':' to form a key/value pair - tmp = [s.split(':') for s in tmp if ':' in s] - - # Add key/value pairs to the details variable and return dict - details.update({key.strip(): value.strip() for (key, value) in tmp}) - - # Set Volume Name - details['Name'] = details.get('Volume Name', '') - - # Set FileSystem Type - if details.get('FileSystem', '') != 'RAW': - details['FileSystem'] = details.get('File System Name', 'Unknown') - - return details - -def get_partitions(disk=None): - partitions = [] - - # Bail early - if disk is None: - raise Exception('Disk not specified.') - - try: - # Run script - with open(diskpart_script, 'w') as script: - script.write('select disk {Number}\n'.format(**disk)) - script.write('list partition\n') - process_return = run_program('diskpart /s {script}'.format(script=diskpart_script)) - process_return = process_return.stdout.decode().strip() - except subprocess.CalledProcessError: - pass - else: - # Append partition numbers - for tmp in re.findall(r'Partition\s+(\d+)\s+\w+\s+(\d+\s+\w+)\s+', process_return, re.IGNORECASE): - _num = tmp[0] - _size = human_readable_size(tmp[1]) - partitions.append({'Number': _num, 'Size': _size}) - - return partitions - -def get_table_type(disk=None): - _type = 'Unknown' - - # Bail early - if disk is None: - raise Exception('Disk not specified.') - - try: - with open(diskpart_script, 'w') as script: - script.write('select disk {Number}\n'.format(**disk)) - script.write('uniqueid disk\n') - process_return = run_program('diskpart /s {script}'.format(script=diskpart_script)) - process_return = process_return.stdout.decode().strip() - except subprocess.CalledProcessError: - pass - else: - if re.findall(r'Disk ID: {[A-Z0-9]+-[A-Z0-9]+-[A-Z0-9]+-[A-Z0-9]+-[A-Z0-9]+}', process_return, re.IGNORECASE): - _type = 'GPT' - elif re.findall(r'Disk ID: 00000000', process_return, re.IGNORECASE): - _type = 'RAW' - elif re.findall(r'Disk ID: [A-Z0-9]+', process_return, re.IGNORECASE): - _type = 'MBR' - - return _type - -def get_ticket_id(): - ticket_id = None - while ticket_id is None: - tmp = input('Enter ticket number: ') - if re.match(r'^([0-9]+([\-_]*\w+|))$', tmp): - ticket_id = tmp - - return ticket_id - -def get_volumes(): - vols = [] - - try: - # Run script - with open(diskpart_script, 'w') as script: - script.write('list volume\n') - process_return = run_program('diskpart /s {script}'.format(script=diskpart_script)) - process_return = process_return.stdout.decode().strip() - except subprocess.CalledProcessError: - pass - else: - # Append volume numbers - for tmp in re.findall(r'Volume (\d+)\s+([A-Za-z]?)\s+', process_return): - vols.append({'Number': tmp[0], 'Letter': tmp[1]}) - - return vols - -def human_readable_size(size, decimals=0): - # Prep string formatting - width = 3+decimals - if decimals > 0: - width += 1 - human_format = '>{width}.{decimals}f'.format(width=width, decimals=decimals) - tmp = '' - - # Convert size to int - try: - size = int(size) - except ValueError: - size = convert_to_bytes(size) - - # Verify we have a valid size - if size <= 0: - return '{size:>{width}} b'.format(size='???', width=width) - - # Format string - if size >= 1099511627776: - size /= 1099511627776 - tmp = '{size:{human_format}} Tb'.format(size=size, human_format=human_format) - elif size >= 1073741824: - size /= 1073741824 - tmp = '{size:{human_format}} Gb'.format(size=size, human_format=human_format) - elif size >= 1048576: - size /= 1048576 - tmp = '{size:{human_format}} Mb'.format(size=size, human_format=human_format) - elif size >= 1024: - size /= 1024 - tmp = '{size:{human_format}} Kb'.format(size=size, human_format=human_format) - else: - tmp = '{size:{human_format}} b'.format(size=size, human_format=human_format) - - # Return - return tmp - -def menu_select(title='~ Untitled Menu ~', main_entries=[], action_entries=[], prompt='Please make a selection', secret_exit=False): - """Display options in a menu for user selection""" - - # Bail early - if (len(main_entries) + len(action_entries) == 0): - raise Exception("MenuError: No items given") - - # Build menu - menu_splash = '{title}\n\n'.format(title=title) - valid_answers = [] - if (secret_exit): - valid_answers.append('Q') - - # Add main entries - if (len(main_entries) > 0): - for i in range(len(main_entries)): - entry = main_entries[i] - # Add Spacer - if ('CRLF' in entry): - menu_splash += '\n' - valid_answers.append(str(i+1)) - menu_splash += '{number:>{mwidth}}: {name}\n'.format(number=i+1, mwidth=len(str(len(main_entries))), name=entry.get('Display Name', entry['Name'])) - menu_splash += '\n' - - # Add action entries - if (len(action_entries) > 0): - for entry in action_entries: - # Add Spacer - if ('CRLF' in entry): - menu_splash += '\n' - valid_answers.append(entry['Letter']) - menu_splash += '{letter:>{mwidth}}: {name}\n'.format(letter=entry['Letter'].upper(), mwidth=len(str(len(action_entries))), name=entry['Name']) - menu_splash += '\n' - - answer = '' - - while (answer.upper() not in valid_answers): - os.system('cls') - print(menu_splash) - answer = input('{prompt}: '.format(prompt=prompt)) - - return answer.upper() - -def mount_backup_shares(): - """Mount the backup shares for use as destinations for backup image creation""" - - # Attempt to mount share(s) - for server in BACKUP_SERVERS: - # Blindly skip if we mounted earlier - if server['Mounted']: - continue - else: - try: - # Test connection - run_program('ping -w 800 -n 2 {IP}'.format(**server)) - - # Mount - run_program('net use \\\\{IP}\\{Share} /user:{User} {Pass}'.format(**server)) - print_info('Mounted {Name}'.format(**server)) - server['Mounted'] = True - except subprocess.CalledProcessError: - print_error('Failed to mount \\\\{Name}\\{Share}, {IP} unreachable.'.format(**server)) - time.sleep(1) - except: - print_warning('Failed to mount \\\\{Name}\\{Share} ({IP})'.format(**server)) - time.sleep(1) - -def mount_windows_share(): - """Mount the Windows images share for use in Windows setup""" - - # Blindly skip if we mounted earlier - if WINDOWS_SERVER['Mounted']: - return None - else: - try: - # Test connection - run_program('ping -w 800 -n 2 {IP}'.format(**WINDOWS_SERVER)) - # Mount - run_program('net use \\\\{IP}\\{Share} /user:{User} {Pass}'.format(**WINDOWS_SERVER)) - print_info('Mounted {Name}'.format(**WINDOWS_SERVER)) - WINDOWS_SERVER['Mounted'] = True - except subprocess.CalledProcessError: - print_error('Failed to mount \\\\{Name}\\{Share}, {IP} unreachable.'.format(**WINDOWS_SERVER)) - time.sleep(1) - except: - print_warning('Failed to mount \\\\{Name}\\{Share}'.format(**WINDOWS_SERVER)) - time.sleep(1) - -def pause(prompt='Press Enter to continue... '): - input(prompt) - -def prep_disk_for_backup(dest=None, disk=None, ticket_id=None): - disk['Backup Warnings'] = '\n' - disk['Clobber Risk'] = [] - width = len(str(len(disk['Partitions']))) - - # Bail early - if dest is None: - raise Exception('Destination not provided.') - if disk is None: - raise Exception('Disk not provided.') - if ticket_id is None: - raise Exception('Ticket ID not provided.') - - # Get partition totals - disk['Bad Partitions'] = [par['Number'] for par in disk['Partitions'] if 'Letter' not in par or re.search(r'(RAW|Unknown)', par['FileSystem'], re.IGNORECASE)] - disk['Valid Partitions'] = len(disk['Partitions']) - len(disk['Bad Partitions']) - - # Bail if no valid partitions are found (those that can be imaged) - if disk['Valid Partitions'] <= 0: - abort_to_main_menu(' No partitions can be imaged for the selected drive') - - # Prep partitions - for par in disk['Partitions']: - if par['Number'] in disk['Bad Partitions']: - par['Display String'] = '{YELLOW} * Partition {Number:>{width}}:\t{Size} {FileSystem}\t\t{q}{Name}{q}\t{Description} ({OS}){CLEAR}'.format( - width=width, - q='"' if par['Name'] != '' else '', - **par, - **COLORS) - else: - # Update info for WIM capturing - par['Image Name'] = str(par['Name']) - if par['Image Name'] == '': - par['Image Name'] = 'Unknown' - if 'IP' in dest: - par['Image Path'] = '\\\\{IP}\\{Share}\\{ticket}'.format(ticket=ticket_id, **dest) - else: - par['Image Path'] = '{Letter}:\\{ticket}'.format(ticket=ticket_id, **dest) - par['Image File'] = '{Number}_{Image Name}'.format(**par) - par['Image File'] = '{fixed_name}.wim'.format(fixed_name=re.sub(r'\W', '_', par['Image File'])) - - # Check for existing backups - par['Image Exists'] = False - if os.path.exists('{Image Path}\\{Image File}'.format(**par)): - par['Image Exists'] = True - disk['Clobber Risk'].append(par['Number']) - par['Display String'] = '{BLUE} + '.format(**COLORS) - else: - par['Display String'] = '{CLEAR} '.format(**COLORS) - - # Append rest of Display String for valid/clobber partitions - par['Display String'] += 'Partition {Number:>{width}}:\t{Size} {FileSystem} (Used: {Used Space})\t{q}{Name}{q}{CLEAR}'.format( - width=width, - q='"' if par['Name'] != '' else '', - **par, - **COLORS) - - # Set description for bad partitions - if len(disk['Bad Partitions']) > 1: - disk['Backup Warnings'] += '{YELLOW} * Unable to backup these partitions{CLEAR}\n'.format(**COLORS) - elif len(disk['Bad Partitions']) == 1: - print_warning(' * Unable to backup this partition') - disk['Backup Warnings'] += '{YELLOW} * Unable to backup this partition{CLEAR}\n'.format(**COLORS) - - # Set description for partitions that would be clobbered - if len(disk['Clobber Risk']) > 1: - disk['Backup Warnings'] += '{BLUE} + These partitions already have backup images on {Name}{CLEAR}\n'.format(**dest, **COLORS) - elif len(disk['Clobber Risk']) == 1: - disk['Backup Warnings'] += '{BLUE} + This partition already has a backup image on {Name}{CLEAR}\n'.format(**dest, **COLORS) - - # Set warning for skipped partitions - if len(disk['Clobber Risk']) + len(disk['Bad Partitions']) > 1: - disk['Backup Warnings'] += '\n{YELLOW}If you continue the partitions marked above will NOT be backed up.{CLEAR}\n'.format(**COLORS) - if len(disk['Clobber Risk']) + len(disk['Bad Partitions']) == 1: - disk['Backup Warnings'] += '\n{YELLOW}If you continue the partition marked above will NOT be backed up.{CLEAR}\n'.format(**COLORS) - -def prep_disk_for_formatting(disk=None): - disk['Format Warnings'] = '\n' - width = len(str(len(disk['Partitions']))) - - # Bail early - if disk is None: - raise Exception('Disk not provided.') - - # Set boot method and partition table type - disk['Use GPT'] = True - if (get_boot_mode() == 'UEFI'): - if (not ask("Setup Windows to use UEFI booting?")): - disk['Use GPT'] = False - else: - if (ask("Setup Windows to use BIOS/Legacy booting?")): - disk['Use GPT'] = False - - # Set Display and Warning Strings - if len(disk['Partitions']) == 0: - disk['Format Warnings'] += 'No partitions found\n' - for par in disk['Partitions']: - if 'Letter' not in par or re.search(r'(RAW|Unknown)', par['FileSystem'], re.IGNORECASE): - # FileSystem not accessible to WinPE. List partition type / OS info for technician - par['Display String'] = ' Partition {Number:>{width}}:\t{Size} {FileSystem}\t\t{q}{Name}{q}\t{Description} ({OS})'.format( - width=width, - q='"' if par['Name'] != '' else '', - **par) - else: - # FileSystem accessible to WinPE. List space used instead of partition type / OS info for technician - par['Display String'] = ' Partition {Number:>{width}}:\t{Size} {FileSystem} (Used: {Used Space})\t{q}{Name}{q}'.format( - width=width, - q='"' if par['Name'] != '' else '', - **par) - -def print_error(message='Generic error', **kwargs): - print('{RED}{message}{CLEAR}'.format(message=message, **COLORS, **kwargs)) - -def print_info(message='Generic info', **kwargs): - print('{BLUE}{message}{CLEAR}'.format(message=message, **COLORS, **kwargs)) - -def print_success(message='Generic success', **kwargs): - print('{GREEN}{message}{CLEAR}'.format(message=message, **COLORS, **kwargs)) - -def print_warning(message='Generic warning', **kwargs): - print('{YELLOW}{message}{CLEAR}'.format(message=message, **COLORS, **kwargs)) - -def remove_volume_letters(keep=''): - if keep is None: - keep = '' - try: - # Run script - with open(diskpart_script, 'w') as script: - for vol in get_volumes(): - if vol['Letter'].upper() != keep.upper(): - script.write('select volume {Number}\n'.format(**vol)) - script.write('remove noerr\n') - run_program('diskpart /s {script}'.format(script=diskpart_script)) - except subprocess.CalledProcessError: - pass - -def run_program(cmd=None, args=[], check=True): - if cmd is None: - raise Exception('No program passed.') - - if len(args) > 0: - args = [cmd] + args - process_return = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=check) - else: - process_return = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=check) - - return process_return - -def select_destination(): - # Build menu - dests = [] - for server in BACKUP_SERVERS: - if server['Mounted']: - dests.append(server) - actions = [ - {'Name': 'Main Menu', 'Letter': 'M'}, - ] - - # Size check - for dest in dests: - if 'IP' in dest: - dest['Usage'] = shutil.disk_usage('\\\\{IP}\\{Share}'.format(**dest)) - else: - dest['Usage'] = shutil.disk_usage('{Letter}:\\'.format(**dest)) - dest['Free Space'] = human_readable_size(dest['Usage'].free) - dest['Display Name'] = '{Name} ({Free Space} available)'.format(**dest) - - # Show menu or bail - if len(dests) > 0: - selection = menu_select('Where are we backing up to?', dests, actions) - if selection == 'M': - return None - else: - return dests[int(selection)-1] - else: - print_warning('No backup destinations found.') - return None - -def select_disk(prompt='Which disk?'): - """Select a disk from the attached disks""" - disks = get_attached_disk_info() - - # Build menu - disk_options = [] - for disk in disks: - display_name = '{Size}\t[{Table}] ({Type}) {Name}'.format(**disk) - if len(disk['Partitions']) > 0: - pwidth=len(str(len(disk['Partitions']))) - for par in disk['Partitions']: - # Show unsupported partition(s) in RED - par_skip = False - if 'Letter' not in par or re.search(r'(RAW|Unknown)', par['FileSystem'], re.IGNORECASE): - par_skip = True - if par_skip: - display_name += COLORS['YELLOW'] - - # Main text - display_name += '\n\t\t\tPartition {Number:>{pwidth}}: {Size} ({FileSystem})'.format(pwidth=pwidth, **par) - if par['Name'] != '': - display_name += '\t"{Name}"'.format(**par) - - # Clear color (if set above) - if par_skip: - display_name += COLORS['CLEAR'] - else: - display_name += '{YELLOW}\n\t\t\tNo partitions found.{CLEAR}'.format(**COLORS) - disk_options.append({'Name': display_name, 'Disk': disk}) - actions = [ - {'Name': 'Main Menu', 'Letter': 'M'}, - ] - - # Menu loop - selection = menu_select(prompt, disk_options, actions) - - if (selection.isnumeric()): - return disk_options[int(selection)-1]['Disk'] - elif (selection == 'M'): - abort_to_main_menu() - -def select_minidump_path(): - dumps = [] - - # Assign volume letters first - assign_volume_letters() - - # Search for minidumps - tmp = run_program('mountvol') - tmp = [d for d in re.findall(r'.*([A-Za-z]):\\', tmp.stdout.decode())] - # Remove RAMDisk letter - if 'X' in tmp: - tmp.remove('X') - for drive in tmp: - if os.path.exists('{drive}:\\Windows\\MiniDump'.format(drive=drive)): - dumps.append({'Name': '{drive}:\\Windows\\MiniDump'.format(drive=drive)}) - - # Check results before showing menu - if len(dumps) == 0: - print_error(' No BSoD / MiniDump paths found') - time.sleep(2) - return None - - # Menu - selection = menu_select('Which BSoD / MiniDump path are we scanning?', dumps, []) - return dumps[int(selection) - 1]['Name'] - -def select_windows_version(): - actions = [{'Name': 'Main Menu', 'Letter': 'M'},] - - # Menu loop - selection = menu_select('Which version of Windows are we installing?', WINDOWS_VERSIONS, actions) - - if selection.isnumeric(): - return WINDOWS_VERSIONS[int(selection)-1] - elif selection == 'M': - abort_to_main_menu() - -def setup_windows(bin=None, windows_image=None, windows_version=None): - # Bail early - if bin is None: - raise Exception('bin path not specified.') - if windows_image is None: - raise Exception('Windows image not specified.') - if windows_version is None: - raise Exception('Windows version not specified.') - - # Apply image - cmd = '{bin}\\wimlib\\wimlib-imagex apply "{File}.{Ext}" "{Image Name}" W:\\ {Glob}'.format(bin=bin, **windows_image, **windows_version) - run_program(cmd) - -def setup_windows_re(windows_version=None, windows_letter='W', tools_letter='T'): - # Bail early - if windows_version is None: - raise Exception('Windows version not specified.') - - _win = '{win}:\\Windows'.format(win=windows_letter) - _winre = '{win}\\System32\\Recovery\\WinRE.wim'.format(win=_win) - _dest = '{tools}:\\Recovery\\WindowsRE'.format(tools=tools_letter) - - if re.search(r'^(8|10)', windows_version['Family']): - # Copy WinRE.wim - os.makedirs(_dest, exist_ok=True) - shutil.copy(_winre, '{dest}\\WinRE.wim'.format(dest=_dest)) - - # Set location - run_program('{win}\\System32\\reagentc /setreimage /path {dest} /target {win}'.format(dest=_dest, win=_win)) - else: - # Only supported on Windows 8 and above - raise SetupError - -def update_boot_partition(system_letter='S', windows_letter='W', mode='ALL'): - run_program('bcdboot {win}:\\Windows /s {sys}: /f {mode}'.format(win=windows_letter, sys=system_letter, mode=mode)) - -def verify_wim_backup(bin=None, par=None): - # Bail early - if bin is None: - raise Exception('bin path not specified.') - if par is None: - raise Exception('Partition not specified.') - - # Verify hiding all output for quicker verification - print(' Partition {Number} Image...\t\t'.format(**par), end='', flush=True) - cmd = '{bin}\\wimlib\\wimlib-imagex verify "{Image Path}\\{Image File}" --nocheck'.format(bin=bin, **par) - if not os.path.exists('{Image Path}\\{Image File}'.format(**par)): - print_error('Missing.') - else: - try: - run_program(cmd) - print_success('OK.') - except subprocess.CalledProcessError as err: - print_error('Damaged.') - par['Error'] = par.get('Error', []) + err.stderr.decode().splitlines() - raise BackupError - -if __name__ == '__main__': - print("This file is not meant to be called directly.") diff --git a/Scripts/functions/backup.py b/Scripts/functions/backup.py new file mode 100644 index 00000000..94f3b103 --- /dev/null +++ b/Scripts/functions/backup.py @@ -0,0 +1,486 @@ +# Wizard Kit PE: Functions - Backup + +from functions.common import * +import partition_uids + +def assign_volume_letters(): + try: + # Run script + with open(DISKPART_SCRIPT, 'w') as script: + for vol in get_volumes(): + script.write('select volume {Number}\n'.format(**vol)) + script.write('assign\n') + run_program('diskpart /s {script}'.format(script=DISKPART_SCRIPT)) + except subprocess.CalledProcessError: + pass + +def backup_partition(bin=None, disk=None, par=None): + # Bail early + if bin is None: + raise Exception('bin path not specified.') + if disk is None: + raise Exception('Disk not specified.') + if par is None: + raise Exception('Partition not specified.') + + print(' Partition {Number} Backup...\t\t'.format(**par), end='', flush=True) + if par['Number'] in disk['Bad Partitions']: + print_warning('Skipped.') + else: + cmd = '{bin}\\wimlib\\wimlib-imagex capture {Letter}:\\ "{Image Path}\\{Image File}" "{Image Name}" "{Image Name}" --compress=none'.format(bin=bin, **par) + if par['Image Exists']: + print_warning('Skipped.') + else: + try: + os.makedirs('{Image Path}'.format(**par), exist_ok=True) + run_program(cmd) + print_success('Complete.') + except subprocess.CalledProcessError as err: + print_error('Failed.') + par['Error'] = err.stderr.decode().splitlines() + raise BackupError + +def get_attached_disk_info(): + """Get details about the attached disks""" + disks = [] + print_info('Getting drive info...') + + # Assign all the letters + assign_volume_letters() + + # Get disks + disks = get_disks() + + # Get disk details + for disk in disks: + # Get partition style + disk['Table'] = get_table_type(disk) + + # Get disk name/model and physical details + disk.update(get_disk_details(disk)) + + # Get partition info for disk + disk['Partitions'] = get_partitions(disk) + + for par in disk['Partitions']: + # Get partition details + par.update(get_partition_details(disk, par)) + + # Done + return disks + +def get_disk_details(disk=None): + details = {} + + # Bail early + if disk is None: + raise Exception('Disk not specified.') + + try: + # Run script + with open(DISKPART_SCRIPT, 'w') as script: + script.write('select disk {Number}\n'.format(**disk)) + script.write('detail disk\n') + process_return = run_program('diskpart /s {script}'.format(script=DISKPART_SCRIPT)) + process_return = process_return.stdout.decode().strip() + except subprocess.CalledProcessError: + pass + else: + # Remove empty lines + tmp = [s.strip() for s in process_return.splitlines() if s.strip() != ''] + + # Set disk name + details['Name'] = tmp[4] + + # Remove lines without a ':' and split each remaining line at the ':' to form a key/value pair + tmp = [s.split(':') for s in tmp if ':' in s] + + # Add key/value pairs to the details variable and return dict + details.update({key.strip(): value.strip() for (key, value) in tmp}) + + return details + +def get_disks(): + disks = [] + + try: + # Run script + with open(DISKPART_SCRIPT, 'w') as script: + script.write('list disk\n') + process_return = run_program('diskpart /s {script}'.format(script=DISKPART_SCRIPT)) + process_return = process_return.stdout.decode().strip() + except subprocess.CalledProcessError: + pass + else: + # Append disk numbers + for tmp in re.findall(r'Disk (\d+)\s+\w+\s+(\d+\s+\w+)', process_return): + _num = tmp[0] + _size = human_readable_size(tmp[1]) + disks.append({'Number': _num, 'Size': _size}) + + return disks + +def get_partition_details(disk=None, par=None): + details = {} + + # Bail early + if disk is None: + raise Exception('Disk not specified.') + if par is None: + raise Exception('Partition not specified.') + + # Diskpart details + try: + # Run script + with open(DISKPART_SCRIPT, 'w') as script: + script.write('select disk {Number}\n'.format(**disk)) + script.write('select partition {Number}\n'.format(**par)) + script.write('detail partition\n') + process_return = run_program('diskpart /s {script}'.format(script=DISKPART_SCRIPT)) + process_return = process_return.stdout.decode().strip() + except subprocess.CalledProcessError: + pass + else: + # Get volume letter or RAW status + tmp = re.search(r'Volume\s+\d+\s+(\w|RAW)\s+', process_return) + if tmp: + if tmp.group(1).upper() == 'RAW': + details['FileSystem'] = RAW + else: + details['Letter'] = tmp.group(1) + + # Remove empty lines from process_return + tmp = [s.strip() for s in process_return.splitlines() if s.strip() != ''] + + # Remove lines without a ':' and split each remaining line at the ':' to form a key/value pair + tmp = [s.split(':') for s in tmp if ':' in s] + + # Add key/value pairs to the details variable and return dict + details.update({key.strip(): value.strip() for (key, value) in tmp}) + + # Get MBR type / GPT GUID for extra details on "Unknown" partitions + guid = partition_uids.lookup_guid(details['Type']) + if guid is not None: + details.update({ + 'Description': guid.get('Description', ''), + 'OS': guid.get('OS', '')}) + + if 'Letter' in details: + # Disk usage + tmp = shutil.disk_usage('{Letter}:\\'.format(**details)) + details['Used Space'] = human_readable_size(tmp.used) + + # fsutil details + try: + process_return = run_program('fsutil fsinfo volumeinfo {Letter}:'.format(**details)) + process_return = process_return.stdout.decode().strip() + except subprocess.CalledProcessError: + pass + else: + # Remove empty lines from process_return + tmp = [s.strip() for s in process_return.splitlines() if s.strip() != ''] + + # Add "Feature" lines + details['File System Features'] = [s.strip() for s in tmp if ':' not in s] + + # Remove lines without a ':' and split each remaining line at the ':' to form a key/value pair + tmp = [s.split(':') for s in tmp if ':' in s] + + # Add key/value pairs to the details variable and return dict + details.update({key.strip(): value.strip() for (key, value) in tmp}) + + # Set Volume Name + details['Name'] = details.get('Volume Name', '') + + # Set FileSystem Type + if details.get('FileSystem', '') != 'RAW': + details['FileSystem'] = details.get('File System Name', 'Unknown') + + return details + +def get_partitions(disk=None): + partitions = [] + + # Bail early + if disk is None: + raise Exception('Disk not specified.') + + try: + # Run script + with open(DISKPART_SCRIPT, 'w') as script: + script.write('select disk {Number}\n'.format(**disk)) + script.write('list partition\n') + process_return = run_program('diskpart /s {script}'.format(script=DISKPART_SCRIPT)) + process_return = process_return.stdout.decode().strip() + except subprocess.CalledProcessError: + pass + else: + # Append partition numbers + for tmp in re.findall(r'Partition\s+(\d+)\s+\w+\s+(\d+\s+\w+)\s+', process_return, re.IGNORECASE): + _num = tmp[0] + _size = human_readable_size(tmp[1]) + partitions.append({'Number': _num, 'Size': _size}) + + return partitions + +def get_table_type(disk=None): + _type = 'Unknown' + + # Bail early + if disk is None: + raise Exception('Disk not specified.') + + try: + with open(DISKPART_SCRIPT, 'w') as script: + script.write('select disk {Number}\n'.format(**disk)) + script.write('uniqueid disk\n') + process_return = run_program('diskpart /s {script}'.format(script=DISKPART_SCRIPT)) + process_return = process_return.stdout.decode().strip() + except subprocess.CalledProcessError: + pass + else: + if re.findall(r'Disk ID: {[A-Z0-9]+-[A-Z0-9]+-[A-Z0-9]+-[A-Z0-9]+-[A-Z0-9]+}', process_return, re.IGNORECASE): + _type = 'GPT' + elif re.findall(r'Disk ID: 00000000', process_return, re.IGNORECASE): + _type = 'RAW' + elif re.findall(r'Disk ID: [A-Z0-9]+', process_return, re.IGNORECASE): + _type = 'MBR' + + return _type + +def get_volumes(): + vols = [] + + try: + # Run script + with open(DISKPART_SCRIPT, 'w') as script: + script.write('list volume\n') + process_return = run_program('diskpart /s {script}'.format(script=DISKPART_SCRIPT)) + process_return = process_return.stdout.decode().strip() + except subprocess.CalledProcessError: + pass + else: + # Append volume numbers + for tmp in re.findall(r'Volume (\d+)\s+([A-Za-z]?)\s+', process_return): + vols.append({'Number': tmp[0], 'Letter': tmp[1]}) + + return vols + +def prep_disk_for_backup(dest=None, disk=None, ticket_id=None): + disk['Backup Warnings'] = '\n' + disk['Clobber Risk'] = [] + width = len(str(len(disk['Partitions']))) + + # Bail early + if dest is None: + raise Exception('Destination not provided.') + if disk is None: + raise Exception('Disk not provided.') + if ticket_id is None: + raise Exception('Ticket ID not provided.') + + # Get partition totals + disk['Bad Partitions'] = [par['Number'] for par in disk['Partitions'] if 'Letter' not in par or re.search(r'(RAW|Unknown)', par['FileSystem'], re.IGNORECASE)] + disk['Valid Partitions'] = len(disk['Partitions']) - len(disk['Bad Partitions']) + + # Bail if no valid partitions are found (those that can be imaged) + if disk['Valid Partitions'] <= 0: + abort_to_main_menu(' No partitions can be imaged for the selected drive') + + # Prep partitions + for par in disk['Partitions']: + if par['Number'] in disk['Bad Partitions']: + par['Display String'] = '{YELLOW} * Partition {Number:>{width}}:\t{Size} {FileSystem}\t\t{q}{Name}{q}\t{Description} ({OS}){CLEAR}'.format( + width=width, + q='"' if par['Name'] != '' else '', + **par, + **COLORS) + else: + # Update info for WIM capturing + par['Image Name'] = str(par['Name']) + if par['Image Name'] == '': + par['Image Name'] = 'Unknown' + if 'IP' in dest: + par['Image Path'] = '\\\\{IP}\\{Share}\\{ticket}'.format(ticket=ticket_id, **dest) + else: + par['Image Path'] = '{Letter}:\\{ticket}'.format(ticket=ticket_id, **dest) + par['Image File'] = '{Number}_{Image Name}'.format(**par) + par['Image File'] = '{fixed_name}.wim'.format(fixed_name=re.sub(r'\W', '_', par['Image File'])) + + # Check for existing backups + par['Image Exists'] = False + if os.path.exists('{Image Path}\\{Image File}'.format(**par)): + par['Image Exists'] = True + disk['Clobber Risk'].append(par['Number']) + par['Display String'] = '{BLUE} + '.format(**COLORS) + else: + par['Display String'] = '{CLEAR} '.format(**COLORS) + + # Append rest of Display String for valid/clobber partitions + par['Display String'] += 'Partition {Number:>{width}}:\t{Size} {FileSystem} (Used: {Used Space})\t{q}{Name}{q}{CLEAR}'.format( + width=width, + q='"' if par['Name'] != '' else '', + **par, + **COLORS) + + # Set description for bad partitions + if len(disk['Bad Partitions']) > 1: + disk['Backup Warnings'] += '{YELLOW} * Unable to backup these partitions{CLEAR}\n'.format(**COLORS) + elif len(disk['Bad Partitions']) == 1: + print_warning(' * Unable to backup this partition') + disk['Backup Warnings'] += '{YELLOW} * Unable to backup this partition{CLEAR}\n'.format(**COLORS) + + # Set description for partitions that would be clobbered + if len(disk['Clobber Risk']) > 1: + disk['Backup Warnings'] += '{BLUE} + These partitions already have backup images on {Name}{CLEAR}\n'.format(**dest, **COLORS) + elif len(disk['Clobber Risk']) == 1: + disk['Backup Warnings'] += '{BLUE} + This partition already has a backup image on {Name}{CLEAR}\n'.format(**dest, **COLORS) + + # Set warning for skipped partitions + if len(disk['Clobber Risk']) + len(disk['Bad Partitions']) > 1: + disk['Backup Warnings'] += '\n{YELLOW}If you continue the partitions marked above will NOT be backed up.{CLEAR}\n'.format(**COLORS) + if len(disk['Clobber Risk']) + len(disk['Bad Partitions']) == 1: + disk['Backup Warnings'] += '\n{YELLOW}If you continue the partition marked above will NOT be backed up.{CLEAR}\n'.format(**COLORS) + +def prep_disk_for_formatting(disk=None): + disk['Format Warnings'] = '\n' + width = len(str(len(disk['Partitions']))) + + # Bail early + if disk is None: + raise Exception('Disk not provided.') + + # Set boot method and partition table type + disk['Use GPT'] = True + if (get_boot_mode() == 'UEFI'): + if (not ask("Setup Windows to use UEFI booting?")): + disk['Use GPT'] = False + else: + if (ask("Setup Windows to use BIOS/Legacy booting?")): + disk['Use GPT'] = False + + # Set Display and Warning Strings + if len(disk['Partitions']) == 0: + disk['Format Warnings'] += 'No partitions found\n' + for par in disk['Partitions']: + if 'Letter' not in par or re.search(r'(RAW|Unknown)', par['FileSystem'], re.IGNORECASE): + # FileSystem not accessible to WinPE. List partition type / OS info for technician + par['Display String'] = ' Partition {Number:>{width}}:\t{Size} {FileSystem}\t\t{q}{Name}{q}\t{Description} ({OS})'.format( + width=width, + q='"' if par['Name'] != '' else '', + **par) + else: + # FileSystem accessible to WinPE. List space used instead of partition type / OS info for technician + par['Display String'] = ' Partition {Number:>{width}}:\t{Size} {FileSystem} (Used: {Used Space})\t{q}{Name}{q}'.format( + width=width, + q='"' if par['Name'] != '' else '', + **par) + +def remove_volume_letters(keep=''): + if keep is None: + keep = '' + try: + # Run script + with open(DISKPART_SCRIPT, 'w') as script: + for vol in get_volumes(): + if vol['Letter'].upper() != keep.upper(): + script.write('select volume {Number}\n'.format(**vol)) + script.write('remove noerr\n') + run_program('diskpart /s {script}'.format(script=DISKPART_SCRIPT)) + except subprocess.CalledProcessError: + pass + +def select_backup_destination(): + # Build menu + dests = [] + for server in BACKUP_SERVERS: + if server['Mounted']: + dests.append(server) + actions = [ + {'Name': 'Main Menu', 'Letter': 'M'}, + ] + + # Size check + for dest in dests: + if 'IP' in dest: + dest['Usage'] = shutil.disk_usage('\\\\{IP}\\{Share}'.format(**dest)) + else: + dest['Usage'] = shutil.disk_usage('{Letter}:\\'.format(**dest)) + dest['Free Space'] = human_readable_size(dest['Usage'].free) + dest['Display Name'] = '{Name} ({Free Space} available)'.format(**dest) + + # Show menu or bail + if len(dests) > 0: + selection = menu_select('Where are we backing up to?', dests, actions) + if selection == 'M': + return None + else: + return dests[int(selection)-1] + else: + print_warning('No backup destinations found.') + return None + +def select_disk(prompt='Which disk?'): + """Select a disk from the attached disks""" + disks = get_attached_disk_info() + + # Build menu + disk_options = [] + for disk in disks: + display_name = '{Size}\t[{Table}] ({Type}) {Name}'.format(**disk) + if len(disk['Partitions']) > 0: + pwidth=len(str(len(disk['Partitions']))) + for par in disk['Partitions']: + # Show unsupported partition(s) in RED + par_skip = False + if 'Letter' not in par or re.search(r'(RAW|Unknown)', par['FileSystem'], re.IGNORECASE): + par_skip = True + if par_skip: + display_name += COLORS['YELLOW'] + + # Main text + display_name += '\n\t\t\tPartition {Number:>{pwidth}}: {Size} ({FileSystem})'.format(pwidth=pwidth, **par) + if par['Name'] != '': + display_name += '\t"{Name}"'.format(**par) + + # Clear color (if set above) + if par_skip: + display_name += COLORS['CLEAR'] + else: + display_name += '{YELLOW}\n\t\t\tNo partitions found.{CLEAR}'.format(**COLORS) + disk_options.append({'Name': display_name, 'Disk': disk}) + actions = [ + {'Name': 'Main Menu', 'Letter': 'M'}, + ] + + # Menu loop + selection = menu_select(prompt, disk_options, actions) + + if (selection.isnumeric()): + return disk_options[int(selection)-1]['Disk'] + elif (selection == 'M'): + abort_to_main_menu() + +def verify_wim_backup(bin=None, par=None): + # Bail early + if bin is None: + raise Exception('bin path not specified.') + if par is None: + raise Exception('Partition not specified.') + + # Verify hiding all output for quicker verification + print(' Partition {Number} Image...\t\t'.format(**par), end='', flush=True) + cmd = '{bin}\\wimlib\\wimlib-imagex verify "{Image Path}\\{Image File}" --nocheck'.format(bin=bin, **par) + if not os.path.exists('{Image Path}\\{Image File}'.format(**par)): + print_error('Missing.') + else: + try: + run_program(cmd) + print_success('OK.') + except subprocess.CalledProcessError as err: + print_error('Damaged.') + par['Error'] = par.get('Error', []) + err.stderr.decode().splitlines() + raise BackupError + +if __name__ == '__main__': + print("This file is not meant to be called directly.") diff --git a/Scripts/functions/common.py b/Scripts/functions/common.py new file mode 100644 index 00000000..a0453a95 --- /dev/null +++ b/Scripts/functions/common.py @@ -0,0 +1,682 @@ +# Wizard Kit PE: Functions - Common + +import os +import psutil +import re +import shutil +import subprocess +import sys +import time +import traceback +import winreg + +from subprocess import CalledProcessError + +from settings.main import * +from settings.tools import * + +# Global variables +global_vars = {} + +# STATIC VARIABLES +COLORS = { + 'CLEAR': '\033[0m', + 'RED': '\033[31m', + 'GREEN': '\033[32m', + 'YELLOW': '\033[33m', + 'BLUE': '\033[34m' +} +HKU = winreg.HKEY_USERS +HKCU = winreg.HKEY_CURRENT_USER +HKLM = winreg.HKEY_LOCAL_MACHINE + +# Error Classes +class BIOSKeyNotFoundError(Exception): + pass + +class BinNotFoundError(Exception): + pass + +class GenericError(Exception): + pass + +class GenericRepair(Exception): + pass + +class MultipleInstallationsError(Exception): + pass + +class NotInstalledError(Exception): + pass + +class NoProfilesError(Exception): + pass + +class PathNotFoundException(Exception): + pass + +class UnsupportedOSError(Exception): + pass + +# General functions +def abort(): + """Abort script.""" + print_warning('Aborted.') + sleep(5) + exit_script() + +def ask(prompt='Kotaero!'): + """Prompt the user with a Y/N question, log answer, and return a bool.""" + answer = None + prompt = '{} [Y/N]: '.format(prompt) + while answer is None: + tmp = input(prompt) + if re.search(r'^y(es|)$', tmp, re.IGNORECASE): + answer = True + elif re.search(r'^n(o|ope|)$', tmp, re.IGNORECASE): + answer = False + message = '{prompt}{answer_text}'.format( + prompt = prompt, + answer_text = 'Yes' if answer else 'No') + print_log(message=message) + return answer + +def convert_to_bytes(size): + """Convert human-readable size str to bytes and return an int.""" + size = str(size) + tmp = re.search(r'(\d+)\s+([KMGT]B)', size.upper()) + if tmp: + size = int(tmp.group(1)) + units = tmp.group(2) + if units == 'TB': + size *= 1099511627776 + elif units == 'GB': + size *= 1073741824 + elif units == 'MB': + size *= 1048576 + elif units == 'KB': + size *= 1024 + else: + return -1 + + return size + +def exit_script(return_value=0): + """Exits the script after some cleanup and opens the log (if set).""" + # Remove dirs (if empty) + for dir in ['BackupDir', 'LogDir', 'TmpDir']: + try: + dir = global_vars[dir] + os.rmdir(dir) + except Exception: + pass + + # Open Log (if it exists) + log = global_vars.get('LogFile', '') + if log and os.path.exists(log): + try: + extract_item('NotepadPlusPlus', silent=True) + popen_program( + [global_vars['Tools']['NotepadPlusPlus'], + global_vars['LogFile']]) + except Exception: + print_error('ERROR: Failed to extract Notepad++ and open log.') + pause('Press Enter to exit...') + + # Kill Caffeine if still running + kill_process('caffeine.exe') + + # Exit + sys.exit(return_value) + +def extract_item(item, filter='', silent=False): + """Extract item from .cbin into .bin.""" + cmd = [ + global_vars['Tools']['SevenZip'], 'x', '-aos', '-bso0', '-bse0', + '-p{ArchivePassword}'.format(**global_vars), + r'-o{BinDir}\{item}'.format(item=item, **global_vars), + r'{CBinDir}\{item}.7z'.format(item=item, **global_vars), + filter] + if not silent: + print_standard('Extracting "{item}"...'.format(item=item)) + try: + run_program(cmd) + except subprocess.CalledProcessError: + if not silent: + print_warning('WARNING: Errors encountered while exctracting data') + +def get_ticket_number(): + """Get TicketNumber from user, save in LogDir, and return as str.""" + ticket_number = None + while ticket_number is None: + _input = input('Enter ticket number: ') + if re.match(r'^([0-9]+([-_]?\w+|))$', _input): + ticket_number = _input + with open(r'{LogDir}\TicketNumber'.format(**global_vars), 'w') as f: + f.write(ticket_number) + return ticket_number + +def human_readable_size(size, decimals=0): + """Convert size in bytes to a human-readable format and return a str.""" + # Prep string formatting + width = 3+decimals + if decimals > 0: + width += 1 + + # Convert size to int + try: + size = int(size) + except ValueError: + size = convert_to_bytes(size) + + # Verify we have a valid size + if size <= 0: + return '{size:>{width}} b'.format(size='???', width=width) + + # Convert to sensible units + if size >= 1099511627776: + size /= 1099511627776 + units = 'Tb' + elif size >= 1073741824: + size /= 1073741824 + units = 'Gb' + elif size >= 1048576: + size /= 1048576 + units = 'Mb' + elif size >= 1024: + size /= 1024 + units = 'Kb' + else: + units = ' b' + + # Return + return '{size:>{width}.{decimals}f} {units}'.format( + size=size, width=width, decimals=decimals, units=units) + +def kill_process(name): + """Kill any running caffeine.exe processes.""" + for proc in psutil.process_iter(): + if proc.name() == name: + proc.kill() + +def major_exception(): + """Display traceback and exit""" + print_error('Major exception') + print_warning(SUPPORT_MESSAGE) + print(traceback.format_exc()) + print_log(traceback.format_exc()) + sleep(30) + pause('Press Enter to exit...') + exit_script(1) + +def menu_select(title='~ Untitled Menu ~', + prompt='Please make a selection', secret_exit=False, + main_entries=[], action_entries=[], disabled_label='DISABLED'): + """Display options in a menu and return selected option as a str.""" + # Bail early + if not main_entries and not action_entries: + raise Exception("MenuError: No items given") + + # Build menu + menu_splash = '{}\n\n'.format(title) + width = len(str(len(main_entries))) + valid_answers = [] + if (secret_exit): + valid_answers.append('Q') + + # Add main entries + for i in range(len(main_entries)): + entry = main_entries[i] + # Add Spacer + if ('CRLF' in entry): + menu_splash += '\n' + entry_str = '{number:>{width}}: {name}'.format( + number = i+1, + width = width, + name = entry.get('Display Name', entry['Name'])) + if entry.get('Disabled', False): + entry_str = '{YELLOW}{entry_str} ({disabled}){CLEAR}'.format( + entry_str = entry_str, + disabled = disabled_label, + **COLORS) + else: + valid_answers.append(str(i+1)) + menu_splash += '{}\n'.format(entry_str) + menu_splash += '\n' + + # Add action entries + for entry in action_entries: + # Add Spacer + if ('CRLF' in entry): + menu_splash += '\n' + valid_answers.append(entry['Letter']) + menu_splash += '{letter:>{width}}: {name}\n'.format( + letter = entry['Letter'].upper(), + width = len(str(len(action_entries))), + name = entry['Name']) + menu_splash += '\n' + + answer = '' + + while (answer.upper() not in valid_answers): + os.system('cls') + print(menu_splash) + answer = input('{}: '.format(prompt)) + + return answer.upper() + +def non_clobber_rename(full_path): + """Append suffix to path, if necessary, to avoid clobbering path""" + new_path = full_path + _i = 1; + while os.path.exists(new_path): + new_path = '{path}_{i}'.format(i=_i, path=full_path) + _i += 1 + + return new_path + +def pause(prompt='Press Enter to continue... '): + """Simple pause implementation.""" + input(prompt) + +def ping(addr='google.com'): + """Attempt to ping addr.""" + cmd = ['ping', '-n', '2', addr] + run_program(cmd) + +def popen_program(cmd, pipe=False, minimized=False, shell=False, **kwargs): + """Run program and return a subprocess.Popen object.""" + startupinfo=None + if minimized: + startupinfo = subprocess.STARTUPINFO() + startupinfo.dwFlags = subprocess.STARTF_USESHOWWINDOW + startupinfo.wShowWindow = 6 + + if pipe: + popen_obj = subprocess.Popen(cmd, shell=shell, startupinfo=startupinfo, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + else: + popen_obj = subprocess.Popen(cmd, shell=shell, startupinfo=startupinfo) + + return popen_obj + +def print_error(*args, **kwargs): + """Prints message to screen in RED.""" + print_standard(*args, color=COLORS['RED'], **kwargs) + +def print_info(*args, **kwargs): + """Prints message to screen in BLUE.""" + print_standard(*args, color=COLORS['BLUE'], **kwargs) + +def print_standard(message='Generic info', + color=None, end='\n', timestamp=True, **kwargs): + """Prints message to screen and log (if set).""" + display_message = message + if color: + display_message = color + message + COLORS['CLEAR'] + # **COLORS is used below to support non-"standard" color printing + print(display_message.format(**COLORS), end=end, **kwargs) + print_log(message, end, timestamp) + +def print_success(*args, **kwargs): + """Prints message to screen in GREEN.""" + print_standard(*args, color=COLORS['GREEN'], **kwargs) + +def print_warning(*args, **kwargs): + """Prints message to screen in YELLOW.""" + print_standard(*args, color=COLORS['YELLOW'], **kwargs) + +def print_log(message='', end='\n', timestamp=True): + time_str = time.strftime("%Y-%m-%d %H%M%z: ") if timestamp else '' + if 'LogFile' in global_vars and global_vars['LogFile'] is not None: + with open(global_vars['LogFile'], 'a') as f: + for line in message.splitlines(): + f.write('{timestamp}{line}{end}'.format( + timestamp = time_str, + line = line, + end = end)) + +def run_program(cmd, args=[], check=True, pipe=True, shell=False): + """Run program and return a subprocess.CompletedProcess object.""" + if args: + # Deprecated so let's raise an exception to find & fix all occurances + print_error('ERROR: Using args is no longer supported.') + raise Exception + cmd = [c for c in cmd if c] + if shell: + cmd = ' '.join(cmd) + + if pipe: + process_return = subprocess.run(cmd, check=check, shell=shell, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + else: + process_return = subprocess.run(cmd, check=check, shell=shell) + + return process_return + +def show_info(message='~Some message~', info='~Some info~', indent=8, width=32): + """Display info with formatting.""" + print_standard('{indent}{message:<{width}}{info}'.format( + indent=' '*indent, width=width, message=message, info=info)) + +def sleep(seconds=2): + """Wait for a while.""" + time.sleep(seconds) + +def stay_awake(): + """Prevent the system from sleeping or hibernating.""" + # Bail if caffeine is already running + for proc in psutil.process_iter(): + if proc.name() == 'caffeine.exe': + return + # Extract and run + extract_item('Caffeine', silent=True) + try: + popen_program(global_vars['Tools']['Caffeine']) + except Exception: + print_error('ERROR: No caffeine available.') + print_warning('Please set the power setting to High Performance.') + +def get_exception(s): + """Get exception by name, returns Exception object.""" + return getattr(sys.modules[__name__], s) + +def try_and_print(message='Trying...', + function=None, cs='CS', ns='NS', other_results={}, + catch_all=True, print_return=False, silent_function=True, + indent=8, width=32, *args, **kwargs): + """Run function, print if successful or not, and return dict. + + other_results is in the form of + { + 'Warning': {'ExceptionClassName': 'Result Message'}, + 'Error': {'ExceptionClassName': 'Result Message'} + } + The the ExceptionClassNames will be excepted conditions + and the result string will be printed in the correct color. + catch_all=False will result in unspecified exceptions being re-raised.""" + err = None + w_exceptions = other_results.get('Warning', {}).keys() + w_exceptions = tuple(get_exception(e) for e in w_exceptions) + e_exceptions = other_results.get('Error', {}).keys() + e_exceptions = tuple(get_exception(e) for e in e_exceptions) + w_results = other_results.get('Warning', {}) + e_results = other_results.get('Error', {}) + + # Run function and catch errors + print_standard('{indent}{message:<{width}}'.format( + indent=' '*indent, message=message, width=width), end='', flush=True) + try: + out = function(*args, **kwargs) + if print_return: + print_standard(out[0], timestamp=False) + for item in out[1:]: + print_standard('{indent}{item}'.format( + indent=' '*(indent+width), item=item)) + elif silent_function: + print_success(cs, timestamp=False) + except w_exceptions as e: + _result = w_results.get(e.__class__.__name__, 'Warning') + print_warning(_result, timestamp=False) + err = e + except e_exceptions as e: + _result = e_results.get(e.__class__.__name__, 'Error') + print_error(_result, timestamp=False) + err = e + except Exception: + print_error(ns, timestamp=False) + err = traceback.format_exc() + + # Return or raise? + if bool(err) and not catch_all: + raise + else: + return {'CS': not bool(err), 'Error': err} + +def upload_data(path, file): + """Add CLIENT_INFO_SERVER to authorized connections and upload file.""" + if not ENABLED_UPLOAD_DATA: + raise GenericError('Feature disabled.') + + extract_item('PuTTY', filter='wizkit.ppk psftp.exe', silent=True) + + # Authorize connection to the server + winreg.CreateKey(HKCU, r'Software\SimonTatham\PuTTY\SshHostKeys') + with winreg.OpenKey(HKCU, r'Software\SimonTatham\PuTTY\SshHostKeys', + access=winreg.KEY_WRITE) as key: + winreg.SetValueEx(key, + 'rsa2@22:{IP}'.format(**CLIENT_INFO_SERVER), 0, + winreg.REG_SZ, CLIENT_INFO_SERVER['RegEntry']) + + # Write batch file + with open(r'{TmpDir}\psftp.batch'.format(**global_vars), + 'w', encoding='ascii') as f: + f.write('lcd "{path}"\n'.format(path=path)) + f.write('cd "{Share}"\n'.format(**CLIENT_INFO_SERVER)) + f.write('mkdir {TicketNumber}\n'.format(**global_vars)) + f.write('cd {TicketNumber}\n'.format(**global_vars)) + f.write('put "{file}"\n'.format(file=file)) + + # Upload Info + cmd = [ + global_vars['Tools']['PuTTY-PSFTP'], + '-noagent', + '-i', r'{BinDir}\PuTTY\wizkit.ppk'.format(**global_vars), + '{User}@{IP}'.format(**CLIENT_INFO_SERVER), + '-b', r'{TmpDir}\psftp.batch'.format(**global_vars)] + run_program(cmd) + +def upload_info(): + """Upload compressed Info file to the NAS as set in settings.main.py.""" + if not ENABLED_UPLOAD_DATA: + raise GenericError('Feature disabled.') + + path = '{ClientDir}'.format(**global_vars) + file = 'Info_{Date-Time}.7z'.format(**global_vars) + upload_data(path, file) + +def compress_info(): + """Compress ClientDir info folders with 7-Zip for upload_info().""" + path = '{ClientDir}'.format(**global_vars) + file = 'Info_{Date-Time}.7z'.format(**global_vars) + _cmd = [ + global_vars['Tools']['SevenZip'], + 'a', '-t7z', '-mx=9', '-bso0', '-bse0', + r'{}\{}'.format(path, file), + r'{ClientDir}\Info'.format(**global_vars)] + run_program(_cmd) + +def wait_for_process(name, poll_rate=3): + """Wait for process by name.""" + running = True + while running: + sleep(poll_rate) + running = False + for proc in psutil.process_iter(): + if re.search(r'^{}'.format(name), proc.name(), re.IGNORECASE): + running = True + sleep(1) + +# global_vars functions +def init_global_vars(): + """Sets global variables based on system info.""" + print_info('Initializing') + os.system('title Wizard Kit') + init_functions = [ + ['Checking .bin...', find_bin], + ['Checking environment...', set_common_vars], + ['Checking OS...', check_os], + ['Checking tools...', check_tools], + ['Creating folders...', make_tmp_dirs], + ['Clearing collisions...', clean_env_vars], + ] + try: + for f in init_functions: + try_and_print( + message=f[0], function=f[1], + cs='Done', ns='Error', catch_all=False) + except: + major_exception() + +def check_os(): + """Set OS specific variables.""" + tmp = {} + + # Query registry + _reg_path = winreg.OpenKey( + HKLM, r'SOFTWARE\Microsoft\Windows NT\CurrentVersion') + for key in ['CSDVersion', 'CurrentBuild', 'CurrentBuildNumber', + 'CurrentVersion', 'ProductName']: + try: + tmp[key] = winreg.QueryValueEx(_reg_path, key)[0] + if key in ['CurrentBuild', 'CurrentBuildNumber']: + tmp[key] = int(tmp[key]) + except ValueError: + # Couldn't convert Build to int so this should be interesting... + tmp[key] = 0 + except Exception: + tmp[key] = 'Unknown' + + # Determine OS bit depth + tmp['Arch'] = 32 + if 'PROGRAMFILES(X86)' in global_vars['Env']: + tmp['Arch'] = 64 + + # Determine OS Name + tmp['Name'] = '{ProductName} {CSDVersion}'.format(**tmp) + if tmp['CurrentBuild'] == 9600: + tmp['Name'] += ' Update' # Win 8.1u + if tmp['CurrentBuild'] == 10240: + tmp['Name'] += ' Release 1507 "Threshold 1"' + if tmp['CurrentBuild'] == 10586: + tmp['Name'] += ' Release 1511 "Threshold 2"' + if tmp['CurrentBuild'] == 14393: + tmp['Name'] += ' Release 1607 "Redstone 1" / "Anniversary Update"' + if tmp['CurrentBuild'] == 15063: + tmp['Name'] += ' Release 1703 "Redstone 2" / "Creators Update"' + if tmp['CurrentBuild'] == 16299: + tmp['Name'] += ' Release 1709 "Redstone 3" / "Fall Creators Update"' + tmp['Name'] = tmp['Name'].replace('Service Pack ', 'SP') + tmp['Name'] = tmp['Name'].replace('Unknown Release', 'Release') + tmp['Name'] = re.sub(r'\s+', ' ', tmp['Name']) + + # Determine OS version + name = '{Name} x{Arch}'.format(**tmp) + if tmp['CurrentVersion'] == '6.0': + tmp['Version'] = 'Vista' + name += ' (very outdated)' + elif tmp['CurrentVersion'] == '6.1': + tmp['Version'] = '7' + if tmp['CSDVersion'] == 'Service Pack 1': + name += ' (outdated)' + else: + name += ' (very outdated)' + elif tmp['CurrentVersion'] in ['6.2', '6.3']: + if int(tmp['CurrentBuildNumber']) <= 9600: + tmp['Version'] = '8' + elif int(tmp['CurrentBuildNumber']) >= 10240: + tmp['Version'] = '10' + if tmp['CurrentBuild'] in [9200, 10240, 10586]: + name += ' (very outdated)' + elif tmp['CurrentBuild'] in [9600, 14393, 15063]: + name += ' (outdated)' + elif tmp['CurrentBuild'] == 16299: + pass # Current Win10 + else: + name += ' (unrecognized)' + tmp['DisplayName'] = name + + # == vista == + # 6.0.6000 + # 6.0.6001 + # 6.0.6002 + # ==== 7 ==== + # 6.1.7600 + # 6.1.7601 + # 6.1.7602 + # ==== 8 ==== + # 6.2.9200 + # === 8.1 === + # 6.3.9200 + # === 8.1u == + # 6.3.9600 + # === 10 v1507 "Threshold 1" == + # 6.3.10240 + # === 10 v1511 "Threshold 2" == + # 6.3.10586 + # === 10 v1607 "Redstone 1" "Anniversary Update" == + # 6.3.14393 + # === 10 v1703 "Redstone 2" "Creators Update" == + # 6.3.15063 + # === 10 v1709 "Redstone 3" "Fall Creators Update" == + # 6.3.16299 + global_vars['OS'] = tmp + +def check_tools(): + """Set tool variables based on OS bit-depth and tool availability.""" + if global_vars['OS'].get('Arch', 32) == 64: + global_vars['Tools'] = { + k: v.get('64', v.get('32')) for (k, v) in TOOLS.items()} + else: + global_vars['Tools'] = {k: v.get('32') for (k, v) in TOOLS.items()} + + # Fix paths + global_vars['Tools'] = {k: os.path.join(global_vars['BinDir'], v) + for (k, v) in global_vars['Tools'].items()} + +def clean_env_vars(): + """Remove conflicting global_vars and env variables. + + This fixes an issue where both global_vars and + global_vars['Env'] are expanded at the same time.""" + for key in global_vars.keys(): + global_vars['Env'].pop(key, None) + +def find_bin(): + """Find .bin folder in the cwd or it's parents.""" + wd = os.getcwd() + base = None + while base is None: + if os.path.exists('.bin'): + base = os.getcwd() + break + if re.fullmatch(r'\w:\\', os.getcwd()): + break + os.chdir('..') + os.chdir(wd) + if base is None: + raise BinNotFoundError + global_vars['BaseDir'] = base + +def make_tmp_dirs(): + """Make temp directories.""" + os.makedirs(global_vars['BackupDir'], exist_ok=True) + os.makedirs(global_vars['LogDir'], exist_ok=True) + os.makedirs(global_vars['TmpDir'], exist_ok=True) + +def set_common_vars(): + """Set common variables.""" + global_vars['Date'] = time.strftime("%Y-%m-%d") + global_vars['Date-Time'] = time.strftime("%Y-%m-%d_%H%M_%z") + global_vars['Env'] = os.environ.copy() + + global_vars['ArchivePassword'] = ARCHIVE_PASSWORD + global_vars['BinDir'] = r'{BaseDir}\.bin'.format( + **global_vars) + global_vars['CBinDir'] = r'{BaseDir}\.cbin'.format( + **global_vars) + global_vars['ClientDir'] = r'{SYSTEMDRIVE}\{prefix}'.format( + prefix=KIT_NAME_SHORT, **global_vars['Env']) + global_vars['BackupDir'] = r'{ClientDir}\Backups\{Date}'.format( + **global_vars) + global_vars['LogDir'] = r'{ClientDir}\Info\{Date}'.format( + **global_vars) + global_vars['ProgBackupDir'] = r'{ClientDir}\Backups'.format( + **global_vars) + global_vars['QuarantineDir'] = r'{ClientDir}\Quarantine'.format( + **global_vars) + global_vars['TmpDir'] = r'{BinDir}\tmp'.format( + **global_vars) + +if __name__ == '__main__': + print("This file is not meant to be called directly.") diff --git a/Scripts/functions/data.py b/Scripts/functions/data.py new file mode 100644 index 00000000..66797329 --- /dev/null +++ b/Scripts/functions/data.py @@ -0,0 +1,607 @@ +# Wizard Kit PE: Functions - Data + +import ctypes + +from operator import itemgetter + +from functions.common import * + +# Classes +class LocalDisk(): + def __init__(self, disk): + self.disk = disk + self.name = disk.mountpoint.upper() + self.path = self.name + def is_dir(self): + # Should always be true + return True + +# Regex +REGEX_EXCL_ITEMS = re.compile( + r'^(\.(AppleDB|AppleDesktop|AppleDouble' + r'|com\.apple\.timemachine\.supported|dbfseventsd' + r'|DocumentRevisions-V100.*|DS_Store|fseventsd|PKInstallSandboxManager' + r'|Spotlight.*|SymAV.*|symSchedScanLockxz|TemporaryItems|Trash.*' + r'|vol|VolumeIcon\.icns)|desktop\.(ini|.*DB|.*DF)' + r'|(hiberfil|pagefile)\.sys|lost\+found|Network\.*Trash\.*Folder' + r'|Recycle[dr]|System\.*Volume\.*Information|Temporary\.*Items' + r'|Thumbs\.db)$', + re.IGNORECASE) +REGEX_EXCL_ROOT_ITEMS = re.compile( + r'^\\?(boot(mgr|nxt)$|Config.msi' + r'|(eula|globdata|install|vc_?red)' + r'|.*.sys$|System Volume Information|RECYCLER?|\$Recycle\.bin' + r'|\$?Win(dows(.old.*|\.~BT|)$|RE_)|\$GetCurrent|Windows10Upgrade' + r'|PerfLogs|Program Files|SYSTEM.SAV' + r'|.*\.(esd|swm|wim|dd|map|dmg|image)$)', + re.IGNORECASE) +REGEX_INCL_ROOT_ITEMS = re.compile( + r'^\\?(AdwCleaner|(My\s*|)(Doc(uments?( and Settings|)|s?)|Downloads' + r'|Media|Music|Pic(ture|)s?|Vid(eo|)s?)' + r'|{prefix}(-?Info|-?Transfer|)' + r'|(ProgramData|Recovery|Temp.*|Users)$' + r'|.*\.(log|txt|rtf|qb\w*|avi|m4a|m4v|mp4|mkv|jpg|png|tiff?)$)' + r''.format(prefix=KIT_NAME_SHORT), + re.IGNORECASE) +REGEX_WIM_FILE = re.compile( + r'\.wim$', + re.IGNORECASE) +REGEX_WINDOWS_OLD = re.compile( + r'^\\Win(dows|)\.old', + re.IGNORECASE) + +# STATIC VARIABLES +FAST_COPY_EXCLUDES = [ + r'\*.esd', + r'\*.swm', + r'\*.wim', + r'\*.dd', + r'\*.dd.tgz', + r'\*.dd.txz', + r'\*.map', + r'\*.dmg', + r'\*.image', + r'$RECYCLE.BIN', + r'$Recycle.Bin', + r'.AppleDB', + r'.AppleDesktop', + r'.AppleDouble', + r'.com.apple.timemachine.supported', + r'.dbfseventsd', + r'.DocumentRevisions-V100*', + r'.DS_Store', + r'.fseventsd', + r'.PKInstallSandboxManager', + r'.Spotlight*', + r'.SymAV*', + r'.symSchedScanLockxz', + r'.TemporaryItems', + r'.Trash*', + r'.vol', + r'.VolumeIcon.icns', + r'desktop.ini', + r'Desktop?DB', + r'Desktop?DF', + r'hiberfil.sys', + r'lost+found', + r'Network?Trash?Folder', + r'pagefile.sys', + r'Recycled', + r'RECYCLER', + r'System?Volume?Information', + r'Temporary?Items', + r'Thumbs.db', + ] +FAST_COPY_ARGS = [ + '/cmd=noexist_only', + '/utf8', + '/skip_empty_dir', + '/linkdest', + '/no_ui', + '/auto_close', + '/exclude={}'.format(';'.join(FAST_COPY_EXCLUDES)), + ] +# Code borrowed from: https://stackoverflow.com/a/29075319 +SEM_NORMAL = ctypes.c_uint() +SEM_FAILCRITICALERRORS = 1 +SEM_NOOPENFILEERRORBOX = 0x8000 +SEM_FAIL = SEM_NOOPENFILEERRORBOX | SEM_FAILCRITICALERRORS + +def cleanup_transfer(dest_path): + """Fix attributes and move extraneous items outside the Transfer folder.""" + try: + # Remove dest_path if empty + os.rmdir(dest_path) + except OSError: + pass + if not os.path.exists(dest_path): + # Bail if dest_path was empty and removed + raise Exception + + # Fix attributes + cmd = ['attrib', '-a', '-h', '-r', '-s', dest_path] + run_program(cmd, check=False) + + for root, dirs, files in os.walk(dest_path, topdown=False): + for name in dirs: + # Remove empty directories and junction points + try: + os.rmdir(os.path.join(root, name)) + except OSError: + pass + for name in files: + # "Remove" files based on exclusion regex + if REGEX_EXCL_ITEMS.search(name): + # Make dest folder + dest_name = root.replace(dest_path, dest_path+'.Removed') + os.makedirs(dest_name, exist_ok=True) + # Set dest filename + dest_name = os.path.join(dest_name, name) + dest_name = non_clobber_rename(dest_name) + source_name = os.path.join(root, name) + try: + shutil.move(source_name, dest_name) + except Exception: + pass + +def is_valid_wim_file(item): + """Checks if the provided os.DirEntry is a valid WIM file, returns bool.""" + valid = bool(item.is_file() and REGEX_WIM_FILE.search(item.name)) + if valid: + extract_item('wimlib', silent=True) + cmd = [global_vars['Tools']['wimlib-imagex'], 'info', item.path] + try: + run_program(cmd) + except subprocess.CalledProcessError: + valid = False + print_log('WARNING: Image "{}" damaged.'.format(item.name)) + return valid + +def mount_backup_shares(): + """Mount the backup shares unless labeled as already mounted.""" + for server in BACKUP_SERVERS: + # Blindly skip if we mounted earlier + if server['Mounted']: + continue + + mount_network_share(server) + +def mount_network_share(server): + """Mount a network share defined by server.""" + # Test connection + try: + ping(server['IP']) + except subprocess.CalledProcessError: + print_error( + r'Failed to mount \\{Name}\{Share}, {IP} unreachable.'.format( + **server)) + sleep(1) + return False + + # Mount + cmd = r'net use \\{IP}\{Share} /user:{User} {Pass}'.format(**server) + cmd = cmd.split(' ') + try: + run_program(cmd) + except Exception: + print_warning(r'Failed to mount \\{Name}\{Share} ({IP})'.format( + **server)) + sleep(1) + else: + print_info('Mounted {Name}'.format(**server)) + server['Mounted'] = True + +def run_fast_copy(items, dest): + """Copy items to dest using FastCopy.""" + if not items: + raise Exception + + cmd = [global_vars['Tools']['FastCopy'], *FAST_COPY_ARGS] + if 'LogFile' in global_vars: + cmd.append('/logfile={LogFile}'.format(**global_vars)) + cmd.extend(items) + cmd.append('/to={}\\'.format(dest)) + + run_program(cmd) + +def run_wimextract(source, items, dest): + """Extract items from source WIM to dest folder.""" + if not items: + raise Exception + extract_item('wimlib', silent=True) + + # Write files.txt + with open(r'{TmpDir}\wim_files.txt'.format(**global_vars), 'w') as f: + # Defaults + for item in items: + f.write('{item}\n'.format(item=item)) + sleep(1) # For safety? + + # Extract files + cmd = [ + global_vars['Tools']['wimlib-imagex'], + 'extract', + source, '1', + r'@{TmpDir}\wim_files.txt'.format(**global_vars), + '--dest-dir={}\\'.format(dest), + '--no-acls', + '--nullglob'] + run_program(cmd) + +def scan_source(source_obj, dest_path): + """Scan source for files/folders to transfer.""" + selected_items = [] + + if source_obj.is_dir(): + # File-Based + print_standard('Scanning source (folder): {}'.format(source_obj.path)) + selected_items = scan_source_path(source_obj.path, dest_path) + else: + # Image-Based + if REGEX_WIM_FILE.search(source_obj.name): + print_standard('Scanning source (image): {}'.format( + source_obj.path)) + selected_items = scan_source_wim(source_obj.path, dest_path) + else: + print_error('ERROR: Unsupported image: {}'.format( + source_obj.path)) + raise GenericError + + return selected_items + +def scan_source_path(source_path, dest_path, rel_path=None, interactive=True): + """Scan source folder for files/folders to transfer, returns list. + + This will scan the root and (recursively) any Windows.old folders.""" + rel_path = '\\' + rel_path if rel_path else '' + if rel_path: + dest_path = dest_path + rel_path + selected_items = [] + win_olds = [] + + # Root items + root_items = [] + for item in os.scandir(source_path): + if REGEX_INCL_ROOT_ITEMS.search(item.name): + root_items.append(item.path) + elif not REGEX_EXCL_ROOT_ITEMS.search(item.name): + if (not interactive + or ask('Copy: "{}{}" ?'.format(rel_path, item.name))): + root_items.append(item.path) + if REGEX_WINDOWS_OLD.search(item.name): + win_olds.append(item) + if root_items: + selected_items.append({ + 'Message': '{}Root Items...'.format(rel_path), + 'Items': root_items.copy(), + 'Destination': dest_path}) + + # Fonts + if os.path.exists(r'{}\Windows\Fonts'.format(source_path)): + selected_items.append({ + 'Message': '{}Fonts...'.format(rel_path), + 'Items': [r'{}\Windows\Fonts'.format(rel_path)], + 'Destination': r'{}\Windows'.format(dest_path)}) + + # Registry + registry_items = [] + for folder in ['config', 'OEM']: + folder = r'Windows\System32\{}'.format(folder) + folder = os.path.join(source_path, folder) + if os.path.exists(folder): + registry_items.append(folder) + if registry_items: + selected_items.append({ + 'Message': '{}Registry...'.format(rel_path), + 'Items': registry_items.copy(), + 'Destination': r'{}\Windows\System32'.format(dest_path)}) + + # Windows.old(s) + for old in win_olds: + selected_items.append( + scan_source_path( + old.path, dest_path, rel_path=old.name, interactive=False)) + + # Done + return selected_items + +def scan_source_wim(source_wim, dest_path, rel_path=None, interactive=True): + """Scan source WIM file for files/folders to transfer, returns list. + + This will scan the root and (recursively) any Windows.old folders.""" + rel_path = '\\' + rel_path if rel_path else '' + selected_items = [] + win_olds = [] + + # Scan source + extract_item('wimlib', silent=True) + cmd = [ + global_vars['Tools']['wimlib-imagex'], 'dir', + source_wim, '1'] + try: + file_list = run_program(cmd) + except subprocess.CalledProcessError: + print_error('ERROR: Failed to get file list.') + raise + + # Root Items + file_list = [i.strip() + for i in file_list.stdout.decode('utf-8', 'ignore').splitlines() + if i.count('\\') == 1 and i.strip() != '\\'] + root_items = [] + if rel_path: + file_list = [i.replace(rel_path, '') for i in file_list] + for item in file_list: + if REGEX_INCL_ROOT_ITEMS.search(item): + root_items.append(item) + elif not REGEX_EXCL_ROOT_ITEMS.search(item): + if (not interactive + or ask('Extract: "{}{}" ?'.format(rel_path, item))): + root_items.append('{}{}'.format(rel_path, item)) + if REGEX_WINDOWS_OLD.search(item): + win_olds.append(item) + if root_items: + selected_items.append({ + 'Message': '{}Root Items...'.format(rel_path), + 'Items': root_items.copy(), + 'Destination': dest_path}) + + # Fonts + if wim_contains(source_wim, r'{}Windows\Fonts'.format(rel_path)): + selected_items.append({ + 'Message': '{}Fonts...'.format(rel_path), + 'Items': [r'{}\Windows\Fonts'.format(rel_path)], + 'Destination': dest_path}) + + # Registry + registry_items = [] + for folder in ['config', 'OEM']: + folder = r'{}Windows\System32\{}'.format(rel_path, folder) + if wim_contains(source_wim, folder): + registry_items.append(folder) + if registry_items: + selected_items.append({ + 'Message': '{}Registry...'.format(rel_path), + 'Items': registry_items.copy(), + 'Destination': dest_path}) + + # Windows.old(s) + for old in win_olds: + scan_source_wim(source_wim, dest_path, rel_path=old, interactive=False) + + # Done + return selected_items + +def select_destination(folder_path, prompt='Select destination'): + """Select destination drive, returns path as string.""" + disk = select_disk(prompt) + if 'fixed' not in disk['Disk'].opts: + folder_path = folder_path.replace('\\', '-') + path = '{disk}{folder_path}_{Date}'.format( + disk = disk['Disk'].mountpoint, + folder_path = folder_path, + **global_vars) + + # Avoid merging with existing folder + path = non_clobber_rename(path) + os.makedirs(path, exist_ok=True) + + return path + +def select_disk(title='Select disk', auto_select=True): + """Select disk from attached disks. returns dict.""" + actions = [{'Name': 'Quit', 'Letter': 'Q'}] + disks = [] + + # Build list of disks + set_thread_error_mode(silent=True) # Prevents "No disk" popups + for d in psutil.disk_partitions(): + info = { + 'Disk': d, + 'Name': d.mountpoint} + try: + usage = psutil.disk_usage(d.device) + free = '{free} / {total} available'.format( + free = human_readable_size(usage.free, 2), + total = human_readable_size(usage.total, 2)) + except Exception: + # Meh, leaving unsupported destinations out + pass + # free = 'Unknown' + # info['Disabled'] = True + else: + info['Display Name'] = '{} ({})'.format(info['Name'], free) + disks.append(info) + set_thread_error_mode(silent=False) # Return to normal + + # Skip menu? + if len(disks) == 1 and auto_select: + return disks[0] + + # Show menu + selection = menu_select(title, main_entries=disks, action_entries=actions) + if selection == 'Q': + exit_script() + else: + return disks[int(selection)-1] + +def select_source(ticket_number): + """Select backup from those found on the BACKUP_SERVERS for the ticket.""" + selected_source = None + sources = [] + mount_backup_shares() + + # Check for ticket folders on servers + for server in BACKUP_SERVERS: + if server['Mounted']: + print_standard('Scanning {}...'.format(server['Name'])) + for d in os.scandir(r'\\{IP}\{Share}'.format(**server)): + if (d.is_dir() + and d.name.lower().startswith(ticket_number.lower())): + # Add folder to sources + sources.append({ + 'Name': '{:9}| File-Based: [DIR] {}'.format( + server['Name'], d.name), + 'Server': server, + 'Source': d}) + + # Check for images and subfolders + for ticket_path in sources.copy(): + for item in os.scandir(ticket_path['Source'].path): + if item.is_dir(): + # Add folder to sources + sources.append({ + 'Name': r'{:9}| File-Based: [DIR] {}\{}'.format( + ticket_path['Server']['Name'], # Server + ticket_path['Source'].name, # Ticket folder + item.name, # Sub-folder + ), + 'Server': ticket_path['Server'], + 'Source': item}) + + # Check for images in folder + for subitem in os.scandir(item.path): + if REGEX_WIM_FILE.search(item.name): + # Add image to sources + try: + size = human_readable_size(item.stat().st_size) + except Exception: + size = ' ? ?' # unknown + sources.append({ + 'Disabled': bool(not is_valid_wim_file(subitem)), + 'Name': r'{:9}| Image-Based: {:>7} {}\{}\{}'.format( + ticket_path['Server']['Name'], # Server + size, # Size (duh) + ticket_path['Source'].name, # Ticket folder + item.name, # Sub-folder + subitem.name, # Image file + ), + 'Server': ticket_path['Server'], + 'Source': subitem}) + elif REGEX_WIM_FILE.search(item.name): + # Add image to sources + try: + size = human_readable_size(item.stat().st_size) + except Exception: + size = ' ? ?' # unknown + sources.append({ + 'Disabled': bool(not is_valid_wim_file(item)), + 'Name': r'{:9}| Image-Based: {:>7} {}\{}'.format( + ticket_path['Server']['Name'], # Server + size, # Size (duh) + ticket_path['Source'].name, # Ticket folder + item.name, # Image file + ), + 'Server': ticket_path['Server'], + 'Source': item}) + # Check for local sources + print_standard('Scanning for local sources...') + set_thread_error_mode(silent=True) # Prevents "No disk" popups + sys_drive = global_vars['Env']['SYSTEMDRIVE'] + for d in psutil.disk_partitions(): + if re.search(r'^{}'.format(sys_drive), d.mountpoint, re.IGNORECASE): + # Skip current OS drive + continue + if 'fixed' in d.opts: + # Skip DVD, etc + sources.append({ + 'Name': '{:9}| File-Based: [DISK] {}'.format( + ' Local', d.mountpoint), + 'Source': LocalDisk(d)}) + set_thread_error_mode(silent=False) # Return to normal + + # Build Menu + sources.sort(key=itemgetter('Name')) + actions = [{'Name': 'Quit', 'Letter': 'Q'}] + + # Select backup from sources + if len(sources) > 0: + selection = menu_select( + 'Which backup are we using?', + main_entries=sources, + action_entries=actions, + disabled_label='DAMAGED') + if selection == 'Q': + umount_backup_shares() + exit_script() + else: + selected_source = sources[int(selection)-1]['Source'] + else: + print_error('ERROR: No backups found for ticket: {}.'.format( + ticket_number)) + umount_backup_shares() + pause("Press Enter to exit...") + exit_script() + + # Done + return selected_source + +def set_thread_error_mode(silent=True): + """Disable or Enable Windows error message dialogs. + + Disable when scanning for disks to avoid popups for empty cardreaders, etc + """ + # Code borrowed from: https://stackoverflow.com/a/29075319 + kernel32 = ctypes.WinDLL('kernel32') + + if silent: + kernel32.SetThreadErrorMode(SEM_FAIL, ctypes.byref(SEM_NORMAL)) + else: + kernel32.SetThreadErrorMode(SEM_NORMAL, ctypes.byref(SEM_NORMAL)) + +def transfer_source(source_obj, dest_path, selected_items): + """Transfer, or extract, files/folders from source to destination.""" + if source_obj.is_dir(): + # Run FastCopy for each selection "group" + for group in selected_items: + try_and_print(message=group['Message'], + function=run_fast_copy, cs='Done', + items=group['Items'], + dest=group['Destination']) + else: + if REGEX_WIM_FILE.search(source_obj.name): + # Extract files from WIM + for group in selected_items: + try_and_print(message=group['Message'], + function=run_wimextract, cs='Done', + source=source_obj.path, + items=group['Items'], + dest=group['Destination']) + else: + print_error('ERROR: Unsupported image: {}'.format(source_obj.path)) + raise GenericError + +def umount_backup_shares(): + """Unnount the backup shares regardless of current status.""" + for server in BACKUP_SERVERS: + umount_network_share(server) + +def umount_network_share(server): + """Unnount a network share defined by server.""" + cmd = r'net use \\{IP}\{Share} /delete'.format(**server) + cmd = cmd.split(' ') + try: + run_program(cmd) + except Exception: + print_error(r'Failed to umount \\{Name}\{Share}.'.format(**server)) + sleep(1) + else: + print_info('Umounted {Name}'.format(**server)) + server['Mounted'] = False + +def wim_contains(source_path, file_path): + """Check if the WIM contains a file or folder.""" + _cmd = [ + global_vars['Tools']['wimlib-imagex'], 'dir', + source_path, '1', + '--path={}'.format(file_path), + '--one-file-only'] + try: + run_program(_cmd) + except subprocess.CalledProcessError: + return False + else: + return True + +if __name__ == '__main__': + print("This file is not meant to be called directly.") diff --git a/Scripts/partition_uids.py b/Scripts/functions/partition_uids.py similarity index 98% rename from Scripts/partition_uids.py rename to Scripts/functions/partition_uids.py index 1484e2af..e31117db 100644 --- a/Scripts/partition_uids.py +++ b/Scripts/functions/partition_uids.py @@ -1,4 +1,4 @@ -# WK WinPE - PARTITION UIDs +# Wizard Kit PE: Functions - PARTITION UIDs # sources: https://en.wikipedia.org/wiki/GUID_Partition_Table # https://en.wikipedia.org/wiki/Partition_type diff --git a/Scripts/functions/windows_setup.py b/Scripts/functions/windows_setup.py new file mode 100644 index 00000000..e70e88cf --- /dev/null +++ b/Scripts/functions/windows_setup.py @@ -0,0 +1,273 @@ +# Wizard Kit PE: Functions - Windows Setup + +from functions.common import * + +# STATIC VARIABLES +DISKPART_SCRIPT = '{tmp}\\diskpart.script'.format(tmp=os.environ['TMP']) +WINDOWS_VERSIONS = [ + {'Name': 'Windows 7 Home Basic', + 'Image File': 'Win7', + 'Image Name': 'Windows 7 HOMEBASIC', + 'Family': '7'}, + {'Name': 'Windows 7 Home Premium', + 'Image File': 'Win7', + 'Image Name': 'Windows 7 HOMEPREMIUM', + 'Family': '7'}, + {'Name': 'Windows 7 Professional', + 'Image File': 'Win7', + 'Image Name': 'Windows 7 PROFESSIONAL', + 'Family': '7'}, + {'Name': 'Windows 7 Ultimate', + 'Image File': 'Win7', + 'Image Name': 'Windows 7 ULTIMATE', + 'Family': '7'}, + + {'Name': 'Windows 8.1', + 'Image File': 'Win8', + 'Image Name': 'Windows 8.1', + 'Family': '8', + 'CRLF': True}, + {'Name': 'Windows 8.1 Pro', + 'Image File': 'Win8', + 'Image Name': 'Windows 8.1 Pro', + 'Family': '8'}, + + {'Name': 'Windows 10 Home', + 'Image File': 'Win10', + 'Image Name': 'Windows 10 Home', + 'Family': '10', + 'CRLF': True}, + {'Name': 'Windows 10 Pro', + 'Image File': 'Win10', + 'Image Name': 'Windows 10 Pro', + 'Family': '10'}, + ] + +def is_index_in_image(bin=None, filename=None, imagename=None): + # Bail early + if bin is None: + raise Exception('bin not specified.') + if filename is None: + raise Exception('Filename not specified.') + if imagename is None: + raise Exception('Image Name not specified.') + + cmd = '{bin}\\wimlib\\wimlib-imagex info "{filename}" "{imagename}"'.format(bin=bin, filename=filename, imagename=imagename) + try: + run_program(cmd) + except subprocess.CalledProcessError: + print_error('Invalid image: {filename}'.format(filename=filename)) + return False + + return True + +def find_windows_image(bin, windows_version=None): + """Search for a Windows source image file on local drives and network drives (in that order)""" + image = {} + + # Bail early + if windows_version is None: + raise Exception('Windows version not specified.') + imagefile = windows_version['Image File'] + + # Search local source + process_return = run_program('mountvol') + for tmp in re.findall(r'.*([A-Za-z]):\\', process_return.stdout.decode()): + for ext in ['esd', 'wim', 'swm']: + filename = '{drive}:\\images\\{imagefile}'.format(drive=tmp[0], imagefile=imagefile) + filename_ext = '{filename}.{ext}'.format(filename=filename, ext=ext) + if os.path.isfile(filename_ext): + if is_index_in_image(bin, filename_ext, windows_version['Image Name']): + image['Ext'] = ext + image['File'] = filename + image['Glob'] = '--ref="{File}*.swm"'.format(**image) if ext == 'swm' else '' + image['Source'] = tmp[0] + break + + # Check for network source (if necessary) + if not any(image): + if not WINDOWS_SERVER['Mounted']: + mount_windows_share() + for ext in ['esd', 'wim', 'swm']: + filename = '\\\\{IP}\\{Share}\\images\\{imagefile}'.format(imagefile=imagefile, **WINDOWS_SERVER) + filename_ext = '{filename}.{ext}'.format(filename=filename, ext=ext) + if os.path.isfile(filename_ext): + if is_index_in_image(bin, filename_ext, windows_version['Image Name']): + image['Ext'] = ext + image['File'] = filename + image['Glob'] = '--ref="{File}*.swm"'.format(**image) if ext == 'swm' else '' + image['Source'] = None + break + + # Display image to be used (if any) and return + if any(image): + print_info('Using image: {File}.{Ext}'.format(**image)) + return image + else: + print_error('Failed to find Windows source image for {winver}'.format(winver=windows_version['Name'])) + abort_to_main_menu('Aborting Windows setup') + +def format_gpt(disk=None, windows_family=None): + """Format disk for use as a Windows OS drive using the GPT (UEFI) layout.""" + + # Bail early + if disk is None: + raise Exception('No disk provided.') + if windows_family is None: + raise Exception('No Windows family provided.') + + # Format drive + # print_info('Drive will use a GPT (UEFI) layout.') + with open(DISKPART_SCRIPT, 'w') as script: + # Partition table + script.write('select disk {number}\n'.format(number=disk['Number'])) + script.write('clean\n') + script.write('convert gpt\n') + + # System partition + script.write('create partition efi size=260\n') # NOTE: Allows for Advanced Format 4K drives + script.write('format quick fs=fat32 label="System"\n') + script.write('assign letter="S"\n') + + # Microsoft Reserved (MSR) partition + script.write('create partition msr size=128\n') + + # Windows partition + script.write('create partition primary\n') + script.write('format quick fs=ntfs label="Windows"\n') + script.write('assign letter="W"\n') + + # Recovery Tools partition (Windows 8+) + if re.search(r'^(8|10)', windows_family): + script.write('shrink minimum=500\n') + script.write('create partition primary\n') + script.write('format quick fs=ntfs label="Recovery Tools"\n') + script.write('assign letter="T"\n') + script.write('set id="de94bba4-06d1-4d40-a16a-bfd50179d6ac"\n') + script.write('gpt attributes=0x8000000000000001\n') + + # Run script + run_program('diskpart /s {script}'.format(script=DISKPART_SCRIPT)) + time.sleep(2) + +def format_mbr(disk=None, windows_family=None): + """Format disk for use as a Windows OS drive using the MBR (legacy) layout.""" + + # Bail early + if disk is None: + raise Exception('No disk provided.') + if windows_family is None: + raise Exception('No Windows family provided.') + + # Format drive + # print_info('Drive will use a MBR (legacy) layout.') + with open(DISKPART_SCRIPT, 'w') as script: + # Partition table + script.write('select disk {number}\n'.format(number=disk['Number'])) + script.write('clean\n') + + # System partition + script.write('create partition primary size=100\n') + script.write('format fs=ntfs quick label="System Reserved"\n') + script.write('active\n') + script.write('assign letter="S"\n') + + # Windows partition + script.write('create partition primary\n') + script.write('format fs=ntfs quick label="Windows"\n') + script.write('assign letter="W"\n') + + # Recovery Tools partition (Windows 8+) + if re.search(r'^(8|10)', windows_family): + script.write('shrink minimum=500\n') + script.write('create partition primary\n') + script.write('format quick fs=ntfs label="Recovery"\n') + script.write('assign letter="T"\n') + script.write('set id=27\n') + + # Run script + run_program('diskpart /s {script}'.format(script=DISKPART_SCRIPT)) + time.sleep(2) + +def get_boot_mode(): + boot_mode = 'Legacy' + try: + reg_key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, 'System\\CurrentControlSet\\Control') + reg_value = winreg.QueryValueEx(reg_key, 'PEFirmwareType')[0] + if reg_value == 2: + boot_mode = 'UEFI' + except: + boot_mode = 'Unknown' + + return boot_mode + +def mount_windows_share(): + """Mount the Windows images share for use in Windows setup""" + + # Blindly skip if we mounted earlier + if WINDOWS_SERVER['Mounted']: + return None + else: + try: + # Test connection + run_program('ping -w 800 -n 2 {IP}'.format(**WINDOWS_SERVER)) + # Mount + run_program('net use \\\\{IP}\\{Share} /user:{User} {Pass}'.format(**WINDOWS_SERVER)) + print_info('Mounted {Name}'.format(**WINDOWS_SERVER)) + WINDOWS_SERVER['Mounted'] = True + except subprocess.CalledProcessError: + print_error('Failed to mount \\\\{Name}\\{Share}, {IP} unreachable.'.format(**WINDOWS_SERVER)) + time.sleep(1) + except: + print_warning('Failed to mount \\\\{Name}\\{Share}'.format(**WINDOWS_SERVER)) + time.sleep(1) + +def select_windows_version(): + actions = [{'Name': 'Main Menu', 'Letter': 'M'},] + + # Menu loop + selection = menu_select('Which version of Windows are we installing?', WINDOWS_VERSIONS, actions) + + if selection.isnumeric(): + return WINDOWS_VERSIONS[int(selection)-1] + elif selection == 'M': + abort_to_main_menu() + +def setup_windows(bin=None, windows_image=None, windows_version=None): + # Bail early + if bin is None: + raise Exception('bin path not specified.') + if windows_image is None: + raise Exception('Windows image not specified.') + if windows_version is None: + raise Exception('Windows version not specified.') + + # Apply image + cmd = '{bin}\\wimlib\\wimlib-imagex apply "{File}.{Ext}" "{Image Name}" W:\\ {Glob}'.format(bin=bin, **windows_image, **windows_version) + run_program(cmd) + +def setup_windows_re(windows_version=None, windows_letter='W', tools_letter='T'): + # Bail early + if windows_version is None: + raise Exception('Windows version not specified.') + + _win = '{win}:\\Windows'.format(win=windows_letter) + _winre = '{win}\\System32\\Recovery\\WinRE.wim'.format(win=_win) + _dest = '{tools}:\\Recovery\\WindowsRE'.format(tools=tools_letter) + + if re.search(r'^(8|10)', windows_version['Family']): + # Copy WinRE.wim + os.makedirs(_dest, exist_ok=True) + shutil.copy(_winre, '{dest}\\WinRE.wim'.format(dest=_dest)) + + # Set location + run_program('{win}\\System32\\reagentc /setreimage /path {dest} /target {win}'.format(dest=_dest, win=_win)) + else: + # Only supported on Windows 8 and above + raise SetupError + +def update_boot_partition(system_letter='S', windows_letter='W', mode='ALL'): + run_program('bcdboot {win}:\\Windows /s {sys}: /f {mode}'.format(win=windows_letter, sys=system_letter, mode=mode)) + +if __name__ == '__main__': + print("This file is not meant to be called directly.") diff --git a/Scripts/menu.py b/Scripts/functions/winpe_menus.py similarity index 66% rename from Scripts/menu.py rename to Scripts/functions/winpe_menus.py index 7a72f729..21d2e11b 100644 --- a/Scripts/menu.py +++ b/Scripts/functions/winpe_menus.py @@ -1,38 +1,66 @@ -# WK WinPE Menu +# Wizard Kit PE: Menus -# Init -import os -import re -import subprocess -import sys -import time -import traceback -os.chdir(os.path.dirname(os.path.realpath(__file__))) -bin = os.path.abspath('..\\') -sys.path.append(os.getcwd()) -from functions import * +from functions.data import * -## Colors -COLORS = { - 'CLEAR': '\033[0m', - 'RED': '\033[31m', - 'GREEN': '\033[32m', - 'YELLOW': '\033[33m', - 'BLUE': '\033[34m'} +# STATIC VARIABLES +FAST_COPY_ARGS = [ + '/cmd=noexist_only', + '/utf8', + '/skip_empty_dir', + '/linkdest', + '/no_ui', + '/auto_close', + '/exclude={}'.format(';'.join(FAST_COPY_EXCLUDES)), + ] +PE_TOOLS = { + 'BlueScreenView': { + 'Path': r'BlueScreenView\BlueScreenView.exe', + }, + 'FastCopy': { + 'Path': r'FastCopy\FastCopy.exe', + 'Args': FAST_COPY_ARGS, + }, + 'HWiNFO': { + 'Path': r'HWiNFO\HWiNFO.exe', + }, + 'NT Password Editor': { + 'Path': r'NT Password Editor\ntpwedit.exe', + }, + 'Notepad++': { + 'Path': r'NotepadPlusPlus\NotepadPlusPlus.exe', + }, + 'PhotoRec': { + 'Path': r'TestDisk\photorec_win.exe', + 'Args': ['-new_console:n'], + }, + 'Prime95': { + 'Path': r'Prime95\prime95.exe', + }, + 'ProduKey': { + 'Path': r'ProduKey\ProduKey.exe', + }, + 'Q-Dir': { + 'Path': r'Q-Dir\Q-Dir.exe', + }, + 'TestDisk': { + 'Path': r'TestDisk\testdisk_win.exe', + 'Args': ['-new_console:n'], + }, + } -def menu_backup_imaging(): +def menu_backup(): """Take backup images of partition(s) in the WIM format and save them to a backup share""" errors = False # Set ticket ID os.system('cls') - ticket_id = get_ticket_id() + ticket_id = get_ticket_number() # Mount backup shares mount_backup_shares() # Select destination - dest = select_destination() + dest = select_backup_destination() if dest is None: abort_to_main_menu('Aborting Backup Creation') @@ -90,13 +118,48 @@ def menu_backup_imaging(): time.sleep(5) pause('\nPress Enter to return to main menu... ') -def menu_windows_setup(): +def menu_root(): + title = '{}: Main Menu'.format(KIT_NAME_FULL) + menus = [ + {'Name': 'Create Backups', 'Menu': menu_backup}, + {'Name': 'Setup Windows', 'Menu': menu_setup}, + {'Name': 'Misc Tools', 'Menu': menu_tools}, + ] + actions = [ + {'Name': 'Command Prompt', 'Letter': 'C'}, + {'Name': 'Reboot', 'Letter': 'R'}, + {'Name': 'Shutdown', 'Letter': 'S'}, + ] + + # Main loop + while True: + selection = menu_select( + title=title, + main_entries=menus, + action_entries=actions, + secret_exit=True) + + if (selection.isnumeric()): + try: + menus[int(selection)-1]['Menu']() + except AbortError: + pass + elif (selection == 'C'): + run_program(['cmd', '-new_console:n'], check=False) + elif (selection == 'R'): + run_program(['wpeutil', 'reboot']) + elif (selection == 'S'): + run_program(['wpeutil', 'shutdown']) + else: + exit_script() + +def menu_setup(): """Format a drive, partition for MBR or GPT, apply a Windows image, and rebuild the boot files""" errors = False # Set ticket ID os.system('cls') - ticket_id = get_ticket_id() + ticket_id = get_ticket_number() # Select the version of Windows to apply windows_version = select_windows_version() @@ -197,80 +260,55 @@ def menu_windows_setup(): pause('\nPress Enter to return to main menu... ') def menu_tools(): - tools = [ - {'Name': 'Blue Screen View', 'Folder': 'BlueScreenView', 'File': 'BlueScreenView.exe'}, - {'Name': 'Fast Copy', 'Folder': 'FastCopy', 'File': 'FastCopy.exe', 'Args': ['/log', '/logfile=X:\WK\Info\FastCopy.log', '/cmd=noexist_only', '/utf8', '/skip_empty_dir', '/linkdest', '/open_window', '/balloon=FALSE', r'/exclude=$RECYCLE.BIN;$Recycle.Bin;.AppleDB;.AppleDesktop;.AppleDouble;.com.apple.timemachine.supported;.dbfseventsd;.DocumentRevisions-V100*;.DS_Store;.fseventsd;.PKInstallSandboxManager;.Spotlight*;.SymAV*;.symSchedScanLockxz;.TemporaryItems;.Trash*;.vol;.VolumeIcon.icns;desktop.ini;Desktop?DB;Desktop?DF;hiberfil.sys;lost+found;Network?Trash?Folder;pagefile.sys;Recycled;RECYCLER;System?Volume?Information;Temporary?Items;Thumbs.db']}, - {'Name': 'HWiNFO', 'Folder': 'HWiNFO', 'File': 'HWiNFO.exe'}, - {'Name': 'NT Password Editor', 'Folder': 'NT Password Editor', 'File': 'ntpwedit.exe'}, - {'Name': 'Notepad++', 'Folder': 'NotepadPlusPlus', 'File': 'notepadplusplus.exe'}, - {'Name': 'PhotoRec', 'Folder': 'TestDisk', 'File': 'photorec_win.exe', 'Args': ['-new_console:n']}, - {'Name': 'Prime95', 'Folder': 'Prime95', 'File': 'prime95.exe'}, - {'Name': 'ProduKey', 'Folder': 'ProduKey', 'File': 'ProduKey.exe', 'Args': ['/external', '/ExtractEdition:1']}, - {'Name': 'Q-Dir', 'Folder': 'Q-Dir', 'File': 'Q-Dir.exe'}, - {'Name': 'TestDisk', 'Folder': 'TestDisk', 'File': 'testdisk_win.exe', 'Args': ['-new_console:n']}, - ] - actions = [ - {'Name': 'Main Menu', 'Letter': 'M'}, - ] + title = '{}: Tools Menu'.format(KIT_NAME_FULL) + tools = [k for k in sorted(PE_TOOLS.keys())] + actions = [{'Name': 'Main Menu', 'Letter': 'M'},] # Menu loop while True: - selection = menu_select('Tools Menu', tools, actions) - + selection = menu_select(title, tools, actions) if (selection.isnumeric()): tool = tools[int(selection)-1] - cmd = ['{bin}\\{folder}\\{file}'.format(bin=bin, folder=tool['Folder'], file=tool['File'])] - if tool['Name'] == 'Blue Screen View': + cmd = [PE_TOOLS[tool]['Path']] + PE_TOOLS[tool].get('Args', []) + if tool == 'Blue Screen View': # Select path to scan minidump_path = select_minidump_path() - if minidump_path is not None: - tool['Args'] = ['/MiniDumpFolder', minidump_path] - if 'Args' in tool: - cmd += tool['Args'] + if minidump_path: + cmd.extend(['/MiniDumpFolder', minidump_path]) try: - subprocess.Popen(cmd) - except: + popen_program(cmd) + except Exception: print_error('Failed to run {prog}'.format(prog=tool['Name'])) - time.sleep(2) + time.sleep(2) + pause() elif (selection == 'M'): break -def menu_main(): - menus = [ - {'Name': 'Create Backups', 'Menu': menu_backup_imaging}, - {'Name': 'Setup Windows', 'Menu': menu_windows_setup}, - {'Name': 'Misc Tools', 'Menu': menu_tools}, - ] - actions = [ - {'Name': 'Command Prompt', 'Letter': 'C'}, - {'Name': 'Reboot', 'Letter': 'R'}, - {'Name': 'Shutdown', 'Letter': 'S'}, - ] +def select_minidump_path(): + dumps = [] - # Main loop - while True: - selection = menu_select('Main Menu', menus, actions, secret_exit=True) + # Assign volume letters first + assign_volume_letters() - if (selection.isnumeric()): - try: - menus[int(selection)-1]['Menu']() - except AbortError: - pass - except: - print_error('Major exception in: {menu}'.format(menu=menus[int(selection)-1]['Name'])) - print_warning(' Please let The Wizard know and he\'ll look into it (Please include the details below).') - print(traceback.print_exc()) - time.sleep(5) - print_info(' You can retry but if this crashes again an alternative approach may be required.') - pause('\nPress enter to return to the main menu') - elif (selection == 'C'): - run_program(['cmd', '-new_console:n'], check=False) - elif (selection == 'R'): - run_program(['wpeutil', 'reboot']) - elif (selection == 'S'): - run_program(['wpeutil', 'shutdown']) - else: - sys.exit(0) + # Search for minidumps + tmp = run_program('mountvol') + tmp = [d for d in re.findall(r'.*([A-Za-z]):\\', tmp.stdout.decode())] + # Remove RAMDisk letter + if 'X' in tmp: + tmp.remove('X') + for drive in tmp: + if os.path.exists('{drive}:\\Windows\\MiniDump'.format(drive=drive)): + dumps.append({'Name': '{drive}:\\Windows\\MiniDump'.format(drive=drive)}) + + # Check results before showing menu + if len(dumps) == 0: + print_error(' No BSoD / MiniDump paths found') + time.sleep(2) + return None + + # Menu + selection = menu_select('Which BSoD / MiniDump path are we scanning?', dumps, []) + return dumps[int(selection) - 1]['Name'] if __name__ == '__main__': - menu_main() \ No newline at end of file + print("This file is not meant to be called directly.") diff --git a/Scripts/settings/main.py b/Scripts/settings/main.py new file mode 100644 index 00000000..4b1d9818 --- /dev/null +++ b/Scripts/settings/main.py @@ -0,0 +1,68 @@ +# Wizard Kit PE: Settings - Main / Branding + +# Features +ENABLED_UPLOAD_DATA = False + +# STATIC VARIABLES (also used by .cmd files) +## Not using spaces aroung '=' for easier .cmd substrings +ARCHIVE_PASSWORD='Abracadabra' +KIT_NAME_FULL='Wizard Kit PE' +KIT_NAME_SHORT='WKPE' +OFFICE_SERVER_IP='10.0.0.10' +QUICKBOOKS_SERVER_IP='10.0.0.10' +SUPPORT_MESSAGE='Please let 2Shirt know by opening an issue on GitHub' +TIME_ZONE='Pacific Standard Time' # Always use "Standard Time" (DST is applied correctly) + +# SERVER VARIABLES +## NOTE: Windows can only use one user per server. This means that if +## one server serves multiple shares then you have to use the same +## user/password for all of those shares. +BACKUP_SERVERS = [ + { 'IP': '10.0.0.10', + 'Name': 'ServerOne', + 'Mounted': False, + 'Share': 'Backups', + 'User': 'restore', + 'Pass': 'Abracadabra', + }, + { 'IP': '10.0.0.11', + 'Name': 'ServerTwo', + 'Mounted': False, + 'Share': 'Backups', + 'User': 'restore', + 'Pass': 'Abracadabra', + }, +] +CLIENT_INFO_SERVER = { + 'IP': '10.0.0.10', + 'RegEntry': r'0x10001,0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000', + 'Share': '/srv/ClientInfo', + 'User': 'upload', +} +OFFICE_SERVER = { + 'IP': OFFICE_SERVER_IP, + 'Name': 'ServerOne', + 'Mounted': False, + 'Share': 'Office', + 'User': 'restore', + 'Pass': 'Abracadabra', +} +QUICKBOOKS_SERVER = { + 'IP': QUICKBOOKS_SERVER_IP, + 'Name': 'ServerOne', + 'Mounted': False, + 'Share': 'QuickBooks', + 'User': 'restore', + 'Pass': 'Abracadabra', +} +WINDOWS_SERVER = { + 'IP': '10.0.0.10', + 'Name': 'ServerOne', + 'Mounted': False, + 'Share': 'Windows', + 'User': 'restore', + 'Pass': 'Abracadabra', +} + +if __name__ == '__main__': + print("This file is not meant to be called directly.") diff --git a/Scripts/settings/tools.py b/Scripts/settings/tools.py new file mode 100644 index 00000000..ab8be9e1 --- /dev/null +++ b/Scripts/settings/tools.py @@ -0,0 +1,55 @@ +# Wizard Kit PE: Settings - Tools + +TOOLS = { + # NOTE: BinDir will be prepended to these paths at runtime + 'AIDA64': { + '32': r'AIDA64\aida64.exe'}, + 'AutoRuns': { + '32': r'Autoruns\autoruns.exe', + '64': r'Autoruns\autoruns64.exe'}, + 'BleachBit': { + '32': r'BleachBit\bleachbit_console.exe'}, + 'Caffeine': { + '32': r'Caffeine\caffeine.exe'}, + 'Du': { + '32': r'Du\du.exe', + '64': r'Du\du64.exe'}, + 'ERUNT': { + '32': r'ERUNT\ERUNT.EXE'}, + 'Everything': { + '32': r'Everything\Everything.exe', + '64': r'Everything\Everything64.exe'}, + 'FastCopy': { + '32': r'FastCopy\FastCopy.exe', + '64': r'FastCopy\FastCopy64.exe'}, + 'HitmanPro': { + '32': r'HitmanPro\HitmanPro.exe', + '64': r'HitmanPro\HitmanPro64.exe'}, + 'HWiNFO': { + '32': r'HWiNFO\HWiNFO.exe', + '64': r'HWiNFO\HWiNFO64.exe'}, + 'KVRT': { + '32': r'KVRT\KVRT.exe'}, + 'NotepadPlusPlus': { + '32': r'NotepadPlusPlus\notepadplusplus.exe'}, + 'ProduKey': { + '32': r'ProduKey\ProduKey.exe', + '64': r'ProduKey\ProduKey64.exe'}, + 'PuTTY-PSFTP': { + '32': r'PuTTY\PSFTP.EXE'}, + 'RKill': { + '32': r'RKill\RKill.exe'}, + 'SevenZip': { + '32': r'7-Zip\7za.exe', + '64': r'7-Zip\7za64.exe'}, + 'TDSSKiller': { + '32': r'TDSSKiller\TDSSKiller.exe'}, + 'wimlib-imagex': { + '32': r'wimlib\x32\wimlib-imagex.exe', + '64': r'wimlib\x64\wimlib-imagex.exe'}, + 'XMPlay': { + '32': r'XMPlay\xmplay.exe'}, + } + +if __name__ == '__main__': + print("This file is not meant to be called directly.") diff --git a/Scripts/winpe_root_menu.py b/Scripts/winpe_root_menu.py new file mode 100644 index 00000000..370186f0 --- /dev/null +++ b/Scripts/winpe_root_menu.py @@ -0,0 +1,21 @@ +# Wizard Kit PE: Root Menu + +import os +import sys + +# Init +os.chdir(os.path.dirname(os.path.realpath(__file__))) +sys.path.append(os.getcwd()) +from functions.common import * +from functions.winpe_menus import * +init_global_vars() +os.system('title {}: Root Menu'.format(KIT_NAME_FULL)) +global_vars['LogFile'] = r'{LogDir}\WinPE.log'.format(**global_vars) + +if __name__ == '__main__': + try: + menu_root() + except SystemExit: + pass + except: + major_exception()