Added I/O functions for building UFDs
This commit is contained in:
parent
703783406a
commit
8f31e5bd67
2 changed files with 99 additions and 105 deletions
|
|
@ -10,35 +10,6 @@ from collections import OrderedDict
|
||||||
from functions.common import *
|
from functions.common import *
|
||||||
|
|
||||||
|
|
||||||
def case_insensitive_search(path, item):
|
|
||||||
"""Search path for item case insensitively, returns str."""
|
|
||||||
regex_match = '^{}$'.format(item)
|
|
||||||
real_path = ''
|
|
||||||
|
|
||||||
# Quick check first
|
|
||||||
if os.path.exists('{}/{}'.format(path, item)):
|
|
||||||
real_path = '{}{}{}'.format(
|
|
||||||
path,
|
|
||||||
'' if path == '/' else '/',
|
|
||||||
item,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Check all items in dir
|
|
||||||
for entry in os.scandir(path):
|
|
||||||
if re.match(regex_match, entry.name, re.IGNORECASE):
|
|
||||||
real_path = '{}{}{}'.format(
|
|
||||||
path,
|
|
||||||
'' if path == '/' else '/',
|
|
||||||
entry.name,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Done
|
|
||||||
if not real_path:
|
|
||||||
raise FileNotFoundError('{}/{}'.format(path, item))
|
|
||||||
|
|
||||||
return real_path
|
|
||||||
|
|
||||||
|
|
||||||
def confirm_selections(args):
|
def confirm_selections(args):
|
||||||
"""Ask tech to confirm selections, twice if necessary."""
|
"""Ask tech to confirm selections, twice if necessary."""
|
||||||
if not ask('Is the above information correct?'):
|
if not ask('Is the above information correct?'):
|
||||||
|
|
@ -100,33 +71,6 @@ def find_first_partition(dev_path):
|
||||||
return part_path
|
return part_path
|
||||||
|
|
||||||
|
|
||||||
def find_path(path):
|
|
||||||
"""Find path case-insensitively, returns pathlib.Path obj."""
|
|
||||||
path_obj = pathlib.Path(path).resolve()
|
|
||||||
|
|
||||||
# Quick check first
|
|
||||||
if path_obj.exists():
|
|
||||||
return path_obj
|
|
||||||
|
|
||||||
# Fix case
|
|
||||||
parts = path_obj.relative_to('/').parts
|
|
||||||
real_path = '/'
|
|
||||||
for part in parts:
|
|
||||||
try:
|
|
||||||
real_path = case_insensitive_search(real_path, part)
|
|
||||||
except NotADirectoryError:
|
|
||||||
# Reclassify error
|
|
||||||
raise FileNotFoundError(path)
|
|
||||||
|
|
||||||
# Raise error if path doesn't exist
|
|
||||||
path_obj = pathlib.Path(real_path)
|
|
||||||
if not path_obj.exists():
|
|
||||||
raise FileNotFoundError(path_obj)
|
|
||||||
|
|
||||||
# Done
|
|
||||||
return path_obj
|
|
||||||
|
|
||||||
|
|
||||||
def get_user_home(user):
|
def get_user_home(user):
|
||||||
"""Get path to user's home dir, returns str."""
|
"""Get path to user's home dir, returns str."""
|
||||||
home_dir = None
|
home_dir = None
|
||||||
|
|
@ -279,55 +223,6 @@ def prep_device(dev_path, label, use_mbr=False, indent=2):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def recursive_copy(source, dest, overwrite=False):
|
|
||||||
"""Copy source to dest recursively.
|
|
||||||
|
|
||||||
NOTE: This uses rsync style source/dest syntax.
|
|
||||||
If the source has a trailing slash then it's contents are copied,
|
|
||||||
otherwise the source itself is copied.
|
|
||||||
|
|
||||||
Examples assuming "ExDir/ExFile.txt" exists:
|
|
||||||
recursive_copy("ExDir", "Dest/") results in "Dest/ExDir/ExFile.txt"
|
|
||||||
recursive_copy("ExDir/", "Dest/") results in "Dest/ExFile.txt"
|
|
||||||
|
|
||||||
NOTE 2: dest does not use find_path because it might not exist.
|
|
||||||
"""
|
|
||||||
copy_contents = source.endswith('/')
|
|
||||||
source = find_path(source)
|
|
||||||
dest = pathlib.Path(dest).resolve().joinpath(source.name)
|
|
||||||
os.makedirs(dest.parent, exist_ok=True)
|
|
||||||
|
|
||||||
if source.is_dir():
|
|
||||||
if copy_contents:
|
|
||||||
# Trailing slash syntax
|
|
||||||
for item in os.scandir(source):
|
|
||||||
recursive_copy(item.path, dest.parent, overwrite=overwrite)
|
|
||||||
elif not dest.exists():
|
|
||||||
# No conflict, copying whole tree (no merging needed)
|
|
||||||
shutil.copytree(source, dest)
|
|
||||||
elif not dest.is_dir():
|
|
||||||
# Refusing to replace file with dir
|
|
||||||
raise FileExistsError('Refusing to replace file: {}'.format(dest))
|
|
||||||
else:
|
|
||||||
# Dest exists and is a dir, merge dirs
|
|
||||||
for item in os.scandir(source):
|
|
||||||
recursive_copy(item.path, dest, overwrite=overwrite)
|
|
||||||
elif source.is_file():
|
|
||||||
if not dest.exists():
|
|
||||||
# No conflict, copying file
|
|
||||||
shutil.copy2(source, dest)
|
|
||||||
elif not dest.is_file():
|
|
||||||
# Refusing to replace dir with file
|
|
||||||
raise FileExistsError('Refusing to replace dir: {}'.format(dest))
|
|
||||||
elif overwrite:
|
|
||||||
# Dest file exists, deleting and replacing file
|
|
||||||
os.remove(dest)
|
|
||||||
shutil.copy2(source, dest)
|
|
||||||
else:
|
|
||||||
# Refusing to delete file when overwrite=False
|
|
||||||
raise FileExistsError('Refusing to delete file: {}'.format(dest))
|
|
||||||
|
|
||||||
|
|
||||||
def remove_arch():
|
def remove_arch():
|
||||||
"""Remove arch dir from UFD.
|
"""Remove arch dir from UFD.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import pathlib
|
import pathlib
|
||||||
|
import re
|
||||||
import shutil
|
import shutil
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -12,6 +13,54 @@ LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
# Functions
|
# Functions
|
||||||
|
def case_insensitive_path(path):
|
||||||
|
"""Find path case-insensitively, returns pathlib.Path obj."""
|
||||||
|
given_path = pathlib.Path(path).resolve()
|
||||||
|
real_path = None
|
||||||
|
|
||||||
|
# Quick check
|
||||||
|
if given_path.exists():
|
||||||
|
return given_path
|
||||||
|
|
||||||
|
# Search for real path
|
||||||
|
parts = list(given_path.parts)
|
||||||
|
real_path = parts.pop(0)
|
||||||
|
for part in parts:
|
||||||
|
try:
|
||||||
|
real_path = case_insensitive_search(real_path, part)
|
||||||
|
except NotADirectoryError:
|
||||||
|
# Reclassify error
|
||||||
|
raise FileNotFoundError(given_path)
|
||||||
|
real_path = pathlib.Path(real_path)
|
||||||
|
|
||||||
|
# Done
|
||||||
|
return real_path
|
||||||
|
|
||||||
|
|
||||||
|
def case_insensitive_search(path, item):
|
||||||
|
"""Search path for item case insensitively, returns pathlib.Path obj."""
|
||||||
|
path = pathlib.Path(path).resolve()
|
||||||
|
given_path = path.joinpath(item)
|
||||||
|
real_path = None
|
||||||
|
regex = fr'^{item}'
|
||||||
|
|
||||||
|
# Quick check
|
||||||
|
if given_path.exists():
|
||||||
|
return given_path
|
||||||
|
|
||||||
|
# Check all items in path
|
||||||
|
for entry in os.scandir(path):
|
||||||
|
if re.match(regex, entry.name, re.IGNORECASE):
|
||||||
|
real_path = path.joinpath(entry.name)
|
||||||
|
|
||||||
|
# Raise exception if necessary
|
||||||
|
if not real_path:
|
||||||
|
raise FileNotFoundError(given_path)
|
||||||
|
|
||||||
|
# Done
|
||||||
|
return real_path
|
||||||
|
|
||||||
|
|
||||||
def delete_empty_folders(path):
|
def delete_empty_folders(path):
|
||||||
"""Recursively delete all empty folders in path."""
|
"""Recursively delete all empty folders in path."""
|
||||||
LOG.debug('path: %s', path)
|
LOG.debug('path: %s', path)
|
||||||
|
|
@ -93,5 +142,55 @@ def non_clobber_path(path):
|
||||||
return new_path
|
return new_path
|
||||||
|
|
||||||
|
|
||||||
|
def recursive_copy(source, dest, overwrite=False):
|
||||||
|
"""Copy source to dest recursively.
|
||||||
|
|
||||||
|
NOTE: This uses rsync style source/dest syntax.
|
||||||
|
If the source has a trailing slash then it's contents are copied,
|
||||||
|
otherwise the source itself is copied.
|
||||||
|
|
||||||
|
Examples assuming "ExDir/ExFile.txt" exists:
|
||||||
|
recursive_copy("ExDir", "Dest/") results in "Dest/ExDir/ExFile.txt"
|
||||||
|
recursive_copy("ExDir/", "Dest/") results in "Dest/ExFile.txt"
|
||||||
|
|
||||||
|
NOTE 2: dest does not use find_path because it might not exist.
|
||||||
|
"""
|
||||||
|
copy_contents = str(source).endswith(('/', '\\'))
|
||||||
|
source = case_insensitive_path(source)
|
||||||
|
dest = pathlib.Path(dest).resolve().joinpath(source.name)
|
||||||
|
os.makedirs(dest.parent, exist_ok=True)
|
||||||
|
|
||||||
|
# Recursively copy source to dest
|
||||||
|
if source.is_dir():
|
||||||
|
if copy_contents:
|
||||||
|
# Trailing slash syntax
|
||||||
|
for item in os.scandir(source):
|
||||||
|
recursive_copy(item.path, dest.parent, overwrite=overwrite)
|
||||||
|
elif not dest.exists():
|
||||||
|
# No conflict, copying whole tree (no merging needed)
|
||||||
|
shutil.copytree(source, dest)
|
||||||
|
elif not dest.is_dir():
|
||||||
|
# Refusing to replace file with dir
|
||||||
|
raise FileExistsError(f'Refusing to replace file: {dest}')
|
||||||
|
else:
|
||||||
|
# Dest exists and is a dir, merge dirs
|
||||||
|
for item in os.scandir(source):
|
||||||
|
recursive_copy(item.path, dest, overwrite=overwrite)
|
||||||
|
elif source.is_file():
|
||||||
|
if not dest.exists():
|
||||||
|
# No conflict, copying file
|
||||||
|
shutil.copy2(source, dest)
|
||||||
|
elif not dest.is_file():
|
||||||
|
# Refusing to replace dir with file
|
||||||
|
raise FileExistsError(f'Refusing to replace dir: {dest}')
|
||||||
|
elif overwrite:
|
||||||
|
# Dest file exists, deleting and replacing file
|
||||||
|
os.remove(dest)
|
||||||
|
shutil.copy2(source, dest)
|
||||||
|
else:
|
||||||
|
# Refusing to delete file when overwrite=False
|
||||||
|
raise FileExistsError(f'Refusing to delete file: {dest}')
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
print("This file is not meant to be called directly.")
|
print("This file is not meant to be called directly.")
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue