Added safety checks for the destination

* Dev size / avail space checks
* Permission checks
  * No mount option checks (yet?)
This commit is contained in:
2Shirt 2018-07-18 20:54:51 -06:00
parent 88c28a3f25
commit f2c557f77c
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C

View file

@ -5,6 +5,7 @@ import pathlib
import psutil
import re
import signal
import stat
import time
from functions.common import *
@ -12,6 +13,7 @@ from functions.data import *
from operator import itemgetter
# STATIC VARIABLES
AUTHORIZED_DEST_FSTYPES = ['ext3', 'ext4', 'xfs']
AUTO_NEXT_PASS_1_THRESHOLD = 85
AUTO_NEXT_PASS_2_THRESHOLD = 98
DDRESCUE_SETTINGS = {
@ -33,7 +35,8 @@ USAGE = """ {script_name} clone [source [destination]]
# Functions
def abort_ddrescue_tui():
run_program(['losetup', '-D'])
# TODO uncomment line below
# run_program(['losetup', '-D'])
abort()
def build_outer_panes(source, dest):
@ -65,7 +68,72 @@ def build_outer_panes(source, dest):
tmux_splitw('-dhl', '21',
'watch', '--color', '--no-title', '--interval', '1',
'cat', source['Progress Out'])
def dest_safety_check(source, dest):
"""Verify the destination is appropriate for the source."""
source_size = source['Details']['size']
if dest['Is Dir']:
cmd = ['findmnt', '-D', '-J',
'-T', dest['Path']]
result = run_program(cmd)
try:
json_data = json.loads(result.stdout.decode())
except Exception:
# Welp, let's abort
print_error('Failed to verify destination usability.')
abort_ddrescue_tui()
else:
dest_size = json_data['filesystems'][0]['avail']
dest['Free Space'] = dest_size
dest['Filesystem'] = json_data['filesystems'][0]['fstype']
else:
dest_size = dest['Details']['size']
# Fix strings before converting to bytes
source_size = re.sub(
r'(\d+\.?\d*)\s*([KMGTB])B?', r'\1 \2B', source_size.upper())
dest_size = re.sub(
r'(\d+\.?\d*)\s*([KMGTB])B?', r'\1 \2B', dest_size.upper())
# Convert to bytes and compare size
source_size = convert_to_bytes(source_size)
dest_size = convert_to_bytes(dest_size)
if source['Type'] == 'Image' and dest_size < (source_size * 1.2):
# Imaging: ensure 120% of source size is available
print_error(
'Not enough free space on destination, refusing to continue.')
print_standard(' Dest {d_size} < Required {s_size}'.format(
d_size = human_readable_size(dest_size),
s_size = human_readable_size(source_size * 1.2)))
abort_ddrescue_tui()
elif source['Type'] == 'Clone' and source_size > dest_size:
# Cloning: ensure dest >= size
print_error('Destination is too small, refusing to continue.')
print_standard(' Dest {d_size} < Source {s_size}'.format(
d_size = human_readable_size(dest_size),
s_size = human_readable_size(source_size)))
abort_ddrescue_tui()
# Filesystem checks
if source['Type'] == 'Image':
# Filesystem Type
if dest['Filesystem'] not in AUTHORIZED_DEST_FSTYPES:
print_error(
'Destination filesystem "{}" is not a recommended type.'.format(
dest['Filesystem']))
if not ask('Proceed anyways? (strongly discouraged by author)'):
abort_ddrescue_tui()
# Read-Write access
## Note: only checks path permissions, not mount options
## if the FS is RO then ddrescue will fail later
dest_ok = True
dest_st_mode = os.stat(dest['Path']).st_mode
dest_ok = dest_ok and dest_st_mode & stat.S_IRUSR
dest_ok = dest_ok and dest_st_mode & stat.S_IWUSR
dest_ok = dest_ok and dest_st_mode & stat.S_IXUSR
if not dest_ok:
print_error('Destination is not writable, refusing to continue.')
abort_ddrescue_tui()
def get_device_details(dev_path):
"""Get device details via lsblk, returns JSON dict."""
@ -180,6 +248,7 @@ def menu_clone(source_path, dest_path):
source['Type'] = 'Clone'
dest = select_device('destination', dest_path,
skip_device = source['Details'], allow_image_file = False)
dest_safety_check(source, dest)
# Show selection details
show_selection_details(source, dest)
@ -237,6 +306,7 @@ def menu_image(source_path, dest_path):
source['Pass 3'] = {'Status': 'Pending', 'Done': False}
source['Type'] = 'Image'
dest = select_dest_path(dest_path, skip_device=source['Details'])
dest_safety_check(source, dest)
# Show selection details
show_selection_details(source, dest)
@ -352,10 +422,6 @@ def menu_main(source):
elif current_pass == 'Pass 2' and recovered > 98:
auto_run = True
# Update current pass for next iteration
print_info('State:')
print_standard(' Pass #: {}\n Auto: {}\n Recovered: {}'.format(
current_pass, auto_run, recovered))
pause()
current_pass = source['Current Pass']
elif selection == 'C':
@ -652,7 +718,8 @@ def run_ddrescue(source, settings):
try:
clear_screen()
print_info('Current dev: {}'.format(dev['Dev Path']))
ddrescue_proc = popen_program(['./__choose_exit', *settings])
#ddrescue_proc = popen_program(['./__choose_exit', *settings])
ddrescue_proc = popen_program(['./__exit_ok', *settings])
ddrescue_proc.wait()
except KeyboardInterrupt:
# Catch user abort
@ -715,7 +782,7 @@ def run_ddrescue(source, settings):
#update_progress(source)
def select_dest_path(provided_path=None, skip_device={}):
dest = {}
dest = {'Is Dir': True, 'Is Image': False}
# Set path
if provided_path:
@ -754,7 +821,7 @@ def select_dest_path(provided_path=None, skip_device={}):
def select_device(description='device', provided_path=None,
skip_device={}, allow_image_file=True):
"""Select device via provided path or menu, return dev as dict."""
dev = {'Is Image': False}
dev = {'Is Dir': False, 'Is Image': False}
# Set path
if provided_path:
@ -777,6 +844,8 @@ def select_device(description='device', provided_path=None,
# Get device details
dev['Details'] = get_device_details(dev['Dev Path'])
if 'Children' not in dev:
dev['Children'] = {}
# Check for parent device(s)
while dev['Details']['pkname']:
@ -877,6 +946,9 @@ def show_selection_details(source, dest):
else:
print_success('Destination path')
print_standard(dest['Path'])
print_info('{:<8}{}'.format('FREE', 'FSTYPE'))
print_standard('{:<8}{}'.format(
dest['Free Space'], dest['Filesystem']))
print_standard(' ')
def show_usage(script_name):