diff --git a/scripts/wk/exe.py b/scripts/wk/exe.py index ab49018b..e57240c9 100644 --- a/scripts/wk/exe.py +++ b/scripts/wk/exe.py @@ -67,7 +67,7 @@ class NonBlockingStreamReader(): out = self.read(0.1) if out: out_bytes += out - with open(out_path, 'a') as _f: + with open(out_path, 'a', encoding='utf-8') as _f: _f.write(out_bytes.decode('utf-8', errors='ignore')) # Close stream to prevent 100% CPU usage @@ -200,6 +200,7 @@ def popen_program(cmd, minimized=False, pipe=False, shell=False, **kwargs): shell=shell, **kwargs) try: + # pylint: disable=consider-using-with proc = subprocess.Popen(**cmd_kwargs) except FileNotFoundError: LOG.error('Command not found: %s', cmd) diff --git a/scripts/wk/hw/ddrescue.py b/scripts/wk/hw/ddrescue.py index ab869313..17faec9b 100644 --- a/scripts/wk/hw/ddrescue.py +++ b/scripts/wk/hw/ddrescue.py @@ -402,7 +402,7 @@ class State(): # Try loading JSON data if settings_file.exists(): - with open(settings_file, 'r') as _f: + with open(settings_file, 'r', encoding='utf-8') as _f: try: settings = json.loads(_f.read()) except (OSError, json.JSONDecodeError) as err: @@ -450,7 +450,7 @@ class State(): # Try saving JSON data try: - with open(settings_file, 'w') as _f: + with open(settings_file, 'w', encoding='utf-8') as _f: json.dump(settings, _f) except OSError as err: std.print_error('Failed to save clone settings') @@ -874,7 +874,7 @@ class State(): f'{self.working_dir}/' f'sfdisk_{self.destination.path.name}.script' ) - with open(script_path, 'w') as _f: + with open(script_path, 'w', encoding='utf-8') as _f: _f.write('\n'.join(sfdisk_script)) # Skip real format for dry runs @@ -884,7 +884,7 @@ class State(): # Format disk LOG.warning('Formatting destination: %s', self.destination.path) - with open(script_path, 'r') as _f: + with open(script_path, 'r', encoding='utf-8') as _f: proc = exe.run_program( cmd=['sudo', 'sfdisk', self.destination.path], stdin=_f, @@ -912,7 +912,7 @@ class State(): pair.status[name] = 'Pending' # Mark all non-trimmed, non-scraped, and bad areas as non-tried - with open(pair.map_path, 'r') as _f: + with open(pair.map_path, 'r', encoding='utf-8') as _f: for line in _f.readlines(): line = line.strip() if line.startswith('0x') and line.endswith(bad_statuses): @@ -920,7 +920,7 @@ class State(): map_data.append(line) # Save updated map - with open(pair.map_path, 'w') as _f: + with open(pair.map_path, 'w', encoding='utf-8') as _f: _f.write('\n'.join(map_data)) # Reinitialize status @@ -991,14 +991,15 @@ class State(): # State (self) std.save_pickles({'state': self}, debug_dir) - with open(f'{debug_dir}/state.report', 'a') as _f: + with open(f'{debug_dir}/state.report', 'a', encoding='utf-8') as _f: _f.write('[Debug report]\n') _f.write('\n'.join(debug.generate_object_report(self))) _f.write('\n') # Block pairs for _bp in self.block_pairs: - with open(f'{debug_dir}/block_pairs.report', 'a') as _f: + with open( + f'{debug_dir}/block_pairs.report', 'a', encoding='utf-8') as _f: _f.write('[Debug report]\n') _f.write('\n'.join(debug.generate_object_report(_bp))) _f.write('\n') @@ -1063,7 +1064,7 @@ class State(): # Write to progress file out_path = pathlib.Path(f'{self.log_dir}/progress.out') - with open(out_path, 'w') as _f: + with open(out_path, 'w', encoding='utf-8') as _f: _f.write('\n'.join(report)) def update_top_panes(self): @@ -1996,7 +1997,7 @@ def run_ddrescue(state, block_pair, pass_name, settings, dry_run=True): """Update SMART pane every 30 seconds.""" state.source.update_smart_details() now = datetime.datetime.now(tz=TIMEZONE).strftime('%Y-%m-%d %H:%M %Z') - with open(f'{state.log_dir}/smart.out', 'w') as _f: + with open(f'{state.log_dir}/smart.out', 'w', encoding='utf-8') as _f: _f.write( std.color_string( ['SMART Attributes', f'Updated: {now}\n'], diff --git a/scripts/wk/hw/diags.py b/scripts/wk/hw/diags.py index a052b6af..e5991b3f 100644 --- a/scripts/wk/hw/diags.py +++ b/scripts/wk/hw/diags.py @@ -357,11 +357,11 @@ class State(): # State (self) std.save_pickles({'state': self}, debug_dir) - with open(f'{debug_dir}/state.report', 'a') as _f: + with open(f'{debug_dir}/state.report', 'a', encoding='utf-8') as _f: _f.write('\n'.join(debug.generate_object_report(self))) # CPU/RAM - with open(f'{debug_dir}/cpu.report', 'a') as _f: + with open(f'{debug_dir}/cpu.report', 'a', encoding='utf-8') as _f: _f.write('\n'.join(debug.generate_object_report(self.cpu))) _f.write('\n\n[Tests]') for name, test in self.cpu.tests.items(): @@ -370,7 +370,9 @@ class State(): # Disks for disk in self.disks: - with open(f'{debug_dir}/disk_{disk.path.name}.report', 'a') as _f: + with open( + f'{debug_dir}/disk_{disk.path.name}.report', 'a', + encoding='utf-8') as _f: _f.write('\n'.join(debug.generate_object_report(disk))) _f.write('\n\n[Tests]') for name, test in disk.tests.items(): @@ -389,7 +391,7 @@ class State(): except Exception: # pylint: disable=broad-except LOG.ERROR('Error(s) encountered while exporting SMC data') data = [line.strip() for line in data] - with open(f'{debug_dir}/smc.data', 'a') as _f: + with open(f'{debug_dir}/smc.data', 'a', encoding='utf-8') as _f: _f.write('\n'.join(data)) def update_clock(self): @@ -426,7 +428,7 @@ class State(): # Write to progress file out_path = pathlib.Path(f'{self.log_dir}/progress.out') - with open(out_path, 'w') as _f: + with open(out_path, 'w', encoding='utf-8') as _f: _f.write('\n'.join(report)) def update_top_pane(self, text): @@ -632,7 +634,7 @@ def check_mprime_results(test_obj, working_dir): """Read file and split into lines, returns list.""" lines = [] try: - with open(f'{working_dir}/{log_name}', 'r') as _f: + with open(f'{working_dir}/{log_name}', 'r', encoding='utf-8') as _f: lines = _f.readlines() except FileNotFoundError: # File may be missing on older systems @@ -916,7 +918,7 @@ def disk_io_benchmark(state, test_objects, skip_usb=True): match.group(1) # Show progress - with open(log_path, 'a') as _f: + with open(log_path, 'a', encoding='utf-8') as _f: if _i % 5 == 0: percent = (_i / dd_values['Read Chunks']) * 100 _f.write(f' {graph.vertical_graph_line(percent, read_rates[-1])}\n') @@ -1095,7 +1097,7 @@ def disk_surface_scan(state, test_objects): # Start scan cmd = ['sudo', 'badblocks', '-sv', '-b', block_size, '-e', '1', dev_path] - with open(log_path, 'a') as _f: + with open(log_path, 'a', encoding='utf-8') as _f: size_str = std.bytes_to_string(dev.details["size"], use_binary=False) _f.write( std.color_string( @@ -1114,7 +1116,7 @@ def disk_surface_scan(state, test_objects): ) # Check results - with open(log_path, 'r') as _f: + with open(log_path, 'r', encoding='utf-8') as _f: for line in _f.readlines(): line = std.strip_colors(line.strip()) if not line or line.startswith('Checking') or line.startswith('['): @@ -1476,13 +1478,13 @@ def show_results(state): def start_mprime(working_dir, log_path): """Start mprime and save filtered output to log, returns Popen object.""" set_apple_fan_speed('max') - proc_mprime = subprocess.Popen( + proc_mprime = subprocess.Popen( # pylint: disable=consider-using-with ['mprime', '-t'], cwd=working_dir, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) - proc_grep = subprocess.Popen( + proc_grep = subprocess.Popen( # pylint: disable=consider-using-with 'grep --ignore-case --invert-match --line-buffered stress.txt'.split(), stdin=proc_mprime.stdout, stdout=subprocess.PIPE, @@ -1521,7 +1523,9 @@ def start_sysbench(sensors, sensors_out, log_path, pane): tmux.respawn_pane(pane, watch_file=log_path, watch_cmd='tail') # Start sysbench - filehandle_sysbench = open(log_path, 'a') + filehandle_sysbench = open( # pylint: disable=consider-using-with + log_path, 'a', encoding='utf-8', + ) proc_sysbench = exe.popen_program(sysbench_cmd, stdout=filehandle_sysbench) # Done diff --git a/scripts/wk/hw/obj.py b/scripts/wk/hw/obj.py index 3c09d9fd..d6923432 100644 --- a/scripts/wk/hw/obj.py +++ b/scripts/wk/hw/obj.py @@ -356,9 +356,11 @@ class Disk(BaseObj): self.details[attr] = -1 # Set description - self.description = '{size_str} ({bus}) {model} {serial}'.format( - size_str=bytes_to_string(self.details['size'], use_binary=False), - **self.details, + self.description = ( + f'{bytes_to_string(self.details["size"], use_binary=False)}' + f' ({self.details["bus"]})' + f' {self.details["model"]}' + f' {self.details["serial"]}' ) def get_labels(self): @@ -482,7 +484,7 @@ class Disk(BaseObj): test_minutes = int(test_minutes) + 10 # Start test - with open(log_path, 'w') as _f: + with open(log_path, 'w', encoding='utf-8') as _f: _f.write(f'{header_str}\nInitializing...') cmd = [ 'sudo', @@ -507,7 +509,7 @@ class Disk(BaseObj): status_str = status_str.capitalize() # Update log - with open(log_path, 'w') as _f: + with open(log_path, 'w', encoding='utf-8') as _f: _f.write(f'{header_str}\nSMART self-test status:\n {status_str}') # Check if finished diff --git a/scripts/wk/hw/sensors.py b/scripts/wk/hw/sensors.py index e5b27bc1..f0e7b97c 100644 --- a/scripts/wk/hw/sensors.py +++ b/scripts/wk/hw/sensors.py @@ -141,7 +141,7 @@ class Sensors(): if thermal_action: run_program(thermal_action, check=False) report = self.generate_report(*temp_labels) - with open(out_path, 'w') as _f: + with open(out_path, 'w', encoding='utf-8') as _f: _f.write('\n'.join(report)) # Check if we should stop diff --git a/scripts/wk/kit/ufd.py b/scripts/wk/kit/ufd.py index e1f7ee7a..a29bfcd6 100644 --- a/scripts/wk/kit/ufd.py +++ b/scripts/wk/kit/ufd.py @@ -374,7 +374,7 @@ def get_uuid(path): def hide_items(ufd_dev_first_partition, items): """Set FAT32 hidden flag for items.""" - with open('/root/.mtoolsrc', 'w') as _f: + with open('/root/.mtoolsrc', 'w', encoding='utf-8') as _f: _f.write(f'drive U: file="{ufd_dev_first_partition}"\n') _f.write('mtools_skip_check=1\n') diff --git a/scripts/wk/os/linux.py b/scripts/wk/os/linux.py index 73e6d85b..f3325fb7 100644 --- a/scripts/wk/os/linux.py +++ b/scripts/wk/os/linux.py @@ -218,7 +218,7 @@ def scan_corestorage_container(container, timeout=300): # Create mapper device(s) if necessary for name, cmd in detected_volumes.items(): cmd_file = make_temp_file() - cmd_file.write_text(cmd) + cmd_file.write_text(cmd, encoding='utf-8') proc = run_program( cmd=['sudo', 'dmsetup', 'create', name, cmd_file], check=False, diff --git a/scripts/wk/repairs/win.py b/scripts/wk/repairs/win.py index b2964fd3..7e045c7f 100644 --- a/scripts/wk/repairs/win.py +++ b/scripts/wk/repairs/win.py @@ -987,8 +987,8 @@ def run_bleachbit(cleaners, preview=True): proc = run_tool('BleachBit', 'bleachbit_console', *cmd_args, cbin=True) # Save logs - log_path.write_text(proc.stdout) - log_path.with_suffix('.err').write_text(proc.stderr) + log_path.write_text(proc.stdout, encoding='utf-8') + log_path.with_suffix('.err').write_text(proc.stderr, encoding='utf-8') def run_hitmanpro(): @@ -1025,7 +1025,7 @@ def run_kvrt(): download_tool('KVRT', 'KVRT') kvrt_path = get_tool_path('KVRT', 'KVRT') tmp_file = fr'{os.environ.get("TMP")}\run_kvrt.cmd' - with open(tmp_file, 'w') as _f: + with open(tmp_file, 'w', encoding='utf-8') as _f: _f.write('@echo off\n') _f.write(f'"{kvrt_path}" {" ".join(cmd_args)}\n') cmd = ('cmd', '/c', tmp_file, '-new_console:nb', '-new_console:s33V') @@ -1036,7 +1036,7 @@ def run_kvrt(): # Run in background proc = run_tool('KVRT', 'KVRT', *cmd_args, download=True) - log_path.write_text(proc.stdout) + log_path.write_text(proc.stdout, encoding='utf-8') def run_microsoft_defender(full=True): @@ -1075,7 +1075,7 @@ def run_microsoft_defender(full=True): # Run scan cmd = (defender_path, '-Scan', '-ScanType', '2' if full else '1') proc = run_program(cmd, check=False) - log_path.write_text(proc.stdout) + log_path.write_text(proc.stdout, encoding='utf-8') if proc.returncode > 0: raise GenericError('Failed to run scan or clean items.') @@ -1085,7 +1085,9 @@ def run_rkill(): log_path = format_log_path(log_name='RKill', timestamp=True, tool=True) log_path.parent.mkdir(parents=True, exist_ok=True) whitelist_path = log_path.with_suffix('.wl') - whitelist_path.write_text('\n'.join(map(str, RKILL_WHITELIST))) + whitelist_path.write_text( + '\n'.join(map(str, RKILL_WHITELIST)), encoding='utf-8', + ) cmd_args = ( '-l', log_path, '-w', whitelist_path, @@ -1398,9 +1400,9 @@ def run_sfc_scan(): # Save output os.makedirs(log_path.parent, exist_ok=True) - with open(log_path, 'a') as _f: + with open(log_path, 'a', encoding='utf-8') as _f: _f.write(proc.stdout) - with open(err_path, 'a') as _f: + with open(err_path, 'a', encoding='utf-8') as _f: _f.write(proc.stderr) # Check result diff --git a/scripts/wk/setup/win.py b/scripts/wk/setup/win.py index 7c500af4..80a8ef94 100644 --- a/scripts/wk/setup/win.py +++ b/scripts/wk/setup/win.py @@ -449,7 +449,6 @@ def auto_set_custom_power_plan(): def auto_enable_bsod_minidumps(): """Enable saving minidumps during BSoDs.""" - cmd = ['wmic', 'RECOVEROS', 'set', 'DebugInfoType', '=', '3'] TRY_PRINT.run('Enable BSoD mini dumps...', enable_bsod_minidumps) @@ -732,12 +731,12 @@ def install_firefox(): # Revert default profile if needed if revert_default: out = [] - for line in profiles_ini.read_text().splitlines(): + for line in profiles_ini.read_text(encoding='utf-8').splitlines(): if 'Default=Profile' in line: out.append(f'Default={current_default_profile}') else: out.append(line) - profiles_ini.write_text('\n'.join(out)) + profiles_ini.write_text('\n'.join(out), encoding='utf-8') def install_libreoffice( diff --git a/scripts/wk/std.py b/scripts/wk/std.py index f6979d13..c811bdc5 100644 --- a/scripts/wk/std.py +++ b/scripts/wk/std.py @@ -803,7 +803,7 @@ def generate_debug_report(): if log_path: report.append('------ Start Log -------') report.append('') - with open(log_path, 'r') as log_file: + with open(log_path, 'r', encoding='utf-8') as log_file: report.extend(log_file.read().splitlines()) report.append('') report.append('------- End Log --------')