WizardKit/scripts/wk/kit/tools.py
2Shirt 12326a5e2c
Use new Union syntax
This bumps the minimum Python version to 3.10
2023-05-29 12:35:40 -07:00

239 lines
6.1 KiB
Python

"""WizardKit: Tool Functions"""
# vim: sts=2 sw=2 ts=2
import logging
import pathlib
import platform
from datetime import datetime, timedelta
from subprocess import CompletedProcess, Popen
import requests
from wk.cfg.main import ARCHIVE_PASSWORD
from wk.cfg.sources import DOWNLOAD_FREQUENCY, SOURCES
from wk.exe import popen_program, run_program
from wk.std import GenericError, sleep
# STATIC VARIABLES
ARCH = '64' if platform.architecture()[0] == '64bit' else '32'
LOG = logging.getLogger(__name__)
HEADERS = {
'User-Agent': (
'Mozilla/5.0 (X11; Linux x86_64; rv:97.0) '
'Gecko/20100101 Firefox/97.0'
),
}
# "GLOBAL" VARIABLES
CACHED_DIRS = {}
# Functions
def download_file(
out_path, source_url,
as_new=False, overwrite=False, referer=None) -> pathlib.Path:
"""Download a file using requests, returns pathlib.Path."""
out_path = pathlib.Path(out_path).resolve()
name = out_path.name
download_failed = None
download_msg = f'Downloading {name}...'
if as_new:
out_path = out_path.with_suffix(f'{out_path.suffix}.new')
overwrite = True
print(download_msg, end='', flush=True)
# Avoid clobbering
if out_path.exists() and not overwrite:
raise FileExistsError(f'Refusing to clobber {out_path}')
# Create destination directory
out_path.parent.mkdir(parents=True, exist_ok=True)
# Update headers
headers = HEADERS.copy()
if referer:
headers['referer'] = referer
# Request download
with requests.Session() as session:
try:
response = session.get(
source_url,
allow_redirects=True,
headers=headers,
stream=True,
)
except requests.RequestException:
try:
sleep(1)
response = session.get(
source_url,
allow_redirects=True,
headers=headers,
stream=True,
)
except requests.RequestException as _err:
download_failed = _err
else:
if not response.ok:
download_failed = response
else:
if not response.ok:
download_failed = response
# Download failed
if download_failed:
LOG.error('Failed to download file: %s', download_failed)
raise GenericError(f'Failed to download file: {name}')
# Write to file
with open(out_path, 'wb') as _f:
for chunk in response.iter_content(chunk_size=128):
_f.write(chunk)
# Done
print(f'\033[{len(download_msg)}D\033[0K', end='', flush=True)
return out_path
def download_tool(folder, name, suffix=None) -> None:
"""Download tool."""
name_arch = f'{name}{ARCH}'
out_path = get_tool_path(folder, name, check=False, suffix=suffix)
up_to_date = False
# Check if tool is up to date
try:
ctime = datetime.fromtimestamp(out_path.stat().st_ctime)
up_to_date = datetime.now() - ctime < timedelta(days=DOWNLOAD_FREQUENCY)
except FileNotFoundError:
# Ignore - we'll download it below
pass
if out_path.exists() and up_to_date:
LOG.info('Skip downloading up-to-date tool: %s', name)
return
# Get ARCH specific URL if available
if name_arch in SOURCES:
source_url = SOURCES[name_arch]
out_path = out_path.with_name(f'{name_arch}{out_path.suffix}')
else:
source_url = SOURCES[name]
# Download
LOG.info('Downloading tool: %s', name)
try:
new_file = download_file(out_path, source_url, as_new=True)
new_file.replace(out_path)
except GenericError:
# Ignore as long as there's still a version present
if not out_path.exists():
raise
def extract_archive(archive, out_path, *args, mode='x', silent=True) -> None:
"""Extract an archive to out_path."""
out_path = pathlib.Path(out_path).resolve()
out_path.parent.mkdir(parents=True, exist_ok=True)
cmd = [get_tool_path('7-Zip', '7z'), mode, archive, f'-o{out_path}', *args]
if silent:
cmd.extend(['-bso0', '-bse0', '-bsp0'])
# Extract
run_program(cmd)
def extract_tool(folder) -> None:
"""Extract tool."""
extract_archive(
find_kit_dir('.cbin').joinpath(folder).with_suffix('.7z'),
find_kit_dir('.bin').joinpath(folder),
'-aos', f'-p{ARCHIVE_PASSWORD}',
)
def find_kit_dir(name=None) -> pathlib.Path:
"""Find folder in kit, returns pathlib.Path.
Search is performed in the script's path and then recursively upwards.
If name is given then search for that instead."""
cur_path = pathlib.Path(__file__).resolve().parent
search = name if name else '.bin'
# Search
if name in CACHED_DIRS:
return CACHED_DIRS[name]
while not cur_path.match(cur_path.anchor):
if cur_path.joinpath(search).exists():
break
cur_path = cur_path.parent
# Check
if not cur_path.joinpath(search).exists():
raise FileNotFoundError(f'Failed to find kit dir, {name=}')
if name:
cur_path = cur_path.joinpath(name)
# Done
CACHED_DIRS[name] = cur_path
return cur_path
def get_tool_path(folder, name, check=True, suffix=None) -> pathlib.Path:
"""Get tool path, returns pathlib.Path"""
bin_dir = find_kit_dir('.bin')
if not suffix:
suffix = 'exe'
name_arch = f'{name}{ARCH}'
# "Search"
tool_path = bin_dir.joinpath(f'{folder}/{name_arch}.{suffix}')
if not (tool_path.exists() or name_arch in SOURCES):
# Use "default" path instead
tool_path = tool_path.with_name(f'{name}.{suffix}')
# Missing?
if check and not tool_path.exists():
raise FileNotFoundError(f'Failed to find tool, {folder=}, {name=}')
# Done
return tool_path
def run_tool(
folder, name, *run_args,
cbin=False, cwd=False, download=False, popen=False,
**run_kwargs,
) -> CompletedProcess | Popen:
"""Run tool from the kit or the Internet, returns proc obj.
proc will be either subprocess.CompletedProcess or subprocess.Popen."""
proc = None
# Extract from .cbin
if cbin:
extract_tool(folder)
# Download tool
if download:
download_tool(folder, name)
# Run
tool_path = get_tool_path(folder, name)
cmd = [tool_path, *run_args]
if cwd:
run_kwargs['cwd'] = tool_path.parent
if popen:
proc = popen_program(cmd, **run_kwargs)
else:
proc = run_program(cmd, check=False, **run_kwargs)
# Done
return proc
if __name__ == '__main__':
print("This file is not meant to be called directly.")