#!/usr/bin/env python3 # # Script to manipulate CSV files. # # Example: # ./scripts/csv.py lfs.code.csv lfs.stack.csv \ # -bfunction -fcode -fstack='max(stack)' # # Copyright (c) 2022, The littlefs authors. # SPDX-License-Identifier: BSD-3-Clause # # prevent local imports if __name__ == "__main__": __import__('sys').path.pop(0) import collections as co import csv import functools as ft import itertools as it import math as mt import os import re import sys # various field types # integer fields class RInt(co.namedtuple('RInt', 'x')): __slots__ = () def __new__(cls, x=0): if isinstance(x, RInt): return x if isinstance(x, str): try: x = int(x, 0) except ValueError: # also accept +-∞ and +-inf if re.match('^\s*\+?\s*(?:∞|inf)\s*$', x): x = mt.inf elif re.match('^\s*-\s*(?:∞|inf)\s*$', x): x = -mt.inf else: raise if not (isinstance(x, int) or mt.isinf(x)): x = int(x) return super().__new__(cls, x) def __repr__(self): return '%s(%r)' % (self.__class__.__name__, self.x) def __str__(self): if self.x == mt.inf: return '∞' elif self.x == -mt.inf: return '-∞' else: return str(self.x) def __bool__(self): return bool(self.x) def __int__(self): assert not mt.isinf(self.x) return self.x def __float__(self): return float(self.x) none = '%7s' % '-' def table(self): return '%7s' % (self,) def diff(self, other): new = self.x if self else 0 old = other.x if other else 0 diff = new - old if diff == +mt.inf: return '%7s' % '+∞' elif diff == -mt.inf: return '%7s' % '-∞' else: return '%+7d' % diff def ratio(self, other): new = self.x if self else 0 old = other.x if other else 0 if mt.isinf(new) and mt.isinf(old): return 0.0 elif mt.isinf(new): return +mt.inf elif mt.isinf(old): return -mt.inf elif not old and not new: return 0.0 elif not old: return +mt.inf else: return (new-old) / old def __pos__(self): return self.__class__(+self.x) def __neg__(self): return self.__class__(-self.x) def __abs__(self): return self.__class__(abs(self.x)) def __add__(self, other): return self.__class__(self.x + other.x) def __sub__(self, other): return self.__class__(self.x - other.x) def __mul__(self, other): return self.__class__(self.x * other.x) def __truediv__(self, other): if not other: if self >= self.__class__(0): return self.__class__(+mt.inf) else: return self.__class__(-mt.inf) return self.__class__(self.x // other.x) def __mod__(self, other): return self.__class__(self.x % other.x) # float fields class RFloat(co.namedtuple('RFloat', 'x')): __slots__ = () def __new__(cls, x=0.0): if isinstance(x, RFloat): return x if isinstance(x, str): try: x = float(x) except ValueError: # also accept +-∞ and +-inf if re.match('^\s*\+?\s*(?:∞|inf)\s*$', x): x = mt.inf elif re.match('^\s*-\s*(?:∞|inf)\s*$', x): x = -mt.inf else: raise if not isinstance(x, float): x = float(x) return super().__new__(cls, x) def __repr__(self): return '%s(%r)' % (self.__class__.__name__, self.x) def __str__(self): if self.x == mt.inf: return '∞' elif self.x == -mt.inf: return '-∞' else: return '%.1f' % self.x def __bool__(self): return bool(self.x) def __int__(self): return int(self.x) def __float__(self): return float(self.x) none = '%7s' % '-' def table(self): return '%7s' % (self,) def diff(self, other): new = self.x if self else 0 old = other.x if other else 0 diff = new - old if diff == +mt.inf: return '%7s' % '+∞' elif diff == -mt.inf: return '%7s' % '-∞' else: return '%+7.1f' % diff def ratio(self, other): new = self.x if self else 0 old = other.x if other else 0 if mt.isinf(new) and mt.isinf(old): return 0.0 elif mt.isinf(new): return +mt.inf elif mt.isinf(old): return -mt.inf elif not old and not new: return 0.0 elif not old: return +mt.inf else: return (new-old) / old def __pos__(self): return self.__class__(+self.x) def __neg__(self): return self.__class__(-self.x) def __abs__(self): return self.__class__(abs(self.x)) def __add__(self, other): return self.__class__(self.x + other.x) def __sub__(self, other): return self.__class__(self.x - other.x) def __mul__(self, other): return self.__class__(self.x * other.x) def __truediv__(self, other): if not other: if self >= self.__class__(0): return self.__class__(+mt.inf) else: return self.__class__(-mt.inf) return self.__class__(self.x / other.x) def __mod__(self, other): return self.__class__(self.x % other.x) # fractional fields, a/b class RFrac(co.namedtuple('RFrac', 'a,b')): __slots__ = () def __new__(cls, a=0, b=None): if isinstance(a, RFrac) and b is None: return a if isinstance(a, str) and b is None: a, b = a.split('/', 1) if b is None: b = a return super().__new__(cls, RInt(a), RInt(b)) def __repr__(self): return '%s(%r, %r)' % (self.__class__.__name__, self.a.x, self.b.x) def __str__(self): return '%s/%s' % (self.a, self.b) def __bool__(self): return bool(self.a) def __int__(self): return int(self.a) def __float__(self): return float(self.a) none = '%11s' % '-' def table(self): return '%11s' % (self,) def notes(self): t = self.a.x/self.b.x if self.b.x else 1.0 return ['∞%' if t == +mt.inf else '-∞%' if t == -mt.inf else '%.1f%%' % (100*t)] def diff(self, other): new_a, new_b = self if self else (RInt(0), RInt(0)) old_a, old_b = other if other else (RInt(0), RInt(0)) return '%11s' % ('%s/%s' % ( new_a.diff(old_a).strip(), new_b.diff(old_b).strip())) def ratio(self, other): new_a, new_b = self if self else (RInt(0), RInt(0)) old_a, old_b = other if other else (RInt(0), RInt(0)) new = new_a.x/new_b.x if new_b.x else 1.0 old = old_a.x/old_b.x if old_b.x else 1.0 return new - old def __pos__(self): return self.__class__(+self.a, +self.b) def __neg__(self): return self.__class__(-self.a, -self.b) def __abs__(self): return self.__class__(abs(self.a), abs(self.b)) def __add__(self, other): return self.__class__(self.a + other.a, self.b + other.b) def __sub__(self, other): return self.__class__(self.a - other.a, self.b - other.b) def __mul__(self, other): return self.__class__(self.a * other.a, self.b * other.b) def __truediv__(self, other): return self.__class__(self.a / other.a, self.b / other.b) def __mod__(self, other): return self.__class__(self.a % other.a, self.b % other.b) def __eq__(self, other): self_a, self_b = self if self.b.x else (RInt(1), RInt(1)) other_a, other_b = other if other.b.x else (RInt(1), RInt(1)) return self_a * other_b == other_a * self_b def __ne__(self, other): return not self.__eq__(other) def __lt__(self, other): self_a, self_b = self if self.b.x else (RInt(1), RInt(1)) other_a, other_b = other if other.b.x else (RInt(1), RInt(1)) return self_a * other_b < other_a * self_b def __gt__(self, other): return self.__class__.__lt__(other, self) def __le__(self, other): return not self.__gt__(other) def __ge__(self, other): return not self.__lt__(other) # various fold operations class RSum: def __call__(self, xs): return sum(xs[1:], start=xs[0]) class RProd: def __call__(self, xs): return mt.prod(xs[1:], start=xs[0]) class RMin: def __call__(self, xs): return min(xs) class RMax: def __call__(self, xs): return max(xs) class RAvg: def __call__(self, xs): return RFloat(sum(float(x) for x in xs) / len(xs)) class RStddev: def __call__(self, xs): avg = sum(float(x) for x in xs) / len(xs) return RFloat(mt.sqrt(sum((float(x) - avg)**2 for x in xs) / len(xs))) class RGMean: def __call__(self, xs): return RFloat(mt.prod(float(x) for x in xs)**(1/len(xs))) class RGStddev: def __call__(self, xs): gmean = mt.prod(float(x) for x in xs)**(1/len(xs)) return RFloat( mt.exp(mt.sqrt( sum(mt.log(float(x)/gmean)**2 for x in xs) / len(xs))) if gmean else mt.inf) # a simple general-purpose parser class # # basically just because memoryview doesn't support strs class Parser: def __init__(self, data, ws='\s*', ws_flags=0): self.data = data self.i = 0 self.m = None # also consume whitespace self.ws = re.compile(ws, ws_flags) self.i = self.ws.match(self.data, self.i).end() def __repr__(self): if len(self.data) - self.i <= 32: return repr(self.data[self.i:]) else: return "%s..." % repr(self.data[self.i:self.i+32])[:32] def __str__(self): return self.data[self.i:] def __len__(self): return len(self.data) - self.i def __bool__(self): return self.i != len(self.data) def match(self, pattern, flags=0): # compile so we can use the pos arg, this is still cached self.m = re.compile(pattern, flags).match(self.data, self.i) return self.m def group(self, *groups): return self.m.group(*groups) def chomp(self, *groups): g = self.group(*groups) self.i = self.m.end() # also consume whitespace self.i = self.ws.match(self.data, self.i).end() return g class Error(Exception): pass def chompmatch(self, pattern, flags=0, *groups): if not self.match(pattern, flags): raise Parser.Error("expected %r, found %r" % (pattern, self)) return self.chomp(*groups) def unexpected(self): raise Parser.Error("unexpected %r" % self) def lookahead(self): # push state on the stack if not hasattr(self, 'stack'): self.stack = [] self.stack.append((self.i, self.m)) return self def consume(self): # pop and use new state self.stack.pop() def discard(self): # pop and discard new state self.i, self.m = self.stack.pop() def __enter__(self): return self def __exit__(self, et, ev, tb): # keep new state if no exception occured if et is None: self.consume() else: self.discard() # a lazily-evaluated field expression class RExpr: # expr parsing/typechecking/etc errors class Error(Exception): pass # expr node base class class Expr: def __init__(self, *args): for k, v in zip('abcdefghijklmnopqrstuvwxyz', args): setattr(self, k, v) def __iter__(self): return (getattr(self, k) for k in it.takewhile( lambda k: hasattr(self, k), 'abcdefghijklmnopqrstuvwxyz')) def __len__(self): return sum(1 for _ in self) def __repr__(self): return '%s(%s)' % ( self.__class__.__name__, ','.join(repr(v) for v in self)) def fields(self): return set(it.chain.from_iterable(v.fields() for v in self)) def type(self, types={}): t = self.a.type(types) if not all(t == v.type(types) for v in it.islice(self, 1, None)): raise RExpr.Error("mismatched types? %r" % self) return t def fold(self, types={}): return self.a.fold(types) def eval(self, fields={}): return self.a.eval(fields) # expr nodes # literal exprs class IntLit(Expr): def fields(self): return set() def type(self, types={}): return RInt def fold(self, types={}): return RSum, RInt def eval(self, fields={}): return self.a class FloatLit(Expr): def fields(self): return set() def type(self, types={}): return RFloat def fold(self, types={}): return RSum, RFloat def eval(self, fields={}): return self.a # field expr class Field(Expr): def fields(self): return {self.a} def type(self, types={}): if self.a not in types: raise RExpr.Error("untyped field? %s" % self.a) return types[self.a] def fold(self, types={}): if self.a not in types: raise RExpr.Error("unfoldable field? %s" % self.a) return RSum, types[self.a] def eval(self, fields={}): if self.a not in fields: raise RExpr.Error("unknown field? %s" % self.a) return fields[self.a] # func expr helper def func(name, args="a"): def func(f): f._func = name f._fargs = args return f return func class Funcs: @ft.cache def __get__(self, _, cls): return {x._func: x for x in cls.__dict__.values() if hasattr(x, '_func')} funcs = Funcs() # type exprs @func('int', 'a') class Int(Expr): """Convert to an integer""" def type(self, types={}): return RInt def eval(self, fields={}): return RInt(self.a.eval(fields)) @func('float', 'a') class Float(Expr): """Convert to a float""" def type(self, types={}): return RFloat def eval(self, fields={}): return RFloat(self.a.eval(fields)) @func('frac', 'a[, b]') class Frac(Expr): """Convert to a fraction""" def type(self, types={}): return RFrac def eval(self, fields={}): if len(self) == 1: return RFrac(self.a.eval(fields)) else: return RFrac(self.a.eval(fields), self.b.eval(fields)) # fold exprs @func('sum', 'a[, ...]') class Sum(Expr): """Find the sum of this column or fields""" def fold(self, types={}): if len(self) == 1: return RSum, self.a.type(types) else: return self.a.fold(types) def eval(self, fields={}): if len(self) == 1: return self.a.eval(fields) else: return RSum()([v.eval(fields) for v in self]) @func('prod', 'a[, ...]') class Prod(Expr): """Find the product of this column or fields""" def fold(self, types={}): if len(self) == 1: return Prod, self.a.type(types) else: return self.a.fold(types) def eval(self, fields={}): if len(self) == 1: return self.a.eval(fields) else: return Prod()([v.eval(fields) for v in self]) @func('min', 'a[, ...]') class Min(Expr): """Find the minimum of this column or fields""" def fold(self, types={}): if len(self) == 1: return RMin, self.a.type(types) else: return self.a.fold(types) def eval(self, fields={}): if len(self) == 1: return self.a.eval(fields) else: return RMin()([v.eval(fields) for v in self]) @func('max', 'a[, ...]') class Max(Expr): """Find the maximum of this column or fields""" def fold(self, types={}): if len(self) == 1: return RMax, self.a.type(types) else: return self.a.fold(types) def eval(self, fields={}): if len(self) == 1: return self.a.eval(fields) else: return RMax()([v.eval(fields) for v in self]) @func('avg', 'a[, ...]') class Avg(Expr): """Find the average of this column or fields""" def type(self, types={}): if len(self) == 1: return self.a.type(types) else: return RFloat def fold(self, types={}): if len(self) == 1: return RAvg, RFloat else: return self.a.fold(types) def eval(self, fields={}): if len(self) == 1: return self.a.eval(fields) else: return RAvg()([v.eval(fields) for v in self]) @func('stddev', 'a[, ...]') class Stddev(Expr): """Find the standard deviation of this column or fields""" def type(self, types={}): if len(self) == 1: return self.a.type(types) else: return RFloat def fold(self, types={}): if len(self) == 1: return RStddev, RFloat else: return self.a.fold(types) def eval(self, fields={}): if len(self) == 1: return self.a.eval(fields) else: return RStddev()([v.eval(fields) for v in self]) @func('gmean', 'a[, ...]') class GMean(Expr): """Find the geometric mean of this column or fields""" def type(self, types={}): if len(self) == 1: return self.a.type(types) else: return RFloat def fold(self, types={}): if len(self) == 1: return RGMean, RFloat else: return self.a.fold(types) def eval(self, fields={}): if len(self) == 1: return self.a.eval(fields) else: return RGMean()([v.eval(fields) for v in self]) @func('gstddev', 'a[, ...]') class GStddev(Expr): """Find the geometric stddev of this column or fields""" def type(self, types={}): if len(self) == 1: return self.a.type(types) else: return RFloat def fold(self, types={}): if len(self) == 1: return RGStddev, RFloat else: return self.a.fold(types) def eval(self, fields={}): if len(self) == 1: return self.a.eval(fields) else: return RGStddev()([v.eval(fields) for v in self]) # functions @func('ratio', 'a') class Ratio(Expr): """Ratio of a fraction as a float""" def type(self, types={}): return RFloat def eval(self, fields={}): v = RFrac(self.a.eval(fields)) if not float(v.b): return RFloat(1) else: return RFloat(float(v.a) / float(v.b)) @func('total', 'a') class Total(Expr): """Total part of a fraction""" def type(self, types={}): return RInt def eval(self, fields={}): return RFrac(self.a.eval(fields)).b @func('abs', 'a') class Abs(Expr): """Absolute value""" def eval(self, fields={}): return abs(self.a.eval(fields)) @func('ceil', 'a') class Ceil(Expr): """Round up to nearest integer""" def type(self, types={}): return RFloat def eval(self, fields={}): return RFloat(mt.ceil(float(self.a.eval(fields)))) @func('floor', 'a') class Floor(Expr): """Round down to nearest integer""" def type(self, types={}): return RFloat def eval(self, fields={}): return RFloat(mt.floor(float(self.a.eval(fields)))) @func('log', 'a[, b]') class Log(Expr): """Log of a with base e, or log of a with base b""" def type(self, types={}): return RFloat def eval(self, fields={}): if len(self) == 1: return RFloat(mt.log( float(self.a.eval(fields)))) else: return RFloat(mt.log( float(self.a.eval(fields)), float(self.b.eval(fields)))) @func('pow', 'a[, b]') class Pow(Expr): """e to the power of a, or a to the power of b""" def type(self, types={}): return RFloat def eval(self, fields={}): if len(self) == 1: return RFloat(mt.exp( float(self.a.eval(fields)))) else: return RFloat(mt.pow( float(self.a.eval(fields)), float(self.b.eval(fields)))) @func('sqrt', 'a') class Sqrt(Expr): """Square root""" def type(self, types={}): return RFloat def eval(self, fields={}): return RFloat(mt.sqrt(float(self.a.eval(fields)))) @func('isint', 'a') class IsInt(Expr): """1 if a is an integer, otherwise 0""" def type(self, types={}): return RInt def eval(self, fields={}): if isinstance(self.a.eval(fields), RInt): return RInt(1) else: return RInt(0) @func('isfloat', 'a') class IsFloat(Expr): """1 if a is a float, otherwise 0""" def type(self, types={}): return RInt def eval(self, fields={}): if isinstance(self.a.eval(fields), RFloat): return RInt(1) else: return RInt(0) @func('isfrac', 'a') class IsFrac(Expr): """1 if a is a fraction, otherwise 0""" def type(self, types={}): return RInt def eval(self, fields={}): if isinstance(self.a.eval(fields), RFrac): return RInt(1) else: return RInt(0) @func('isinf', 'a') class IsInf(Expr): """1 if a is infinite, otherwise 0""" def type(self, types={}): return RInt def eval(self, fields={}): if mt.isinf(self.a.eval(fields)): return RInt(1) else: return RInt(0) @func('isnan') class IsNan(Expr): """1 if a is a NAN, otherwise 0""" def type(self, types={}): return RInt def eval(self, fields={}): if mt.isnan(self.a.eval(fields)): return RInt(1) else: return RInt(0) # unary expr helper def uop(op): def uop(f): f._uop = op return f return uop class UOps: @ft.cache def __get__(self, _, cls): return {x._uop: x for x in cls.__dict__.values() if hasattr(x, '_uop')} uops = UOps() # unary ops @uop('+') class Pos(Expr): """Non-negation""" def eval(self, fields={}): return +self.a.eval(fields) @uop('-') class Neg(Expr): """Negation""" def eval(self, fields={}): return -self.a.eval(fields) @uop('!') class NotNot(Expr): """1 if a is zero, otherwise 0""" def type(self, types={}): return RInt def eval(self, fields={}): if self.a.eval(fields): return RInt(0) else: return RInt(1) # binary expr help def bop(op, prec): def bop(f): f._bop = op f._bprec = prec return f return bop class BOps: @ft.cache def __get__(self, _, cls): return {x._bop: x for x in cls.__dict__.values() if hasattr(x, '_bop')} bops = BOps() class BPrecs: @ft.cache def __get__(self, _, cls): return {x._bop: x._bprec for x in cls.__dict__.values() if hasattr(x, '_bop')} bprecs = BPrecs() # binary ops @bop('*', 10) class Mul(Expr): """Multiplication""" def eval(self, fields={}): return self.a.eval(fields) * self.b.eval(fields) @bop('/', 10) class Div(Expr): """Division""" def eval(self, fields={}): return self.a.eval(fields) / self.b.eval(fields) @bop('%', 10) class Mod(Expr): """Modulo""" def eval(self, fields={}): return self.a.eval(fields) % self.b.eval(fields) @bop('+', 9) class Add(Expr): """Addition""" def eval(self, fields={}): a = self.a.eval(fields) b = self.b.eval(fields) return a + b @bop('-', 9) class Sub(Expr): """Subtraction""" def eval(self, fields={}): return self.a.eval(fields) - self.b.eval(fields) @bop('==', 4) class Eq(Expr): """1 if a equals b, otherwise 0""" def eval(self, fields={}): if self.a.eval(fields) == self.b.eval(fields): return RInt(1) else: return RInt(0) @bop('!=', 4) class Ne(Expr): """1 if a does not equal b, otherwise 0""" def eval(self, fields={}): if self.a.eval(fields) != self.b.eval(fields): return RInt(1) else: return RInt(0) @bop('<', 4) class Lt(Expr): """1 if a is less than b""" def eval(self, fields={}): if self.a.eval(fields) < self.b.eval(fields): return RInt(1) else: return RInt(0) @bop('<=', 4) class Le(Expr): """1 if a is less than or equal to b""" def eval(self, fields={}): if self.a.eval(fields) <= self.b.eval(fields): return RInt(1) else: return RInt(0) @bop('>', 4) class Gt(Expr): """1 if a is greater than b""" def eval(self, fields={}): if self.a.eval(fields) > self.b.eval(fields): return RInt(1) else: return RInt(0) @bop('>=', 4) class Ge(Expr): """1 if a is greater than or equal to b""" def eval(self, fields={}): if self.a.eval(fields) >= self.b.eval(fields): return RInt(1) else: return RInt(0) @bop('&&', 3) class AndAnd(Expr): """b if a is non-zero, otherwise a""" def eval(self, fields={}): a = self.a.eval(fields) if a: return self.b.eval(fields) else: return a @bop('||', 2) class OrOr(Expr): """a if a is non-zero, otherwise b""" def eval(self, fields={}): a = self.a.eval(fields) if a: return a else: return self.b.eval(fields) # ternary expr help def top(op_a, op_b, prec): def top(f): f._top = (op_a, op_b) f._tprec = prec return f return top class TOps: @ft.cache def __get__(self, _, cls): return {x._top: x for x in cls.__dict__.values() if hasattr(x, '_top')} tops = TOps() class TPrecs: @ft.cache def __get__(self, _, cls): return {x._top: x._tprec for x in cls.__dict__.values() if hasattr(x, '_top')} tprecs = TPrecs() # ternary ops @top('?', ':', 1) class IfElse(Expr): """b if a is non-zero, otherwise c""" def type(self, types={}): t = self.b.type(types) u = self.c.type(types) if t != u: raise RExpr.Error("mismatched types? %r" % self) return t def fold(self, types={}): return self.b.fold(types) def eval(self, fields={}): a = self.a.eval(fields) if a: return self.b.eval(fields) else: return self.c.eval(fields) # show expr help text @classmethod def help(cls): print('uops:') for op in cls.uops.keys(): print(' %-21s %s' % ('%sa' % op, RExpr.uops[op].__doc__)) print('bops:') for op in cls.bops.keys(): print(' %-21s %s' % ('a %s b' % op, RExpr.bops[op].__doc__)) print('tops:') for op in cls.tops.keys(): print(' %-21s %s' % ('a %s b %s c' % op, RExpr.tops[op].__doc__)) print('funcs:') for func in cls.funcs.keys(): print(' %-21s %s' % ( '%s(%s)' % (func, RExpr.funcs[func]._fargs), RExpr.funcs[func].__doc__)) # parse an expr def __init__(self, expr): self.expr = expr.strip() # parse the expression into a tree def p_expr(p, prec=0): # parens if p.match('\('): p.chomp() a = p_expr(p) if not p.match('\)'): raise RExpr.Error("mismatched parens? %s" % p) p.chomp() # floats elif p.match('[+-]?(?:[_0-9]*\.[_0-9eE]|nan)'): a = RExpr.FloatLit(RFloat(p.chomp())) # ints elif p.match('[+-]?(?:[0-9][bBoOxX]?[_0-9a-fA-F]*|∞|inf)'): a = RExpr.IntLit(RInt(p.chomp())) # fields/functions elif p.match('[_a-zA-Z][_a-zA-Z0-9]*'): a = p.chomp() if p.match('\('): p.chomp() if a not in RExpr.funcs: raise RExpr.Error("unknown function? %s" % a) args = [] while True: b = p_expr(p) args.append(b) if p.match(','): p.chomp() continue else: if not p.match('\)'): raise RExpr.Error("mismatched parens? %s" % p) p.chomp() a = RExpr.funcs[a](*args) break else: a = RExpr.Field(a) # unary ops elif any(p.match(re.escape(op)) for op in RExpr.uops.keys()): # sort by len to avoid ambiguities for op in sorted(RExpr.uops.keys(), reverse=True): if p.match(re.escape(op)): p.chomp() a = p_expr(p, mt.inf) a = RExpr.uops[op](a) break else: assert False # unknown expr? else: raise RExpr.Error("unknown expr? %s" % p) # parse tail while True: # binary ops if any(p.match(re.escape(op)) and prec < RExpr.bprecs[op] for op in RExpr.bops.keys()): # sort by len to avoid ambiguities for op in sorted(RExpr.bops.keys(), reverse=True): if (p.match(re.escape(op)) and prec < RExpr.bprecs[op]): p.chomp() b = p_expr(p, RExpr.bprecs[op]) a = RExpr.bops[op](a, b) break else: assert False # ternary ops, these are intentionally right associative elif any(p.match(re.escape(op[0])) and prec <= RExpr.tprecs[op] for op in RExpr.tops.keys()): # sort by len to avoid ambiguities for op in sorted(RExpr.tops.keys(), reverse=True): if (p.match(re.escape(op[0])) and prec <= RExpr.tprecs[op]): p.chomp() b = p_expr(p, RExpr.tprecs[op]) if not p.match(re.escape(op[1])): raise RExpr.Error( 'mismatched ternary op? %s %s' % op) p.chomp() c = p_expr(p, RExpr.tprecs[op]) a = RExpr.tops[op](a, b, c) break else: assert False # no tail else: return a try: p = Parser(self.expr) self.tree = p_expr(p) if p: raise RExpr.Error("trailing expr? %s" % p) except (RExpr.Error, ValueError) as e: print('error: in expr: %s' % self.expr, file=sys.stderr) print('error: %s' % e, file=sys.stderr) sys.exit(3) # recursively find all fields def fields(self): try: return self.tree.fields() except RExpr.Error as e: print('error: in expr: %s' % self.expr, file=sys.stderr) print('error: %s' % e, file=sys.stderr) sys.exit(3) # recursively find the type def type(self, types={}): try: return self.tree.type(types) except RExpr.Error as e: print('error: in expr: %s' % self.expr, file=sys.stderr) print('error: %s' % e, file=sys.stderr) sys.exit(3) # recursively find the fold operation def fold(self, types={}): try: return self.tree.fold(types) except RExpr.Error as e: print('error: in expr: %s' % self.expr, file=sys.stderr) print('error: %s' % e, file=sys.stderr) sys.exit(3) # recursive evaluate the expr def eval(self, fields={}): try: return self.tree.eval(fields) except RExpr.Error as e: print('error: in expr: %s' % self.expr, file=sys.stderr) print('error: %s' % e, file=sys.stderr) sys.exit(3) # parse %-escaped strings def punescape(s, attrs=None): if attrs is None: attrs = {} if isinstance(attrs, dict): attrs_ = attrs attrs = lambda k: attrs_[k] pattern = re.compile( '%[%n]' '|' '%x..' '|' '%u....' '|' '%U........' '|' '%\((?P[^)]*)\)' '(?P[+\- #0-9\.]*[sdboxXfFeEgG])') def unescape(m): if m.group()[1] == '%': return '%' elif m.group()[1] == 'n': return '\n' elif m.group()[1] == 'x': return chr(int(m.group()[2:], 16)) elif m.group()[1] == 'u': return chr(int(m.group()[2:], 16)) elif m.group()[1] == 'U': return chr(int(m.group()[2:], 16)) elif m.group()[1] == '(': try: v = attrs(m.group('field')) except KeyError: return m.group() f = m.group('format') if f[-1] in 'dboxX': if isinstance(v, str): v = try_dat(v) or 0 v = int(v) elif f[-1] in 'fFeEgG': if isinstance(v, str): v = try_dat(v) or 0 v = float(v) else: f = ('<' if '-' in f else '>') + f.replace('-', '') v = str(v) # note we need Python's new format syntax for binary return ('{:%s}' % f).format(v) else: assert False return re.sub(pattern, unescape, s) def punescape_help(): print('mods:') print(' %-21s %s' % ('%%', 'A literal % character')) print(' %-21s %s' % ('%n', 'A newline')) print(' %-21s %s' % ( '%xaa', 'A character with the hex value aa')) print(' %-21s %s' % ( '%uaaaa', 'A character with the hex value aaaa')) print(' %-21s %s' % ( '%Uaaaaaaaa', 'A character with the hex value aaaaaaaa')) print(' %-21s %s' % ( '%(field)s', 'An existing field formatted as a string')) print(' %-21s %s' % ( '%(field)[dboxX]', 'An existing field formatted as an integer')) print(' %-21s %s' % ( '%(field)[fFeEgG]', 'An existing field formatted as a float')) def openio(path, mode='r', buffering=-1): # allow '-' for stdin/stdout if path == '-': if 'r' in mode: return os.fdopen(os.dup(sys.stdin.fileno()), mode, buffering) else: return os.fdopen(os.dup(sys.stdout.fileno()), mode, buffering) else: return open(path, mode, buffering) def collect_csv(csv_paths, *, depth=1, children=None, notes=None, **_): # collect both results and fields from CSV files fields = co.OrderedDict() results = [] for path in csv_paths: try: with openio(path) as f: # csv or json? assume json starts with [ json = (f.buffer.peek(1)[:1] == b'[') # read csv? if not json: reader = csv.DictReader(f, restval='') # collect fields fields.update((k, True) for k in reader.fieldnames) for r in reader: # strip and drop empty fields r_ = {k: v.strip() for k, v in r.items() if k not in {'notes'} and v.strip()} # special handling for notes field if notes is not None and notes in r: r_[notes] = set(r[notes].split(',')) results.append(r_) # read json? else: import json def unjsonify(results, depth_): results_ = [] for r in results: # collect fields fields.update((k, True) for k in r.keys()) # convert to strings, we'll reparse these later # # this may seem a bit backwards, but it keeps # the rest of the script simpler if we pretend # everything came from a csv r_ = {k: str(v).strip() for k, v in r.items() if k not in {'children', 'notes'} and str(v).strip()} # special handling for children field if (children is not None and children in r and r[children] is not None and depth_ > 1): r_[children] = unjsonify( r[children], depth_-1) # special handling for notes field if (notes is not None and notes in r and r[notes] is not None): r_[notes] = set(r[notes]) results_.append(r_) return results_ results.extend(unjsonify(json.load(f), depth)) except FileNotFoundError: pass return list(fields.keys()), results def compile(fields_, results, by=None, fields=None, mods=[], exprs=[], sort=None, children=None, hot=None, notes=None): by = by.copy() fields = fields.copy() # make sure sort/hot fields are included for k, reverse in it.chain(sort or [], hot or []): # this defaults to typechecking sort/hot fields, which is # probably safer, if you really want to sort by strings you # can use --by + --label to create hidden by fields if k and k not in by and k not in fields: fields.append(k) # make sure all expr targets are in fields so they get typechecked # correctly for k, _ in exprs: if k not in fields: fields.append(k) # we only really care about the last mod/expr for each field mods = {k: mod for k, mod in mods} exprs = {k: expr for k, expr in exprs} # find best type for all fields used by field exprs fields__ = set(it.chain.from_iterable( exprs[k].fields() if k in exprs else [k] for k in fields)) types = {} for k in fields__: # check if dependency is in original fields # # it's tempting to also allow enumerate fields here, but this # currently doesn't work when hotifying if k not in fields_: print("error: no field %r?" % k, file=sys.stderr) sys.exit(2) for t in [RInt, RFloat, RFrac]: for r in results: if k in r and r[k].strip(): try: t(r[k]) except ValueError: break else: types[k] = t break else: print("error: no type matches field %r?" % k, file=sys.stderr) sys.exit(2) # typecheck exprs, note these may reference input fields # with the same name types__ = types.copy() for k, expr in exprs.items(): types__[k] = expr.type(types) # foldcheck field exprs folds = {k: (RSum, t) for k, v in types.items()} for k, expr in exprs.items(): folds[k] = expr.fold(types) folds = {k: (f(), t) for k, (f, t) in folds.items()} # create result class def __new__(cls, **r): # evaluate types r_ = r.copy() for k, t in types.items(): r_[k] = t(r[k]) if k in r else t() # evaluate exprs r__ = r_.copy() for k, expr in exprs.items(): r__[k] = expr.eval(r_) # evaluate mods for k, m in mods.items(): r__[k] = punescape(m, r) # return result return cls.__mro__[1].__new__(cls, **( {k: r__.get(k, '') for k in by} | {k: ([r__[k]], 1) if k in r__ else ([], 0) for k in fields} | ({children: r[children] if children in r else []} if children is not None else {}) | ({notes: r[notes] if notes in r else set()} if notes is not None else {}))) def __add__(self, other): # reuse lists if possible def extend(a, b): if len(a[0]) == a[1]: a[0].extend(b[0][:b[1]]) return (a[0], a[1] + b[1]) else: return (a[0][:a[1]] + b[0][:b[1]], a[1] + b[1]) # lazily fold results return self.__class__.__mro__[1].__new__(self.__class__, **( {k: getattr(self, k) for k in by} | {k: extend( object.__getattribute__(self, k), object.__getattribute__(other, k)) for k in fields} | ({children: self.children + other.children} if children is not None else {}) | ({notes: self.notes | other.notes} if notes is not None else {}))) def __getattribute__(self, k): # lazily fold results on demand, this avoids issues with fold # operations that depend on the number of results if k in fields: v = object.__getattribute__(self, k) if v[1]: return folds[k][0](v[0][:v[1]]) else: return None return object.__getattribute__(self, k) return type( 'Result', (co.namedtuple('Result', list(co.OrderedDict.fromkeys(it.chain( by, fields, [children] if children is not None else [], [notes] if notes is not None else [])).keys())),), dict( __slots__=(), __new__=__new__, __add__=__add__, __getattribute__=__getattribute__, _by=by, _fields=fields, _sort=fields, _types={k: t for k, (_, t) in folds.items()}, _mods=mods, _exprs=exprs, **{'_children': children} if children is not None else {}, **{'_notes': notes} if notes is not None else {})) def homogenize(Result, results, *, enumerates=None, defines=[], depth=1): # this just converts all (possibly recursive) results to our # result type results_ = [] for r in results: # filter by matching defines # # we do this here instead of in fold to be consistent with # evaluation order of exprs/mods/etc, note this isn't really # inconsistent with the other scripts, since they don't really # evaluate anything if not all(k in r and str(r[k]) in vs for k, vs in defines): continue # append a result results_.append(Result(**( r # enumerate? | ({e: len(results_) for e in enumerates} if enumerates is not None else {}) # recurse? | ({Result._children: homogenize( Result, r[Result._children], # only filter defines at the top level! enumerates=enumerates, depth=depth-1)} if hasattr(Result, '_children') and Result._children in r and r[Result._children] is not None and depth > 1 else {})))) return results_ # common folding/tabling/read/write code class Rev(co.namedtuple('Rev', 'x')): __slots__ = () # yes we need all of these because we're a namedtuple def __lt__(self, other): return self.x > other.x def __gt__(self, other): return self.x < other.x def __le__(self, other): return self.x >= other.x def __ge__(self, other): return self.x <= other.x def fold(Result, results, *, by=None, defines=[], sort=None, depth=1, **_): # stop when depth hits zero if depth == 0: return [] # organize by by if by is None: by = Result._by for k in it.chain(by or [], (k for k, _ in defines)): if k not in Result._by and k not in Result._fields: print("error: could not find field %r?" % k, file=sys.stderr) sys.exit(-1) # filter by matching defines if defines: results_ = [] for r in results: if all(str(getattr(r, k)) in vs for k, vs in defines): results_.append(r) results = results_ # organize results into conflicts folding = co.OrderedDict() for r in results: name = tuple(getattr(r, k) for k in by) if name not in folding: folding[name] = [] folding[name].append(r) # merge conflicts folded = [] for name, rs in folding.items(): folded.append(sum(rs[1:], start=rs[0])) # sort, note that python's sort is stable folded.sort(key=lambda r: ( # sort by explicit sort fields tuple((Rev if reverse ^ (not k or k in Result._fields) else lambda x: x)( tuple((getattr(r, k_),) if getattr(r, k_) is not None else () for k_ in ([k] if k else Result._sort))) for k, reverse in (sort or [])), # sort by result r)) # recurse if we have recursive results if hasattr(Result, '_children'): folded = [r._replace(**{ Result._children: fold( Result, getattr(r, Result._children), by=by, # only filter defines at the top level! sort=sort, depth=depth-1)}) for r in folded] return folded def hotify(Result, results, *, enumerates=None, depth=1, hot=None, **_): # note! hotifying risks confusion if you don't enumerate/have a # z field, since it will allow folding across recursive boundaries # hotify only makes sense for recursive results assert hasattr(Result, '_children') results_ = [] for r in results: hot_ = [] def recurse(results_, depth_): nonlocal hot_ if not results_: return # find the hottest result r = min(results_, key=lambda r: tuple((Rev if reverse ^ (not k or k in Result._fields) else lambda x: x)( tuple((getattr(r, k_),) if getattr(r, k_) is not None else () for k_ in ([k] if k else Result._sort))) for k, reverse in it.chain(hot, [(None, False)]))) hot_.append(r._replace(**( # enumerate? ({e: len(hot_) for e in enumerates} if enumerates is not None else {}) | {Result._children: []}))) # recurse? if depth_ > 1: recurse(getattr(r, Result._children), depth_-1) recurse(getattr(r, Result._children), depth-1) results_.append(r._replace(**{Result._children: hot_})) return results_ def table(Result, results, diff_results=None, *, by=None, fields=None, sort=None, labels=None, depth=1, hot=None, diff=None, percent=None, all=False, compare=None, no_header=False, small_header=False, no_total=False, small_table=False, summary=False, **_): import builtins all_, all = all, builtins.all if by is None: by = Result._by if fields is None: fields = Result._fields types = Result._types # organize by name table = { ','.join(str(getattr(r, k) if getattr(r, k) is not None else '') for k in by): r for r in results} diff_table = { ','.join(str(getattr(r, k) if getattr(r, k) is not None else '') for k in by): r for r in diff_results or []} # lost results? this only happens if we didn't fold by the same # by field, which is an error and risks confusing results assert len(table) == len(results) if diff_results is not None: assert len(diff_table) == len(diff_results) # find compare entry if there is one if compare: compare_r = table.get(','.join(str(k) for k in compare)) # build up our lines lines = [] # header if not no_header: header = ['%s%s' % ( ','.join(labels if labels is not None else by), ' (%d added, %d removed)' % ( sum(1 for n in table if n not in diff_table), sum(1 for n in diff_table if n not in table)) if diff else '') if not small_header and not small_table and not summary else ''] if not diff: for k in fields: header.append(k) else: for k in fields: header.append('o'+k) for k in fields: header.append('n'+k) for k in fields: header.append('d'+k) lines.append(header) # delete these to try to catch typos below, we need to rebuild # these tables at each recursive layer del table del diff_table # entry helper def table_entry(name, r, diff_r=None): # prepend name entry = [name] # normal entry? if ((compare is None or r == compare_r) and not percent and not diff): for k in fields: entry.append( (getattr(r, k).table(), getattr(getattr(r, k), 'notes', lambda: [])()) if getattr(r, k, None) is not None else types[k].none) # compare entry? elif not percent and not diff: for k in fields: entry.append( (getattr(r, k).table() if getattr(r, k, None) is not None else types[k].none, (lambda t: ['+∞%'] if t == +mt.inf else ['-∞%'] if t == -mt.inf else ['%+.1f%%' % (100*t)])( types[k].ratio( getattr(r, k, None), getattr(compare_r, k, None))))) # percent entry? elif not diff: for k in fields: entry.append( (getattr(r, k).table() if getattr(r, k, None) is not None else types[k].none, (lambda t: ['+∞%'] if t == +mt.inf else ['-∞%'] if t == -mt.inf else ['%+.1f%%' % (100*t)])( types[k].ratio( getattr(r, k, None), getattr(diff_r, k, None))))) # diff entry? else: for k in fields: entry.append(getattr(diff_r, k).table() if getattr(diff_r, k, None) is not None else types[k].none) for k in fields: entry.append(getattr(r, k).table() if getattr(r, k, None) is not None else types[k].none) for k in fields: entry.append( (types[k].diff( getattr(r, k, None), getattr(diff_r, k, None)), (lambda t: ['+∞%'] if t == +mt.inf else ['-∞%'] if t == -mt.inf else ['%+.1f%%' % (100*t)] if t else [])( types[k].ratio( getattr(r, k, None), getattr(diff_r, k, None))))) # append any notes if hasattr(Result, '_notes') and r is not None: notes = sorted(getattr(r, Result._notes)) if isinstance(entry[-1], tuple): entry[-1] = (entry[-1][0], entry[-1][1] + notes) else: entry[-1] = (entry[-1], notes) return entry # recursive entry helper def table_recurse(results_, diff_results_, depth_, prefixes=('', '', '', '')): # build the children table at each layer table_ = { ','.join(str(getattr(r, k) if getattr(r, k) is not None else '') for k in by): r for r in results_} diff_table_ = { ','.join(str(getattr(r, k) if getattr(r, k) is not None else '') for k in by): r for r in diff_results_ or []} names_ = [n for n in table_.keys() | diff_table_.keys() if diff_results is None or all_ or any( types[k].ratio( getattr(table_.get(n), k, None), getattr(diff_table_.get(n), k, None)) for k in fields)] # sort again, now with diff info, note that python's sort is stable names_.sort(key=lambda n: ( # sort by explicit sort fields next( tuple((Rev if reverse ^ (not k or k in Result._fields) else lambda x: x)( tuple((getattr(r_, k_),) if getattr(r_, k_) is not None else () for k_ in ([k] if k else Result._sort))) for k, reverse in (sort or [])) for r_ in [table_.get(n), diff_table_.get(n)] if r_ is not None), # sort by ratio if diffing Rev(tuple(types[k].ratio( getattr(table_.get(n), k, None), getattr(diff_table_.get(n), k, None)) for k in fields)) if diff or percent else (), # move compare entry to the top, note this can be # overridden by explicitly sorting by fields (table_.get(n) != compare_r, # sort by ratio if comparing Rev(tuple( types[k].ratio( getattr(table_.get(n), k, None), getattr(compare_r, k, None)) for k in fields))) if compare else (), # sort by result (table_[n],) if n in table_ else (), # and finally by name (diffs may be missing results) n)) for i, name in enumerate(names_): # find comparable results r = table_.get(name) diff_r = diff_table_.get(name) # figure out a good label if labels is not None: label = next( ','.join(str(getattr(r_, k) if getattr(r_, k) is not None else '') for k in labels) for r_ in [r, diff_r] if r_ is not None) else: label = name # build line line = table_entry(label, r, diff_r) # add prefixes line = [x if isinstance(x, tuple) else (x, []) for x in line] line[0] = (prefixes[0+(i==len(names_)-1)] + line[0][0], line[0][1]) lines.append(line) # recurse? if name in table_ and depth_ > 1: table_recurse( getattr(r, Result._children), getattr(diff_r, Result._children, None), depth_-1, (prefixes[2+(i==len(names_)-1)] + "|-> ", prefixes[2+(i==len(names_)-1)] + "'-> ", prefixes[2+(i==len(names_)-1)] + "| ", prefixes[2+(i==len(names_)-1)] + " ")) # build entries if not summary: table_recurse(results, diff_results, depth) # total if not no_total and not (small_table and not summary): r = next(iter(fold(Result, results, by=[])), None) if diff_results is None: diff_r = None else: diff_r = next(iter(fold(Result, diff_results, by=[])), None) lines.append(table_entry('TOTAL', r, diff_r)) # homogenize lines = [[x if isinstance(x, tuple) else (x, []) for x in line] for line in lines] # find the best widths, note that column 0 contains the names and is # handled a bit differently widths = co.defaultdict(lambda: 7, {0: 7}) nwidths = co.defaultdict(lambda: 0) for line in lines: for i, x in enumerate(line): widths[i] = max(widths[i], ((len(x[0])+1+4-1)//4)*4-1) if i != len(line)-1: nwidths[i] = max(nwidths[i], 1+sum(2+len(n) for n in x[1])) # print our table for line in lines: print('%-*s %s' % ( widths[0], line[0][0], ' '.join('%*s%-*s' % ( widths[i], x[0], nwidths[i], ' (%s)' % ', '.join(x[1]) if x[1] else '') for i, x in enumerate(line[1:], 1)))) def read_csv(path, Result, *, depth=1, **_): with openio(path, 'r') as f: # csv or json? assume json starts with [ json = (f.buffer.peek(1)[:1] == b'[') # read csv? if not json: results = [] reader = csv.DictReader(f, restval='') for r in reader: if not any(k in r and r[k].strip() for k in Result._fields): continue try: # note this allows by/fields to overlap results.append(Result(**( {k: r[k] for k in Result._by if k in r and r[k].strip()} | {k: r[k] for k in Result._fields if k in r and r[k].strip()}))) except TypeError: pass return results # read json? else: import json def unjsonify(results, depth_): results_ = [] for r in results: if not any(k in r and r[k].strip() for k in Result._fields): continue try: # note this allows by/fields to overlap results_.append(Result(**( {k: r[k] for k in Result._by if k in r and r[k] is not None} | {k: r[k] for k in Result._fields if k in r and r[k] is not None} | ({Result._children: unjsonify( r[Result._children], depth_-1)} if hasattr(Result, '_children') and Result._children in r and r[Result._children] is not None and depth_ > 1 else {}) | ({Result._notes: set(r[Result._notes])} if hasattr(Result, '_notes') and Result._notes in r and r[Result._notes] is not None else {})))) except TypeError: pass return results_ return unjsonify(json.load(f), depth) def write_csv(path, Result, results, *, json=False, by=None, fields=None, depth=1, **_): with openio(path, 'w') as f: # write csv? if not json: writer = csv.DictWriter(f, list(co.OrderedDict.fromkeys(it.chain( by if by is not None else Result._by, fields if fields is not None else Result._fields)).keys())) writer.writeheader() for r in results: # note this allows by/fields to overlap writer.writerow( {k: getattr(r, k) for k in (by if by is not None else Result._by) if getattr(r, k) is not None} | {k: str(getattr(r, k)) for k in (fields if fields is not None else Result._fields) if getattr(r, k) is not None}) # write json? else: import json # the neat thing about json is we can include recursive results def jsonify(results, depth_): results_ = [] for r in results: # note this allows by/fields to overlap results_.append( {k: getattr(r, k) for k in (by if by is not None else Result._by) if getattr(r, k) is not None} | {k: str(getattr(r, k)) for k in (fields if fields is not None else Result._fields) if getattr(r, k) is not None} | ({Result._children: jsonify( getattr(r, Result._children), depth_-1)} if hasattr(Result, '_children') and getattr(r, Result._children) and depth_ > 1 else {}) | ({Result._notes: list( getattr(r, Result._notes))} if hasattr(Result, '_notes') and getattr(r, Result._notes) else {})) return results_ json.dump(jsonify(results, depth), f, separators=(',', ':')) def main(csv_paths, *, by=None, fields=None, defines=[], sort=None, depth=None, children=None, hot=None, notes=None, **args): # show mod help text? if args.get('help_mods'): return punescape_help() # show expr help text? if args.get('help_exprs'): return RExpr.help() if by is None and fields is None: print("error: needs --by or --fields to figure out fields", file=sys.stderr) sys.exit(-1) if children is not None: if len(children) > 1: print("error: multiple --children fields currently not supported", file=sys.stderr) sys.exit(-1) children = children[0] if notes is not None: if len(notes) > 1: print("error: multiple --notes fields currently not supported", file=sys.stderr) sys.exit(-1) notes = notes[0] # recursive results imply --children if (depth is not None or hot is not None) and children is None: children = 'children' # figure out depth if depth is None: depth = mt.inf if hot else 1 elif depth == 0: depth = mt.inf # separate out enumerates/mods/exprs # # enumerate enumerates: -ia # by supports mods: -ba=%(b)s # fields/sort/etc supports exprs: -fa=b+c # enumerates = [k for (k, v), hidden in (by or []) if v == enumerate] mods = [(k, v) for k, v in it.chain( ((k, v) for (k, v), hidden in (by or []) if v != enumerate)) if v is not None] exprs = [(k, v) for k, v in it.chain( ((k, v) for (k, v), hidden in (fields or [])), ((k, v) for (k, v), reverse in (sort or [])), ((k, v) for (k, v), reverse in (hot or []))) if v is not None] labels = None if by is not None: labels = [k for (k, v), hidden in by if not hidden] by = [k for (k, v), hidden in by] if fields is not None: fields = [k for (k, v), hidden in fields if not hidden or args.get('output') or args.get('output_json')] if sort is not None: sort = [(k, reverse) for (k, v), reverse in sort] if hot is not None: hot = [(k, reverse) for (k, v), reverse in hot] # find results if not args.get('use', None): # not enough info? if not csv_paths: print("error: no *.csv files?", file=sys.stderr) sys.exit(1) # collect info fields_, results = collect_csv(csv_paths, depth=depth, children=children, notes=notes, **args) else: # use is just an alias but takes priority fields_, results = collect_csv([args['use']], depth=depth, children=children, notes=notes, **args) # if by not specified, guess it's anything not in fields/defines/exprs/etc if not by: by = [k for k in fields_ if k not in (fields or []) and not any(k == k_ for k_, _ in defines) and not any(k == k_ for k_, _ in (sort or [])) and k != children and not any(k == k_ for k_, _ in (hot or [])) and k != notes and not any(k == k_ for _, expr in exprs for k_ in expr.fields())] # if fields not specified, guess it's anything not in by/defines/exprs/etc if not fields: fields = [k for k in fields_ if k not in (by or []) and not any(k == k_ for k_, _ in defines) and not any(k == k_ for k_, _ in (sort or [])) and k != children and not any(k == k_ for k_, _ in (hot or [])) and k != notes and not any(k == k_ for _, expr in exprs for k_ in expr.fields())] # build result type Result = compile(fields_, results, by=by, fields=fields, mods=mods, exprs=exprs, sort=sort, children=children, hot=hot, notes=notes) # homogenize results = homogenize(Result, results, enumerates=enumerates, defines=defines, depth=depth) # fold results = fold(Result, results, by=by, depth=depth) # hotify? if hot: results = hotify(Result, results, enumerates=enumerates, depth=depth, hot=hot) # find previous results? diff_results = None if args.get('diff') or args.get('percent'): # note! don't use read_csv here # # it's tempting now that we have a Result type, but we want to # make sure all the defines/exprs/mods/etc are evaluated in the # same order try: _, diff_results = collect_csv( [args.get('diff') or args.get('percent')], depth=depth, children=children, notes=notes, **args) except FileNotFoundError: diff_results = [] # homogenize diff_results = homogenize(Result, diff_results, enumerates=enumerates, defines=defines, depth=depth) # fold diff_results = fold(Result, diff_results, by=by, depth=depth) # hotify? if hot: diff_results = hotify(Result, diff_results, enumerates=enumerates, depth=depth, hot=hot) # write results to JSON if args.get('output_json'): write_csv(args['output_json'], Result, results, json=True, by=by, fields=fields, depth=depth, **args) # write results to CSV elif args.get('output'): write_csv(args['output'], Result, results, by=by, fields=fields, depth=depth, **args) # print table else: table(Result, results, diff_results, by=by, fields=fields, sort=sort, labels=labels, depth=depth, **args) if __name__ == "__main__": import argparse import sys parser = argparse.ArgumentParser( description="Script to manipulate CSV files.", allow_abbrev=False) parser.add_argument( 'csv_paths', nargs='*', help="Input *.csv files.") parser.add_argument( '--help-mods', action='store_true', help="Show what %% modifiers are available.") parser.add_argument( '--help-exprs', action='store_true', help="Show what field exprs are available.") parser.add_argument( '-o', '--output', help="Specify CSV file to store results.") parser.add_argument( '-O', '--output-json', help="Specify JSON file to store results. This may contain " "recursive info.") parser.add_argument( '-u', '--use', help="Don't parse anything, use this CSV/JSON file.") parser.add_argument( '-d', '--diff', help="Specify CSV/JSON file to diff against.") parser.add_argument( '-p', '--percent', help="Specify CSV/JSON file to diff against, but only show " "percentage change, not a full diff.") parser.add_argument( '-c', '--compare', type=lambda x: tuple(v.strip() for v in x.split(',')), help="Compare results to the row matching this by pattern.") parser.add_argument( '-a', '--all', action='store_true', help="Show all, not just the ones that changed.") class AppendBy(argparse.Action): def __call__(self, parser, namespace, value, option): if namespace.by is None: namespace.by = [] namespace.by.append((value, option in { '-B', '--hidden-by', '-I', '--hidden-enumerate'})) parser.add_argument( '-i', '--enumerate', action=AppendBy, nargs='?', type=lambda x: (x, enumerate), const=('i', enumerate), help="Enumerate results with this field. This will prevent " "result folding.") parser.add_argument( '-I', '--hidden-enumerate', action=AppendBy, nargs='?', type=lambda x: (x, enumerate), const=('i', enumerate), help="Like -i/--enumerate, but hidden from the table renderer.") parser.add_argument( '-b', '--by', action=AppendBy, type=lambda x: ( lambda k, v=None: ( k.strip(), v.strip() if v is not None else None) )(*x.split('=', 1)), help="Group by this field. This does _not_ support expressions, " "but can be assigned a string with %% modifiers.") parser.add_argument( '-B', '--hidden-by', action=AppendBy, type=lambda x: ( lambda k, v=None: ( k.strip(), v.strip() if v is not None else None) )(*x.split('=', 1)), help="Like -b/--by, but hidden from the table renderer.") class AppendField(argparse.Action): def __call__(self, parser, namespace, value, option): if namespace.fields is None: namespace.fields = [] namespace.fields.append((value, option in { '-F', '--hidden-field'})) parser.add_argument( '-f', '--field', dest='fields', action=AppendField, type=lambda x: ( lambda k, v=None: ( k.strip(), RExpr(v) if v is not None else None) )(*x.split('=', 1)), help="Show this field. Can include an expression of the form " "field=expr.") parser.add_argument( '-F', '--hidden-field', dest='fields', action=AppendField, type=lambda x: ( lambda k, v=None: ( k.strip(), v.strip() if v is not None else None) )(*x.split('=', 1)), help="Like -f/--field, but hidden from the table renderer.") parser.add_argument( '-D', '--define', dest='defines', action='append', type=lambda x: ( lambda k, vs: ( k.strip(), {v.strip() for v in vs.split(',')}) )(*x.split('=', 1)), help="Only include results where this field is this value. May " "include comma-separated options.") class AppendSort(argparse.Action): def __call__(self, parser, namespace, value, option): if namespace.sort is None: namespace.sort = [] namespace.sort.append((value, option in {'-S', '--reverse-sort'})) parser.add_argument( '-s', '--sort', nargs='?', action=AppendSort, type=lambda x: ( lambda k, v=None: ( k.strip(), RExpr(v) if v is not None else None) )(*x.split('=', 1)), const=(None, None), help="Sort by this field. Can include an expression of the form " "field=expr.") parser.add_argument( '-S', '--reverse-sort', nargs='?', action=AppendSort, type=lambda x: ( lambda k, v=None: ( k.strip(), RExpr(v) if v is not None else None) )(*x.split('=', 1)), const=(None, None), help="Sort by this field, but backwards. Can include an expression " "of the form field=expr.") parser.add_argument( '-z', '--depth', nargs='?', type=lambda x: int(x, 0), const=0, help="Depth of function calls to show. 0 shows all calls unless " "we find a cycle. Defaults to 0.") parser.add_argument( '-Z', '--children', nargs='?', const='children', action='append', help="Field to use for recursive results. This expects a list " "and really only works with JSON input.") class AppendHot(argparse.Action): def __call__(self, parser, namespace, value, option): if namespace.hot is None: namespace.hot = [] namespace.hot.append((value, option in {'-R', '--reverse-hot'})) parser.add_argument( '-r', '--hot', nargs='?', action=AppendHot, type=lambda x: ( lambda k, v=None: ( k.strip(), RExpr(v) if v is not None else None) )(*x.split('=', 1)), const=(None, None), help="Show only the hot path for each function call. Can " "optionally provide fields like sort. Can include an " "expression in the form of field=expr.") parser.add_argument( '-R', '--reverse-hot', nargs='?', action=AppendHot, type=lambda x: ( lambda k, v=None: ( k.strip(), RExpr(v) if v is not None else None) )(*x.split('=', 1)), const=(None, None), help="Like -r/--hot, but backwards.") parser.add_argument( '-N', '--notes', nargs='?', const='notes', action='append', help="Field to use for notes.") parser.add_argument( '--no-header', action='store_true', help="Don't show the header.") parser.add_argument( '--small-header', action='store_true', help="Don't show by field names.") parser.add_argument( '--no-total', action='store_true', help="Don't show the total.") parser.add_argument( '-Q', '--small-table', action='store_true', help="Equivalent to --small-header + --no-total.") parser.add_argument( '-Y', '--summary', action='store_true', help="Only show the total.") sys.exit(main(**{k: v for k, v in vars(parser.parse_intermixed_args()).items() if v is not None}))