Added functions.json
* Uses safer method to get JSON data from a command * Replaced nearly all uses of json.loads with new get_json_from_command() * Fixes issue #100
This commit is contained in:
parent
480222c1f2
commit
7381547b27
5 changed files with 70 additions and 72 deletions
|
|
@ -78,8 +78,7 @@ def find_core_storage_volumes(device_path=None):
|
|||
'--output', 'NAME,PARTTYPE']
|
||||
if device_path:
|
||||
cmd.append(device_path)
|
||||
result = run_program(cmd)
|
||||
json_data = json.loads(result.stdout.decode())
|
||||
json_data = get_json_from_command(cmd)
|
||||
devs = json_data.get('blockdevices', [])
|
||||
devs = [d for d in devs if d.get('parttype', '') == corestorage_uuid]
|
||||
if devs:
|
||||
|
|
@ -159,8 +158,7 @@ def get_mounted_volumes():
|
|||
'devtmpfs,hugetlbfs,mqueue,proc,pstore,securityfs,sysfs,tmpfs'
|
||||
),
|
||||
'-o', 'SOURCE,TARGET,FSTYPE,LABEL,SIZE,AVAIL,USED']
|
||||
result = run_program(cmd)
|
||||
json_data = json.loads(result.stdout.decode())
|
||||
json_data = get_json_from_command(cmd)
|
||||
mounted_volumes = []
|
||||
for item in json_data.get('filesystems', []):
|
||||
mounted_volumes.append(item)
|
||||
|
|
@ -185,8 +183,7 @@ def mount_volumes(
|
|||
find_core_storage_volumes(device_path)
|
||||
|
||||
# Get list of block devices
|
||||
result = run_program(cmd)
|
||||
json_data = json.loads(result.stdout.decode())
|
||||
json_data = get_json_from_command(cmd)
|
||||
devs = json_data.get('blockdevices', [])
|
||||
|
||||
# Get list of volumes
|
||||
|
|
|
|||
|
|
@ -1,6 +1,5 @@
|
|||
# Wizard Kit: Functions - ddrescue-tui
|
||||
|
||||
import json
|
||||
import pathlib
|
||||
import psutil
|
||||
import re
|
||||
|
|
@ -11,6 +10,7 @@ import time
|
|||
from collections import OrderedDict
|
||||
from functions.data import *
|
||||
from functions.hw_diags import *
|
||||
from functions.json import *
|
||||
from functions.tmux import *
|
||||
from operator import itemgetter
|
||||
from settings.ddrescue import *
|
||||
|
|
@ -346,15 +346,14 @@ class RecoveryState():
|
|||
map_allowed_fstypes = RECOMMENDED_FSTYPES.copy()
|
||||
map_allowed_fstypes.extend(['cifs', 'ext2', 'vfat'])
|
||||
map_allowed_fstypes.sort()
|
||||
json_data = {}
|
||||
json_data = get_json_from_command(cmd)
|
||||
|
||||
# Avoid saving map to non-persistent filesystem
|
||||
try:
|
||||
result = run_program(cmd)
|
||||
json_data = json.loads(result.stdout.decode())
|
||||
except Exception:
|
||||
# Abort if json_data is empty
|
||||
if not json_data:
|
||||
print_error('ERROR: Failed to verify map path')
|
||||
raise GenericAbort()
|
||||
|
||||
# Avoid saving map to non-persistent filesystem
|
||||
fstype = json_data.get(
|
||||
'filesystems', [{}])[0].get(
|
||||
'fstype', 'unknown')
|
||||
|
|
@ -547,21 +546,11 @@ def fix_tmux_panes(state, forced=False):
|
|||
|
||||
def get_device_details(dev_path):
|
||||
"""Get device details via lsblk, returns JSON dict."""
|
||||
try:
|
||||
cmd = (
|
||||
'lsblk',
|
||||
'--json',
|
||||
'--output-all',
|
||||
'--paths',
|
||||
dev_path)
|
||||
result = run_program(cmd)
|
||||
except CalledProcessError:
|
||||
# Return empty dict and let calling section deal with the issue
|
||||
return {}
|
||||
cmd = ['lsblk', '--json', '--output-all', '--paths', dev_path]
|
||||
json_data = get_json_from_command(cmd)
|
||||
|
||||
json_data = json.loads(result.stdout.decode())
|
||||
# Just return the first device (there should only be one)
|
||||
return json_data['blockdevices'][0]
|
||||
return json_data.get('blockdevices', [{}])[0]
|
||||
|
||||
|
||||
def get_device_report(dev_path):
|
||||
|
|
@ -594,17 +583,19 @@ def get_device_report(dev_path):
|
|||
|
||||
def get_dir_details(dir_path):
|
||||
"""Get dir details via findmnt, returns JSON dict."""
|
||||
try:
|
||||
result = run_program([
|
||||
'findmnt', '-J',
|
||||
'-o', 'SOURCE,TARGET,FSTYPE,OPTIONS,SIZE,AVAIL,USED',
|
||||
'-T', dir_path])
|
||||
json_data = json.loads(result.stdout.decode())
|
||||
except Exception:
|
||||
cmd = [
|
||||
'findmnt', '-J',
|
||||
'-o', 'SOURCE,TARGET,FSTYPE,OPTIONS,SIZE,AVAIL,USED',
|
||||
'-T', dir_path,
|
||||
]
|
||||
json_data = get_json_from_command(cmd)
|
||||
|
||||
# Raise exception if json_data is empty
|
||||
if not json_data:
|
||||
raise GenericError(
|
||||
'Failed to get directory details for "{}".'.format(self.path))
|
||||
else:
|
||||
return json_data['filesystems'][0]
|
||||
'Failed to get directory details for "{}".'.format(dir_path))
|
||||
|
||||
return json_data.get('filesystems', [{}])[0]
|
||||
|
||||
|
||||
def get_dir_report(dir_path):
|
||||
|
|
@ -1210,14 +1201,8 @@ def select_path(skip_device=None):
|
|||
|
||||
def select_device(description='device', skip_device=None):
|
||||
"""Select device via a menu, returns DevObj."""
|
||||
cmd = (
|
||||
'lsblk',
|
||||
'--json',
|
||||
'--nodeps',
|
||||
'--output-all',
|
||||
'--paths')
|
||||
result = run_program(cmd)
|
||||
json_data = json.loads(result.stdout.decode())
|
||||
cmd = ['lsblk', '--json', '--nodeps', '--output-all', '--paths']
|
||||
json_data = get_json_from_command(cmd)
|
||||
skip_names = []
|
||||
if skip_device:
|
||||
skip_names.append(skip_device.path)
|
||||
|
|
@ -1226,7 +1211,7 @@ def select_device(description='device', skip_device=None):
|
|||
|
||||
# Build menu
|
||||
dev_options = []
|
||||
for dev in json_data['blockdevices']:
|
||||
for dev in json_data.get('blockdevices', []):
|
||||
# Disable dev if in skip_names
|
||||
disabled = dev['name'] in skip_names or dev['pkname'] in skip_names
|
||||
|
||||
|
|
|
|||
|
|
@ -1,10 +1,10 @@
|
|||
# Wizard Kit: Functions - HW Diagnostics
|
||||
|
||||
import json
|
||||
import re
|
||||
import time
|
||||
|
||||
from collections import OrderedDict
|
||||
from functions.json import *
|
||||
from functions.sensors import *
|
||||
from functions.threading import *
|
||||
from functions.tmux import *
|
||||
|
|
@ -38,15 +38,10 @@ class CpuObj():
|
|||
def get_details(self):
|
||||
"""Get CPU details from lscpu."""
|
||||
cmd = ['lscpu', '--json']
|
||||
try:
|
||||
result = run_program(cmd, check=False)
|
||||
json_data = json.loads(result.stdout.decode())
|
||||
except Exception:
|
||||
# Ignore and leave self.lscpu empty
|
||||
return
|
||||
for line in json_data.get('lscpu', []):
|
||||
_field = line.get('field', None).replace(':', '')
|
||||
_data = line.get('data', None)
|
||||
json_data = get_json_from_command(cmd)
|
||||
for line in json_data.get('lscpu', [{}]):
|
||||
_field = line.get('field', '').replace(':', '')
|
||||
_data = line.get('data', '')
|
||||
if not _field and not _data:
|
||||
# Skip
|
||||
print_warning(_field, _data)
|
||||
|
|
@ -311,13 +306,8 @@ class DiskObj():
|
|||
def get_details(self):
|
||||
"""Get data from lsblk."""
|
||||
cmd = ['lsblk', '--json', '--output-all', '--paths', self.path]
|
||||
try:
|
||||
result = run_program(cmd, check=False)
|
||||
json_data = json.loads(result.stdout.decode())
|
||||
self.lsblk = json_data['blockdevices'][0]
|
||||
except Exception:
|
||||
# Leave self.lsblk empty
|
||||
pass
|
||||
json_data = get_json_from_command(cmd)
|
||||
self.lsblk = json_data.get('blockdevices', [{}])[0]
|
||||
|
||||
# Set necessary details
|
||||
self.lsblk['model'] = self.lsblk.get('model', 'Unknown Model')
|
||||
|
|
@ -358,12 +348,7 @@ class DiskObj():
|
|||
def get_smart_details(self):
|
||||
"""Get data from smartctl."""
|
||||
cmd = ['sudo', 'smartctl', '--all', '--json', self.path]
|
||||
try:
|
||||
result = run_program(cmd, check=False)
|
||||
self.smartctl = json.loads(result.stdout.decode())
|
||||
except Exception:
|
||||
# Leave self.smartctl empty
|
||||
pass
|
||||
self.smartctl = get_json_from_command(cmd)
|
||||
|
||||
# Check for attributes
|
||||
if KEY_NVME in self.smartctl:
|
||||
|
|
@ -518,9 +503,8 @@ class State():
|
|||
|
||||
# Add block devices
|
||||
cmd = ['lsblk', '--json', '--nodeps', '--paths']
|
||||
result = run_program(cmd, check=False)
|
||||
json_data = json.loads(result.stdout.decode())
|
||||
for disk in json_data['blockdevices']:
|
||||
json_data = get_json_from_command(cmd)
|
||||
for disk in json_data.get('blockdevices', []):
|
||||
skip_disk = False
|
||||
disk_obj = DiskObj(disk['name'])
|
||||
|
||||
|
|
|
|||
32
.bin/Scripts/functions/json.py
Normal file
32
.bin/Scripts/functions/json.py
Normal file
|
|
@ -0,0 +1,32 @@
|
|||
# Wizard Kit: Functions - JSON
|
||||
|
||||
import json
|
||||
|
||||
from functions.common import *
|
||||
|
||||
def get_json_from_command(cmd, ignore_errors=True):
|
||||
"""Capture JSON content from cmd output, returns dict.
|
||||
|
||||
If the data can't be decoded then either an exception is raised
|
||||
or an empty dict is returned depending on ignore_errors.
|
||||
"""
|
||||
errors = 'strict'
|
||||
json_data = {}
|
||||
|
||||
if ignore_errors:
|
||||
errors = 'ignore'
|
||||
|
||||
try:
|
||||
result = run_program(cmd, encoding='utf-8', errors=errors)
|
||||
json_data = json.loads(result.stdout)
|
||||
except (subprocess.CalledProcessError, json.decoder.JSONDecodeError):
|
||||
if not ignore_errors:
|
||||
raise
|
||||
|
||||
return json_data
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
print("This file is not meant to be called directly.")
|
||||
|
||||
# vim: sts=2 sw=2 ts=2
|
||||
|
|
@ -105,7 +105,7 @@ def get_raw_sensor_data():
|
|||
# Get raw data
|
||||
try:
|
||||
result = run_program(cmd)
|
||||
result = result.stdout.decode().splitlines()
|
||||
result = result.stdout.decode('utf-8', errors='ignore').splitlines()
|
||||
except subprocess.CalledProcessError:
|
||||
# Assuming no sensors available, set to empty list
|
||||
result = []
|
||||
|
|
|
|||
Loading…
Reference in a new issue