"""WizardKit: Graph Functions""" # vim: sts=2 sw=2 ts=2 import base64 import json import logging import math import pathlib import time import requests import Gnuplot from wk.cfg.net import BENCHMARK_SERVER, IMGUR_CLIENT_ID from wk.std import color_string # Hack to hide X11 error when running in CLI mode Gnuplot.gp.GnuplotOpts.default_term = 'xterm' # STATIC VARIABLES LOG = logging.getLogger(__name__) GRAPH_HORIZONTAL = ('▁', '▂', '▃', '▄', '▅', '▆', '▇', '█') GRAPH_VERTICAL = ( '▏', '▎', '▍', '▌', '▋', '▊', '▉', '█', '█▏', '█▎', '█▍', '█▌', '█▋', '█▊', '█▉', '██', '██▏', '██▎', '██▍', '██▌', '██▋', '██▊', '██▉', '███', '███▏', '███▎', '███▍', '███▌', '███▋', '███▊', '███▉', '████', ) # SCALE_STEPS: These scales allow showing differences between HDDs and SSDs # on the same graph. SCALE_STEPS = { 8: [2**(0.56*(x+1))+(16*(x+1)) for x in range(8)], 16: [2**(0.56*(x+1))+(16*(x+1)) for x in range(16)], 32: [2**(0.56*(x+1)/2)+(16*(x+1)/2) for x in range(32)], } # THRESHOLDS: These are the rate_list (in MB/s) used to color graphs THRESH_FAIL = 65 * 1024**2 THRESH_WARN = 135 * 1024**2 THRESH_GREAT = 750 * 1024**2 # Functions def export_io_graph(disk, log_dir, read_rates): """Exports PNG graph using gnuplot.""" # Safety check if not read_rates: raise RuntimeError(f'No read rates for {disk.path}') # Prep max_rate = max(read_rates) / (1024**2) max_rate = max(800, max_rate) out_path = pathlib.Path(f'{log_dir}/{disk.path.name}_iobenchmark.png') plot_data = out_path.with_suffix('.data') # Adjust Y-axis range to either 1000 or roughly max rate + 200 ## Round up to the nearest 100 and then add 200 y_range = (math.ceil(max_rate/100)*100) + 200 # Save plot data to file for Gnuplot with open(plot_data, 'w') as _f: for i, rate in enumerate(read_rates): percent = (i+1) / len(read_rates) * 100 rate = int(rate / (1024**2)) _f.write(f'{percent:0.1f} {rate}\n') # Run gnuplot commands _g = Gnuplot.Gnuplot() _g('reset') _g(f'set output "{out_path}"') _g('set terminal png large size 660,300 truecolor font "Noto Sans,11"') _g('set title "I/O Benchmark"') _g(f'set yrange [0:{y_range}]') _g('set style data lines') _g(f'plot "{plot_data}" title "{disk.description.replace("_", " ")}"') # Cleanup _g.close() del _g return out_path def generate_horizontal_graph(rate_list, graph_width=40, oneline=False): """Generate horizontal graph from rate_list, returns list.""" graph = ['', '', '', ''] scale = 8 if oneline else 32 # Build graph for rate in merge_rates(rate_list, graph_width=graph_width): step = get_graph_step(rate, scale=scale) # Set color rate_color = None if rate < THRESH_FAIL: rate_color = 'RED' elif rate < THRESH_WARN: rate_color = 'YELLOW' elif rate > THRESH_GREAT: rate_color = 'GREEN' # Build graph full_block = color_string((GRAPH_HORIZONTAL[-1],), (rate_color,)) if step >= 24: graph[0] += color_string((GRAPH_HORIZONTAL[step-24],), (rate_color,)) graph[1] += full_block graph[2] += full_block graph[3] += full_block elif step >= 16: graph[0] += ' ' graph[1] += color_string((GRAPH_HORIZONTAL[step-16],), (rate_color,)) graph[2] += full_block graph[3] += full_block elif step >= 8: graph[0] += ' ' graph[1] += ' ' graph[2] += color_string((GRAPH_HORIZONTAL[step-8],), (rate_color,)) graph[3] += full_block else: graph[0] += ' ' graph[1] += ' ' graph[2] += ' ' graph[3] += color_string((GRAPH_HORIZONTAL[step],), (rate_color,)) # Done if oneline: graph = graph[-1:] return graph def get_graph_step(rate, scale=16): """Get graph step based on rate and scale, returns int.""" rate_in_mb = rate / (1024**2) step = 0 # Iterate over scale_steps backwards for _r in range(scale-1, -1, -1): if rate_in_mb >= SCALE_STEPS[scale][_r]: step = _r break # Done return step def merge_rates(rates, graph_width=40): """Merge rates to have entries equal to the width, returns list.""" merged_rates = [] offset = 0 slice_width = int(len(rates) / graph_width) # Merge rates for _i in range(graph_width): merged_rates.append(sum(rates[offset:offset+slice_width])/slice_width) offset += slice_width # Done return merged_rates def upload_to_imgur(image_path): """Upload image to Imgur and return image url as str.""" image_data = None image_link = None # Bail early if not image_path: raise RuntimeError(f'Invalid image path: {image_path}') # Read image file and convert to base64 then convert to str with open(image_path, 'rb') as _f: image_data = base64.b64encode(_f.read()).decode() # POST image url = 'https://api.imgur.com/3/image' boundary = '----WebKitFormBoundary7MA4YWxkTrZu0gW' payload = ( f'--{boundary}\r\nContent-Disposition: form-data; ' f'name="image"\r\n\r\n{image_data}\r\n--{boundary}--' ) headers = { 'content-type': f'multipart/form-data; boundary={boundary}', 'Authorization': f'Client-ID {IMGUR_CLIENT_ID}', } response = requests.request('POST', url, data=payload, headers=headers) # Return image link if response.ok: json_data = json.loads(response.text) image_link = json_data['data']['link'] return image_link def upload_to_nextcloud(image_path, ticket_number, dev_name): """Upload image to Nextcloud server and return folder url as str.""" image_data = None # Bail early if not image_path: raise RuntimeError(f'Invalid image path: {image_path}') # Read image file and convert to base64 with open(image_path, 'rb') as _f: image_data = _f.read() # PUT image url = ( f'{BENCHMARK_SERVER["Url"]}/' f'{ticket_number}_iobenchmark' f'_{dev_name}_{time.strftime("%Y-%m-%d_%H%M_%z")}.png' ) requests.put( url, data=image_data, headers={'X-Requested-With': 'XMLHttpRequest'}, auth=(BENCHMARK_SERVER['User'], BENCHMARK_SERVER['Pass']), verify='/usr/local/bin/wk/cfg/1201_Root_CA.crt', ) # Return folder link return BENCHMARK_SERVER['Short Url'] def vertical_graph_line(percent, rate, scale=32): """Build colored graph string using thresholds, returns str.""" color_bar = None color_rate = None step = get_graph_step(rate, scale=scale) # Set colors if rate < THRESH_FAIL: color_bar = 'RED' color_rate = 'YELLOW' elif rate < THRESH_WARN: color_bar = 'YELLOW' color_rate = 'YELLOW' elif rate > THRESH_GREAT: color_bar = 'GREEN' color_rate = 'GREEN' # Build string line = color_string( strings=( f'{percent:5.1f}%', f'{GRAPH_VERTICAL[step]:<4}', f'{rate/(1000**2):6.1f} MB/s', ), colors=( None, color_bar, color_rate, ), sep=' ', ) # Done return line if __name__ == '__main__': print("This file is not meant to be called directly.")