Fix various pylint warnings

This commit is contained in:
2Shirt 2021-09-27 21:48:11 -06:00
parent 2e485505d4
commit 90fb97ad91
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C
10 changed files with 52 additions and 43 deletions

View file

@ -67,7 +67,7 @@ class NonBlockingStreamReader():
out = self.read(0.1) out = self.read(0.1)
if out: if out:
out_bytes += out out_bytes += out
with open(out_path, 'a') as _f: with open(out_path, 'a', encoding='utf-8') as _f:
_f.write(out_bytes.decode('utf-8', errors='ignore')) _f.write(out_bytes.decode('utf-8', errors='ignore'))
# Close stream to prevent 100% CPU usage # Close stream to prevent 100% CPU usage
@ -200,6 +200,7 @@ def popen_program(cmd, minimized=False, pipe=False, shell=False, **kwargs):
shell=shell, shell=shell,
**kwargs) **kwargs)
try: try:
# pylint: disable=consider-using-with
proc = subprocess.Popen(**cmd_kwargs) proc = subprocess.Popen(**cmd_kwargs)
except FileNotFoundError: except FileNotFoundError:
LOG.error('Command not found: %s', cmd) LOG.error('Command not found: %s', cmd)

View file

@ -402,7 +402,7 @@ class State():
# Try loading JSON data # Try loading JSON data
if settings_file.exists(): if settings_file.exists():
with open(settings_file, 'r') as _f: with open(settings_file, 'r', encoding='utf-8') as _f:
try: try:
settings = json.loads(_f.read()) settings = json.loads(_f.read())
except (OSError, json.JSONDecodeError) as err: except (OSError, json.JSONDecodeError) as err:
@ -450,7 +450,7 @@ class State():
# Try saving JSON data # Try saving JSON data
try: try:
with open(settings_file, 'w') as _f: with open(settings_file, 'w', encoding='utf-8') as _f:
json.dump(settings, _f) json.dump(settings, _f)
except OSError as err: except OSError as err:
std.print_error('Failed to save clone settings') std.print_error('Failed to save clone settings')
@ -874,7 +874,7 @@ class State():
f'{self.working_dir}/' f'{self.working_dir}/'
f'sfdisk_{self.destination.path.name}.script' f'sfdisk_{self.destination.path.name}.script'
) )
with open(script_path, 'w') as _f: with open(script_path, 'w', encoding='utf-8') as _f:
_f.write('\n'.join(sfdisk_script)) _f.write('\n'.join(sfdisk_script))
# Skip real format for dry runs # Skip real format for dry runs
@ -884,7 +884,7 @@ class State():
# Format disk # Format disk
LOG.warning('Formatting destination: %s', self.destination.path) LOG.warning('Formatting destination: %s', self.destination.path)
with open(script_path, 'r') as _f: with open(script_path, 'r', encoding='utf-8') as _f:
proc = exe.run_program( proc = exe.run_program(
cmd=['sudo', 'sfdisk', self.destination.path], cmd=['sudo', 'sfdisk', self.destination.path],
stdin=_f, stdin=_f,
@ -912,7 +912,7 @@ class State():
pair.status[name] = 'Pending' pair.status[name] = 'Pending'
# Mark all non-trimmed, non-scraped, and bad areas as non-tried # Mark all non-trimmed, non-scraped, and bad areas as non-tried
with open(pair.map_path, 'r') as _f: with open(pair.map_path, 'r', encoding='utf-8') as _f:
for line in _f.readlines(): for line in _f.readlines():
line = line.strip() line = line.strip()
if line.startswith('0x') and line.endswith(bad_statuses): if line.startswith('0x') and line.endswith(bad_statuses):
@ -920,7 +920,7 @@ class State():
map_data.append(line) map_data.append(line)
# Save updated map # Save updated map
with open(pair.map_path, 'w') as _f: with open(pair.map_path, 'w', encoding='utf-8') as _f:
_f.write('\n'.join(map_data)) _f.write('\n'.join(map_data))
# Reinitialize status # Reinitialize status
@ -991,14 +991,15 @@ class State():
# State (self) # State (self)
std.save_pickles({'state': self}, debug_dir) std.save_pickles({'state': self}, debug_dir)
with open(f'{debug_dir}/state.report', 'a') as _f: with open(f'{debug_dir}/state.report', 'a', encoding='utf-8') as _f:
_f.write('[Debug report]\n') _f.write('[Debug report]\n')
_f.write('\n'.join(debug.generate_object_report(self))) _f.write('\n'.join(debug.generate_object_report(self)))
_f.write('\n') _f.write('\n')
# Block pairs # Block pairs
for _bp in self.block_pairs: for _bp in self.block_pairs:
with open(f'{debug_dir}/block_pairs.report', 'a') as _f: with open(
f'{debug_dir}/block_pairs.report', 'a', encoding='utf-8') as _f:
_f.write('[Debug report]\n') _f.write('[Debug report]\n')
_f.write('\n'.join(debug.generate_object_report(_bp))) _f.write('\n'.join(debug.generate_object_report(_bp)))
_f.write('\n') _f.write('\n')
@ -1063,7 +1064,7 @@ class State():
# Write to progress file # Write to progress file
out_path = pathlib.Path(f'{self.log_dir}/progress.out') out_path = pathlib.Path(f'{self.log_dir}/progress.out')
with open(out_path, 'w') as _f: with open(out_path, 'w', encoding='utf-8') as _f:
_f.write('\n'.join(report)) _f.write('\n'.join(report))
def update_top_panes(self): def update_top_panes(self):
@ -1996,7 +1997,7 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True):
"""Update SMART pane every 30 seconds.""" """Update SMART pane every 30 seconds."""
state.source.update_smart_details() state.source.update_smart_details()
now = datetime.datetime.now(tz=TIMEZONE).strftime('%Y-%m-%d %H:%M %Z') now = datetime.datetime.now(tz=TIMEZONE).strftime('%Y-%m-%d %H:%M %Z')
with open(f'{state.log_dir}/smart.out', 'w') as _f: with open(f'{state.log_dir}/smart.out', 'w', encoding='utf-8') as _f:
_f.write( _f.write(
std.color_string( std.color_string(
['SMART Attributes', f'Updated: {now}\n'], ['SMART Attributes', f'Updated: {now}\n'],

View file

@ -357,11 +357,11 @@ class State():
# State (self) # State (self)
std.save_pickles({'state': self}, debug_dir) std.save_pickles({'state': self}, debug_dir)
with open(f'{debug_dir}/state.report', 'a') as _f: with open(f'{debug_dir}/state.report', 'a', encoding='utf-8') as _f:
_f.write('\n'.join(debug.generate_object_report(self))) _f.write('\n'.join(debug.generate_object_report(self)))
# CPU/RAM # CPU/RAM
with open(f'{debug_dir}/cpu.report', 'a') as _f: with open(f'{debug_dir}/cpu.report', 'a', encoding='utf-8') as _f:
_f.write('\n'.join(debug.generate_object_report(self.cpu))) _f.write('\n'.join(debug.generate_object_report(self.cpu)))
_f.write('\n\n[Tests]') _f.write('\n\n[Tests]')
for name, test in self.cpu.tests.items(): for name, test in self.cpu.tests.items():
@ -370,7 +370,9 @@ class State():
# Disks # Disks
for disk in self.disks: for disk in self.disks:
with open(f'{debug_dir}/disk_{disk.path.name}.report', 'a') as _f: with open(
f'{debug_dir}/disk_{disk.path.name}.report', 'a',
encoding='utf-8') as _f:
_f.write('\n'.join(debug.generate_object_report(disk))) _f.write('\n'.join(debug.generate_object_report(disk)))
_f.write('\n\n[Tests]') _f.write('\n\n[Tests]')
for name, test in disk.tests.items(): for name, test in disk.tests.items():
@ -389,7 +391,7 @@ class State():
except Exception: # pylint: disable=broad-except except Exception: # pylint: disable=broad-except
LOG.ERROR('Error(s) encountered while exporting SMC data') LOG.ERROR('Error(s) encountered while exporting SMC data')
data = [line.strip() for line in data] data = [line.strip() for line in data]
with open(f'{debug_dir}/smc.data', 'a') as _f: with open(f'{debug_dir}/smc.data', 'a', encoding='utf-8') as _f:
_f.write('\n'.join(data)) _f.write('\n'.join(data))
def update_clock(self): def update_clock(self):
@ -426,7 +428,7 @@ class State():
# Write to progress file # Write to progress file
out_path = pathlib.Path(f'{self.log_dir}/progress.out') out_path = pathlib.Path(f'{self.log_dir}/progress.out')
with open(out_path, 'w') as _f: with open(out_path, 'w', encoding='utf-8') as _f:
_f.write('\n'.join(report)) _f.write('\n'.join(report))
def update_top_pane(self, text): def update_top_pane(self, text):
@ -632,7 +634,7 @@ def check_mprime_results(test_obj, working_dir):
"""Read file and split into lines, returns list.""" """Read file and split into lines, returns list."""
lines = [] lines = []
try: try:
with open(f'{working_dir}/{log_name}', 'r') as _f: with open(f'{working_dir}/{log_name}', 'r', encoding='utf-8') as _f:
lines = _f.readlines() lines = _f.readlines()
except FileNotFoundError: except FileNotFoundError:
# File may be missing on older systems # File may be missing on older systems
@ -916,7 +918,7 @@ def disk_io_benchmark(state, test_objects, skip_usb=True):
match.group(1) match.group(1)
# Show progress # Show progress
with open(log_path, 'a') as _f: with open(log_path, 'a', encoding='utf-8') as _f:
if _i % 5 == 0: if _i % 5 == 0:
percent = (_i / dd_values['Read Chunks']) * 100 percent = (_i / dd_values['Read Chunks']) * 100
_f.write(f' {graph.vertical_graph_line(percent, read_rates[-1])}\n') _f.write(f' {graph.vertical_graph_line(percent, read_rates[-1])}\n')
@ -1095,7 +1097,7 @@ def disk_surface_scan(state, test_objects):
# Start scan # Start scan
cmd = ['sudo', 'badblocks', '-sv', '-b', block_size, '-e', '1', dev_path] cmd = ['sudo', 'badblocks', '-sv', '-b', block_size, '-e', '1', dev_path]
with open(log_path, 'a') as _f: with open(log_path, 'a', encoding='utf-8') as _f:
size_str = std.bytes_to_string(dev.details["size"], use_binary=False) size_str = std.bytes_to_string(dev.details["size"], use_binary=False)
_f.write( _f.write(
std.color_string( std.color_string(
@ -1114,7 +1116,7 @@ def disk_surface_scan(state, test_objects):
) )
# Check results # Check results
with open(log_path, 'r') as _f: with open(log_path, 'r', encoding='utf-8') as _f:
for line in _f.readlines(): for line in _f.readlines():
line = std.strip_colors(line.strip()) line = std.strip_colors(line.strip())
if not line or line.startswith('Checking') or line.startswith('['): if not line or line.startswith('Checking') or line.startswith('['):
@ -1476,13 +1478,13 @@ def show_results(state):
def start_mprime(working_dir, log_path): def start_mprime(working_dir, log_path):
"""Start mprime and save filtered output to log, returns Popen object.""" """Start mprime and save filtered output to log, returns Popen object."""
set_apple_fan_speed('max') set_apple_fan_speed('max')
proc_mprime = subprocess.Popen( proc_mprime = subprocess.Popen( # pylint: disable=consider-using-with
['mprime', '-t'], ['mprime', '-t'],
cwd=working_dir, cwd=working_dir,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, stderr=subprocess.STDOUT,
) )
proc_grep = subprocess.Popen( proc_grep = subprocess.Popen( # pylint: disable=consider-using-with
'grep --ignore-case --invert-match --line-buffered stress.txt'.split(), 'grep --ignore-case --invert-match --line-buffered stress.txt'.split(),
stdin=proc_mprime.stdout, stdin=proc_mprime.stdout,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
@ -1521,7 +1523,9 @@ def start_sysbench(sensors, sensors_out, log_path, pane):
tmux.respawn_pane(pane, watch_file=log_path, watch_cmd='tail') tmux.respawn_pane(pane, watch_file=log_path, watch_cmd='tail')
# Start sysbench # Start sysbench
filehandle_sysbench = open(log_path, 'a') filehandle_sysbench = open( # pylint: disable=consider-using-with
log_path, 'a', encoding='utf-8',
)
proc_sysbench = exe.popen_program(sysbench_cmd, stdout=filehandle_sysbench) proc_sysbench = exe.popen_program(sysbench_cmd, stdout=filehandle_sysbench)
# Done # Done

View file

@ -356,9 +356,11 @@ class Disk(BaseObj):
self.details[attr] = -1 self.details[attr] = -1
# Set description # Set description
self.description = '{size_str} ({bus}) {model} {serial}'.format( self.description = (
size_str=bytes_to_string(self.details['size'], use_binary=False), f'{bytes_to_string(self.details["size"], use_binary=False)}'
**self.details, f' ({self.details["bus"]})'
f' {self.details["model"]}'
f' {self.details["serial"]}'
) )
def get_labels(self): def get_labels(self):
@ -482,7 +484,7 @@ class Disk(BaseObj):
test_minutes = int(test_minutes) + 10 test_minutes = int(test_minutes) + 10
# Start test # Start test
with open(log_path, 'w') as _f: with open(log_path, 'w', encoding='utf-8') as _f:
_f.write(f'{header_str}\nInitializing...') _f.write(f'{header_str}\nInitializing...')
cmd = [ cmd = [
'sudo', 'sudo',
@ -507,7 +509,7 @@ class Disk(BaseObj):
status_str = status_str.capitalize() status_str = status_str.capitalize()
# Update log # Update log
with open(log_path, 'w') as _f: with open(log_path, 'w', encoding='utf-8') as _f:
_f.write(f'{header_str}\nSMART self-test status:\n {status_str}') _f.write(f'{header_str}\nSMART self-test status:\n {status_str}')
# Check if finished # Check if finished

View file

@ -141,7 +141,7 @@ class Sensors():
if thermal_action: if thermal_action:
run_program(thermal_action, check=False) run_program(thermal_action, check=False)
report = self.generate_report(*temp_labels) report = self.generate_report(*temp_labels)
with open(out_path, 'w') as _f: with open(out_path, 'w', encoding='utf-8') as _f:
_f.write('\n'.join(report)) _f.write('\n'.join(report))
# Check if we should stop # Check if we should stop

View file

@ -374,7 +374,7 @@ def get_uuid(path):
def hide_items(ufd_dev_first_partition, items): def hide_items(ufd_dev_first_partition, items):
"""Set FAT32 hidden flag for items.""" """Set FAT32 hidden flag for items."""
with open('/root/.mtoolsrc', 'w') as _f: with open('/root/.mtoolsrc', 'w', encoding='utf-8') as _f:
_f.write(f'drive U: file="{ufd_dev_first_partition}"\n') _f.write(f'drive U: file="{ufd_dev_first_partition}"\n')
_f.write('mtools_skip_check=1\n') _f.write('mtools_skip_check=1\n')

View file

@ -218,7 +218,7 @@ def scan_corestorage_container(container, timeout=300):
# Create mapper device(s) if necessary # Create mapper device(s) if necessary
for name, cmd in detected_volumes.items(): for name, cmd in detected_volumes.items():
cmd_file = make_temp_file() cmd_file = make_temp_file()
cmd_file.write_text(cmd) cmd_file.write_text(cmd, encoding='utf-8')
proc = run_program( proc = run_program(
cmd=['sudo', 'dmsetup', 'create', name, cmd_file], cmd=['sudo', 'dmsetup', 'create', name, cmd_file],
check=False, check=False,

View file

@ -987,8 +987,8 @@ def run_bleachbit(cleaners, preview=True):
proc = run_tool('BleachBit', 'bleachbit_console', *cmd_args, cbin=True) proc = run_tool('BleachBit', 'bleachbit_console', *cmd_args, cbin=True)
# Save logs # Save logs
log_path.write_text(proc.stdout) log_path.write_text(proc.stdout, encoding='utf-8')
log_path.with_suffix('.err').write_text(proc.stderr) log_path.with_suffix('.err').write_text(proc.stderr, encoding='utf-8')
def run_hitmanpro(): def run_hitmanpro():
@ -1025,7 +1025,7 @@ def run_kvrt():
download_tool('KVRT', 'KVRT') download_tool('KVRT', 'KVRT')
kvrt_path = get_tool_path('KVRT', 'KVRT') kvrt_path = get_tool_path('KVRT', 'KVRT')
tmp_file = fr'{os.environ.get("TMP")}\run_kvrt.cmd' tmp_file = fr'{os.environ.get("TMP")}\run_kvrt.cmd'
with open(tmp_file, 'w') as _f: with open(tmp_file, 'w', encoding='utf-8') as _f:
_f.write('@echo off\n') _f.write('@echo off\n')
_f.write(f'"{kvrt_path}" {" ".join(cmd_args)}\n') _f.write(f'"{kvrt_path}" {" ".join(cmd_args)}\n')
cmd = ('cmd', '/c', tmp_file, '-new_console:nb', '-new_console:s33V') cmd = ('cmd', '/c', tmp_file, '-new_console:nb', '-new_console:s33V')
@ -1036,7 +1036,7 @@ def run_kvrt():
# Run in background # Run in background
proc = run_tool('KVRT', 'KVRT', *cmd_args, download=True) proc = run_tool('KVRT', 'KVRT', *cmd_args, download=True)
log_path.write_text(proc.stdout) log_path.write_text(proc.stdout, encoding='utf-8')
def run_microsoft_defender(full=True): def run_microsoft_defender(full=True):
@ -1075,7 +1075,7 @@ def run_microsoft_defender(full=True):
# Run scan # Run scan
cmd = (defender_path, '-Scan', '-ScanType', '2' if full else '1') cmd = (defender_path, '-Scan', '-ScanType', '2' if full else '1')
proc = run_program(cmd, check=False) proc = run_program(cmd, check=False)
log_path.write_text(proc.stdout) log_path.write_text(proc.stdout, encoding='utf-8')
if proc.returncode > 0: if proc.returncode > 0:
raise GenericError('Failed to run scan or clean items.') raise GenericError('Failed to run scan or clean items.')
@ -1085,7 +1085,9 @@ def run_rkill():
log_path = format_log_path(log_name='RKill', timestamp=True, tool=True) log_path = format_log_path(log_name='RKill', timestamp=True, tool=True)
log_path.parent.mkdir(parents=True, exist_ok=True) log_path.parent.mkdir(parents=True, exist_ok=True)
whitelist_path = log_path.with_suffix('.wl') whitelist_path = log_path.with_suffix('.wl')
whitelist_path.write_text('\n'.join(map(str, RKILL_WHITELIST))) whitelist_path.write_text(
'\n'.join(map(str, RKILL_WHITELIST)), encoding='utf-8',
)
cmd_args = ( cmd_args = (
'-l', log_path, '-l', log_path,
'-w', whitelist_path, '-w', whitelist_path,
@ -1398,9 +1400,9 @@ def run_sfc_scan():
# Save output # Save output
os.makedirs(log_path.parent, exist_ok=True) os.makedirs(log_path.parent, exist_ok=True)
with open(log_path, 'a') as _f: with open(log_path, 'a', encoding='utf-8') as _f:
_f.write(proc.stdout) _f.write(proc.stdout)
with open(err_path, 'a') as _f: with open(err_path, 'a', encoding='utf-8') as _f:
_f.write(proc.stderr) _f.write(proc.stderr)
# Check result # Check result

View file

@ -449,7 +449,6 @@ def auto_set_custom_power_plan():
def auto_enable_bsod_minidumps(): def auto_enable_bsod_minidumps():
"""Enable saving minidumps during BSoDs.""" """Enable saving minidumps during BSoDs."""
cmd = ['wmic', 'RECOVEROS', 'set', 'DebugInfoType', '=', '3']
TRY_PRINT.run('Enable BSoD mini dumps...', enable_bsod_minidumps) TRY_PRINT.run('Enable BSoD mini dumps...', enable_bsod_minidumps)
@ -732,12 +731,12 @@ def install_firefox():
# Revert default profile if needed # Revert default profile if needed
if revert_default: if revert_default:
out = [] out = []
for line in profiles_ini.read_text().splitlines(): for line in profiles_ini.read_text(encoding='utf-8').splitlines():
if 'Default=Profile' in line: if 'Default=Profile' in line:
out.append(f'Default={current_default_profile}') out.append(f'Default={current_default_profile}')
else: else:
out.append(line) out.append(line)
profiles_ini.write_text('\n'.join(out)) profiles_ini.write_text('\n'.join(out), encoding='utf-8')
def install_libreoffice( def install_libreoffice(

View file

@ -803,7 +803,7 @@ def generate_debug_report():
if log_path: if log_path:
report.append('------ Start Log -------') report.append('------ Start Log -------')
report.append('') report.append('')
with open(log_path, 'r') as log_file: with open(log_path, 'r', encoding='utf-8') as log_file:
report.extend(log_file.read().splitlines()) report.extend(log_file.read().splitlines())
report.append('') report.append('')
report.append('------- End Log --------') report.append('------- End Log --------')