"""WizardKit: Graph Functions""" # vim: sts=2 sw=2 ts=2 import base64 import json import logging import pathlib import time from matplotlib import pyplot import requests from wk.cfg.net import BENCHMARK_SERVER, IMGUR_CLIENT_ID from wk.ui import ansi # STATIC VARIABLES LOG = logging.getLogger(__name__) GRAPH_FIGURE_SIZE = ( 9.167, # 660px at 72dpi 4.167, # 300px at 72dpi ) GRAPH_HORIZONTAL = ('▁', '▂', '▃', '▄', '▅', '▆', '▇', '█') GRAPH_VERTICAL = ( '▏', '▎', '▍', '▌', '▋', '▊', '▉', '█', '█▏', '█▎', '█▍', '█▌', '█▋', '█▊', '█▉', '██', '██▏', '██▎', '██▍', '██▌', '██▋', '██▊', '██▉', '███', '███▏', '███▎', '███▍', '███▌', '███▋', '███▊', '███▉', '████', ) # SCALE_STEPS: These scales allow showing differences between HDDs and SSDs # on the same graph. SCALE_STEPS = { 8: [2**(0.56*(x+1))+(16*(x+1)) for x in range(8)], 16: [2**(0.56*(x+1))+(16*(x+1)) for x in range(16)], 32: [2**(0.56*(x+1)/2)+(16*(x+1)/2) for x in range(32)], } # THRESHOLDS: These are the rate_list (in MB/s) used to color graphs THRESH_FAIL = 65 * 1024**2 THRESH_WARN = 135 * 1024**2 THRESH_GREAT = 750 * 1024**2 # Functions def reduce_cpu_temp_list(list_of_temps, factor=5, start=30) -> list[float]: """Reduce temperature list by averaging adjacent values. NOTE: This only averages values after the amount defined by start. NOTE 2: If the last group is less than the factor it is simply dropped. """ new_list = list_of_temps[:start] list_of_temps = list_of_temps[start:] while list_of_temps: group = list_of_temps[:factor] list_of_temps = list_of_temps[factor:] if len(group) < factor: continue new_list.append(sum(group)/factor) return new_list def export_cpu_graph(cpu_description, log_dir, sensor_history): """Exports PNG graph using matplotlib.""" lines = {} offset_labels = [] out_path = pathlib.Path(f'{log_dir}/cpu_tests.png') run_averages = {} # Safety check if not sensor_history: raise RuntimeError('No sensor_data available.') # Prep offset = 0 for run_label, sensor_data in sensor_history: all_run_temps = [] offset_labels.append((offset, run_label)) run_length = 0 for adapter in sensor_data.get('CPUTemps', {}).values(): for source, data in adapter.items(): y_values = data['Temps'] if run_label in ('Sysbench', 'Prime95'): y_values = reduce_cpu_temp_list(y_values) all_run_temps.extend(y_values) run_length = max(run_length, len(y_values)) if source not in lines: lines[source] = [] lines[source].extend(y_values) try: run_averages[run_label] = { 'Start': offset, 'End': offset+run_length, 'Temp': sum(all_run_temps) / len(all_run_temps), } except ZeroDivisionError: # Ignore pass offset += run_length # Build graph _, ax = pyplot.subplots( dpi=72, figsize=list(x*2 for x in GRAPH_FIGURE_SIZE), layout='constrained', ) ax.set_title(cpu_description) ax.set_xticks([]) for label, data in lines.items(): ax.plot(data, label=label) #prev_label = 'Idle' # Always skip Idle prev_label = '' for offset, label in offset_labels: color = 'grey' if label == prev_label: continue if label == 'Sysbench': color = 'orange' if label == 'Prime95': color = 'red' if label == 'Cooldown': color = 'blue' label = '' ax.axvline(x=offset, color=color, label=label, linestyle='--') #ax.axvline(x=offset, color=color) prev_label = label for run_label, data in run_averages.items(): if run_label not in ('Prime95', 'Sysbench'): continue ax.axhline( y = data['Temp'], color = 'orange' if run_label == 'Sysbench' else 'red', label = f'{run_label} (Avg)', linestyle = ':', ) ax.legend() pyplot.savefig(out_path) # Done return out_path def export_io_graph(disk, log_dir, read_rates): """Exports PNG graph using matplotlib.""" # Safety check if not read_rates: raise RuntimeError(f'No read rates for {disk.path}') # Prep max_rate = max(read_rates) / (1024**2) max_rate = max(800, max_rate) out_path = pathlib.Path(f'{log_dir}/{disk.path.name}_iobenchmark.png') plot_data = ([], []) # Prep data for graph for i, rate in enumerate(read_rates): plot_data[0].append((i+1) / len(read_rates) * 100) # Step plot_data[1].append(int(rate / (1024**2))) # Data # Build graph _, ax = pyplot.subplots( dpi=72, figsize=GRAPH_FIGURE_SIZE, layout='constrained', ) ax.set_title('I/O Benchmark') ax.plot(*plot_data, label=disk.description.replace('_', ' ')) ax.legend() pyplot.savefig(out_path) # Done return out_path def generate_horizontal_graph( rate_list: list[float], graph_width: int = 40, oneline: bool = False) -> list[str]: """Generate horizontal graph from rate_list, returns list.""" graph = ['', '', '', ''] scale = 8 if oneline else 32 # Build graph for rate in merge_rates(rate_list, graph_width=graph_width): step = get_graph_step(rate, scale=scale) # Set color rate_color = None if rate < THRESH_FAIL: rate_color = 'RED' elif rate < THRESH_WARN: rate_color = 'YELLOW' elif rate > THRESH_GREAT: rate_color = 'GREEN' # Build graph full_block = ansi.color_string((GRAPH_HORIZONTAL[-1],), (rate_color,)) if step >= 24: graph[0] += ansi.color_string((GRAPH_HORIZONTAL[step-24],), (rate_color,)) graph[1] += full_block graph[2] += full_block graph[3] += full_block elif step >= 16: graph[0] += ' ' graph[1] += ansi.color_string((GRAPH_HORIZONTAL[step-16],), (rate_color,)) graph[2] += full_block graph[3] += full_block elif step >= 8: graph[0] += ' ' graph[1] += ' ' graph[2] += ansi.color_string((GRAPH_HORIZONTAL[step-8],), (rate_color,)) graph[3] += full_block else: graph[0] += ' ' graph[1] += ' ' graph[2] += ' ' graph[3] += ansi.color_string((GRAPH_HORIZONTAL[step],), (rate_color,)) # Done if oneline: graph = graph[-1:] return graph def get_graph_step(rate: float, scale: int = 16) -> int: """Get graph step based on rate and scale, returns int.""" rate_in_mb = rate / (1024**2) step = 0 # Iterate over scale_steps backwards for _r in range(scale-1, -1, -1): if rate_in_mb >= SCALE_STEPS[scale][_r]: step = _r break # Done return step def merge_rates( rates: list[float], graph_width: int = 40, ) -> list[int | float]: """Merge rates to have entries equal to the width, returns list.""" merged_rates = [] offset = 0 slice_width = int(len(rates) / graph_width) # Merge rates for _ in range(graph_width): merged_rates.append(sum(rates[offset:offset+slice_width])/slice_width) offset += slice_width # Done return merged_rates def upload_to_imgur(image_path): """Upload image to Imgur and return image url as str.""" image_data = None image_link = None # Bail early if not image_path: raise RuntimeError(f'Invalid image path: {image_path}') # Read image file and convert to base64 then convert to str with open(image_path, 'rb') as _f: image_data = base64.b64encode(_f.read()).decode() # POST image url = 'https://api.imgur.com/3/image' boundary = '----WebKitFormBoundary7MA4YWxkTrZu0gW' payload = ( f'--{boundary}\r\nContent-Disposition: form-data; ' f'name="image"\r\n\r\n{image_data}\r\n--{boundary}--' ) headers = { 'content-type': f'multipart/form-data; boundary={boundary}', 'Authorization': f'Client-ID {IMGUR_CLIENT_ID}', } response = requests.request( 'POST', url, data=payload, headers=headers, timeout=60, ) # Return image link if response.ok: json_data = json.loads(response.text) image_link = json_data['data']['link'] return image_link def upload_to_nextcloud(image_path, ticket_number, dev_name): """Upload image to Nextcloud server and return folder url as str.""" image_data = None ticket_range = f'{ticket_number[:3]}00-{ticket_number[:3]}99' # Bail early if not image_path: raise RuntimeError(f'Invalid image path: {image_path}') # Read image file and convert to base64 with open(image_path, 'rb') as _f: image_data = _f.read() # PUT image url = ( f'{BENCHMARK_SERVER["Url"]}/' f'{ticket_range}/{ticket_number}_iobenchmark' f'_{dev_name}_{time.strftime("%Y-%m-%d_%H%M_%z")}.png' ) requests.put( url, data=image_data, auth=(BENCHMARK_SERVER['User'], BENCHMARK_SERVER['Pass']), headers={'X-Requested-With': 'XMLHttpRequest'}, timeout=60, ) # Return folder link return BENCHMARK_SERVER['Short Url'] def vertical_graph_line(percent: float, rate: float, scale: int = 32) -> str: """Build colored graph string using thresholds, returns str.""" color_bar = None color_rate = None step = get_graph_step(rate, scale=scale) # Set colors if rate < THRESH_FAIL: color_bar = 'RED' color_rate = 'YELLOW' elif rate < THRESH_WARN: color_bar = 'YELLOW' color_rate = 'YELLOW' elif rate > THRESH_GREAT: color_bar = 'GREEN' color_rate = 'GREEN' # Build string line = ansi.color_string( strings=( f'{percent:5.1f}%', f'{GRAPH_VERTICAL[step]:<4}', f'{rate/(1000**2):6.1f} MB/s', ), colors=( None, color_bar, color_rate, ), sep=' ', ) # Done return line if __name__ == '__main__': print("This file is not meant to be called directly.")