PEP8 Cleanup

This commit is contained in:
Alan Mason 2018-07-22 16:27:34 -06:00 committed by GitHub
parent 9e48c1d1a6
commit 1f63f91144
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -13,7 +13,7 @@ from functions.data import *
from operator import itemgetter
# STATIC VARIABLES
AUTHORIZED_DEST_FSTYPES = ['ext3', 'ext4', 'xfs']
RECOMMENDED_FSTYPES = ['ext3', 'ext4', 'xfs']
AUTO_NEXT_PASS_1_THRESHOLD = 85
AUTO_NEXT_PASS_2_THRESHOLD = 98
DDRESCUE_SETTINGS = {
@ -36,11 +36,13 @@ USAGE = """ {script_name} clone [source [destination]]
(e.g. {script_name} clone /dev/sda /dev/sdb)
"""
# Functions
def abort_ddrescue_tui():
run_program(['losetup', '-D'])
abort()
def build_outer_panes(source, dest):
"""Build top and side panes."""
clear_screen()
@ -67,10 +69,12 @@ def build_outer_panes(source, dest):
# Side pane
update_progress(source)
tmux_splitw('-dhl', '21',
tmux_splitw(
'-dhl', '21',
'watch', '--color', '--no-title', '--interval', '1',
'cat', source['Progress Out'])
def check_dest_paths(source):
"""Check for image and/or map file and alert user about details."""
dd_image_exists = os.path.exists(source['Dest Paths']['Image'])
@ -112,11 +116,13 @@ def check_dest_paths(source):
if abort_imaging or (resume_files_exist and not ask(p)):
abort_ddrescue_tui()
def dest_safety_check(source, dest):
"""Verify the destination is appropriate for the source."""
source_size = source['Details']['size']
if dest['Is Dir']:
cmd = ['findmnt', '-J',
cmd = [
'findmnt', '-J',
'-o', 'SOURCE,TARGET,FSTYPE,OPTIONS,SIZE,AVAIL,USED',
'-T', dest['Path']]
result = run_program(cmd)
@ -141,14 +147,16 @@ def dest_safety_check(source, dest):
# Imaging: ensure 120% of source size is available
print_error(
'Not enough free space on destination, refusing to continue.')
print_standard(' Dest {d_size} < Required {s_size}'.format(
print_standard(
' Dest {d_size} < Required {s_size}'.format(
d_size=human_readable_size(dest_size),
s_size=human_readable_size(source_size * 1.2)))
abort_ddrescue_tui()
elif source['Type'] == 'Clone' and source_size > dest_size:
# Cloning: ensure dest >= size
print_error('Destination is too small, refusing to continue.')
print_standard(' Dest {d_size} < Source {s_size}'.format(
print_standard(
' Dest {d_size} < Source {s_size}'.format(
d_size=human_readable_size(dest_size),
s_size=human_readable_size(source_size)))
abort_ddrescue_tui()
@ -156,12 +164,12 @@ def dest_safety_check(source, dest):
# Imaging specific checks
if source['Type'] == 'Image':
# Filesystem Type
if dest['Filesystem'] not in AUTHORIZED_DEST_FSTYPES:
if dest['Filesystem'] not in RECOMMENDED_FSTYPES:
print_error(
'Destination filesystem "{}" is not a recommended type.'.format(
'Destination filesystem "{}" is not recommended.'.format(
dest['Filesystem']))
print_info('Authorized types are: {}'.format(
' / '.join(AUTHORIZED_DEST_FSTYPES).upper()))
print_info('Recommended types are: {}'.format(
' / '.join(RECOMMENDED_FSTYPES).upper()))
print_standard(' ')
if not ask('Proceed anyways? (Strongly discouraged)'):
abort_ddrescue_tui()
@ -181,6 +189,7 @@ def dest_safety_check(source, dest):
'Destination is not mounted read-write, refusing to continue.')
abort_ddrescue_tui()
def get_device_details(dev_path):
"""Get device details via lsblk, returns JSON dict."""
try:
@ -199,11 +208,13 @@ def get_device_details(dev_path):
# Just return the first device (there should only be one)
return json_data['blockdevices'][0]
def get_device_size_in_bytes(s):
"""Convert size string from lsblk string to bytes, returns int."""
s = re.sub(r'(\d+\.?\d*)\s*([KMGTB])B?', r'\1 \2B', s, re.IGNORECASE)
return convert_to_bytes(s)
def get_recovery_scope_size(source):
"""Calculate total size of selected dev(s)."""
source['Total Size'] = 0
@ -216,6 +227,7 @@ def get_recovery_scope_size(source):
source['Size'] = get_device_size_in_bytes(source['Details']['size'])
source['Total Size'] = source['Size']
def get_status_color(s, t_success=99, t_warn=90):
"""Get color based on status, returns str."""
color = COLORS['CLEAR']
@ -238,6 +250,7 @@ def get_status_color(s, t_success=99, t_warn=90):
color = COLORS['RED']
return color
def mark_all_passes_pending(source):
"""Mark all devs and passes as pending in preparation for retry."""
source['Current Pass'] = 'Pass 1'
@ -248,6 +261,7 @@ def mark_all_passes_pending(source):
child[p_num]['Status'] = 'Pending'
child[p_num]['Done'] = False
def menu_clone(source_path, dest_path):
"""ddrescue cloning menu."""
@ -260,7 +274,8 @@ def menu_clone(source_path, dest_path):
source['Recovered Size'] = 0,
source['Total Size'] = 0,
source['Type'] = 'Clone'
dest = select_device('destination', dest_path,
dest = select_device(
'destination', dest_path,
skip_device=source['Details'], allow_image_file=False)
dest_safety_check(source, dest)
@ -284,6 +299,7 @@ def menu_clone(source_path, dest_path):
run_program(['tmux', 'kill-window'])
exit_script()
def menu_ddrescue(*args):
"""Main ddrescue loop/menu."""
args = list(args)
@ -312,6 +328,7 @@ def menu_ddrescue(*args):
show_usage(script_name)
exit_script()
def menu_image(source_path, dest_path):
"""ddrescue imaging menu."""
@ -349,6 +366,7 @@ def menu_image(source_path, dest_path):
run_program(['tmux', 'kill-window'])
exit_script()
def menu_main(source, dest):
"""Main menu is used to set ddrescue settings."""
title = '{GREEN}ddrescue TUI: Main Menu{CLEAR}\n\n'.format(**COLORS)
@ -435,11 +453,11 @@ def menu_main(source, dest):
break
if source[current_pass]['Done']:
min_status = source[current_pass]['Min Status']
if (current_pass == 'Pass 1'
and min_status < AUTO_NEXT_PASS_1_THRESHOLD):
if (current_pass == 'Pass 1' and
min_status < AUTO_NEXT_PASS_1_THRESHOLD):
auto_run = False
elif (current_pass == 'Pass 2'
and min_status < AUTO_NEXT_PASS_2_THRESHOLD):
elif (current_pass == 'Pass 2' and
min_status < AUTO_NEXT_PASS_2_THRESHOLD):
auto_run = False
# Update current pass for next iteration
current_pass = source['Current Pass']
@ -449,6 +467,7 @@ def menu_main(source, dest):
elif selection == 'Q':
break
def menu_select_children(source):
"""Select child device(s) or whole disk, returns list."""
dev_options = [{
@ -513,6 +532,7 @@ def menu_select_children(source):
if d['Selected'] and 'Whole device' not in d['Base Name']]
return selected_children
def menu_select_device(title='Which device?', skip_device={}):
"""Select block device via a menu, returns dev_path as str."""
skip_names = [
@ -565,6 +585,7 @@ def menu_select_device(title='Which device?', skip_device={}):
elif selection == 'Q':
abort_ddrescue_tui()
def menu_select_path(skip_device={}):
"""Select path via menu, returns path as str."""
pwd = os.path.realpath(global_vars['Env']['PWD'])
@ -636,6 +657,7 @@ def menu_select_path(skip_device={}):
print_error('Invalid path "{}"'.format(m_path))
return s_path
def menu_settings(source):
"""Change advanced ddrescue settings."""
title = '{GREEN}ddrescue TUI: Expert Settings{CLEAR}\n\n'.format(**COLORS)
@ -686,6 +708,7 @@ def menu_settings(source):
elif selection == 'M':
break
def read_map_file(map_path):
"""Read map file with ddrescuelog and return data as dict."""
map_data = {}
@ -718,6 +741,7 @@ def read_map_file(map_path):
return map_data
def run_ddrescue(source, dest, settings):
"""Run ddrescue pass."""
current_pass = source['Current Pass']
@ -748,7 +772,7 @@ def run_ddrescue(source, dest, settings):
source_devs = source['Children']
# Set heights
## NOTE: 12/33 is based on min heights for SMART/ddrescue panes (12+22+1sep)
# NOTE: 12/33 is based on min heights for SMART/ddrescue panes (12+22+1sep)
result = run_program(['tput', 'lines'])
height = int(result.stdout.decode().strip())
height_smart = int(height * (12 / 33))
@ -772,12 +796,13 @@ def run_ddrescue(source, dest, settings):
# Set ddrescue cmd
if source['Type'] == 'Clone':
cmd = ['ddrescue', *settings, '--force',
s_dev['Dev Path'], dest['Dev Path'], s_dev['Dest Paths']['Map']]
cmd = [
'ddrescue', *settings, '--force', s_dev['Dev Path'],
dest['Dev Path'], s_dev['Dest Paths']['Map']]
else:
cmd = ['ddrescue', *settings,
s_dev['Dev Path'], s_dev['Dest Paths']['Image'],
s_dev['Dest Paths']['Map']]
cmd = [
'ddrescue', *settings, s_dev['Dev Path'],
s_dev['Dest Paths']['Image'], s_dev['Dest Paths']['Map']]
# Start ddrescue
try:
@ -814,6 +839,7 @@ def run_ddrescue(source, dest, settings):
pause('Press Enter to return to main menu... ')
run_program(['tmux', 'kill-pane', '-t', smart_pane])
def select_dest_path(provided_path=None, skip_device={}):
dest = {'Is Dir': True, 'Is Image': False}
@ -851,6 +877,7 @@ def select_dest_path(provided_path=None, skip_device={}):
return dest
def select_device(description='device', provided_path=None,
skip_device={}, allow_image_file=True):
"""Select device via provided path or menu, return dev as dict."""
@ -903,14 +930,17 @@ def select_device(description='device', provided_path=None,
width = int((int(result.stdout.decode().strip()) - 21) / 2) - 2
if len(dev['Display Name']) > width:
if dev['Is Image']:
dev['Display Name'] = '...{}'.format(dev['Display Name'][-(width-3):])
dev['Display Name'] = '...{}'.format(
dev['Display Name'][-(width-3):])
else:
dev['Display Name'] = '{}...'.format(dev['Display Name'][:(width-3)])
dev['Display Name'] = '{}...'.format(
dev['Display Name'][:(width-3)])
else:
dev['Display Name'] = dev['Display Name']
return dev
def set_dest_image_paths(source, dest):
"""Set destination image path for source and any child devices."""
if source['Type'] == 'Clone':
@ -944,6 +974,7 @@ def set_dest_image_paths(source, dest):
'Image': '{}.dd'.format(base),
'Map': '{}.map'.format(base)}
def setup_loopback_device(source_path):
"""Setup a loopback device for source_path, returns dev_path as str."""
cmd = (
@ -962,6 +993,7 @@ def setup_loopback_device(source_path):
else:
return dev_path
def show_device_details(dev_path):
"""Display device details on screen."""
cmd = (
@ -982,6 +1014,7 @@ def show_device_details(dev_path):
for line in output:
print_standard(line)
def show_safety_check():
"""Display safety check message and get confirmation from user."""
print_standard('\nSAFETY CHECK')
@ -992,6 +1025,7 @@ def show_safety_check():
if not ask('Asking again to confirm, is this correct?'):
abort_ddrescue_tui()
def show_selection_details(source, dest):
clear_screen()
@ -1017,17 +1051,20 @@ def show_selection_details(source, dest):
dest['Free Space'], dest['Filesystem']))
print_standard(' ')
def show_usage(script_name):
print_info('Usage:')
print_standard(USAGE.format(script_name=script_name))
pause()
def tmux_splitw(*args):
"""Run tmux split-window command and return output as str."""
cmd = ['tmux', 'split-window', *args]
result = run_program(cmd)
return result.stdout.decode().strip()
def update_progress(source, end_run=False):
"""Update progress file."""
current_pass = source['Current Pass']
@ -1049,7 +1086,8 @@ def update_progress(source, end_run=False):
next_pass = 'Done'
if 'Progress Out' not in source:
source['Progress Out'] = '{}/progress.out'.format(global_vars['LogDir'])
source['Progress Out'] = '{}/progress.out'.format(
global_vars['LogDir'])
output = []
if source['Type'] == 'Clone':
output.append(' {BLUE}Cloning Status{CLEAR}'.format(**COLORS))
@ -1163,7 +1201,8 @@ def update_progress(source, end_run=False):
pass
else:
s_display = '{:0.2f} %'.format(s_display)
output.append('{p_num}{s_color}{s_display:>15}{CLEAR}'.format(
output.append(
'{p_num}{s_color}{s_display:>15}{CLEAR}'.format(
p_num=p_num,
s_color=get_status_color(child[p_num]['Status']),
s_display=s_display,
@ -1184,7 +1223,8 @@ def update_progress(source, end_run=False):
pass
else:
s_display = '{:0.2f} %'.format(s_display)
output.append('{p_num}{s_color}{s_display:>15}{CLEAR}'.format(
output.append(
'{p_num}{s_color}{s_display:>15}{CLEAR}'.format(
p_num=p_num,
s_color=get_status_color(source[p_num]['Status']),
s_display=s_display,
@ -1196,6 +1236,7 @@ def update_progress(source, end_run=False):
with open(source['Progress Out'], 'w') as f:
f.writelines(output)
if __name__ == '__main__':
print("This file is not meant to be called directly.")