Pylint cleanup for ddrescue-tui part 1

This commit is contained in:
2Shirt 2019-05-13 19:26:44 -06:00
parent fa39c85fe4
commit df33152a38
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C

View file

@ -1,15 +1,14 @@
# Wizard Kit: Functions - ddrescue-tui # pylint: disable=no-name-in-module,too-many-lines,wildcard-import
# vim: sts=2 sw=2 ts=2
'''Wizard Kit: Functions - ddrescue-tui'''
import datetime import datetime
import pathlib import pathlib
import psutil
import pytz
import re import re
import signal
import stat import stat
import time import time
import pytz
from collections import OrderedDict
from functions.data import * from functions.data import *
from functions.hw_diags import * from functions.hw_diags import *
from functions.json import * from functions.json import *
@ -20,6 +19,7 @@ from settings.ddrescue import *
# Clases # Clases
class BaseObj(): class BaseObj():
# pylint: disable=missing-docstring
"""Base object used by DevObj, DirObj, and ImageObj.""" """Base object used by DevObj, DirObj, and ImageObj."""
def __init__(self, path): def __init__(self, path):
self.type = 'base' self.type = 'base'
@ -44,6 +44,7 @@ class BaseObj():
class BlockPair(): class BlockPair():
# pylint: disable=too-many-instance-attributes
"""Object to track data and methods together for source and dest.""" """Object to track data and methods together for source and dest."""
def __init__(self, mode, source, dest): def __init__(self, mode, source, dest):
self.mode = mode self.mode = mode
@ -153,6 +154,7 @@ class BlockPair():
class DevObj(BaseObj): class DevObj(BaseObj):
# pylint: disable=too-many-instance-attributes
"""Block device object.""" """Block device object."""
def self_check(self): def self_check(self):
"""Verify that self.path points to a block device.""" """Verify that self.path points to a block device."""
@ -186,6 +188,7 @@ class DevObj(BaseObj):
self.update_filename_prefix() self.update_filename_prefix()
def update_filename_prefix(self): def update_filename_prefix(self):
# pylint: disable=attribute-defined-outside-init
"""Set filename prefix based on details.""" """Set filename prefix based on details."""
self.prefix = '{m_size}_{model}'.format( self.prefix = '{m_size}_{model}'.format(
m_size=self.model_size, m_size=self.model_size,
@ -205,6 +208,7 @@ class DevObj(BaseObj):
class DirObj(BaseObj): class DirObj(BaseObj):
"""Directory object."""
def self_check(self): def self_check(self):
"""Verify that self.path points to a directory.""" """Verify that self.path points to a directory."""
if not pathlib.Path(self.path).is_dir(): if not pathlib.Path(self.path).is_dir():
@ -222,6 +226,7 @@ class DirObj(BaseObj):
class ImageObj(BaseObj): class ImageObj(BaseObj):
"""Image file object."""
def self_check(self): def self_check(self):
"""Verify that self.path points to a file.""" """Verify that self.path points to a file."""
if not pathlib.Path(self.path).is_file(): if not pathlib.Path(self.path).is_file():
@ -247,6 +252,7 @@ class ImageObj(BaseObj):
class RecoveryState(): class RecoveryState():
# pylint: disable=too-many-instance-attributes
"""Object to track BlockPair objects and overall state.""" """Object to track BlockPair objects and overall state."""
def __init__(self, mode, source, dest): def __init__(self, mode, source, dest):
self.mode = mode.lower() self.mode = mode.lower()
@ -317,15 +323,15 @@ class RecoveryState():
def current_pass_done(self): def current_pass_done(self):
"""Checks if pass is done for all block-pairs, returns bool.""" """Checks if pass is done for all block-pairs, returns bool."""
done = True done = True
for bp in self.block_pairs: for b_p in self.block_pairs:
done &= bp.pass_done[self.current_pass] done &= b_p.pass_done[self.current_pass]
return done return done
def current_pass_min(self): def current_pass_min(self):
"""Gets minimum pass rescued percentage, returns float.""" """Gets minimum pass rescued percentage, returns float."""
min_percent = 100 min_percent = 100
for bp in self.block_pairs: for b_p in self.block_pairs:
min_percent = min(min_percent, bp.rescued_percent) min_percent = min(min_percent, b_p.rescued_percent)
return min_percent return min_percent
def get_smart_source(self): def get_smart_source(self):
@ -339,10 +345,10 @@ class RecoveryState():
def retry_all_passes(self): def retry_all_passes(self):
"""Mark all passes as pending for all block-pairs.""" """Mark all passes as pending for all block-pairs."""
self.finished = False self.finished = False
for bp in self.block_pairs: for b_p in self.block_pairs:
bp.pass_done = [False, False, False] b_p.pass_done = [False, False, False]
bp.status = ['Pending', 'Pending', 'Pending'] b_p.status = ['Pending', 'Pending', 'Pending']
bp.fix_status_strings() b_p.fix_status_strings()
self.set_pass_num() self.set_pass_num()
def self_checks(self): def self_checks(self):
@ -374,10 +380,10 @@ class RecoveryState():
# Run BlockPair self checks and get total size # Run BlockPair self checks and get total size
self.total_size = 0 self.total_size = 0
for bp in self.block_pairs: for b_p in self.block_pairs:
bp.self_check() b_p.self_check()
self.resumed |= bp.resumed self.resumed |= b_p.resumed
self.total_size += bp.size self.total_size += b_p.size
def set_pass_num(self): def set_pass_num(self):
"""Set current pass based on all block-pair's progress.""" """Set current pass based on all block-pair's progress."""
@ -385,8 +391,8 @@ class RecoveryState():
for pass_num in (2, 1, 0): for pass_num in (2, 1, 0):
# Iterate backwards through passes # Iterate backwards through passes
pass_done = True pass_done = True
for bp in self.block_pairs: for b_p in self.block_pairs:
pass_done &= bp.pass_done[pass_num] pass_done &= b_p.pass_done[pass_num]
if pass_done: if pass_done:
# All block-pairs reported being done # All block-pairs reported being done
# Set to next pass, unless we're on the last pass (2) # Set to next pass, unless we're on the last pass (2)
@ -413,7 +419,7 @@ class RecoveryState():
# Just set to N/A (NOTE: this overrules the refresh rate below) # Just set to N/A (NOTE: this overrules the refresh rate below)
self.etoc = 'N/A' self.etoc = 'N/A'
return return
elif 'In Progress' not in self.status: if 'In Progress' not in self.status:
# Don't update when EToC is hidden # Don't update when EToC is hidden
return return
if now.second % ETOC_REFRESH_RATE != 0: if now.second % ETOC_REFRESH_RATE != 0:
@ -427,7 +433,7 @@ class RecoveryState():
# Capture main tmux pane # Capture main tmux pane
try: try:
text = tmux_capture_pane() text = tmux_capture_pane()
except Exception: except Exception: # pylint: disable=broad-except
# Ignore # Ignore
pass pass
@ -450,7 +456,7 @@ class RecoveryState():
minutes=int(minutes), minutes=int(minutes),
seconds=int(seconds), seconds=int(seconds),
) )
except Exception: except Exception: # pylint: disable=broad-except
# Ignore and leave as raw string # Ignore and leave as raw string
pass pass
@ -460,15 +466,16 @@ class RecoveryState():
now = datetime.datetime.now(tz=self.timezone) now = datetime.datetime.now(tz=self.timezone)
_etoc = now + etoc_delta _etoc = now + etoc_delta
self.etoc = _etoc.strftime('%Y-%m-%d %H:%M %Z') self.etoc = _etoc.strftime('%Y-%m-%d %H:%M %Z')
except Exception: except Exception: # pylint: disable=broad-except
# Ignore and leave as current string # Ignore and leave as current string
pass pass
def update_progress(self): def update_progress(self):
# pylint: disable=attribute-defined-outside-init
"""Update overall progress using block_pairs.""" """Update overall progress using block_pairs."""
self.rescued = 0 self.rescued = 0
for bp in self.block_pairs: for b_p in self.block_pairs:
self.rescued += bp.rescued self.rescued += b_p.rescued
self.rescued_percent = (self.rescued / self.total_size) * 100 self.rescued_percent = (self.rescued / self.total_size) * 100
self.status_percent = get_formatted_status( self.status_percent = get_formatted_status(
label='Recovered:', data=self.rescued_percent) label='Recovered:', data=self.rescued_percent)
@ -514,10 +521,13 @@ def create_path_obj(path):
def double_confirm_clone(): def double_confirm_clone():
"""Display warning and get 2nd confirmation, returns bool.""" """Display warning and get 2nd confirmation, returns bool."""
print_standard('\nSAFETY CHECK') print_standard('\nSAFETY CHECK')
print_warning('All data will be DELETED from the ' print_warning(
'destination device and partition(s) listed above.') 'All data will be DELETED from the '
print_warning('This is irreversible and will lead ' 'destination device and partition(s) listed above.'
'to {CLEAR}{RED}DATA LOSS.'.format(**COLORS)) )
print_warning(
'This is irreversible and will lead to {CLEAR}{RED}DATA LOSS.'.format(
**COLORS))
return ask('Asking again to confirm, is this correct?') return ask('Asking again to confirm, is this correct?')
@ -701,12 +711,14 @@ def get_formatted_status(label, data):
try: try:
data_str = '{data:>{data_width}.2f} %'.format( data_str = '{data:>{data_width}.2f} %'.format(
data=data, data=data,
data_width=data_width-2) data_width=data_width-2,
)
except ValueError: except ValueError:
# Assuming non-numeric data # Assuming non-numeric data
data_str = '{data:>{data_width}}'.format( data_str = '{data:>{data_width}}'.format(
data=data, data=data,
data_width=data_width) data_width=data_width,
)
status = '{label}{s_color}{data_str}{CLEAR}'.format( status = '{label}{s_color}{data_str}{CLEAR}'.format(
label=label, label=label,
s_color=get_status_color(data), s_color=get_status_color(data),
@ -742,9 +754,9 @@ def is_writable_dir(dir_obj):
"""Check if we have read-write-execute permissions, returns bool.""" """Check if we have read-write-execute permissions, returns bool."""
is_ok = True is_ok = True
path_st_mode = os.stat(dir_obj.path).st_mode path_st_mode = os.stat(dir_obj.path).st_mode
is_ok == is_ok and path_st_mode & stat.S_IRUSR is_ok &= path_st_mode & stat.S_IRUSR
is_ok == is_ok and path_st_mode & stat.S_IWUSR is_ok &= path_st_mode & stat.S_IWUSR
is_ok == is_ok and path_st_mode & stat.S_IXUSR is_ok &= path_st_mode & stat.S_IXUSR
return is_ok return is_ok
@ -825,8 +837,7 @@ def menu_main(state):
] ]
actions = [ actions = [
{'Name': 'Start', 'Letter': 'S'}, {'Name': 'Start', 'Letter': 'S'},
{'Name': 'Change settings {YELLOW}(experts only){CLEAR}'.format( {'Name': 'Change settings {YELLOW}(experts only){CLEAR}'.format(**COLORS),
**COLORS),
'Letter': 'C'}, 'Letter': 'C'},
{'Name': 'Quit', 'Letter': 'Q', 'CRLF': True}, {'Name': 'Quit', 'Letter': 'Q', 'CRLF': True},
] ]
@ -1019,8 +1030,8 @@ def run_ddrescue(state, pass_settings):
fix_tmux_panes(state, forced=True) fix_tmux_panes(state, forced=True)
# Run pass for each block-pair # Run pass for each block-pair
for bp in state.block_pairs: for b_p in state.block_pairs:
if bp.pass_done[state.current_pass]: if b_p.pass_done[state.current_pass]:
# Skip to next block-pair # Skip to next block-pair
continue continue
update_sidepane(state) update_sidepane(state)
@ -1028,7 +1039,7 @@ def run_ddrescue(state, pass_settings):
# Set ddrescue cmd # Set ddrescue cmd
cmd = [ cmd = [
'ddrescue', *pass_settings, 'ddrescue', *pass_settings,
bp.source_path, bp.dest_path, bp.map_path] b_p.source_path, b_p.dest_path, b_p.map_path]
if state.mode == 'clone': if state.mode == 'clone':
cmd.append('--force') cmd.append('--force')
if state.current_pass == 0: if state.current_pass == 0:
@ -1043,7 +1054,7 @@ def run_ddrescue(state, pass_settings):
# Start ddrescue # Start ddrescue
try: try:
clear_screen() clear_screen()
print_info('Current dev: {}'.format(bp.source_path)) print_info('Current dev: {}'.format(b_p.source_path))
ddrescue_proc = popen_program(cmd) ddrescue_proc = popen_program(cmd)
i = 0 i = 0
while True: while True:
@ -1058,7 +1069,7 @@ def run_ddrescue(state, pass_settings):
i += 1 i += 1
# Update progress # Update progress
bp.update_progress(state.current_pass) b_p.update_progress(state.current_pass)
update_sidepane(state) update_sidepane(state)
# Fix panes # Fix panes
@ -1068,11 +1079,11 @@ def run_ddrescue(state, pass_settings):
try: try:
ddrescue_proc.wait(timeout=1) ddrescue_proc.wait(timeout=1)
sleep(2) sleep(2)
bp.update_progress(state.current_pass) b_p.update_progress(state.current_pass)
update_sidepane(state) update_sidepane(state)
break break
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
# Catch to update smart/bp/sidepane # Catch to update smart/b_p/sidepane
pass pass
except KeyboardInterrupt: except KeyboardInterrupt:
@ -1081,7 +1092,7 @@ def run_ddrescue(state, pass_settings):
ddrescue_proc.wait(timeout=10) ddrescue_proc.wait(timeout=10)
# Update progress/sidepane again # Update progress/sidepane again
bp.update_progress(state.current_pass) b_p.update_progress(state.current_pass)
update_sidepane(state) update_sidepane(state)
# Was ddrescue aborted? # Was ddrescue aborted?
@ -1103,7 +1114,7 @@ def run_ddrescue(state, pass_settings):
break break
else: else:
# Mark pass finished # Mark pass finished
bp.finish_pass(state.current_pass) b_p.finish_pass(state.current_pass)
update_sidepane(state) update_sidepane(state)
# Done # Done
@ -1378,14 +1389,14 @@ def update_sidepane(state):
output.append('─────────────────────') output.append('─────────────────────')
# Source(s) progress # Source(s) progress
for bp in state.block_pairs: for b_p in state.block_pairs:
if state.source.is_image(): if state.source.is_image():
output.append('{BLUE}Image File{CLEAR}'.format(**COLORS)) output.append('{BLUE}Image File{CLEAR}'.format(**COLORS))
else: else:
output.append('{BLUE}{source}{CLEAR}'.format( output.append('{BLUE}{source}{CLEAR}'.format(
source=bp.source_path, source=b_p.source_path,
**COLORS)) **COLORS))
output.extend(bp.status) output.extend(b_p.status)
output.append(' ') output.append(' ')
# EToC # EToC
@ -1410,5 +1421,3 @@ def update_sidepane(state):
if __name__ == '__main__': if __name__ == '__main__':
print("This file is not meant to be called directly.") print("This file is not meant to be called directly.")
# vim: sts=2 sw=2 ts=2