Improved source scanning for user data transfers
* Fixes recursion bug when Windows.old folders are present * Combined logic for file/folder sources and WIM sources * Code uses proper folder separators for the running OS * (e.g. '\' for Windows and '/' for the rest)
This commit is contained in:
parent
f7f3f0d53c
commit
f0ae207890
2 changed files with 127 additions and 123 deletions
|
|
@ -749,6 +749,10 @@ def set_linux_vars():
|
|||
global_vars['Env'] = os.environ.copy()
|
||||
global_vars['BinDir'] = '/usr/local/bin'
|
||||
global_vars['LogDir'] = global_vars['TmpDir']
|
||||
global_vars['Tools'] = {
|
||||
'wimlib-imagex': 'wimlib-imagex',
|
||||
'SevenZip': '7z',
|
||||
}
|
||||
|
||||
if __name__ == '__main__':
|
||||
print("This file is not meant to be called directly.")
|
||||
|
|
|
|||
|
|
@ -17,6 +17,11 @@ class LocalDisk():
|
|||
# Should always be true
|
||||
return True
|
||||
|
||||
class SourceItem():
|
||||
def __init__(self, name, path):
|
||||
self.name = name
|
||||
self.path = path
|
||||
|
||||
# Regex
|
||||
REGEX_EXCL_ITEMS = re.compile(
|
||||
r'^(\.(AppleDB|AppleDesktop|AppleDouble'
|
||||
|
|
@ -29,7 +34,7 @@ REGEX_EXCL_ITEMS = re.compile(
|
|||
r'|Thumbs\.db)$',
|
||||
re.IGNORECASE)
|
||||
REGEX_EXCL_ROOT_ITEMS = re.compile(
|
||||
r'^\\?(boot(mgr|nxt)$|Config.msi'
|
||||
r'^(boot(mgr|nxt)$|Config.msi'
|
||||
r'|(eula|globdata|install|vc_?red)'
|
||||
r'|.*.sys$|System Volume Information|RECYCLER?|\$Recycle\.bin'
|
||||
r'|\$?Win(dows(.old.*|\.~BT|)$|RE_)|\$GetCurrent|Windows10Upgrade'
|
||||
|
|
@ -37,7 +42,7 @@ REGEX_EXCL_ROOT_ITEMS = re.compile(
|
|||
r'|.*\.(esd|swm|wim|dd|map|dmg|image)$)',
|
||||
re.IGNORECASE)
|
||||
REGEX_INCL_ROOT_ITEMS = re.compile(
|
||||
r'^\\?(AdwCleaner|(My\s*|)(Doc(uments?( and Settings|)|s?)|Downloads'
|
||||
r'^(AdwCleaner|(My\s*|)(Doc(uments?( and Settings|)|s?)|Downloads'
|
||||
r'|Media|Music|Pic(ture|)s?|Vid(eo|)s?)'
|
||||
r'|{prefix}(-?Info|-?Transfer|)'
|
||||
r'|(ProgramData|Recovery|Temp.*|Users)$'
|
||||
|
|
@ -48,7 +53,7 @@ REGEX_WIM_FILE = re.compile(
|
|||
r'\.wim$',
|
||||
re.IGNORECASE)
|
||||
REGEX_WINDOWS_OLD = re.compile(
|
||||
r'^\\Win(dows|)\.old',
|
||||
r'^Win(dows|)\.old',
|
||||
re.IGNORECASE)
|
||||
|
||||
# STATIC VARIABLES
|
||||
|
|
@ -334,150 +339,151 @@ def run_wimextract(source, items, dest):
|
|||
'--nullglob']
|
||||
run_program(cmd)
|
||||
|
||||
def scan_source(source_obj, dest_path):
|
||||
"""Scan source for files/folders to transfer."""
|
||||
selected_items = []
|
||||
|
||||
def list_source_items(source_obj, rel_path=None):
|
||||
items = []
|
||||
rel_path = '{}{}'.format(os.sep, rel_path) if rel_path else ''
|
||||
if source_obj.is_dir():
|
||||
# File-Based
|
||||
print_standard('Scanning source (folder): {}'.format(source_obj.path))
|
||||
selected_items = scan_source_path(source_obj.path, dest_path)
|
||||
source_path = '{}{}'.format(source_obj.path, rel_path)
|
||||
items = [SourceItem(name=item.name, path=item.path)
|
||||
for item in os.scandir(source_path)]
|
||||
else:
|
||||
# Image-Based
|
||||
if REGEX_WIM_FILE.search(source_obj.name):
|
||||
print_standard('Scanning source (image): {}'.format(
|
||||
source_obj.path))
|
||||
selected_items = scan_source_wim(source_obj.path, dest_path)
|
||||
else:
|
||||
print_error('ERROR: Unsupported image: {}'.format(
|
||||
source_obj.path))
|
||||
raise GenericError
|
||||
|
||||
return selected_items
|
||||
|
||||
def scan_source_path(source_path, dest_path, rel_path=None, interactive=True):
|
||||
"""Scan source folder for files/folders to transfer, returns list.
|
||||
|
||||
This will scan the root and (recursively) any Windows.old folders."""
|
||||
rel_path = '\\' + rel_path if rel_path else ''
|
||||
if rel_path:
|
||||
dest_path = dest_path + rel_path
|
||||
selected_items = []
|
||||
win_olds = []
|
||||
|
||||
# Root items
|
||||
root_items = []
|
||||
for item in os.scandir(source_path):
|
||||
if REGEX_INCL_ROOT_ITEMS.search(item.name):
|
||||
root_items.append(item.path)
|
||||
elif not REGEX_EXCL_ROOT_ITEMS.search(item.name):
|
||||
if (not interactive
|
||||
or ask('Copy: "{}{}" ?'.format(rel_path, item.name))):
|
||||
root_items.append(item.path)
|
||||
if REGEX_WINDOWS_OLD.search(item.name):
|
||||
win_olds.append(item)
|
||||
if root_items:
|
||||
selected_items.append({
|
||||
'Message': '{}Root Items...'.format(rel_path),
|
||||
'Items': root_items.copy(),
|
||||
'Destination': dest_path})
|
||||
|
||||
# Fonts
|
||||
if os.path.exists(r'{}\Windows\Fonts'.format(source_path)):
|
||||
selected_items.append({
|
||||
'Message': '{}Fonts...'.format(rel_path),
|
||||
'Items': [r'{}\Windows\Fonts'.format(rel_path)],
|
||||
'Destination': r'{}\Windows'.format(dest_path)})
|
||||
|
||||
# Registry
|
||||
registry_items = []
|
||||
for folder in ['config', 'OEM']:
|
||||
folder = r'Windows\System32\{}'.format(folder)
|
||||
folder = os.path.join(source_path, folder)
|
||||
if os.path.exists(folder):
|
||||
registry_items.append(folder)
|
||||
if registry_items:
|
||||
selected_items.append({
|
||||
'Message': '{}Registry...'.format(rel_path),
|
||||
'Items': registry_items.copy(),
|
||||
'Destination': r'{}\Windows\System32'.format(dest_path)})
|
||||
|
||||
# Windows.old(s)
|
||||
for old in win_olds:
|
||||
selected_items.append(
|
||||
scan_source_path(
|
||||
old.path, dest_path, rel_path=old.name, interactive=False))
|
||||
|
||||
# Done
|
||||
return selected_items
|
||||
|
||||
def scan_source_wim(source_wim, dest_path, rel_path=None, interactive=True):
|
||||
"""Scan source WIM file for files/folders to transfer, returns list.
|
||||
|
||||
This will scan the root and (recursively) any Windows.old folders."""
|
||||
rel_path = '\\' + rel_path if rel_path else ''
|
||||
selected_items = []
|
||||
win_olds = []
|
||||
|
||||
# Scan source
|
||||
# Prep wimlib-imagex
|
||||
if psutil.WINDOWS:
|
||||
extract_item('wimlib', silent=True)
|
||||
cmd = [
|
||||
global_vars['Tools']['wimlib-imagex'], 'dir',
|
||||
source_wim, '1']
|
||||
source_obj.path, '1']
|
||||
if rel_path:
|
||||
cmd.append('--path={}'.format(rel_path))
|
||||
|
||||
# Get item list
|
||||
try:
|
||||
file_list = run_program(cmd)
|
||||
items = run_program(cmd)
|
||||
except subprocess.CalledProcessError:
|
||||
print_error('ERROR: Failed to get file list.')
|
||||
raise
|
||||
|
||||
# Root Items
|
||||
file_list = [i.strip()
|
||||
for i in file_list.stdout.decode('utf-8', 'ignore').splitlines()
|
||||
if i.count('\\') == 1 and i.strip() != '\\']
|
||||
root_items = []
|
||||
# Strip non-root items
|
||||
items = [re.sub(r'(\\|/)', os.sep, i.strip())
|
||||
for i in items.stdout.decode('utf-8', 'ignore').splitlines()]
|
||||
if rel_path:
|
||||
file_list = [i.replace(rel_path, '') for i in file_list]
|
||||
for item in file_list:
|
||||
if REGEX_INCL_ROOT_ITEMS.search(item):
|
||||
root_items.append(item)
|
||||
elif not REGEX_EXCL_ROOT_ITEMS.search(item):
|
||||
if (not interactive
|
||||
or ask('Extract: "{}{}" ?'.format(rel_path, item))):
|
||||
root_items.append('{}{}'.format(rel_path, item))
|
||||
if REGEX_WINDOWS_OLD.search(item):
|
||||
items = [i.replace(rel_path, '') for i in items]
|
||||
items = [i for i in items
|
||||
if i.count(os.sep) == 1 and i.strip() != os.sep]
|
||||
items = [SourceItem(name=i[1:], path=rel_path+i) for i in items]
|
||||
|
||||
# Done
|
||||
return items
|
||||
|
||||
def scan_source(source_obj, dest_path, rel_path='', interactive=True):
|
||||
"""Scan source for files/folders to transfer, returns list.
|
||||
|
||||
This will scan the root and (recursively) any Windows.old folders."""
|
||||
selected_items = []
|
||||
win_olds = []
|
||||
|
||||
# Root Items
|
||||
root_items = []
|
||||
item_list = list_source_items(source_obj, rel_path)
|
||||
for item in item_list:
|
||||
if REGEX_INCL_ROOT_ITEMS.search(item.name):
|
||||
print_success('Auto-Selected: {}'.format(item.path))
|
||||
root_items.append('{}'.format(item.path))
|
||||
elif not REGEX_EXCL_ROOT_ITEMS.search(item.name):
|
||||
if not interactive:
|
||||
print_success('Auto-Selected: {}'.format(item.path))
|
||||
root_items.append('{}'.format(item.path))
|
||||
elif ask('Extract: "{}{}{}" ?'.format(
|
||||
rel_path,
|
||||
os.sep if rel_path else '',
|
||||
item.name)):
|
||||
root_items.append('{}'.format(item.path))
|
||||
if REGEX_WINDOWS_OLD.search(item.name):
|
||||
item.name = '{}{}{}'.format(
|
||||
rel_path,
|
||||
os.sep if rel_path else '',
|
||||
item.name)
|
||||
win_olds.append(item)
|
||||
if root_items:
|
||||
selected_items.append({
|
||||
'Message': '{}Root Items...'.format(rel_path),
|
||||
'Message': '{}{}Root Items...'.format(
|
||||
rel_path,
|
||||
' ' if rel_path else ''),
|
||||
'Items': root_items.copy(),
|
||||
'Destination': dest_path})
|
||||
|
||||
# Fonts
|
||||
if wim_contains(source_wim, r'{}Windows\Fonts'.format(rel_path)):
|
||||
font_obj = get_source_item_obj(source_obj, rel_path, 'Windows/Fonts')
|
||||
if font_obj:
|
||||
selected_items.append({
|
||||
'Message': '{}Fonts...'.format(rel_path),
|
||||
'Items': [r'{}\Windows\Fonts'.format(rel_path)],
|
||||
'Message': '{}{}Fonts...'.format(
|
||||
rel_path,
|
||||
' ' if rel_path else ''),
|
||||
'Items': [font_obj.path],
|
||||
'Destination': dest_path})
|
||||
|
||||
# Registry
|
||||
registry_items = []
|
||||
for folder in ['config', 'OEM']:
|
||||
folder = r'{}Windows\System32\{}'.format(rel_path, folder)
|
||||
if wim_contains(source_wim, folder):
|
||||
registry_items.append(folder)
|
||||
folder_obj = get_source_item_obj(
|
||||
source_obj, rel_path, 'Windows/System32/{}'.format(folder))
|
||||
if folder_obj:
|
||||
registry_items.append(folder_obj.path)
|
||||
if registry_items:
|
||||
selected_items.append({
|
||||
'Message': '{}Registry...'.format(rel_path),
|
||||
'Message': '{}{}Registry...'.format(
|
||||
rel_path,
|
||||
' ' if rel_path else ''),
|
||||
'Items': registry_items.copy(),
|
||||
'Destination': dest_path})
|
||||
|
||||
# Windows.old(s)
|
||||
for old in win_olds:
|
||||
scan_source_wim(source_wim, dest_path, rel_path=old, interactive=False)
|
||||
selected_items.extend(scan_source(
|
||||
source_obj,
|
||||
dest_path,
|
||||
rel_path=old.name,
|
||||
interactive=False))
|
||||
|
||||
# Done
|
||||
return selected_items
|
||||
|
||||
def get_source_item_obj(source_obj, rel_path, item_path):
|
||||
item_obj = None
|
||||
item_path = re.sub(r'(\\|/)', os.sep, item_path)
|
||||
if source_obj.is_dir():
|
||||
item_obj = SourceItem(
|
||||
name = item_path,
|
||||
path = '{}{}{}{}{}'.format(
|
||||
source_obj.path,
|
||||
os.sep,
|
||||
rel_path,
|
||||
os.sep if rel_path else '',
|
||||
item_path))
|
||||
else:
|
||||
# Assuming WIM file
|
||||
if psutil.WINDOWS:
|
||||
extract_item('wimlib', silent=True)
|
||||
cmd = [
|
||||
global_vars['Tools']['wimlib-imagex'], 'dir',
|
||||
source_obj.path, '1',
|
||||
'--path={}'.format(item_path),
|
||||
'--one-file-only']
|
||||
try:
|
||||
run_program(cmd)
|
||||
except subprocess.CalledProcessError:
|
||||
# function will return None below
|
||||
pass
|
||||
else:
|
||||
item_obj = SourceItem(
|
||||
name = item_path,
|
||||
path = '{}{}{}{}'.format(
|
||||
os.sep,
|
||||
rel_path,
|
||||
os.sep if rel_path else '',
|
||||
item_path))
|
||||
return item_obj
|
||||
|
||||
def select_destination(folder_path, prompt='Select destination'):
|
||||
"""Select destination drive, returns path as string."""
|
||||
disk = select_volume(prompt)
|
||||
|
|
@ -623,6 +629,14 @@ def select_source(ticket_number):
|
|||
pause("Press Enter to exit...")
|
||||
exit_script()
|
||||
|
||||
# Sanity check
|
||||
if selected_source.is_file():
|
||||
# Image-Based
|
||||
if not REGEX_WIM_FILE.search(selected_source.name):
|
||||
print_error('ERROR: Unsupported image: {}'.format(
|
||||
selected_source.path))
|
||||
raise GenericError
|
||||
|
||||
# Done
|
||||
return selected_source
|
||||
|
||||
|
|
@ -716,19 +730,5 @@ def umount_network_share(server):
|
|||
print_info('Umounted {Name}'.format(**server))
|
||||
server['Mounted'] = False
|
||||
|
||||
def wim_contains(source_path, file_path):
|
||||
"""Check if the WIM contains a file or folder."""
|
||||
_cmd = [
|
||||
global_vars['Tools']['wimlib-imagex'], 'dir',
|
||||
source_path, '1',
|
||||
'--path={}'.format(file_path),
|
||||
'--one-file-only']
|
||||
try:
|
||||
run_program(_cmd)
|
||||
except subprocess.CalledProcessError:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
if __name__ == '__main__':
|
||||
print("This file is not meant to be called directly.")
|
||||
|
|
|
|||
Loading…
Reference in a new issue