Use new Union syntax
This bumps the minimum Python version to 3.10
This commit is contained in:
parent
171cd0019e
commit
12326a5e2c
10 changed files with 36 additions and 46 deletions
|
|
@ -21,7 +21,7 @@ from . import ui
|
|||
|
||||
|
||||
# Check env
|
||||
if version_info < (3, 7):
|
||||
if version_info < (3, 10):
|
||||
# Unsupported
|
||||
raise RuntimeError(
|
||||
'This package is unsupported on Python '
|
||||
|
|
|
|||
|
|
@ -15,7 +15,6 @@ import subprocess
|
|||
import time
|
||||
|
||||
from collections import OrderedDict
|
||||
from typing import Union
|
||||
|
||||
import psutil
|
||||
import pytz
|
||||
|
|
@ -200,7 +199,7 @@ class BlockPair():
|
|||
"""Get percent rescued from map_data, returns float."""
|
||||
return 100 * self.map_data.get('rescued', 0) / self.size
|
||||
|
||||
def get_rescued_size(self) -> Union[float, int]:
|
||||
def get_rescued_size(self) -> float | int:
|
||||
"""Get rescued size using map data.
|
||||
|
||||
NOTE: Returns 0 if no map data is available.
|
||||
|
|
@ -1689,7 +1688,7 @@ def get_fstype_macos(path) -> str:
|
|||
return fstype
|
||||
|
||||
|
||||
def get_object(path) -> Union[hw_disk.Disk, None, pathlib.Path]:
|
||||
def get_object(path) -> hw_disk.Disk | None | pathlib.Path:
|
||||
"""Get object based on path, returns obj."""
|
||||
# TODO: Refactor to avoid returning None
|
||||
obj = None
|
||||
|
|
|
|||
|
|
@ -3,8 +3,6 @@
|
|||
|
||||
import logging
|
||||
|
||||
from typing import Union
|
||||
|
||||
from wk.ui import ansi
|
||||
|
||||
|
||||
|
|
@ -103,7 +101,7 @@ def get_graph_step(rate: float, scale: int = 16) -> int:
|
|||
def merge_rates(
|
||||
rates: list[float],
|
||||
graph_width: int = 40,
|
||||
) -> list[Union[int, float]]:
|
||||
) -> list[int | float]:
|
||||
"""Merge rates to have entries equal to the width, returns list."""
|
||||
merged_rates = []
|
||||
offset = 0
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ import plistlib
|
|||
import re
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Union
|
||||
from typing import Any
|
||||
|
||||
from wk.cfg.main import KIT_NAME_SHORT
|
||||
from wk.cfg.python import DATACLASS_DECORATOR_KWARGS
|
||||
|
|
@ -46,7 +46,7 @@ class Disk:
|
|||
model: str = field(init=False)
|
||||
name: str = field(init=False)
|
||||
notes: list[str] = field(init=False, default_factory=list)
|
||||
path: Union[pathlib.Path, str]
|
||||
path: pathlib.Path | str
|
||||
parent: str = field(init=False)
|
||||
phy_sec: int = field(init=False)
|
||||
raw_details: dict[str, Any] = field(init=False)
|
||||
|
|
|
|||
|
|
@ -7,15 +7,13 @@ import pathlib
|
|||
import re
|
||||
import shutil
|
||||
|
||||
from typing import Union
|
||||
|
||||
|
||||
# STATIC VARIABLES
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Functions
|
||||
def case_insensitive_path(path: Union[pathlib.Path, str]) -> pathlib.Path:
|
||||
def case_insensitive_path(path: pathlib.Path | str) -> pathlib.Path:
|
||||
"""Find path case-insensitively, returns pathlib.Path obj."""
|
||||
given_path = pathlib.Path(path).resolve()
|
||||
real_path = None
|
||||
|
|
@ -40,7 +38,7 @@ def case_insensitive_path(path: Union[pathlib.Path, str]) -> pathlib.Path:
|
|||
|
||||
|
||||
def case_insensitive_search(
|
||||
path: Union[pathlib.Path, str], item: str) -> pathlib.Path:
|
||||
path: pathlib.Path | str, item: str) -> pathlib.Path:
|
||||
"""Search path for item case insensitively, returns pathlib.Path obj."""
|
||||
path = pathlib.Path(path).resolve()
|
||||
given_path = path.joinpath(item)
|
||||
|
|
@ -65,8 +63,8 @@ def case_insensitive_search(
|
|||
|
||||
|
||||
def copy_file(
|
||||
source: Union[pathlib.Path, str],
|
||||
dest: Union[pathlib.Path, str],
|
||||
source: pathlib.Path | str,
|
||||
dest: pathlib.Path | str,
|
||||
overwrite: bool = False) -> None:
|
||||
"""Copy file and optionally overwrite the destination."""
|
||||
source = case_insensitive_path(source)
|
||||
|
|
@ -78,7 +76,7 @@ def copy_file(
|
|||
shutil.copy2(source, dest)
|
||||
|
||||
|
||||
def delete_empty_folders(path: Union[pathlib.Path, str]) -> None:
|
||||
def delete_empty_folders(path: pathlib.Path | str) -> None:
|
||||
"""Recursively delete all empty folders in path."""
|
||||
LOG.debug('path: %s', path)
|
||||
|
||||
|
|
@ -96,7 +94,7 @@ def delete_empty_folders(path: Union[pathlib.Path, str]) -> None:
|
|||
|
||||
|
||||
def delete_folder(
|
||||
path: Union[pathlib.Path, str],
|
||||
path: pathlib.Path | str,
|
||||
force: bool = False,
|
||||
ignore_errors: bool = False,
|
||||
) -> None:
|
||||
|
|
@ -117,7 +115,7 @@ def delete_folder(
|
|||
|
||||
|
||||
def delete_item(
|
||||
path: Union[pathlib.Path, str],
|
||||
path: pathlib.Path | str,
|
||||
force: bool = False,
|
||||
ignore_errors: bool = False,
|
||||
) -> None:
|
||||
|
|
@ -139,7 +137,7 @@ def delete_item(
|
|||
|
||||
|
||||
def get_path_obj(
|
||||
path: Union[pathlib.Path, str],
|
||||
path: pathlib.Path | str,
|
||||
expanduser: bool = True,
|
||||
resolve: bool = True,
|
||||
) -> pathlib.Path:
|
||||
|
|
@ -152,7 +150,7 @@ def get_path_obj(
|
|||
return path
|
||||
|
||||
|
||||
def non_clobber_path(path: Union[pathlib.Path, str]) -> pathlib.Path:
|
||||
def non_clobber_path(path: pathlib.Path | str) -> pathlib.Path:
|
||||
"""Update path as needed to non-existing path, returns pathlib.Path."""
|
||||
LOG.debug('path: %s', path)
|
||||
path = pathlib.Path(path)
|
||||
|
|
@ -182,8 +180,8 @@ def non_clobber_path(path: Union[pathlib.Path, str]) -> pathlib.Path:
|
|||
|
||||
|
||||
def recursive_copy(
|
||||
source: Union[pathlib.Path, str],
|
||||
dest: Union[pathlib.Path, str],
|
||||
source: pathlib.Path | str,
|
||||
dest: pathlib.Path | str,
|
||||
overwrite: bool = False) -> None:
|
||||
"""Copy source to dest recursively.
|
||||
|
||||
|
|
@ -235,8 +233,8 @@ def recursive_copy(
|
|||
|
||||
|
||||
def rename_item(
|
||||
path: Union[pathlib.Path, str],
|
||||
new_path: Union[pathlib.Path, str],
|
||||
path: pathlib.Path | str,
|
||||
new_path: pathlib.Path | str,
|
||||
) -> pathlib.Path:
|
||||
"""Rename item, returns pathlib.Path."""
|
||||
path = pathlib.Path(path)
|
||||
|
|
|
|||
|
|
@ -7,7 +7,6 @@ import platform
|
|||
|
||||
from datetime import datetime, timedelta
|
||||
from subprocess import CompletedProcess, Popen
|
||||
from typing import Union
|
||||
|
||||
import requests
|
||||
|
||||
|
|
@ -208,7 +207,7 @@ def run_tool(
|
|||
folder, name, *run_args,
|
||||
cbin=False, cwd=False, download=False, popen=False,
|
||||
**run_kwargs,
|
||||
) -> Union[CompletedProcess, Popen]:
|
||||
) -> CompletedProcess | Popen:
|
||||
"""Run tool from the kit or the Internet, returns proc obj.
|
||||
|
||||
proc will be either subprocess.CompletedProcess or subprocess.Popen."""
|
||||
|
|
|
|||
|
|
@ -8,8 +8,6 @@ import pathlib
|
|||
import shutil
|
||||
import time
|
||||
|
||||
from typing import Union
|
||||
|
||||
from wk import cfg
|
||||
from wk.io import non_clobber_path
|
||||
|
||||
|
|
@ -41,8 +39,8 @@ def enable_debug_mode() -> None:
|
|||
|
||||
|
||||
def format_log_path(
|
||||
log_dir: Union[None, pathlib.Path, str] = None,
|
||||
log_name: Union[None, str] = None,
|
||||
log_dir: None | pathlib.Path | str = None,
|
||||
log_name: None | str = None,
|
||||
timestamp: bool = False,
|
||||
kit: bool = False, tool: bool = False, append: bool = False,
|
||||
) -> pathlib.Path:
|
||||
|
|
@ -83,7 +81,7 @@ def get_root_logger_path() -> pathlib.Path:
|
|||
raise RuntimeError('Log path not found.')
|
||||
|
||||
|
||||
def remove_empty_log(log_path: Union[None, pathlib.Path] = None) -> None:
|
||||
def remove_empty_log(log_path: None | pathlib.Path = None) -> None:
|
||||
"""Remove log if empty.
|
||||
|
||||
NOTE: Under Windows an empty log is 2 bytes long.
|
||||
|
|
@ -106,7 +104,7 @@ def remove_empty_log(log_path: Union[None, pathlib.Path] = None) -> None:
|
|||
log_path.unlink()
|
||||
|
||||
|
||||
def start(config: Union[dict[str, str], None] = None) -> None:
|
||||
def start(config: dict[str, str] | None = None) -> None:
|
||||
"""Configure and start logging using safe defaults."""
|
||||
log_path = format_log_path(timestamp=os.name != 'nt')
|
||||
root_logger = logging.getLogger()
|
||||
|
|
@ -129,8 +127,8 @@ def start(config: Union[dict[str, str], None] = None) -> None:
|
|||
|
||||
|
||||
def update_log_path(
|
||||
dest_dir: Union[None, pathlib.Path, str] = None,
|
||||
dest_name: Union[None, str] = None,
|
||||
dest_dir: None | pathlib.Path | str = None,
|
||||
dest_name: None | str = None,
|
||||
keep_history: bool = True, timestamp: bool = True, append: bool = False,
|
||||
) -> None:
|
||||
"""Moves current log file to new path and updates the root logger."""
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ import pathlib
|
|||
import re
|
||||
|
||||
from subprocess import CompletedProcess
|
||||
from typing import Any, Union
|
||||
from typing import Any
|
||||
|
||||
import psutil
|
||||
|
||||
|
|
@ -100,7 +100,7 @@ def mount_backup_shares(read_write: bool = False) -> list[str]:
|
|||
|
||||
def mount_network_share(
|
||||
details: dict[str, Any],
|
||||
mount_point: Union[None, pathlib.Path, str] = None,
|
||||
mount_point: None | pathlib.Path | str = None,
|
||||
read_write: bool = False) -> CompletedProcess:
|
||||
"""Mount network share using OS specific methods."""
|
||||
cmd = None
|
||||
|
|
@ -249,8 +249,8 @@ def unmount_backup_shares() -> list[str]:
|
|||
|
||||
|
||||
def unmount_network_share(
|
||||
details: Union[dict[str, Any], None] = None,
|
||||
mount_point: Union[None, pathlib.Path, str] = None,
|
||||
details: dict[str, Any] | None = None,
|
||||
mount_point: None | pathlib.Path | str = None,
|
||||
) -> CompletedProcess:
|
||||
"""Unmount network share"""
|
||||
cmd = []
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ import platform
|
|||
import re
|
||||
|
||||
from contextlib import suppress
|
||||
from typing import Any, Union
|
||||
from typing import Any
|
||||
|
||||
import psutil
|
||||
|
||||
|
|
@ -265,7 +265,7 @@ def get_installed_antivirus() -> list[str]:
|
|||
return report
|
||||
|
||||
|
||||
def get_installed_ram(as_list=False, raise_exceptions=False) -> Union[list, str]:
|
||||
def get_installed_ram(as_list=False, raise_exceptions=False) -> list | str:
|
||||
"""Get installed RAM, returns list or str."""
|
||||
mem = psutil.virtual_memory()
|
||||
mem_str = bytes_to_string(mem.total, decimals=1)
|
||||
|
|
@ -281,7 +281,7 @@ def get_installed_ram(as_list=False, raise_exceptions=False) -> Union[list, str]
|
|||
return [mem_str] if as_list else mem_str
|
||||
|
||||
|
||||
def get_os_activation(as_list=False, check=True) -> Union[list, str]:
|
||||
def get_os_activation(as_list=False, check=True) -> list | str:
|
||||
"""Get OS activation status, returns list or str.
|
||||
|
||||
NOTE: If check=True then raise an exception if OS isn't activated.
|
||||
|
|
|
|||
|
|
@ -6,8 +6,6 @@ import platform
|
|||
import re
|
||||
import time
|
||||
|
||||
from typing import Union
|
||||
|
||||
|
||||
# STATIC VARIABLES
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
|
@ -34,7 +32,7 @@ class GenericWarning(Exception):
|
|||
|
||||
# Functions
|
||||
def bytes_to_string(
|
||||
size: Union[float, int],
|
||||
size: float | int,
|
||||
decimals: int = 0,
|
||||
use_binary: bool = True) -> str:
|
||||
"""Convert size into a human-readable format, returns str.
|
||||
|
|
@ -78,12 +76,12 @@ def bytes_to_string(
|
|||
return size_str
|
||||
|
||||
|
||||
def sleep(seconds: Union[int, float] = 2) -> None:
|
||||
def sleep(seconds: int | float = 2) -> None:
|
||||
"""Simple wrapper for time.sleep."""
|
||||
time.sleep(seconds)
|
||||
|
||||
|
||||
def string_to_bytes(size: Union[float, int, str], assume_binary: bool = False) -> int:
|
||||
def string_to_bytes(size: float | int | str, assume_binary: bool = False) -> int:
|
||||
"""Convert human-readable size to bytes and return an int."""
|
||||
LOG.debug('size: %s, assume_binary: %s', size, assume_binary)
|
||||
scale = 1000
|
||||
|
|
|
|||
Loading…
Reference in a new issue