pylint cleanup

This commit is contained in:
2Shirt 2019-12-30 18:47:35 -07:00
parent e7fbc21721
commit d9561a0159
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C

View file

@ -9,7 +9,6 @@ import logging
import os import os
import pathlib import pathlib
import plistlib import plistlib
import pytz
import re import re
import shutil import shutil
import time import time
@ -18,10 +17,10 @@ from collections import OrderedDict
from docopt import docopt from docopt import docopt
import psutil import psutil
import pytz
from wk import cfg, debug, exe, io, log, net, std, tmux from wk import cfg, debug, exe, io, log, net, std, tmux
from wk.hw import obj as hw_obj from wk.hw import obj as hw_obj
from wk.hw import sensors as hw_sensors
# STATIC VARIABLES # STATIC VARIABLES
@ -248,9 +247,6 @@ class State():
working_dir=working_dir, working_dir=working_dir,
)) ))
# Safety Checks
# TODO
def add_clone_block_pairs(self, working_dir): def add_clone_block_pairs(self, working_dir):
"""Add device to device block pairs and set settings if necessary.""" """Add device to device block pairs and set settings if necessary."""
source_sep = get_partition_separator(self.source.path.name) source_sep = get_partition_separator(self.source.path.name)
@ -310,22 +306,6 @@ class State():
bp_dest = self.destination bp_dest = self.destination
self.add_block_pair(part, bp_dest, working_dir) self.add_block_pair(part, bp_dest, working_dir)
def clean_working_dir(self, working_dir):
"""Clean working directory to ensure a fresh recovery session.
NOTE: Data from previous sessions will be preserved
in a backup directory.
"""
backup_dir = pathlib.Path(f'{working_dir}/prev')
backup_dir = io.non_clobber_path(backup_dir)
backup_dir.mkdir()
# Move settings, maps, etc to backup_dir
for entry in os.scandir(working_dir):
if entry.name.endswith(('.dd', '.json', '.map')):
new_path = f'{backup_dir}/{entry.name}'
new_path = io.non_clobber_path(new_path)
shutil.move(entry.path, new_path)
def confirm_selections( def confirm_selections(
self, mode, prompt, working_dir=None, source_parts=None): self, mode, prompt, working_dir=None, source_parts=None):
@ -446,35 +426,6 @@ class State():
self.fix_tmux_layout(forced=False) self.fix_tmux_layout(forced=False)
std.sleep(1) std.sleep(1)
def get_etoc(self):
"""Get EToC from ddrescue output, returns str."""
delta = None
delta_dict = {}
etoc = 'Unknown'
now = datetime.datetime.now(tz=TIMEZONE)
output = tmux.capture_pane()
# Search for EToC delta
matches = re.findall(f'remaining time:.*$', output, re.MULTILINE)
if matches:
match = REGEX_REMAINING_TIME.search(matches[-1])
if match.group('na'):
etoc = 'N/A'
else:
for key in ('days', 'hours', 'minutes', 'seconds'):
delta_dict[key] = match.group(key)
delta_dict = {k: int(v) if v else 0 for k, v in delta_dict.items()}
delta = datetime.timedelta(**delta_dict)
# Calc EToC if delta found
if delta:
etoc_datetime = now + delta
etoc = etoc_datetime.strftime('%Y-%m-%d %H:%M %Z')
# Done
return etoc
def init_recovery(self, docopt_args): def init_recovery(self, docopt_args):
"""Select source/dest and set env.""" """Select source/dest and set env."""
std.clear_screen() std.clear_screen()
@ -530,7 +481,7 @@ class State():
# Start fresh if requested # Start fresh if requested
if docopt_args['--start-fresh']: if docopt_args['--start-fresh']:
self.clean_working_dir(working_dir) clean_working_dir(working_dir)
# Add block pairs # Add block pairs
if mode == 'Clone': if mode == 'Clone':
@ -546,7 +497,7 @@ class State():
self.update_progress_pane('Idle') self.update_progress_pane('Idle')
self.confirm_selections(mode, 'Start recovery?', working_dir=working_dir) self.confirm_selections(mode, 'Start recovery?', working_dir=working_dir)
# Prep destination # TODO: Prep destination
# if cloning and not resuming format destination # if cloning and not resuming format destination
# Done # Done
@ -716,11 +667,8 @@ class State():
report.append(std.color_string(f'{"Status":^{width}}', 'BLUE')) report.append(std.color_string(f'{"Status":^{width}}', 'BLUE'))
if 'NEEDS ATTENTION' in overall_status: if 'NEEDS ATTENTION' in overall_status:
report.append( report.append(
std.color_string( std.color_string(f'{overall_status:^{width}}', 'YELLOW_BLINK'),
f'{overall_status:^{width}}', )
'YELLOW_BLINK',
),
)
else: else:
report.append(f'{overall_status:^{width}}') report.append(f'{overall_status:^{width}}')
report.append(separator) report.append(separator)
@ -755,9 +703,8 @@ class State():
report.append(' ') report.append(' ')
# EToC # EToC
# TODO: Finish update_progress_pane() [EToC]
if overall_status in ('Active', 'NEEDS ATTENTION'): if overall_status in ('Active', 'NEEDS ATTENTION'):
etoc = self.get_etoc() etoc = get_etoc()
report.append(separator) report.append(separator)
report.append(std.color_string('Estimated Pass Finish', 'BLUE')) report.append(std.color_string('Estimated Pass Finish', 'BLUE'))
if overall_status == 'NEEDS ATTENTION' or etoc == 'N/A': if overall_status == 'NEEDS ATTENTION' or etoc == 'N/A':
@ -1063,6 +1010,24 @@ def build_settings_menu(silent=True):
return menu return menu
def clean_working_dir(working_dir):
"""Clean working directory to ensure a fresh recovery session.
NOTE: Data from previous sessions will be preserved
in a backup directory.
"""
backup_dir = pathlib.Path(f'{working_dir}/prev')
backup_dir = io.non_clobber_path(backup_dir)
backup_dir.mkdir()
# Move settings, maps, etc to backup_dir
for entry in os.scandir(working_dir):
if entry.name.endswith(('.dd', '.json', '.map')):
new_path = f'{backup_dir}/{entry.name}'
new_path = io.non_clobber_path(new_path)
shutil.move(entry.path, new_path)
def format_status_string(status, width): def format_status_string(status, width):
"""Format colored status string, returns str.""" """Format colored status string, returns str."""
color = None color = None
@ -1127,6 +1092,35 @@ def fstype_is_ok(path, map_dir=False):
return is_ok return is_ok
def get_etoc():
"""Get EToC from ddrescue output, returns str."""
delta = None
delta_dict = {}
etoc = 'Unknown'
now = datetime.datetime.now(tz=TIMEZONE)
output = tmux.capture_pane()
# Search for EToC delta
matches = re.findall(f'remaining time:.*$', output, re.MULTILINE)
if matches:
match = REGEX_REMAINING_TIME.search(matches[-1])
if match.group('na'):
etoc = 'N/A'
else:
for key in ('days', 'hours', 'minutes', 'seconds'):
delta_dict[key] = match.group(key)
delta_dict = {k: int(v) if v else 0 for k, v in delta_dict.items()}
delta = datetime.timedelta(**delta_dict)
# Calc EToC if delta found
if delta:
etoc_datetime = now + delta
etoc = etoc_datetime.strftime('%Y-%m-%d %H:%M %Z')
# Done
return etoc
def get_object(path): def get_object(path):
"""Get object based on path, returns obj.""" """Get object based on path, returns obj."""
obj = None obj = None