#!/usr/bin/env python3 # # Plot CSV files in terminal. # # Example: # ./scripts/plot.py bench.csv -xSIZE -ybench_read -W80 -H17 # # Copyright (c) 2022, The littlefs authors. # SPDX-License-Identifier: BSD-3-Clause # import bisect import codecs import collections as co import csv import io import itertools as it import math as mt import os import shlex import shutil import time try: import inotify_simple except ModuleNotFoundError: inotify_simple = None COLORS = [ '1;34', # bold blue '1;31', # bold red '1;32', # bold green '1;35', # bold purple '1;33', # bold yellow '1;36', # bold cyan '34', # blue '31', # red '32', # green '35', # purple '33', # yellow '36', # cyan ] CHARS_DOTS = " .':" CHARS_BRAILLE = ( '⠀⢀⡀⣀⠠⢠⡠⣠⠄⢄⡄⣄⠤⢤⡤⣤' '⠐⢐⡐⣐⠰⢰⡰⣰⠔⢔⡔⣔⠴⢴⡴⣴' '⠂⢂⡂⣂⠢⢢⡢⣢⠆⢆⡆⣆⠦⢦⡦⣦' '⠒⢒⡒⣒⠲⢲⡲⣲⠖⢖⡖⣖⠶⢶⡶⣶' '⠈⢈⡈⣈⠨⢨⡨⣨⠌⢌⡌⣌⠬⢬⡬⣬' '⠘⢘⡘⣘⠸⢸⡸⣸⠜⢜⡜⣜⠼⢼⡼⣼' '⠊⢊⡊⣊⠪⢪⡪⣪⠎⢎⡎⣎⠮⢮⡮⣮' '⠚⢚⡚⣚⠺⢺⡺⣺⠞⢞⡞⣞⠾⢾⡾⣾' '⠁⢁⡁⣁⠡⢡⡡⣡⠅⢅⡅⣅⠥⢥⡥⣥' '⠑⢑⡑⣑⠱⢱⡱⣱⠕⢕⡕⣕⠵⢵⡵⣵' '⠃⢃⡃⣃⠣⢣⡣⣣⠇⢇⡇⣇⠧⢧⡧⣧' '⠓⢓⡓⣓⠳⢳⡳⣳⠗⢗⡗⣗⠷⢷⡷⣷' '⠉⢉⡉⣉⠩⢩⡩⣩⠍⢍⡍⣍⠭⢭⡭⣭' '⠙⢙⡙⣙⠹⢹⡹⣹⠝⢝⡝⣝⠽⢽⡽⣽' '⠋⢋⡋⣋⠫⢫⡫⣫⠏⢏⡏⣏⠯⢯⡯⣯' '⠛⢛⡛⣛⠻⢻⡻⣻⠟⢟⡟⣟⠿⢿⡿⣿') CHARS_POINTS_AND_LINES = 'o' SI_PREFIXES = { 18: 'E', 15: 'P', 12: 'T', 9: 'G', 6: 'M', 3: 'K', 0: '', -3: 'm', -6: 'u', -9: 'n', -12: 'p', -15: 'f', -18: 'a', } SI2_PREFIXES = { 60: 'Ei', 50: 'Pi', 40: 'Ti', 30: 'Gi', 20: 'Mi', 10: 'Ki', 0: '', -10: 'mi', -20: 'ui', -30: 'ni', -40: 'pi', -50: 'fi', -60: 'ai', } # format a number to a strict character width using SI prefixes def si(x, w=4): if x == 0: return '0' # figure out prefix and scale # # note we adjust this so that 100K = .1M, which has more info # per character p = 3*int(mt.log(abs(x)*10, 10**3)) p = min(18, max(-18, p)) # format with enough digits s = '%.*f' % (w, abs(x) / (10.0**p)) s = s.lstrip('0') # truncate but only digits that follow the dot if '.' in s: s = s[:max(s.find('.'), w-(2 if x < 0 else 1))] s = s.rstrip('0') s = s.rstrip('.') return '%s%s%s' % ('-' if x < 0 else '', s, SI_PREFIXES[p]) def si2(x, w=5): if x == 0: return '0' # figure out prefix and scale # # note we adjust this so that 128Ki = .1Mi, which has more info # per character p = 10*int(mt.log(abs(x)*10, 2**10)) p = min(30, max(-30, p)) # format with enough digits s = '%.*f' % (w, abs(x) / (2.0**p)) s = s.lstrip('0') # truncate but only digits that follow the dot if '.' in s: s = s[:max(s.find('.'), w-(3 if x < 0 else 2))] s = s.rstrip('0') s = s.rstrip('.') return '%s%s%s' % ('-' if x < 0 else '', s, SI2_PREFIXES[p]) # parse escape strings def escape(s): return codecs.escape_decode(s.encode('utf8'))[0].decode('utf8') def openio(path, mode='r', buffering=-1): # allow '-' for stdin/stdout if path == '-': if 'r' in mode: return os.fdopen(os.dup(sys.stdin.fileno()), mode, buffering) else: return os.fdopen(os.dup(sys.stdout.fileno()), mode, buffering) else: return open(path, mode, buffering) if inotify_simple is None: Inotify = None else: class Inotify(inotify_simple.INotify): def __init__(self, paths): super().__init__() # wait for interesting events flags = (inotify_simple.flags.ATTRIB | inotify_simple.flags.CREATE | inotify_simple.flags.DELETE | inotify_simple.flags.DELETE_SELF | inotify_simple.flags.MODIFY | inotify_simple.flags.MOVED_FROM | inotify_simple.flags.MOVED_TO | inotify_simple.flags.MOVE_SELF) # recurse into directories for path in paths: if os.path.isdir(path): for dir, _, files in os.walk(path): self.add_watch(dir, flags) for f in files: self.add_watch(os.path.join(dir, f), flags) else: self.add_watch(path, flags) class RingIO: def __init__(self, maxlen=None, head=False): self.maxlen = maxlen self.head = head self.lines = co.deque(maxlen=maxlen) self.tail = io.StringIO() # trigger automatic sizing if maxlen == 0: self.resize(0) def __len__(self): return len(self.lines) def write(self, s): # note using split here ensures the trailing string has no newline lines = s.split('\n') if len(lines) > 1 and self.tail.getvalue(): self.tail.write(lines[0]) lines[0] = self.tail.getvalue() self.tail = io.StringIO() self.lines.extend(lines[:-1]) if lines[-1]: self.tail.write(lines[-1]) def resize(self, maxlen): self.maxlen = maxlen if maxlen == 0: maxlen = shutil.get_terminal_size((80, 5))[1] if maxlen != self.lines.maxlen: self.lines = co.deque(self.lines, maxlen=maxlen) canvas_lines = 1 def draw(self): # did terminal size change? if self.maxlen == 0: self.resize(0) # copy lines lines = self.lines.copy() # pad to fill any existing canvas, but truncate to terminal size h = shutil.get_terminal_size((80, 5))[1] lines.extend('' for _ in range( len(lines), min(RingIO.canvas_lines, h))) while len(lines) > h: if self.head: lines.pop() else: lines.popleft() # first thing first, give ourself a canvas while RingIO.canvas_lines < len(lines): sys.stdout.write('\n') RingIO.canvas_lines += 1 # write lines from top to bottom so later lines overwrite earlier # lines, note [xA/[xB stop at terminal boundaries for i, line in enumerate(lines): # move cursor, clear line, disable/reenable line wrapping sys.stdout.write('\r') if len(lines)-1-i > 0: sys.stdout.write('\x1b[%dA' % (len(lines)-1-i)) sys.stdout.write('\x1b[K') sys.stdout.write('\x1b[?7l') sys.stdout.write(line) sys.stdout.write('\x1b[?7h') if len(lines)-1-i > 0: sys.stdout.write('\x1b[%dB' % (len(lines)-1-i)) sys.stdout.flush() # parse different data representations def dat(x): # allow the first part of an a/b fraction if '/' in x: x, _ = x.split('/', 1) # first try as int try: return int(x, 0) except ValueError: pass # then try as float try: return float(x) # just don't allow infinity or nan if mt.isinf(x) or mt.isnan(x): raise ValueError("invalid dat %r" % x) except ValueError: pass # else give up raise ValueError("invalid dat %r" % x) # a hack log that preserves sign, with a linear region between -1 and 1 def symlog(x): if x > 1: return mt.log(x)+1 elif x < -1: return -mt.log(-x)-1 else: return x class Plot: def __init__(self, width, height, *, xlim=None, ylim=None, xlog=False, ylog=False, braille=False, dots=False): # scale if we're printing with dots or braille self.width = 2*width if braille else width self.height = (4*height if braille else 2*height if dots else height) self.xlim = xlim or (0, width) self.ylim = ylim or (0, height) self.xlog = xlog self.ylog = ylog self.braille = braille self.dots = dots self.grid = [('',False)]*(self.width*self.height) def scale(self, x, y): # scale and clamp try: if self.xlog: x = int(self.width * ( (symlog(x)-symlog(self.xlim[0])) / (symlog(self.xlim[1])-symlog(self.xlim[0])))) else: x = int(self.width * ( (x-self.xlim[0]) / (self.xlim[1]-self.xlim[0]))) if self.ylog: y = int(self.height * ( (symlog(y)-symlog(self.ylim[0])) / (symlog(self.ylim[1])-symlog(self.ylim[0])))) else: y = int(self.height * ( (y-self.ylim[0]) / (self.ylim[1]-self.ylim[0]))) except ZeroDivisionError: x = 0 y = 0 return x, y def point(self, x, y, *, color=COLORS[0], char=True): # scale x, y = self.scale(x, y) # ignore out of bounds points if x >= 0 and x < self.width and y >= 0 and y < self.height: self.grid[x + y*self.width] = (color, char) def line(self, x1, y1, x2, y2, *, color=COLORS[0], char=True): # scale x1, y1 = self.scale(x1, y1) x2, y2 = self.scale(x2, y2) # incremental error line algorithm ex = abs(x2 - x1) ey = -abs(y2 - y1) dx = +1 if x1 < x2 else -1 dy = +1 if y1 < y2 else -1 e = ex + ey while True: if x1 >= 0 and x1 < self.width and y1 >= 0 and y1 < self.height: self.grid[x1 + y1*self.width] = (color, char) e2 = 2*e if x1 == x2 and y1 == y2: break if e2 > ey: e += ey x1 += dx if x1 == x2 and y1 == y2: break if e2 < ex: e += ex y1 += dy if x2 >= 0 and x2 < self.width and y2 >= 0 and y2 < self.height: self.grid[x2 + y2*self.width] = (color, char) def plot(self, coords, *, color=COLORS[0], char=True, line_char=True): # draw lines if line_char: for (x1, y1), (x2, y2) in zip(coords, coords[1:]): if y1 is not None and y2 is not None: self.line(x1, y1, x2, y2, color=color, char=line_char) # draw points if char and (not line_char or char is not True): for x, y in coords: if y is not None: self.point(x, y, color=color, char=char) def draw(self, row, *, color=False): # scale if needed if self.braille: xscale, yscale = 2, 4 elif self.dots: xscale, yscale = 1, 2 else: xscale, yscale = 1, 1 y = self.height//yscale-1 - row row_ = [] for x in range(self.width//xscale): best_f = '' best_c = False # encode into a byte b = 0 for i in range(xscale*yscale): f, c = self.grid[x*xscale+(xscale-1-(i%xscale)) + (y*yscale+(i//xscale))*self.width] if c: b |= 1 << i if f: best_f = f if c and c is not True: best_c = c # use byte to lookup character if b: if best_c: c = best_c elif self.braille: c = CHARS_BRAILLE[b] else: c = CHARS_DOTS[b] else: c = ' ' # color? if b and color and best_f: c = '\x1b[%sm%s\x1b[m' % (best_f, c) # draw axis in blank spaces if not b: if x == 0 and y == 0: c = '+' elif x == 0 and y == self.height//yscale-1: c = '^' elif x == self.width//xscale-1 and y == 0: c = '>' elif x == 0: c = '|' elif y == 0: c = '-' row_.append(c) return ''.join(row_) def collect(csv_paths, renames=[], defines=[]): # collect results from CSV files fields = [] results = [] for path in csv_paths: try: with openio(path) as f: reader = csv.DictReader(f, restval='') fields.extend( k for k in reader.fieldnames if k not in fields) for r in reader: # apply any renames if renames: # make a copy so renames can overlap r_ = {} for new_k, old_k in renames: if old_k in r: r_[new_k] = r[old_k] r.update(r_) # filter by matching defines if not all(k in r and r[k] in vs for k, vs in defines): continue results.append(r) except FileNotFoundError: pass return fields, results def fold(results, by=None, x=None, y=None, defines=[], labels=None): # filter by matching defines if defines: results_ = [] for r in results: if all(k in r and r[k] in vs for k, vs in defines): results_.append(r) results = results_ if by: # find all 'by' values keys = set() for r in results: keys.add(tuple(r.get(k, '') for k in by)) keys = sorted(keys) # collect all datasets datasets = co.OrderedDict() for key in (keys if by else [()]): for x_ in (x if x else [None]): for y_ in y: # organize by 'by', x, and y dataset = [] i = 0 for r in results: # filter by 'by' if by and not all( k in r and r[k] == v for k, v in zip(by, key)): continue # find xs if x_ is not None: if x_ not in r: continue try: x__ = dat(r[x_]) except ValueError: continue else: # fallback to enumeration x__ = i i += 1 # find ys if y_ is not None: if y_ not in r: continue try: y__ = dat(r[y_]) except ValueError: continue else: y__ = None dataset.append((x__, y__)) # hide x/y if there is only one field key_ = key if len(x or []) > 1: key_ += (x_,) if len(y or []) > 1 or not key_: key_ += (y_,) datasets[key_] = dataset # filter/order by labels if labels: datasets_ = co.OrderedDict() for _, key in labels: if key in datasets: datasets_[key] = datasets[key] datasets = datasets_ return datasets # some classes for organizing subplots into a grid class Subplot: def __init__(self, **args): self.x = 0 self.y = 0 self.xspan = 1 self.yspan = 1 self.args = args class Grid: def __init__(self, subplot, width=1.0, height=1.0): self.xweights = [width] self.yweights = [height] self.map = {(0,0): subplot} self.subplots = [subplot] def __repr__(self): return 'Grid(%r, %r)' % (self.xweights, self.yweights) @property def width(self): return len(self.xweights) @property def height(self): return len(self.yweights) def __iter__(self): return iter(self.subplots) def __getitem__(self, i): x, y = i if x < 0: x += len(self.xweights) if y < 0: y += len(self.yweights) return self.map[(x,y)] def merge(self, other, dir): if dir in ['above', 'below']: # first scale the two grids so they line up self_xweights = self.xweights other_xweights = other.xweights self_w = sum(self_xweights) other_w = sum(other_xweights) ratio = self_w / other_w other_xweights = [s*ratio for s in other_xweights] # now interleave xweights as needed new_xweights = [] self_map = {} other_map = {} self_i = 0 other_i = 0 self_xweight = (self_xweights[self_i] if self_i < len(self_xweights) else mt.inf) other_xweight = (other_xweights[other_i] if other_i < len(other_xweights) else mt.inf) while self_i < len(self_xweights) and other_i < len(other_xweights): if other_xweight - self_xweight > 0.0000001: new_xweights.append(self_xweight) other_xweight -= self_xweight new_i = len(new_xweights)-1 for j in range(len(self.yweights)): self_map[(new_i, j)] = self.map[(self_i, j)] for j in range(len(other.yweights)): other_map[(new_i, j)] = other.map[(other_i, j)] for s in other.subplots: if s.x+s.xspan-1 == new_i: s.xspan += 1 elif s.x > new_i: s.x += 1 self_i += 1 self_xweight = (self_xweights[self_i] if self_i < len(self_xweights) else mt.inf) elif self_xweight - other_xweight > 0.0000001: new_xweights.append(other_xweight) self_xweight -= other_xweight new_i = len(new_xweights)-1 for j in range(len(other.yweights)): other_map[(new_i, j)] = other.map[(other_i, j)] for j in range(len(self.yweights)): self_map[(new_i, j)] = self.map[(self_i, j)] for s in self.subplots: if s.x+s.xspan-1 == new_i: s.xspan += 1 elif s.x > new_i: s.x += 1 other_i += 1 other_xweight = (other_xweights[other_i] if other_i < len(other_xweights) else mt.inf) else: new_xweights.append(self_xweight) new_i = len(new_xweights)-1 for j in range(len(self.yweights)): self_map[(new_i, j)] = self.map[(self_i, j)] for j in range(len(other.yweights)): other_map[(new_i, j)] = other.map[(other_i, j)] self_i += 1 self_xweight = (self_xweights[self_i] if self_i < len(self_xweights) else mt.inf) other_i += 1 other_xweight = (other_xweights[other_i] if other_i < len(other_xweights) else mt.inf) # squish so ratios are preserved self_h = sum(self.yweights) other_h = sum(other.yweights) ratio = (self_h-other_h) / self_h self_yweights = [s*ratio for s in self.yweights] # finally concatenate the two grids if dir == 'above': for s in other.subplots: s.y += len(self_yweights) self.subplots.extend(other.subplots) self.xweights = new_xweights self.yweights = self_yweights + other.yweights self.map = self_map | { (x, y+len(self_yweights)): s for (x, y), s in other_map.items()} else: for s in self.subplots: s.y += len(other.yweights) self.subplots.extend(other.subplots) self.xweights = new_xweights self.yweights = other.yweights + self_yweights self.map = other_map | { (x, y+len(other.yweights)): s for (x, y), s in self_map.items()} if dir in ['right', 'left']: # first scale the two grids so they line up self_yweights = self.yweights other_yweights = other.yweights self_h = sum(self_yweights) other_h = sum(other_yweights) ratio = self_h / other_h other_yweights = [s*ratio for s in other_yweights] # now interleave yweights as needed new_yweights = [] self_map = {} other_map = {} self_i = 0 other_i = 0 self_yweight = (self_yweights[self_i] if self_i < len(self_yweights) else mt.inf) other_yweight = (other_yweights[other_i] if other_i < len(other_yweights) else mt.inf) while self_i < len(self_yweights) and other_i < len(other_yweights): if other_yweight - self_yweight > 0.0000001: new_yweights.append(self_yweight) other_yweight -= self_yweight new_i = len(new_yweights)-1 for j in range(len(self.xweights)): self_map[(j, new_i)] = self.map[(j, self_i)] for j in range(len(other.xweights)): other_map[(j, new_i)] = other.map[(j, other_i)] for s in other.subplots: if s.y+s.yspan-1 == new_i: s.yspan += 1 elif s.y > new_i: s.y += 1 self_i += 1 self_yweight = (self_yweights[self_i] if self_i < len(self_yweights) else mt.inf) elif self_yweight - other_yweight > 0.0000001: new_yweights.append(other_yweight) self_yweight -= other_yweight new_i = len(new_yweights)-1 for j in range(len(other.xweights)): other_map[(j, new_i)] = other.map[(j, other_i)] for j in range(len(self.xweights)): self_map[(j, new_i)] = self.map[(j, self_i)] for s in self.subplots: if s.y+s.yspan-1 == new_i: s.yspan += 1 elif s.y > new_i: s.y += 1 other_i += 1 other_yweight = (other_yweights[other_i] if other_i < len(other_yweights) else mt.inf) else: new_yweights.append(self_yweight) new_i = len(new_yweights)-1 for j in range(len(self.xweights)): self_map[(j, new_i)] = self.map[(j, self_i)] for j in range(len(other.xweights)): other_map[(j, new_i)] = other.map[(j, other_i)] self_i += 1 self_yweight = (self_yweights[self_i] if self_i < len(self_yweights) else mt.inf) other_i += 1 other_yweight = (other_yweights[other_i] if other_i < len(other_yweights) else mt.inf) # squish so ratios are preserved self_w = sum(self.xweights) other_w = sum(other.xweights) ratio = (self_w-other_w) / self_w self_xweights = [s*ratio for s in self.xweights] # finally concatenate the two grids if dir == 'right': for s in other.subplots: s.x += len(self_xweights) self.subplots.extend(other.subplots) self.xweights = self_xweights + other.xweights self.yweights = new_yweights self.map = self_map | { (x+len(self_xweights), y): s for (x, y), s in other_map.items()} else: for s in self.subplots: s.x += len(other.xweights) self.subplots.extend(other.subplots) self.xweights = other.xweights + self_xweights self.yweights = new_yweights self.map = other_map | { (x+len(other.xweights), y): s for (x, y), s in self_map.items()} def scale(self, width, height): self.xweights = [s*width for s in self.xweights] self.yweights = [s*height for s in self.yweights] @classmethod def fromargs(cls, width=1.0, height=1.0, *, subplots=[], **args): grid = cls(Subplot(**args)) for dir, subargs in subplots: subgrid = cls.fromargs( width=subargs.pop('width', 0.5 if dir in ['right', 'left'] else width), height=subargs.pop('height', 0.5 if dir in ['above', 'below'] else height), **subargs) grid.merge(subgrid, dir) grid.scale(width, height) return grid def main(csv_paths, *, by=None, x=None, y=None, define=[], label=None, color=False, braille=False, colors=None, chars=None, line_chars=None, points=False, points_and_lines=False, width=None, height=None, xlim=(None,None), ylim=(None,None), xlog=False, ylog=False, x2=False, y2=False, xunits='', yunits='', xlabel=None, ylabel=None, xticklabels=None, yticklabels=None, title=None, legend_right=False, legend_above=False, legend_below=False, subplot={}, subplots=[], head=False, cat=False, keep_open=False, sleep=None, **args): # figure out what color should be if color == 'auto': color = sys.stdout.isatty() elif color == 'always': color = True else: color = False # what colors to use? if colors is not None: colors_ = colors else: colors_ = COLORS if chars is not None: chars_ = chars elif points_and_lines: chars_ = CHARS_POINTS_AND_LINES else: chars_ = [True] if line_chars is not None: line_chars_ = line_chars elif points_and_lines or not points: line_chars_ = [True] else: line_chars_ = [False] # allow escape codes in labels/titles title = escape(title).splitlines() if title is not None else [] xlabel = escape(xlabel).splitlines() if xlabel is not None else [] ylabel = escape(ylabel).splitlines() if ylabel is not None else [] # subplot can also contribute to subplots, resolve this here or things # become a mess... subplots += subplot.pop('subplots', []) # allow any subplots to contribute to by/x/y def subplots_get(k, *, subplots=[], **args): v = args.get(k, []).copy() for _, subargs in subplots: v.extend(subplots_get(k, **subargs)) return v all_by = (by or []) + subplots_get('by', **subplot, subplots=subplots) all_x = (x or []) + subplots_get('x', **subplot, subplots=subplots) all_y = (y or []) + subplots_get('y', **subplot, subplots=subplots) all_defines = co.defaultdict(lambda: set()) for k, vs in it.chain(define or [], subplots_get('define', **subplot, subplots=subplots)): all_defines[k] |= vs all_defines = sorted(all_defines.items()) all_labels = ((label or []) + subplots_get('label', **subplot, subplots=subplots)) # separate out renames all_renames = list(it.chain.from_iterable( ((k, v) for v in vs) for k, vs in it.chain(all_by, all_x, all_y))) all_by = [k for k, _ in all_by] all_x = [k for k, _ in all_x] all_y = [k for k, _ in all_y] if not all_by and not all_y: print("error: needs --by or -y to figure out fields", file=sys.stderr) sys.exit(-1) # create a grid of subplots grid = Grid.fromargs(**subplot, subplots=subplots) for s in grid: # allow subplot params to override global params x2_ = s.args.get('x2', False) or x2 y2_ = s.args.get('y2', False) or y2 xunits_ = s.args.get('xunits', xunits) yunits_ = s.args.get('yunits', yunits) xticklabels_ = s.args.get('xticklabels', xticklabels) yticklabels_ = s.args.get('yticklabels', yticklabels) # label/titles are handled a bit differently in subplots subtitle = s.args.get('title') xsublabel = s.args.get('xlabel') ysublabel = s.args.get('ylabel') # allow escape codes in sublabels/subtitles subtitle = (escape(subtitle).splitlines() if subtitle is not None else []) xsublabel = (escape(xsublabel).splitlines() if xsublabel is not None else []) ysublabel = (escape(ysublabel).splitlines() if ysublabel is not None else []) # don't allow >2 ticklabels and render single ticklabels only once if xticklabels_ is not None: if len(xticklabels_) == 1: xticklabels_ = ["", xticklabels_[0]] elif len(xticklabels_) > 2: xticklabels_ = [xticklabels_[0], xticklabels_[-1]] if yticklabels_ is not None: if len(yticklabels_) == 1: yticklabels_ = ["", yticklabels_[0]] elif len(yticklabels_) > 2: yticklabels_ = [yticklabels_[0], yticklabels_[-1]] s.x2 = x2_ s.y2 = y2_ s.xunits = xunits_ s.yunits = yunits_ s.xticklabels = xticklabels_ s.yticklabels = yticklabels_ s.title = subtitle s.xlabel = xsublabel s.ylabel = ysublabel # preprocess margins so they can be shared for s in grid: s.xmargin = ( len(s.ylabel) + (1 if s.ylabel else 0) # fit ysublabel + (1 if s.x > 0 else 0), # space between ((5 if s.y2 else 4) + len(s.yunits) # fit yticklabels if s.yticklabels is None else max((len(t) for t in s.yticklabels), default=0)) + (1 if s.yticklabels != [] else 0), ) s.ymargin = ( len(s.xlabel), # fit xsublabel 1 if s.xticklabels != [] else 0, # fit xticklabels len(s.title), # fit subtitle ) for s in grid: # share margins so everything aligns nicely s.xmargin = ( max(s_.xmargin[0] for s_ in grid if s_.x == s.x), max(s_.xmargin[1] for s_ in grid if s_.x == s.x), ) s.ymargin = ( max(s_.ymargin[0] for s_ in grid if s_.y == s.y), max(s_.ymargin[1] for s_ in grid if s_.y == s.y), max(s_.ymargin[-1] for s_ in grid if s_.y+s_.yspan == s.y+s.yspan), ) def draw(f): def writeln(s=''): f.write(s) f.write('\n') f.writeln = writeln # first collect results from CSV files fields_, results = collect(csv_paths, all_renames, all_defines) # if y not specified, guess it's anything not in by/defines/x/renames all_y_ = all_y if not all_y: all_y_ = [k for k in fields_ if k not in all_by and not any(k == k_ for k_, _ in all_defines) and not any(k == old_k for _, old_k in all_renames)] # then extract the requested datasets datasets_ = fold(results, all_by, all_x, all_y_, None, all_labels) # figure out colors/chars here so that subplot defines # don't change them later, that'd be bad datacolors_ = { name: colors_[i % len(colors_)] for i, name in enumerate(datasets_.keys())} datachars_ = { name: chars_[i % len(chars_)] for i, name in enumerate(datasets_.keys())} dataline_chars_ = { name: line_chars_[i % len(line_chars_)] for i, name in enumerate(datasets_.keys())} # build legend? legend_width = 0 if legend_right or legend_above or legend_below: legend_ = [] if all_labels: all_labels_ = {key: l for l, key in all_labels} for i, name in enumerate(datasets_.keys()): if all_labels and not all_labels_[name]: continue label = '%s%s' % ( '%s ' % datachars_[name] if chars is not None else '%s ' % dataline_chars_[name] if line_chars is not None else '', all_labels_[name] if all_labels else ','.join(name)) if label: legend_.append((label, colors_[i % len(colors_)])) legend_width = max(legend_width, len(label)+1) # figure out our canvas size if width is None: width_ = min(80, shutil.get_terminal_size((80, None))[0]) elif width: width_ = width else: width_ = shutil.get_terminal_size((80, None))[0] if height is None: height_ = 17 + len(title) + len(xlabel) elif height: height_ = height else: height_ = shutil.get_terminal_size((None, 17 + len(title) + len(xlabel)))[1] # make space for shell prompt if not keep_open: height_ -= 1 # carve out space for the xlabel height_ -= len(xlabel) # carve out space for the ylabel width_ -= len(ylabel) + (1 if ylabel else 0) # carve out space for title height_ -= len(title) # carve out space for the legend if legend_right and legend_: width_ -= legend_width if legend_above and legend_: legend_cols = len(legend_) while True: legend_widths = [ max(len(l) for l, _ in legend_[i::legend_cols]) for i in range(legend_cols)] if (legend_cols <= 1 or sum(legend_widths)+2*(legend_cols-1) + max(sum(s.xmargin[:2]) for s in grid if s.x == 0) <= width_): break legend_cols -= 1 height_ -= (len(legend_)+legend_cols-1) // legend_cols if legend_below and legend_: legend_cols = len(legend_) while True: legend_widths = [ max(len(l) for l, _ in legend_[i::legend_cols]) for i in range(legend_cols)] if (legend_cols <= 1 or sum(legend_widths)+2*(legend_cols-1) + max(sum(s.xmargin[:2]) for s in grid if s.x == 0) <= width_): break legend_cols -= 1 height_ -= (len(legend_)+legend_cols-1) // legend_cols # figure out the grid dimensions # # note we floor to give the dimension tweaks the best chance of not # exceeding the requested dimensions, this means we usually are less # than the requested dimensions by quite a bit when we have many # subplots, but it's a tradeoff for a relatively simple implementation widths = [mt.floor(w*width_) for w in grid.xweights] heights = [mt.floor(w*height_) for w in grid.yweights] # tweak dimensions to allow all plots to have a minimum width, # this may force the plot to be larger than the requested dimensions, # but that's the best we can do for s in grid: # fit xunits minwidth = sum(s.xmargin) + max( 2, 2*((5 if s.x2 else 4)+len(s.xunits)) if s.xticklabels is None else sum(len(t) for t in s.xticklabels)) # fit yunits minheight = sum(s.ymargin) + 2 i = 0 while minwidth > sum(widths[s.x:s.x+s.xspan]): widths[s.x+i] += 1 i = (i + 1) % s.xspan i = 0 while minheight > sum(heights[s.y:s.y+s.yspan]): heights[s.y+i] += 1 i = (i + 1) % s.yspan width_ = sum(widths) height_ = sum(heights) # create a plot for each subplot for s in grid: # allow subplot params to override global params x_ = {k for k,_ in (x or []) + s.args.get('x', [])} y_ = {k for k,_ in (y or []) + s.args.get('y', [])} define_ = define + s.args.get('define', []) xlim_ = s.args.get('xlim', xlim) ylim_ = s.args.get('ylim', ylim) xlog_ = s.args.get('xlog', False) or xlog ylog_ = s.args.get('ylog', False) or ylog # allow shortened ranges if len(xlim_) == 1: xlim_ = (0, xlim_[0]) if len(ylim_) == 1: ylim_ = (0, ylim_[0]) # data can be constrained by subplot-specific defines, # so re-extract for each plot subdatasets = fold(results, all_by, all_x, all_y_, define_, all_labels) # filter by subplot x/y subdatasets = co.OrderedDict([(name, dataset) for name, dataset in subdatasets.items() if len(all_x) <= 1 or name[-(1 if len(all_y_) <= 1 else 2)] in x_ if len(all_y_) <= 1 or name[-1] in y_]) # find actual xlim/ylim xlim_ = ( xlim_[0] if xlim_[0] is not None else min(it.chain([0], (x for dataset in subdatasets.values() for x, y in dataset if y is not None))), xlim_[1] if xlim_[1] is not None else max(it.chain([0], (x for dataset in subdatasets.values() for x, y in dataset if y is not None)))) ylim_ = ( ylim_[0] if ylim_[0] is not None else min(it.chain([0], (y for dataset in subdatasets.values() for _, y in dataset if y is not None))), ylim_[1] if ylim_[1] is not None else max(it.chain([0], (y for dataset in subdatasets.values() for _, y in dataset if y is not None)))) # find actual width/height subwidth = sum(widths[s.x:s.x+s.xspan]) - sum(s.xmargin) subheight = sum(heights[s.y:s.y+s.yspan]) - sum(s.ymargin) # plot! plot = Plot( subwidth, subheight, xlim=xlim_, ylim=ylim_, xlog=xlog_, ylog=ylog_, braille=line_chars is None and braille, dots=line_chars is None and not braille) for name, dataset in subdatasets.items(): plot.plot( sorted((x,y) for x,y in dataset), color=datacolors_[name], char=datachars_[name], line_char=dataline_chars_[name]) s.plot = plot s.width = subwidth s.height = subheight s.xlim = xlim_ s.ylim = ylim_ # now that everything's plotted, let's render things to the terminal # figure out margin xmargin = ( len(ylabel) + (1 if ylabel else 0), sum(grid[0,0].xmargin[:2]), ) ymargin = ( sum(grid[0,0].ymargin[:2]), grid[-1,-1].ymargin[-1], ) # draw title? for line in title: f.writeln('%*s%s' % ( sum(xmargin[:2]), '', line.center(width_-xmargin[1]))) # draw legend_above? if legend_above and legend_: for i in range(0, len(legend_), legend_cols): f.writeln('%*s%s' % ( max( sum(xmargin[:2]) + (width_-xmargin[1] - (sum(legend_widths)+2*(legend_cols-1))) // 2, 0), '', ' '.join('%s%s%s' % ( '\x1b[%sm' % legend_[i+j][1] if color else '', '%-*s' % (legend_widths[j], legend_[i+j][0]), '\x1b[m' if color else '') for j in range(min(legend_cols, len(legend_)-i))))) for row in range(height_): # draw ylabel? f.write('%s ' % ''.join( ('%*s%s%*s' % ( ymargin[-1], '', line.center(height_-sum(ymargin)), ymargin[0], ''))[row] for line in ylabel) if ylabel else '') for x_ in range(grid.width): # figure out the grid x/y position subrow = row y_ = len(heights)-1 while subrow >= heights[y_]: subrow -= heights[y_] y_ -= 1 s = grid[x_, y_] subrow = row - sum(heights[s.y+s.yspan:]) # header if subrow < s.ymargin[-1]: # draw subtitle? if subrow < len(s.title): f.write('%*s%s' % ( sum(s.xmargin[:2]), '', s.title[subrow].center(s.width))) else: f.write('%*s%*s' % ( sum(s.xmargin[:2]), '', s.width, '')) # draw plot? elif subrow-s.ymargin[-1] < s.height: subrow = subrow-s.ymargin[-1] # draw ysublabel? f.write('%-*s' % ( s.xmargin[0], '%s ' % ''.join( line.center(s.height)[subrow] for line in s.ylabel) if s.ylabel else '')) # draw yunits? if subrow == 0 and s.yticklabels != []: f.write('%*s' % ( s.xmargin[1], ((si2 if s.y2 else si)(s.ylim[1]) + s.yunits if s.yticklabels is None else s.yticklabels[1]) + ' ')) elif subrow == s.height-1 and s.yticklabels != []: f.write('%*s' % ( s.xmargin[1], ((si2 if s.y2 else si)(s.ylim[0]) + s.yunits if s.yticklabels is None else s.yticklabels[0]) + ' ')) else: f.write('%*s' % ( s.xmargin[1], '')) # draw plot! f.write(s.plot.draw(subrow, color=color)) # footer else: subrow = subrow-s.ymargin[-1]-s.height # draw xunits? if subrow < (1 if s.xticklabels != [] else 0): f.write('%*s%-*s%*s%*s' % ( sum(s.xmargin[:2]), '', (5 if s.x2 else 4) + len(s.xunits) if s.xticklabels is None else len(s.xticklabels[0]), (si2 if s.x2 else si)(s.xlim[0]) + s.xunits if s.xticklabels is None else s.xticklabels[0], s.width - (2*((5 if s.x2 else 4)+len(s.xunits)) if s.xticklabels is None else sum(len(t) for t in s.xticklabels)), '', (5 if s.x2 else 4) + len(s.xunits) if s.xticklabels is None else len(s.xticklabels[1]), (si2 if s.x2 else si)(s.xlim[1]) + s.xunits if s.xticklabels is None else s.xticklabels[1])) # draw xsublabel? elif (subrow < s.ymargin[1] or subrow-s.ymargin[1] >= len(s.xlabel)): f.write('%*s%*s' % ( sum(s.xmargin[:2]), '', s.width, '')) else: f.write('%*s%s' % ( sum(s.xmargin[:2]), '', s.xlabel[subrow-s.ymargin[1]].center(s.width))) # draw legend_right? if (legend_right and legend_ and row >= ymargin[-1] and row-ymargin[-1] < len(legend_)): j = row-ymargin[-1] f.write(' %s%s%s' % ( '\x1b[%sm' % legend_[j][1] if color else '', legend_[j][0], '\x1b[m' if color else '')) f.writeln() # draw xlabel? for line in xlabel: f.writeln('%*s%s' % ( sum(xmargin[:2]), '', line.center(width_-xmargin[1]))) # draw legend below? if legend_below and legend_: for i in range(0, len(legend_), legend_cols): f.writeln('%*s%s' % ( max( sum(xmargin[:2]) + (width_-xmargin[1] - (sum(legend_widths)+2*(legend_cols-1))) // 2, 0), '', ' '.join('%s%s%s' % ( '\x1b[%sm' % legend_[i+j][1] if color else '', '%-*s' % (legend_widths[j], legend_[i+j][0]), '\x1b[m' if color else '') for j in range(min(legend_cols, len(legend_)-i))))) if keep_open: try: while True: # register inotify before running the command, this avoids # modification race conditions if keep_open and Inotify: inotify = Inotify(csv_paths) if cat: draw(sys.stdout) else: ring = RingIO(head=head) draw(ring) ring.draw() # try to inotifywait if keep_open and Inotify: ptime = time.time() inotify.read() inotify.close() # sleep for a minimum amount of time, this helps reduce # flicker issues time.sleep(max(0, (sleep or 0.01) - (time.time()-ptime))) else: time.sleep(sleep or 0.1) except KeyboardInterrupt: pass if not cat: sys.stdout.write('\n') else: draw(sys.stdout) if __name__ == "__main__": import sys import argparse import re parser = argparse.ArgumentParser( description="Plot CSV files in terminal.", allow_abbrev=False) parser.add_argument( 'csv_paths', nargs='*', help="Input *.csv files.") parser.add_argument( '-b', '--by', action='append', type=lambda x: ( lambda k, vs=None: ( k.strip(), tuple(v.strip() for v in vs.split(',')) if vs is not None else ()) )(*x.split('=', 1)), help="Group by this field. Can rename fields with " "new_name=old_name.") parser.add_argument( '-x', action='append', type=lambda x: ( lambda k, vs=None: ( k.strip(), tuple(v.strip() for v in vs.split(',')) if vs is not None else ()) )(*x.split('=', 1)), help="Field to use for the x-axis. Can rename fields with " "new_name=old_name.") parser.add_argument( '-y', action='append', type=lambda x: ( lambda k, vs=None: ( k.strip(), tuple(v.strip() for v in vs.split(',')) if vs is not None else ()) )(*x.split('=', 1)), help="Field to use for the y-axis. Can rename fields with " "new_name=old_name.") parser.add_argument( '-D', '--define', type=lambda x: ( lambda k, vs: ( k.strip(), {v.strip() for v in vs.split(',')}) )(*x.split('=', 1)), action='append', help="Only include results where this field is this value. May " "include comma-separated options.") parser.add_argument( '-L', '--label', action='append', type=lambda x: ( lambda k, vs: ( re.sub(r'\\([=\\])', r'\1', k.strip()), tuple(v.strip() for v in vs.split(','))) )(*re.split(r'(?