Add type hints to BlockPair

This commit is contained in:
2Shirt 2023-06-03 18:07:30 -07:00
parent fc2b90a2c0
commit 05de5c7294
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C

View file

@ -127,42 +127,42 @@ TIMEZONE = pytz.timezone(cfg.main.LINUX_TIME_ZONE)
# Classes # Classes
class BlockPair(): class BlockPair():
"""Object for tracking source to dest recovery data.""" """Object for tracking source to dest recovery data."""
def __init__(self, source, destination, model, working_dir): def __init__(
"""Initialize BlockPair() self,
source_dev: hw_disk.Disk,
NOTE: source should be a wk.hw.obj.Disk() object destination: pathlib.Path,
and destination should be a pathlib.Path() object. working_dir: pathlib.Path,
""" ):
self.sector_size = source.phy_sec self.sector_size: int = source_dev.phy_sec
self.source = source.path self.source: pathlib.Path = pathlib.Path(source_dev.path)
self.destination = destination self.destination: pathlib.Path = destination
self.map_data = {} self.map_data: dict[str, bool | int] = {}
self.map_path = None self.map_path: pathlib.Path = pathlib.Path()
self.size = source.size self.size: int = source_dev.size
self.status = { self.status: dict[str, float | int | str] = {
'read-skip': 'Pending', 'read-skip': 'Pending',
'read-full': 'Pending', 'read-full': 'Pending',
'trim': 'Pending', 'trim': 'Pending',
'scrape': 'Pending', 'scrape': 'Pending',
} }
self.view_map = 'DISPLAY' in os.environ or 'WAYLAND_DISPLAY' in os.environ self.view_map: bool = 'DISPLAY' in os.environ or 'WAYLAND_DISPLAY' in os.environ
self.view_proc = None self.view_proc: subprocess.Popen | None = None
# Set map path # Set map path
# e.g. '(Clone|Image)_Model[_p#]_Size[_Label].map' # e.g. '(Clone|Image)_Model[_p#]_Size[_Label].map'
map_name = model if model else 'None' map_name = source_dev.model
if source.bus == 'Image': if source_dev.bus == 'Image':
map_name = 'Image' map_name = 'Image'
if source.parent: if source_dev.parent:
part_num = re.sub(r"^.*?(\d+)$", r"\1", source.path.name) part_num = re.sub(r"^.*?(\d+)$", r"\1", self.source.name)
map_name += f'_p{part_num}' map_name += f'_p{part_num}'
size_str = std.bytes_to_string( size_str = std.bytes_to_string(
size=self.size, size=self.size,
use_binary=False, use_binary=False,
) )
map_name += f'_{size_str.replace(" ", "")}' map_name += f'_{size_str.replace(" ", "")}'
if source.raw_details.get('label', ''): if source_dev.raw_details.get('label', ''):
map_name += f'_{source.raw_details["label"]}' map_name += f'_{source_dev.raw_details["label"]}'
map_name = map_name.replace(' ', '_') map_name = map_name.replace(' ', '_')
map_name = map_name.replace('/', '_') map_name = map_name.replace('/', '_')
map_name = map_name.replace('\\', '_') map_name = map_name.replace('\\', '_')
@ -197,7 +197,7 @@ class BlockPair():
"""Get percent rescued from map_data, returns float.""" """Get percent rescued from map_data, returns float."""
return 100 * self.map_data.get('rescued', 0) / self.size return 100 * self.map_data.get('rescued', 0) / self.size
def get_rescued_size(self) -> float | int: def get_rescued_size(self) -> int:
"""Get rescued size using map data. """Get rescued size using map data.
NOTE: Returns 0 if no map data is available. NOTE: Returns 0 if no map data is available.
@ -211,7 +211,7 @@ class BlockPair():
NOTE: If the file is missing it is assumed that recovery hasn't NOTE: If the file is missing it is assumed that recovery hasn't
started yet so default values will be returned instead. started yet so default values will be returned instead.
""" """
data = {'full recovery': False, 'pass completed': False} data: dict[str, bool | int] = {'full recovery': False, 'pass completed': False}
# Get output from ddrescuelog # Get output from ddrescuelog
cmd = [ cmd = [
@ -353,9 +353,8 @@ class State():
"""Add BlockPair object and run safety checks.""" """Add BlockPair object and run safety checks."""
self.block_pairs.append( self.block_pairs.append(
BlockPair( BlockPair(
source=source, source_dev=source,
destination=destination, destination=destination,
model=self.source.model,
working_dir=self.working_dir, working_dir=self.working_dir,
)) ))