WizardKit/scripts/wk/net.py
2019-12-13 19:01:26 -07:00

260 lines
6.6 KiB
Python

"""WizardKit: Net Functions"""
# vim: sts=2 sw=2 ts=2
import os
import pathlib
import re
import psutil
from wk.exe import get_json_from_command, run_program
from wk.std import PLATFORM, GenericError, show_data
from wk.cfg.net import BACKUP_SERVERS
# REGEX
REGEX_VALID_IP = re.compile(
r'(10.\d+.\d+.\d+'
r'|172.(1[6-9]|2\d|3[0-1])'
r'|192.168.\d+.\d+)',
re.IGNORECASE)
# Functions
def connected_to_private_network(raise_on_error=False):
"""Check if connected to a private network, returns bool.
This checks for a valid private IP assigned to this system.
NOTE: If one isn't found and raise_on_error=True then an exception is raised.
NOTE 2: If one is found and raise_on_error=True then None is returned.
"""
connected = False
# Check IPs
devs = psutil.net_if_addrs()
for dev in devs.values():
for family in dev:
if REGEX_VALID_IP.search(family.address):
# Valid IP found
connected = True
break
if connected:
break
# No valid IP found
if not connected and raise_on_error:
raise GenericError('Not connected to a network')
# Done
if raise_on_error:
connected = None
return connected
def mount_backup_shares(read_write=False):
"""Mount backup shares using OS specific methods."""
report = []
for name, details in BACKUP_SERVERS.items():
mount_point = None
mount_str = f'{name} (//{details["Address"]}/{details["Share"]})'
# Prep mount point
if PLATFORM in ('Darwin', 'Linux'):
mount_point = pathlib.Path(f'/Backups/{name}')
try:
if not mount_point.exists():
# Script should be run as user so sudo is required
run_program(['sudo', 'mkdir', '-p', mount_point])
except OSError:
# Assuming permission denied under macOS
pass
if mount_point:
mount_str += f' to {mount_point}'
# Check if already mounted
if share_is_mounted(details):
report.append(f'(Already) Mounted {mount_str}')
# Skip to next share
continue
# Mount share
proc = mount_network_share(details, mount_point, read_write=read_write)
if proc.returncode:
report.append(f'Failed to Mount {mount_str}')
else:
report.append(f'Mounted {mount_str}')
# Done
return report
def mount_network_share(details, mount_point=None, read_write=False):
"""Mount network share using OS specific methods."""
cmd = None
address = details['Address']
share = details['Share']
username = details['RO-User']
password = details['RO-Pass']
if read_write:
username = details['RW-User']
password = details['RW-Pass']
# Network check
if not connected_to_private_network():
raise RuntimeError('Not connected to a network')
# Build OS-specific command
if PLATFORM == 'Darwin':
cmd = [
'sudo',
'mount',
'-t', 'smbfs',
'-o', f'{"rw" if read_write else "ro"}',
f'//{username}:{password}@{address}/{share}',
mount_point,
]
elif PLATFORM == 'Linux':
cmd = [
'sudo',
'mount',
'-t', 'cifs',
'-o', (
f'{"rw" if read_write else "ro"}'
f',uid={os.getuid()}'
f',gid={os.getgid()}'
f',username={username}'
f',{"password=" if password else "guest"}{password}'
),
f'//{address}/{share}',
mount_point
]
elif PLATFORM == 'Windows':
cmd = ['net', 'use']
if mount_point:
cmd.append(f'{mount_point}:')
cmd.append(f'/user:{username}')
cmd.append(fr'\\{address}\{share}')
cmd.append(password)
# Mount share
return run_program(cmd, check=False)
def ping(addr='google.com'):
"""Attempt to ping addr."""
cmd = (
'ping',
'-n' if psutil.WINDOWS else '-c',
'2',
addr,
)
run_program(cmd)
def share_is_mounted(details):
"""Check if dev/share/etc is mounted, returns bool."""
mounted = False
if PLATFORM == 'Darwin':
# Weak and naive text search
proc = run_program(['mount'], check=False)
for line in proc.stdout.splitlines():
if f'{details["Address"]}/{details["Share"]}' in line:
mounted = True
break
elif PLATFORM == 'Linux':
cmd = [
'findmnt',
'--list',
'--json',
'--invert',
'--types', (
'autofs,binfmt_misc,bpf,cgroup,cgroup2,configfs,debugfs,devpts,'
'devtmpfs,hugetlbfs,mqueue,proc,pstore,securityfs,sysfs,tmpfs'
),
'--output', 'SOURCE',
]
mount_data = get_json_from_command(cmd)
for row in mount_data.get('filesystems', []):
if row['source'] == f'//{details["Address"]}/{details["Share"]}':
mounted = True
break
#TODO: Check mount status under Windows
#elif PLATFORM == 'Windows':
# Done
return mounted
def show_valid_addresses():
"""Show all valid private IP addresses assigned to the system."""
devs = psutil.net_if_addrs()
for dev, families in sorted(devs.items()):
for family in families:
if REGEX_VALID_IP.search(family.address):
# Valid IP found
show_data(message=dev, data=family.address)
def speedtest():
"""Run a network speedtest using speedtest-cli."""
cmd = ['speedtest-cli', '--simple']
proc = run_program(cmd, check=False)
output = [line.strip() for line in proc.stdout.splitlines() if line.strip()]
output = [line.split() for line in output]
output = [(a, float(b), c) for a, b, c in output]
return [f'{a:<10}{b:6.2f} {c}' for a, b, c in output]
def unmount_backup_shares():
"""Unmount backup shares."""
report = []
for name, details in BACKUP_SERVERS.items():
kwargs = {}
source_str = f'{name} (//{details["Address"]}/{details["Share"]})'
# Check if mounted
if not share_is_mounted(details):
report.append(f'Not mounted {source_str}')
continue
# Build OS specific kwargs
if PLATFORM in ('Darwin', 'Linux'):
kwargs['mount_point'] = f'/Backups/{name}'
elif PLATFORM == 'Windows':
kwargs['details'] = details
# Unmount and add to report
proc = unmount_network_share(**kwargs)
if proc.returncode:
report.append(f'Failed to unmount {source_str}')
else:
report.append(f'Unmounted {source_str}')
# Done
return report
def unmount_network_share(details=None, mount_point=None):
"""Unmount network share"""
cmd = []
# Build OS specific command
if PLATFORM in ('Darwin', 'Linux'):
cmd = ['sudo', 'umount', mount_point]
elif PLATFORM == 'Windows':
cmd = ['net', 'use']
if mount_point:
cmd.append(f'{mount_point}:')
elif details:
cmd.append(fr'\\{details["Address"]}\{details["Share"]}')
cmd.append('/delete')
# Unmount share
return run_program(cmd, check=False)
if __name__ == '__main__':
print("This file is not meant to be called directly.")