Files
littlefs/scripts/csv.py
Christopher Haster 2f20f53e90 scripts: csv.py: Reverted define filtering to before expr eval
It's just too unintuitive to filter after exprs.

Note this is consistent with how exprs/mods are evaluated. Exprs/mods
can't reference other exprs/mods because csv.py is only single-pass, so
allowing defines to reference exprs/mods is surprising.

And the solution to needing these sort of post-expr/mod references is
the same for defines: You can always chain multiple csv.py calls.

The reason defines were change to evaluate after expr eval was because
this seemed inconsistent with other result scripts, but this is not
actually the case. Other result scripts simply don't have exprs/mods, so
filtering in fold is the same as filtering during collection. Note that
even in fold, filtering is done _before_ the actual fold/sum operation.

---

Also fixed a recursive-define regression when folding. Counter-
intuitively, we _don't_ want to recursively apply define filters. If we
do the results will just end up too confusing to be useful.
2025-03-12 19:10:17 -05:00

2564 lines
82 KiB
Python
Executable File

#!/usr/bin/env python3
#
# Script to manipulate CSV files.
#
# Example:
# ./scripts/csv.py lfs.code.csv lfs.stack.csv \
# -bfunction -fcode -fstack='max(stack)'
#
# Copyright (c) 2022, The littlefs authors.
# SPDX-License-Identifier: BSD-3-Clause
#
# prevent local imports
if __name__ == "__main__":
__import__('sys').path.pop(0)
import collections as co
import csv
import functools as ft
import itertools as it
import math as mt
import os
import re
import sys
# various field types
# integer fields
class RInt(co.namedtuple('RInt', 'x')):
__slots__ = ()
def __new__(cls, x=0):
if isinstance(x, RInt):
return x
if isinstance(x, str):
try:
x = int(x, 0)
except ValueError:
# also accept +-∞ and +-inf
if re.match('^\s*\+?\s*(?:∞|inf)\s*$', x):
x = mt.inf
elif re.match('^\s*-\s*(?:∞|inf)\s*$', x):
x = -mt.inf
else:
raise
if not (isinstance(x, int) or mt.isinf(x)):
x = int(x)
return super().__new__(cls, x)
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.x)
def __str__(self):
if self.x == mt.inf:
return ''
elif self.x == -mt.inf:
return '-∞'
else:
return str(self.x)
def __bool__(self):
return bool(self.x)
def __int__(self):
assert not mt.isinf(self.x)
return self.x
def __float__(self):
return float(self.x)
none = '%7s' % '-'
def table(self):
return '%7s' % (self,)
def diff(self, other):
new = self.x if self else 0
old = other.x if other else 0
diff = new - old
if diff == +mt.inf:
return '%7s' % '+∞'
elif diff == -mt.inf:
return '%7s' % '-∞'
else:
return '%+7d' % diff
def ratio(self, other):
new = self.x if self else 0
old = other.x if other else 0
if mt.isinf(new) and mt.isinf(old):
return 0.0
elif mt.isinf(new):
return +mt.inf
elif mt.isinf(old):
return -mt.inf
elif not old and not new:
return 0.0
elif not old:
return +mt.inf
else:
return (new-old) / old
def __pos__(self):
return self.__class__(+self.x)
def __neg__(self):
return self.__class__(-self.x)
def __abs__(self):
return self.__class__(abs(self.x))
def __add__(self, other):
return self.__class__(self.x + other.x)
def __sub__(self, other):
return self.__class__(self.x - other.x)
def __mul__(self, other):
return self.__class__(self.x * other.x)
def __truediv__(self, other):
if not other:
if self >= self.__class__(0):
return self.__class__(+mt.inf)
else:
return self.__class__(-mt.inf)
return self.__class__(self.x // other.x)
def __mod__(self, other):
return self.__class__(self.x % other.x)
# float fields
class RFloat(co.namedtuple('RFloat', 'x')):
__slots__ = ()
def __new__(cls, x=0.0):
if isinstance(x, RFloat):
return x
if isinstance(x, str):
try:
x = float(x)
except ValueError:
# also accept +-∞ and +-inf
if re.match('^\s*\+?\s*(?:∞|inf)\s*$', x):
x = mt.inf
elif re.match('^\s*-\s*(?:∞|inf)\s*$', x):
x = -mt.inf
else:
raise
if not isinstance(x, float):
x = float(x)
return super().__new__(cls, x)
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.x)
def __str__(self):
if self.x == mt.inf:
return ''
elif self.x == -mt.inf:
return '-∞'
else:
return '%.1f' % self.x
def __bool__(self):
return bool(self.x)
def __int__(self):
return int(self.x)
def __float__(self):
return float(self.x)
none = '%7s' % '-'
def table(self):
return '%7s' % (self,)
def diff(self, other):
new = self.x if self else 0
old = other.x if other else 0
diff = new - old
if diff == +mt.inf:
return '%7s' % '+∞'
elif diff == -mt.inf:
return '%7s' % '-∞'
else:
return '%+7.1f' % diff
def ratio(self, other):
new = self.x if self else 0
old = other.x if other else 0
if mt.isinf(new) and mt.isinf(old):
return 0.0
elif mt.isinf(new):
return +mt.inf
elif mt.isinf(old):
return -mt.inf
elif not old and not new:
return 0.0
elif not old:
return +mt.inf
else:
return (new-old) / old
def __pos__(self):
return self.__class__(+self.x)
def __neg__(self):
return self.__class__(-self.x)
def __abs__(self):
return self.__class__(abs(self.x))
def __add__(self, other):
return self.__class__(self.x + other.x)
def __sub__(self, other):
return self.__class__(self.x - other.x)
def __mul__(self, other):
return self.__class__(self.x * other.x)
def __truediv__(self, other):
if not other:
if self >= self.__class__(0):
return self.__class__(+mt.inf)
else:
return self.__class__(-mt.inf)
return self.__class__(self.x / other.x)
def __mod__(self, other):
return self.__class__(self.x % other.x)
# fractional fields, a/b
class RFrac(co.namedtuple('RFrac', 'a,b')):
__slots__ = ()
def __new__(cls, a=0, b=None):
if isinstance(a, RFrac) and b is None:
return a
if isinstance(a, str) and b is None:
a, b = a.split('/', 1)
if b is None:
b = a
return super().__new__(cls, RInt(a), RInt(b))
def __repr__(self):
return '%s(%r, %r)' % (self.__class__.__name__, self.a.x, self.b.x)
def __str__(self):
return '%s/%s' % (self.a, self.b)
def __bool__(self):
return bool(self.a)
def __int__(self):
return int(self.a)
def __float__(self):
return float(self.a)
none = '%11s' % '-'
def table(self):
return '%11s' % (self,)
def notes(self):
t = self.a.x/self.b.x if self.b.x else 1.0
return ['%' if t == +mt.inf
else '-∞%' if t == -mt.inf
else '%.1f%%' % (100*t)]
def diff(self, other):
new_a, new_b = self if self else (RInt(0), RInt(0))
old_a, old_b = other if other else (RInt(0), RInt(0))
return '%11s' % ('%s/%s' % (
new_a.diff(old_a).strip(),
new_b.diff(old_b).strip()))
def ratio(self, other):
new_a, new_b = self if self else (RInt(0), RInt(0))
old_a, old_b = other if other else (RInt(0), RInt(0))
new = new_a.x/new_b.x if new_b.x else 1.0
old = old_a.x/old_b.x if old_b.x else 1.0
return new - old
def __pos__(self):
return self.__class__(+self.a, +self.b)
def __neg__(self):
return self.__class__(-self.a, -self.b)
def __abs__(self):
return self.__class__(abs(self.a), abs(self.b))
def __add__(self, other):
return self.__class__(self.a + other.a, self.b + other.b)
def __sub__(self, other):
return self.__class__(self.a - other.a, self.b - other.b)
def __mul__(self, other):
return self.__class__(self.a * other.a, self.b * other.b)
def __truediv__(self, other):
return self.__class__(self.a / other.a, self.b / other.b)
def __mod__(self, other):
return self.__class__(self.a % other.a, self.b % other.b)
def __eq__(self, other):
self_a, self_b = self if self.b.x else (RInt(1), RInt(1))
other_a, other_b = other if other.b.x else (RInt(1), RInt(1))
return self_a * other_b == other_a * self_b
def __ne__(self, other):
return not self.__eq__(other)
def __lt__(self, other):
self_a, self_b = self if self.b.x else (RInt(1), RInt(1))
other_a, other_b = other if other.b.x else (RInt(1), RInt(1))
return self_a * other_b < other_a * self_b
def __gt__(self, other):
return self.__class__.__lt__(other, self)
def __le__(self, other):
return not self.__gt__(other)
def __ge__(self, other):
return not self.__lt__(other)
# various fold operations
class RSum:
def __call__(self, xs):
return sum(xs[1:], start=xs[0])
class RProd:
def __call__(self, xs):
return mt.prod(xs[1:], start=xs[0])
class RMin:
def __call__(self, xs):
return min(xs)
class RMax:
def __call__(self, xs):
return max(xs)
class RAvg:
def __call__(self, xs):
return RFloat(sum(float(x) for x in xs) / len(xs))
class RStddev:
def __call__(self, xs):
avg = sum(float(x) for x in xs) / len(xs)
return RFloat(mt.sqrt(sum((float(x) - avg)**2 for x in xs) / len(xs)))
class RGMean:
def __call__(self, xs):
return RFloat(mt.prod(float(x) for x in xs)**(1/len(xs)))
class RGStddev:
def __call__(self, xs):
gmean = mt.prod(float(x) for x in xs)**(1/len(xs))
return RFloat(
mt.exp(mt.sqrt(
sum(mt.log(float(x)/gmean)**2 for x in xs) / len(xs)))
if gmean else mt.inf)
# a simple general-purpose parser class
#
# basically just because memoryview doesn't support strs
class Parser:
def __init__(self, data, ws='\s*', ws_flags=0):
self.data = data
self.i = 0
self.m = None
# also consume whitespace
self.ws = re.compile(ws, ws_flags)
self.i = self.ws.match(self.data, self.i).end()
def __repr__(self):
if len(self.data) - self.i <= 32:
return repr(self.data[self.i:])
else:
return "%s..." % repr(self.data[self.i:self.i+32])[:32]
def __str__(self):
return self.data[self.i:]
def __len__(self):
return len(self.data) - self.i
def __bool__(self):
return self.i != len(self.data)
def match(self, pattern, flags=0):
# compile so we can use the pos arg, this is still cached
self.m = re.compile(pattern, flags).match(self.data, self.i)
return self.m
def group(self, *groups):
return self.m.group(*groups)
def chomp(self, *groups):
g = self.group(*groups)
self.i = self.m.end()
# also consume whitespace
self.i = self.ws.match(self.data, self.i).end()
return g
class Error(Exception):
pass
def chompmatch(self, pattern, flags=0, *groups):
if not self.match(pattern, flags):
raise Parser.Error("expected %r, found %r" % (pattern, self))
return self.chomp(*groups)
def unexpected(self):
raise Parser.Error("unexpected %r" % self)
def lookahead(self):
# push state on the stack
if not hasattr(self, 'stack'):
self.stack = []
self.stack.append((self.i, self.m))
return self
def consume(self):
# pop and use new state
self.stack.pop()
def discard(self):
# pop and discard new state
self.i, self.m = self.stack.pop()
def __enter__(self):
return self
def __exit__(self, et, ev, tb):
# keep new state if no exception occured
if et is None:
self.consume()
else:
self.discard()
# a lazily-evaluated field expression
class RExpr:
# expr parsing/typechecking/etc errors
class Error(Exception):
pass
# expr node base class
class Expr:
def __init__(self, *args):
for k, v in zip('abcdefghijklmnopqrstuvwxyz', args):
setattr(self, k, v)
def __iter__(self):
return (getattr(self, k)
for k in it.takewhile(
lambda k: hasattr(self, k),
'abcdefghijklmnopqrstuvwxyz'))
def __len__(self):
return sum(1 for _ in self)
def __repr__(self):
return '%s(%s)' % (
self.__class__.__name__,
','.join(repr(v) for v in self))
def fields(self):
return set(it.chain.from_iterable(v.fields() for v in self))
def type(self, types={}):
t = self.a.type(types)
if not all(t == v.type(types) for v in it.islice(self, 1, None)):
raise RExpr.Error("mismatched types? %r" % self)
return t
def fold(self, types={}):
return self.a.fold(types)
def eval(self, fields={}):
return self.a.eval(fields)
# expr nodes
# literal exprs
class IntLit(Expr):
def fields(self):
return set()
def type(self, types={}):
return RInt
def fold(self, types={}):
return RSum, RInt
def eval(self, fields={}):
return self.a
class FloatLit(Expr):
def fields(self):
return set()
def type(self, types={}):
return RFloat
def fold(self, types={}):
return RSum, RFloat
def eval(self, fields={}):
return self.a
# field expr
class Field(Expr):
def fields(self):
return {self.a}
def type(self, types={}):
if self.a not in types:
raise RExpr.Error("untyped field? %s" % self.a)
return types[self.a]
def fold(self, types={}):
if self.a not in types:
raise RExpr.Error("unfoldable field? %s" % self.a)
return RSum, types[self.a]
def eval(self, fields={}):
if self.a not in fields:
raise RExpr.Error("unknown field? %s" % self.a)
return fields[self.a]
# func expr helper
def func(name, args="a"):
def func(f):
f._func = name
f._fargs = args
return f
return func
class Funcs:
@ft.cache
def __get__(self, _, cls):
return {x._func: x
for x in cls.__dict__.values()
if hasattr(x, '_func')}
funcs = Funcs()
# type exprs
@func('int', 'a')
class Int(Expr):
"""Convert to an integer"""
def type(self, types={}):
return RInt
def eval(self, fields={}):
return RInt(self.a.eval(fields))
@func('float', 'a')
class Float(Expr):
"""Convert to a float"""
def type(self, types={}):
return RFloat
def eval(self, fields={}):
return RFloat(self.a.eval(fields))
@func('frac', 'a[, b]')
class Frac(Expr):
"""Convert to a fraction"""
def type(self, types={}):
return RFrac
def eval(self, fields={}):
if len(self) == 1:
return RFrac(self.a.eval(fields))
else:
return RFrac(self.a.eval(fields), self.b.eval(fields))
# fold exprs
@func('sum', 'a[, ...]')
class Sum(Expr):
"""Find the sum of this column or fields"""
def fold(self, types={}):
if len(self) == 1:
return RSum, self.a.type(types)
else:
return self.a.fold(types)
def eval(self, fields={}):
if len(self) == 1:
return self.a.eval(fields)
else:
return RSum()([v.eval(fields) for v in self])
@func('prod', 'a[, ...]')
class Prod(Expr):
"""Find the product of this column or fields"""
def fold(self, types={}):
if len(self) == 1:
return Prod, self.a.type(types)
else:
return self.a.fold(types)
def eval(self, fields={}):
if len(self) == 1:
return self.a.eval(fields)
else:
return Prod()([v.eval(fields) for v in self])
@func('min', 'a[, ...]')
class Min(Expr):
"""Find the minimum of this column or fields"""
def fold(self, types={}):
if len(self) == 1:
return RMin, self.a.type(types)
else:
return self.a.fold(types)
def eval(self, fields={}):
if len(self) == 1:
return self.a.eval(fields)
else:
return RMin()([v.eval(fields) for v in self])
@func('max', 'a[, ...]')
class Max(Expr):
"""Find the maximum of this column or fields"""
def fold(self, types={}):
if len(self) == 1:
return RMax, self.a.type(types)
else:
return self.a.fold(types)
def eval(self, fields={}):
if len(self) == 1:
return self.a.eval(fields)
else:
return RMax()([v.eval(fields) for v in self])
@func('avg', 'a[, ...]')
class Avg(Expr):
"""Find the average of this column or fields"""
def type(self, types={}):
if len(self) == 1:
return self.a.type(types)
else:
return RFloat
def fold(self, types={}):
if len(self) == 1:
return RAvg, RFloat
else:
return self.a.fold(types)
def eval(self, fields={}):
if len(self) == 1:
return self.a.eval(fields)
else:
return RAvg()([v.eval(fields) for v in self])
@func('stddev', 'a[, ...]')
class Stddev(Expr):
"""Find the standard deviation of this column or fields"""
def type(self, types={}):
if len(self) == 1:
return self.a.type(types)
else:
return RFloat
def fold(self, types={}):
if len(self) == 1:
return RStddev, RFloat
else:
return self.a.fold(types)
def eval(self, fields={}):
if len(self) == 1:
return self.a.eval(fields)
else:
return RStddev()([v.eval(fields) for v in self])
@func('gmean', 'a[, ...]')
class GMean(Expr):
"""Find the geometric mean of this column or fields"""
def type(self, types={}):
if len(self) == 1:
return self.a.type(types)
else:
return RFloat
def fold(self, types={}):
if len(self) == 1:
return RGMean, RFloat
else:
return self.a.fold(types)
def eval(self, fields={}):
if len(self) == 1:
return self.a.eval(fields)
else:
return RGMean()([v.eval(fields) for v in self])
@func('gstddev', 'a[, ...]')
class GStddev(Expr):
"""Find the geometric stddev of this column or fields"""
def type(self, types={}):
if len(self) == 1:
return self.a.type(types)
else:
return RFloat
def fold(self, types={}):
if len(self) == 1:
return RGStddev, RFloat
else:
return self.a.fold(types)
def eval(self, fields={}):
if len(self) == 1:
return self.a.eval(fields)
else:
return RGStddev()([v.eval(fields) for v in self])
# functions
@func('ratio', 'a')
class Ratio(Expr):
"""Ratio of a fraction as a float"""
def type(self, types={}):
return RFloat
def eval(self, fields={}):
v = RFrac(self.a.eval(fields))
if not float(v.b):
return RFloat(1)
else:
return RFloat(float(v.a) / float(v.b))
@func('total', 'a')
class Total(Expr):
"""Total part of a fraction"""
def type(self, types={}):
return RInt
def eval(self, fields={}):
return RFrac(self.a.eval(fields)).b
@func('abs', 'a')
class Abs(Expr):
"""Absolute value"""
def eval(self, fields={}):
return abs(self.a.eval(fields))
@func('ceil', 'a')
class Ceil(Expr):
"""Round up to nearest integer"""
def type(self, types={}):
return RFloat
def eval(self, fields={}):
return RFloat(mt.ceil(float(self.a.eval(fields))))
@func('floor', 'a')
class Floor(Expr):
"""Round down to nearest integer"""
def type(self, types={}):
return RFloat
def eval(self, fields={}):
return RFloat(mt.floor(float(self.a.eval(fields))))
@func('log', 'a[, b]')
class Log(Expr):
"""Log of a with base e, or log of a with base b"""
def type(self, types={}):
return RFloat
def eval(self, fields={}):
if len(self) == 1:
return RFloat(mt.log(
float(self.a.eval(fields))))
else:
return RFloat(mt.log(
float(self.a.eval(fields)),
float(self.b.eval(fields))))
@func('pow', 'a[, b]')
class Pow(Expr):
"""e to the power of a, or a to the power of b"""
def type(self, types={}):
return RFloat
def eval(self, fields={}):
if len(self) == 1:
return RFloat(mt.exp(
float(self.a.eval(fields))))
else:
return RFloat(mt.pow(
float(self.a.eval(fields)),
float(self.b.eval(fields))))
@func('sqrt', 'a')
class Sqrt(Expr):
"""Square root"""
def type(self, types={}):
return RFloat
def eval(self, fields={}):
return RFloat(mt.sqrt(float(self.a.eval(fields))))
@func('isint', 'a')
class IsInt(Expr):
"""1 if a is an integer, otherwise 0"""
def type(self, types={}):
return RInt
def eval(self, fields={}):
if isinstance(self.a.eval(fields), RInt):
return RInt(1)
else:
return RInt(0)
@func('isfloat', 'a')
class IsFloat(Expr):
"""1 if a is a float, otherwise 0"""
def type(self, types={}):
return RInt
def eval(self, fields={}):
if isinstance(self.a.eval(fields), RFloat):
return RInt(1)
else:
return RInt(0)
@func('isfrac', 'a')
class IsFrac(Expr):
"""1 if a is a fraction, otherwise 0"""
def type(self, types={}):
return RInt
def eval(self, fields={}):
if isinstance(self.a.eval(fields), RFrac):
return RInt(1)
else:
return RInt(0)
@func('isinf', 'a')
class IsInf(Expr):
"""1 if a is infinite, otherwise 0"""
def type(self, types={}):
return RInt
def eval(self, fields={}):
if mt.isinf(self.a.eval(fields)):
return RInt(1)
else:
return RInt(0)
@func('isnan')
class IsNan(Expr):
"""1 if a is a NAN, otherwise 0"""
def type(self, types={}):
return RInt
def eval(self, fields={}):
if mt.isnan(self.a.eval(fields)):
return RInt(1)
else:
return RInt(0)
# unary expr helper
def uop(op):
def uop(f):
f._uop = op
return f
return uop
class UOps:
@ft.cache
def __get__(self, _, cls):
return {x._uop: x
for x in cls.__dict__.values()
if hasattr(x, '_uop')}
uops = UOps()
# unary ops
@uop('+')
class Pos(Expr):
"""Non-negation"""
def eval(self, fields={}):
return +self.a.eval(fields)
@uop('-')
class Neg(Expr):
"""Negation"""
def eval(self, fields={}):
return -self.a.eval(fields)
@uop('!')
class NotNot(Expr):
"""1 if a is zero, otherwise 0"""
def type(self, types={}):
return RInt
def eval(self, fields={}):
if self.a.eval(fields):
return RInt(0)
else:
return RInt(1)
# binary expr help
def bop(op, prec):
def bop(f):
f._bop = op
f._bprec = prec
return f
return bop
class BOps:
@ft.cache
def __get__(self, _, cls):
return {x._bop: x
for x in cls.__dict__.values()
if hasattr(x, '_bop')}
bops = BOps()
class BPrecs:
@ft.cache
def __get__(self, _, cls):
return {x._bop: x._bprec
for x in cls.__dict__.values()
if hasattr(x, '_bop')}
bprecs = BPrecs()
# binary ops
@bop('*', 10)
class Mul(Expr):
"""Multiplication"""
def eval(self, fields={}):
return self.a.eval(fields) * self.b.eval(fields)
@bop('/', 10)
class Div(Expr):
"""Division"""
def eval(self, fields={}):
return self.a.eval(fields) / self.b.eval(fields)
@bop('%', 10)
class Mod(Expr):
"""Modulo"""
def eval(self, fields={}):
return self.a.eval(fields) % self.b.eval(fields)
@bop('+', 9)
class Add(Expr):
"""Addition"""
def eval(self, fields={}):
a = self.a.eval(fields)
b = self.b.eval(fields)
return a + b
@bop('-', 9)
class Sub(Expr):
"""Subtraction"""
def eval(self, fields={}):
return self.a.eval(fields) - self.b.eval(fields)
@bop('==', 4)
class Eq(Expr):
"""1 if a equals b, otherwise 0"""
def eval(self, fields={}):
if self.a.eval(fields) == self.b.eval(fields):
return RInt(1)
else:
return RInt(0)
@bop('!=', 4)
class Ne(Expr):
"""1 if a does not equal b, otherwise 0"""
def eval(self, fields={}):
if self.a.eval(fields) != self.b.eval(fields):
return RInt(1)
else:
return RInt(0)
@bop('<', 4)
class Lt(Expr):
"""1 if a is less than b"""
def eval(self, fields={}):
if self.a.eval(fields) < self.b.eval(fields):
return RInt(1)
else:
return RInt(0)
@bop('<=', 4)
class Le(Expr):
"""1 if a is less than or equal to b"""
def eval(self, fields={}):
if self.a.eval(fields) <= self.b.eval(fields):
return RInt(1)
else:
return RInt(0)
@bop('>', 4)
class Gt(Expr):
"""1 if a is greater than b"""
def eval(self, fields={}):
if self.a.eval(fields) > self.b.eval(fields):
return RInt(1)
else:
return RInt(0)
@bop('>=', 4)
class Ge(Expr):
"""1 if a is greater than or equal to b"""
def eval(self, fields={}):
if self.a.eval(fields) >= self.b.eval(fields):
return RInt(1)
else:
return RInt(0)
@bop('&&', 3)
class AndAnd(Expr):
"""b if a is non-zero, otherwise a"""
def eval(self, fields={}):
a = self.a.eval(fields)
if a:
return self.b.eval(fields)
else:
return a
@bop('||', 2)
class OrOr(Expr):
"""a if a is non-zero, otherwise b"""
def eval(self, fields={}):
a = self.a.eval(fields)
if a:
return a
else:
return self.b.eval(fields)
# ternary expr help
def top(op_a, op_b, prec):
def top(f):
f._top = (op_a, op_b)
f._tprec = prec
return f
return top
class TOps:
@ft.cache
def __get__(self, _, cls):
return {x._top: x
for x in cls.__dict__.values()
if hasattr(x, '_top')}
tops = TOps()
class TPrecs:
@ft.cache
def __get__(self, _, cls):
return {x._top: x._tprec
for x in cls.__dict__.values()
if hasattr(x, '_top')}
tprecs = TPrecs()
# ternary ops
@top('?', ':', 1)
class IfElse(Expr):
"""b if a is non-zero, otherwise c"""
def type(self, types={}):
t = self.b.type(types)
u = self.c.type(types)
if t != u:
raise RExpr.Error("mismatched types? %r" % self)
return t
def fold(self, types={}):
return self.b.fold(types)
def eval(self, fields={}):
a = self.a.eval(fields)
if a:
return self.b.eval(fields)
else:
return self.c.eval(fields)
# show expr help text
@classmethod
def help(cls):
print('uops:')
for op in cls.uops.keys():
print(' %-21s %s' % ('%sa' % op, RExpr.uops[op].__doc__))
print('bops:')
for op in cls.bops.keys():
print(' %-21s %s' % ('a %s b' % op, RExpr.bops[op].__doc__))
print('tops:')
for op in cls.tops.keys():
print(' %-21s %s' % ('a %s b %s c' % op, RExpr.tops[op].__doc__))
print('funcs:')
for func in cls.funcs.keys():
print(' %-21s %s' % (
'%s(%s)' % (func, RExpr.funcs[func]._fargs),
RExpr.funcs[func].__doc__))
# parse an expr
def __init__(self, expr):
self.expr = expr.strip()
# parse the expression into a tree
def p_expr(p, prec=0):
# parens
if p.match('\('):
p.chomp()
a = p_expr(p)
if not p.match('\)'):
raise RExpr.Error("mismatched parens? %s" % p)
p.chomp()
# floats
elif p.match('[+-]?(?:[_0-9]*\.[_0-9eE]|nan)'):
a = RExpr.FloatLit(RFloat(p.chomp()))
# ints
elif p.match('[+-]?(?:[0-9][bBoOxX]?[_0-9a-fA-F]*|∞|inf)'):
a = RExpr.IntLit(RInt(p.chomp()))
# fields/functions
elif p.match('[_a-zA-Z][_a-zA-Z0-9]*'):
a = p.chomp()
if p.match('\('):
p.chomp()
if a not in RExpr.funcs:
raise RExpr.Error("unknown function? %s" % a)
args = []
while True:
b = p_expr(p)
args.append(b)
if p.match(','):
p.chomp()
continue
else:
if not p.match('\)'):
raise RExpr.Error("mismatched parens? %s" % p)
p.chomp()
a = RExpr.funcs[a](*args)
break
else:
a = RExpr.Field(a)
# unary ops
elif any(p.match(re.escape(op)) for op in RExpr.uops.keys()):
# sort by len to avoid ambiguities
for op in sorted(RExpr.uops.keys(), reverse=True):
if p.match(re.escape(op)):
p.chomp()
a = p_expr(p, mt.inf)
a = RExpr.uops[op](a)
break
else:
assert False
# unknown expr?
else:
raise RExpr.Error("unknown expr? %s" % p)
# parse tail
while True:
# binary ops
if any(p.match(re.escape(op))
and prec < RExpr.bprecs[op]
for op in RExpr.bops.keys()):
# sort by len to avoid ambiguities
for op in sorted(RExpr.bops.keys(), reverse=True):
if (p.match(re.escape(op))
and prec < RExpr.bprecs[op]):
p.chomp()
b = p_expr(p, RExpr.bprecs[op])
a = RExpr.bops[op](a, b)
break
else:
assert False
# ternary ops, these are intentionally right associative
elif any(p.match(re.escape(op[0]))
and prec <= RExpr.tprecs[op]
for op in RExpr.tops.keys()):
# sort by len to avoid ambiguities
for op in sorted(RExpr.tops.keys(), reverse=True):
if (p.match(re.escape(op[0]))
and prec <= RExpr.tprecs[op]):
p.chomp()
b = p_expr(p, RExpr.tprecs[op])
if not p.match(re.escape(op[1])):
raise RExpr.Error(
'mismatched ternary op? %s %s' % op)
p.chomp()
c = p_expr(p, RExpr.tprecs[op])
a = RExpr.tops[op](a, b, c)
break
else:
assert False
# no tail
else:
return a
try:
p = Parser(self.expr)
self.tree = p_expr(p)
if p:
raise RExpr.Error("trailing expr? %s" % p)
except (RExpr.Error, ValueError) as e:
print('error: in expr: %s' % self.expr,
file=sys.stderr)
print('error: %s' % e,
file=sys.stderr)
sys.exit(3)
# recursively find all fields
def fields(self):
try:
return self.tree.fields()
except RExpr.Error as e:
print('error: in expr: %s' % self.expr,
file=sys.stderr)
print('error: %s' % e,
file=sys.stderr)
sys.exit(3)
# recursively find the type
def type(self, types={}):
try:
return self.tree.type(types)
except RExpr.Error as e:
print('error: in expr: %s' % self.expr,
file=sys.stderr)
print('error: %s' % e,
file=sys.stderr)
sys.exit(3)
# recursively find the fold operation
def fold(self, types={}):
try:
return self.tree.fold(types)
except RExpr.Error as e:
print('error: in expr: %s' % self.expr,
file=sys.stderr)
print('error: %s' % e,
file=sys.stderr)
sys.exit(3)
# recursive evaluate the expr
def eval(self, fields={}):
try:
return self.tree.eval(fields)
except RExpr.Error as e:
print('error: in expr: %s' % self.expr,
file=sys.stderr)
print('error: %s' % e,
file=sys.stderr)
sys.exit(3)
# parse %-escaped strings
def punescape(s, attrs=None):
if attrs is None:
attrs = {}
if isinstance(attrs, dict):
attrs_ = attrs
attrs = lambda k: attrs_[k]
pattern = re.compile(
'%[%n]'
'|' '%x..'
'|' '%u....'
'|' '%U........'
'|' '%\((?P<field>[^)]*)\)'
'(?P<format>[+\- #0-9\.]*[sdboxXfFeEgG])')
def unescape(m):
if m.group()[1] == '%': return '%'
elif m.group()[1] == 'n': return '\n'
elif m.group()[1] == 'x': return chr(int(m.group()[2:], 16))
elif m.group()[1] == 'u': return chr(int(m.group()[2:], 16))
elif m.group()[1] == 'U': return chr(int(m.group()[2:], 16))
elif m.group()[1] == '(':
try:
v = attrs(m.group('field'))
except KeyError:
return m.group()
f = m.group('format')
if f[-1] in 'dboxX':
if isinstance(v, str):
v = try_dat(v) or 0
v = int(v)
elif f[-1] in 'fFeEgG':
if isinstance(v, str):
v = try_dat(v) or 0
v = float(v)
else:
f = ('<' if '-' in f else '>') + f.replace('-', '')
v = str(v)
# note we need Python's new format syntax for binary
return ('{:%s}' % f).format(v)
else: assert False
return re.sub(pattern, unescape, s)
def punescape_help():
print('mods:')
print(' %-21s %s' % ('%%', 'A literal % character'))
print(' %-21s %s' % ('%n', 'A newline'))
print(' %-21s %s' % (
'%xaa', 'A character with the hex value aa'))
print(' %-21s %s' % (
'%uaaaa', 'A character with the hex value aaaa'))
print(' %-21s %s' % (
'%Uaaaaaaaa', 'A character with the hex value aaaaaaaa'))
print(' %-21s %s' % (
'%(field)s', 'An existing field formatted as a string'))
print(' %-21s %s' % (
'%(field)[dboxX]', 'An existing field formatted as an integer'))
print(' %-21s %s' % (
'%(field)[fFeEgG]', 'An existing field formatted as a float'))
def openio(path, mode='r', buffering=-1):
# allow '-' for stdin/stdout
if path == '-':
if 'r' in mode:
return os.fdopen(os.dup(sys.stdin.fileno()), mode, buffering)
else:
return os.fdopen(os.dup(sys.stdout.fileno()), mode, buffering)
else:
return open(path, mode, buffering)
def collect(csv_paths, *,
depth=1,
children=None,
notes=None,
**_):
# collect both results and fields from CSV files
fields = co.OrderedDict()
results = []
for path in csv_paths:
try:
with openio(path) as f:
# csv or json? assume json starts with [
json = (f.buffer.peek(1)[:1] == b'[')
# read csv?
if not json:
reader = csv.DictReader(f, restval='')
# collect fields
fields.update((k, True) for k in reader.fieldnames)
for r in reader:
# strip and drop empty fields
r_ = {k: v.strip()
for k, v in r.items()
if k not in {'notes'}
and v.strip()}
# special handling for notes field
if notes is not None and notes in r:
r_[notes] = set(r[notes].split(','))
results.append(r_)
# read json?
else:
import json
def unjsonify(results, depth_):
results_ = []
for r in results:
# collect fields
fields.update((k, True) for k in r.keys())
# convert to strings, we'll reparse these later
#
# this may seem a bit backwards, but it keeps
# the rest of the script simpler if we pretend
# everything came from a csv
r_ = {k: str(v).strip()
for k, v in r.items()
if k not in {'children', 'notes'}
and str(v).strip()}
# special handling for children field
if (children is not None
and children in r
and r[children] is not None
and depth_ > 1):
r_[children] = unjsonify(
r[children],
depth_-1)
# special handling for notes field
if (notes is not None
and notes in r
and r[notes] is not None):
r_[notes] = set(r[notes])
results_.append(r_)
return results_
results.extend(unjsonify(json.load(f), depth))
except FileNotFoundError:
pass
return list(fields.keys()), results
def compile(fields_, results,
by=None,
fields=None,
mods=[],
exprs=[],
sort=None,
children=None,
hot=None,
notes=None):
by = by.copy()
fields = fields.copy()
# make sure sort/hot fields are included
for k, reverse in it.chain(sort or [], hot or []):
# this defaults to typechecking sort/hot fields, which is
# probably safer, if you really want to sort by strings you
# can use --by + --label to create hidden by fields
if k and k not in by and k not in fields:
fields.append(k)
# make sure all expr targets are in fields so they get typechecked
# correctly
for k, _ in exprs:
if k not in fields:
fields.append(k)
# we only really care about the last mod/expr for each field
mods = {k: mod for k, mod in mods}
exprs = {k: expr for k, expr in exprs}
# find best type for all fields used by field exprs
fields__ = set(it.chain.from_iterable(
exprs[k].fields() if k in exprs else [k]
for k in fields))
types = {}
for k in fields__:
# check if dependency is in original fields
#
# it's tempting to also allow enumerate fields here, but this
# currently doesn't work when hotifying
if k not in fields_:
print("error: no field %r?" % k,
file=sys.stderr)
sys.exit(2)
for t in [RInt, RFloat, RFrac]:
for r in results:
if k in r and r[k].strip():
try:
t(r[k])
except ValueError:
break
else:
types[k] = t
break
else:
print("error: no type matches field %r?" % k,
file=sys.stderr)
sys.exit(2)
# typecheck exprs, note these may reference input fields
# with the same name
types__ = types.copy()
for k, expr in exprs.items():
types__[k] = expr.type(types)
# foldcheck field exprs
folds = {k: (RSum, t) for k, v in types.items()}
for k, expr in exprs.items():
folds[k] = expr.fold(types)
folds = {k: (f(), t) for k, (f, t) in folds.items()}
# create result class
def __new__(cls, **r):
# evaluate types
r_ = r.copy()
for k, t in types.items():
r_[k] = t(r[k]) if k in r else t()
# evaluate exprs
r__ = r_.copy()
for k, expr in exprs.items():
r__[k] = expr.eval(r_)
# evaluate mods
for k, m in mods.items():
r__[k] = punescape(m, r)
# return result
return cls.__mro__[1].__new__(cls, **(
{k: r__.get(k, '') for k in by}
| {k: ([r__[k]], 1) if k in r__ else ([], 0)
for k in fields}
| ({children: r[children] if children in r else []}
if children is not None else {})
| ({notes: r[notes] if notes in r else set()}
if notes is not None else {})))
def __add__(self, other):
# reuse lists if possible
def extend(a, b):
if len(a[0]) == a[1]:
a[0].extend(b[0][:b[1]])
return (a[0], a[1] + b[1])
else:
return (a[0][:a[1]] + b[0][:b[1]], a[1] + b[1])
# lazily fold results
return self.__class__.__mro__[1].__new__(self.__class__, **(
{k: getattr(self, k) for k in by}
| {k: extend(
object.__getattribute__(self, k),
object.__getattribute__(other, k))
for k in fields}
| ({children: self.children + other.children}
if children is not None else {})
| ({notes: self.notes | other.notes}
if notes is not None else {})))
def __getattribute__(self, k):
# lazily fold results on demand, this avoids issues with fold
# operations that depend on the number of results
if k in fields:
v = object.__getattribute__(self, k)
if v[1]:
return folds[k][0](v[0][:v[1]])
else:
return None
return object.__getattribute__(self, k)
return type(
'Result',
(co.namedtuple('Result', list(co.OrderedDict.fromkeys(it.chain(
by,
fields,
[children] if children is not None else [],
[notes] if notes is not None else [])).keys())),),
dict(
__slots__=(),
__new__=__new__,
__add__=__add__,
__getattribute__=__getattribute__,
_by=by,
_fields=fields,
_sort=fields,
_types={k: t for k, (_, t) in folds.items()},
_mods=mods,
_exprs=exprs,
**{'_children': children} if children is not None else {},
**{'_notes': notes} if notes is not None else {}))
def homogenize(Result, results, *,
enumerates=None,
defines=[],
depth=1):
# this just converts all (possibly recursive) results to our
# result type
results_ = []
for r in results:
# filter by matching defines
#
# we do this here instead of in fold to be consistent with
# evaluation order of exprs/mods/etc, note this isn't really
# inconsistent with the other scripts, since they don't really
# evaluate anything
if not all(k in r and str(r[k]) in vs for k, vs in defines):
continue
# append a result
results_.append(Result(**(
r
# enumerate?
| ({e: len(results_) for e in enumerates}
if enumerates is not None
else {})
# recurse?
| ({Result._children: homogenize(
Result, r[Result._children],
# only filter defines at the top level!
enumerates=enumerates,
depth=depth-1)}
if hasattr(Result, '_children')
and Result._children in r
and r[Result._children] is not None
and depth > 1
else {}))))
return results_
# common folding/tabling/read/write code
class Rev(co.namedtuple('Rev', 'x')):
__slots__ = ()
# yes we need all of these because we're a namedtuple
def __lt__(self, other):
return self.x > other.x
def __gt__(self, other):
return self.x < other.x
def __le__(self, other):
return self.x >= other.x
def __ge__(self, other):
return self.x <= other.x
def fold(Result, results, *,
by=None,
defines=[],
sort=None,
depth=1,
**_):
# stop when depth hits zero
if depth == 0:
return []
# organize by by
if by is None:
by = Result._by
for k in it.chain(by or [], (k for k, _ in defines)):
if k not in Result._by and k not in Result._fields:
print("error: could not find field %r?" % k,
file=sys.stderr)
sys.exit(-1)
# filter by matching defines
if defines:
results_ = []
for r in results:
if all(str(getattr(r, k)) in vs for k, vs in defines):
results_.append(r)
results = results_
# organize results into conflicts
folding = co.OrderedDict()
for r in results:
name = tuple(getattr(r, k) for k in by)
if name not in folding:
folding[name] = []
folding[name].append(r)
# merge conflicts
folded = []
for name, rs in folding.items():
folded.append(sum(rs[1:], start=rs[0]))
# sort, note that python's sort is stable
folded.sort(key=lambda r: (
# sort by explicit sort fields
tuple((Rev
if reverse ^ (not k or k in Result._fields)
else lambda x: x)(
tuple((getattr(r, k_),)
if getattr(r, k_) is not None
else ()
for k_ in ([k] if k else Result._sort)))
for k, reverse in (sort or [])),
# sort by result
r))
# recurse if we have recursive results
if hasattr(Result, '_children'):
folded = [r._replace(**{
Result._children: fold(
Result, getattr(r, Result._children),
by=by,
# only filter defines at the top level!
sort=sort,
depth=depth-1)})
for r in folded]
return folded
def hotify(Result, results, *,
enumerates=None,
depth=1,
hot=None,
**_):
# note! hotifying risks confusion if you don't enumerate/have a
# z field, since it will allow folding across recursive boundaries
# hotify only makes sense for recursive results
assert hasattr(Result, '_children')
results_ = []
for r in results:
hot_ = []
def recurse(results_, depth_):
nonlocal hot_
if not results_:
return
# find the hottest result
r = min(results_, key=lambda r:
tuple((Rev
if reverse ^ (not k or k in Result._fields)
else lambda x: x)(
tuple((getattr(r, k_),)
if getattr(r, k_) is not None
else ()
for k_ in ([k] if k else Result._sort)))
for k, reverse in it.chain(hot, [(None, False)])))
hot_.append(r._replace(**(
# enumerate?
({e: len(hot_) for e in enumerates}
if enumerates is not None
else {})
| {Result._children: []})))
# recurse?
if depth_ > 1:
recurse(getattr(r, Result._children),
depth_-1)
recurse(getattr(r, Result._children), depth-1)
results_.append(r._replace(**{Result._children: hot_}))
return results_
def table(Result, results, diff_results=None, *,
by=None,
fields=None,
sort=None,
labels=None,
depth=1,
hot=None,
diff=None,
percent=None,
all=False,
compare=None,
no_header=False,
small_header=False,
no_total=False,
small_table=False,
summary=False,
**_):
import builtins
all_, all = all, builtins.all
if by is None:
by = Result._by
if fields is None:
fields = Result._fields
types = Result._types
# organize by name
table = {
','.join(str(getattr(r, k)
if getattr(r, k) is not None
else '')
for k in by): r
for r in results}
diff_table = {
','.join(str(getattr(r, k)
if getattr(r, k) is not None
else '')
for k in by): r
for r in diff_results or []}
# lost results? this only happens if we didn't fold by the same
# by field, which is an error and risks confusing results
assert len(table) == len(results)
if diff_results is not None:
assert len(diff_table) == len(diff_results)
# find compare entry if there is one
if compare:
compare_r = table.get(','.join(str(k) for k in compare))
# build up our lines
lines = []
# header
if not no_header:
header = ['%s%s' % (
','.join(labels if labels is not None else by),
' (%d added, %d removed)' % (
sum(1 for n in table if n not in diff_table),
sum(1 for n in diff_table if n not in table))
if diff else '')
if not small_header and not small_table and not summary
else '']
if not diff:
for k in fields:
header.append(k)
else:
for k in fields:
header.append('o'+k)
for k in fields:
header.append('n'+k)
for k in fields:
header.append('d'+k)
lines.append(header)
# delete these to try to catch typos below, we need to rebuild
# these tables at each recursive layer
del table
del diff_table
# entry helper
def table_entry(name, r, diff_r=None):
# prepend name
entry = [name]
# normal entry?
if ((compare is None or r == compare_r)
and not percent
and not diff):
for k in fields:
entry.append(
(getattr(r, k).table(),
getattr(getattr(r, k), 'notes', lambda: [])())
if getattr(r, k, None) is not None
else types[k].none)
# compare entry?
elif not percent and not diff:
for k in fields:
entry.append(
(getattr(r, k).table()
if getattr(r, k, None) is not None
else types[k].none,
(lambda t: ['+∞%'] if t == +mt.inf
else ['-∞%'] if t == -mt.inf
else ['%+.1f%%' % (100*t)])(
types[k].ratio(
getattr(r, k, None),
getattr(compare_r, k, None)))))
# percent entry?
elif not diff:
for k in fields:
entry.append(
(getattr(r, k).table()
if getattr(r, k, None) is not None
else types[k].none,
(lambda t: ['+∞%'] if t == +mt.inf
else ['-∞%'] if t == -mt.inf
else ['%+.1f%%' % (100*t)])(
types[k].ratio(
getattr(r, k, None),
getattr(diff_r, k, None)))))
# diff entry?
else:
for k in fields:
entry.append(getattr(diff_r, k).table()
if getattr(diff_r, k, None) is not None
else types[k].none)
for k in fields:
entry.append(getattr(r, k).table()
if getattr(r, k, None) is not None
else types[k].none)
for k in fields:
entry.append(
(types[k].diff(
getattr(r, k, None),
getattr(diff_r, k, None)),
(lambda t: ['+∞%'] if t == +mt.inf
else ['-∞%'] if t == -mt.inf
else ['%+.1f%%' % (100*t)] if t
else [])(
types[k].ratio(
getattr(r, k, None),
getattr(diff_r, k, None)))))
# append any notes
if hasattr(Result, '_notes') and r is not None:
notes = sorted(getattr(r, Result._notes))
if isinstance(entry[-1], tuple):
entry[-1] = (entry[-1][0], entry[-1][1] + notes)
else:
entry[-1] = (entry[-1], notes)
return entry
# recursive entry helper
def table_recurse(results_, diff_results_,
depth_,
prefixes=('', '', '', '')):
# build the children table at each layer
table_ = {
','.join(str(getattr(r, k)
if getattr(r, k) is not None
else '')
for k in by): r
for r in results_}
diff_table_ = {
','.join(str(getattr(r, k)
if getattr(r, k) is not None
else '')
for k in by): r
for r in diff_results_ or []}
names_ = [n
for n in table_.keys() | diff_table_.keys()
if diff_results is None
or all_
or any(
types[k].ratio(
getattr(table_.get(n), k, None),
getattr(diff_table_.get(n), k, None))
for k in fields)]
# sort again, now with diff info, note that python's sort is stable
names_.sort(key=lambda n: (
# sort by explicit sort fields
tuple((Rev
if reverse ^ (not k or k in Result._fields)
else lambda x: x)(
tuple((getattr(table_[n], k_),)
if getattr(table_.get(n), k_, None) is not None
else ()
for k_ in ([k] if k else Result._sort)))
for k, reverse in (sort or [])),
# sort by ratio if diffing
Rev(tuple(types[k].ratio(
getattr(table_.get(n), k, None),
getattr(diff_table_.get(n), k, None))
for k in fields))
if diff or percent
else (),
# move compare entry to the top, note this can be
# overridden by explicitly sorting by fields
(table_.get(n) != compare_r,
# sort by ratio if comparing
Rev(tuple(
types[k].ratio(
getattr(table_.get(n), k, None),
getattr(compare_r, k, None))
for k in fields)))
if compare
else (),
# sort by result
(table_[n],) if n in table_ else (),
# and finally by name (diffs may be missing results)
n))
for i, name in enumerate(names_):
# find comparable results
r = table_.get(name)
diff_r = diff_table_.get(name)
# figure out a good label
if labels is not None:
label = ','.join(str(getattr(r, k)
if getattr(r, k) is not None
else '')
for k in labels)
else:
label = name
# build line
line = table_entry(label, r, diff_r)
# add prefixes
line = [x if isinstance(x, tuple) else (x, []) for x in line]
line[0] = (prefixes[0+(i==len(names_)-1)] + line[0][0], line[0][1])
lines.append(line)
# recurse?
if name in table_ and depth_ > 1:
table_recurse(
getattr(r, Result._children),
getattr(diff_r, Result._children, None),
depth_-1,
(prefixes[2+(i==len(names_)-1)] + "|-> ",
prefixes[2+(i==len(names_)-1)] + "'-> ",
prefixes[2+(i==len(names_)-1)] + "| ",
prefixes[2+(i==len(names_)-1)] + " "))
# build entries
if not summary:
table_recurse(results, diff_results, depth)
# total
if not no_total and not (small_table and not summary):
r = next(iter(fold(Result, results, by=[])), None)
if diff_results is None:
diff_r = None
else:
diff_r = next(iter(fold(Result, diff_results, by=[])), None)
lines.append(table_entry('TOTAL', r, diff_r))
# homogenize
lines = [[x if isinstance(x, tuple) else (x, []) for x in line]
for line in lines]
# find the best widths, note that column 0 contains the names and is
# handled a bit differently
widths = co.defaultdict(lambda: 7, {0: 7})
nwidths = co.defaultdict(lambda: 0)
for line in lines:
for i, x in enumerate(line):
widths[i] = max(widths[i], ((len(x[0])+1+4-1)//4)*4-1)
if i != len(line)-1:
nwidths[i] = max(nwidths[i], 1+sum(2+len(n) for n in x[1]))
# print our table
for line in lines:
print('%-*s %s' % (
widths[0], line[0][0],
' '.join('%*s%-*s' % (
widths[i], x[0],
nwidths[i], ' (%s)' % ', '.join(x[1]) if x[1] else '')
for i, x in enumerate(line[1:], 1))))
def read_csv(path, Result, *,
depth=1,
**_):
with openio(path, 'r') as f:
# csv or json? assume json starts with [
json = (f.buffer.peek(1)[:1] == b'[')
# read csv?
if not json:
results = []
reader = csv.DictReader(f, restval='')
for r in reader:
if not any(k in r and r[k].strip()
for k in Result._fields):
continue
try:
# note this allows by/fields to overlap
results.append(Result(**(
{k: r[k] for k in Result._by
if k in r and r[k].strip()}
| {k: r[k] for k in Result._fields
if k in r and r[k].strip()})))
except TypeError:
pass
return results
# read json?
else:
import json
def unjsonify(results, depth_):
results_ = []
for r in results:
if not any(k in r and r[k].strip()
for k in Result._fields):
continue
try:
# note this allows by/fields to overlap
results_.append(Result(**(
{k: r[k] for k in Result._by
if k in r and r[k] is not None}
| {k: r[k] for k in Result._fields
if k in r and r[k] is not None}
| ({Result._children: unjsonify(
r[Result._children],
depth_-1)}
if hasattr(Result, '_children')
and Result._children in r
and r[Result._children] is not None
and depth_ > 1
else {})
| ({Result._notes: set(r[Result._notes])}
if hasattr(Result, '_notes')
and Result._notes in r
and r[Result._notes] is not None
else {}))))
except TypeError:
pass
return results_
return unjsonify(json.load(f), depth)
def write_csv(path, Result, results, *,
json=False,
by=None,
fields=None,
depth=1,
**_):
with openio(path, 'w') as f:
# write csv?
if not json:
writer = csv.DictWriter(f, list(co.OrderedDict.fromkeys(it.chain(
by
if by is not None
else Result._by,
fields
if fields is not None
else Result._fields)).keys()))
writer.writeheader()
for r in results:
# note this allows by/fields to overlap
writer.writerow(
{k: getattr(r, k)
for k in (by
if by is not None
else Result._by)
if getattr(r, k) is not None}
| {k: str(getattr(r, k))
for k in (fields
if fields is not None
else Result._fields)
if getattr(r, k) is not None})
# write json?
else:
import json
# the neat thing about json is we can include recursive results
def jsonify(results, depth_):
results_ = []
for r in results:
# note this allows by/fields to overlap
results_.append(
{k: getattr(r, k)
for k in (by
if by is not None
else Result._by)
if getattr(r, k) is not None}
| {k: str(getattr(r, k))
for k in (fields
if fields is not None
else Result._fields)
if getattr(r, k) is not None}
| ({Result._children: jsonify(
getattr(r, Result._children),
depth_-1)}
if hasattr(Result, '_children')
and getattr(r, Result._children)
and depth_ > 1
else {})
| ({Result._notes: list(
getattr(r, Result._notes))}
if hasattr(Result, '_notes')
and getattr(r, Result._notes)
else {}))
return results_
json.dump(jsonify(results, depth), f,
separators=(',', ':'))
def main(csv_paths, *,
by=None,
fields=None,
defines=[],
sort=None,
depth=None,
children=None,
hot=None,
notes=None,
**args):
# show mod help text?
if args.get('help_mods'):
return punescape_help()
# show expr help text?
if args.get('help_exprs'):
return RExpr.help()
if by is None and fields is None:
print("error: needs --by or --fields to figure out fields",
file=sys.stderr)
sys.exit(-1)
if children is not None:
if len(children) > 1:
print("error: multiple --children fields currently not supported",
file=sys.stderr)
sys.exit(-1)
children = children[0]
if notes is not None:
if len(notes) > 1:
print("error: multiple --notes fields currently not supported",
file=sys.stderr)
sys.exit(-1)
notes = notes[0]
# recursive results imply --children
if (depth is not None or hot is not None) and children is None:
children = 'children'
# figure out depth
if depth is None:
depth = mt.inf if hot else 1
elif depth == 0:
depth = mt.inf
# separate out enumerates/mods/exprs
#
# enumerate enumerates: -ia
# by supports mods: -ba=%(b)s
# fields/sort/etc supports exprs: -fa=b+c
#
enumerates = [k
for (k, v), hidden in (by or [])
if v == enumerate]
mods = [(k, v)
for k, v in it.chain(
((k, v) for (k, v), hidden in (by or [])
if v != enumerate))
if v is not None]
exprs = [(k, v)
for k, v in it.chain(
((k, v) for (k, v), hidden in (fields or [])),
((k, v) for (k, v), reverse in (sort or [])),
((k, v) for (k, v), reverse in (hot or [])))
if v is not None]
labels = None
if by is not None:
labels = [k for (k, v), hidden in by if not hidden]
by = [k for (k, v), hidden in by]
visible = None
if fields is not None:
visible = [k for (k, v), hidden in fields if not hidden]
fields = [k for (k, v), hidden in fields]
if sort is not None:
sort = [(k, reverse) for (k, v), reverse in sort]
if hot is not None:
hot = [(k, reverse) for (k, v), reverse in hot]
# find results
if not args.get('use', None):
# not enough info?
if not csv_paths:
print("error: no *.csv files?",
file=sys.stderr)
sys.exit(1)
# collect info
fields_, results = collect(csv_paths,
depth=depth,
children=children,
notes=notes,
**args)
else:
# use is just an alias but takes priority
fields_, results = collect([args['use']],
depth=depth,
children=children,
notes=notes,
**args)
# if by not specified, guess it's anything not in fields/defines/exprs/etc
if not by:
by = [k for k in fields_
if k not in (fields or [])
and not any(k == k_ for k_, _ in defines)
and not any(k == k_ for k_, _ in (sort or []))
and k != children
and not any(k == k_ for k_, _ in (hot or []))
and k != notes
and not any(k == k_
for _, expr in exprs
for k_ in expr.fields())]
# if fields not specified, guess it's anything not in by/defines/exprs/etc
if not fields:
fields = [k for k in fields_
if k not in (by or [])
and not any(k == k_ for k_, _ in defines)
and not any(k == k_ for k_, _ in (sort or []))
and k != children
and not any(k == k_ for k_, _ in (hot or []))
and k != notes
and not any(k == k_
for _, expr in exprs
for k_ in expr.fields())]
# build result type
Result = compile(fields_, results,
by=by,
fields=fields,
mods=mods,
exprs=exprs,
sort=sort,
children=children,
hot=hot,
notes=notes)
# homogenize
results = homogenize(Result, results,
enumerates=enumerates,
defines=defines,
depth=depth)
# fold
results = fold(Result, results,
by=by,
depth=depth)
# hotify?
if hot:
results = hotify(Result, results,
enumerates=enumerates,
depth=depth,
hot=hot)
# write results to CSV/JSON
if args.get('output'):
write_csv(args['output'], Result, results,
by=by,
fields=fields,
depth=depth,
**args)
if args.get('output_json'):
write_csv(args['output_json'], Result, results, json=True,
by=by,
fields=fields,
depth=depth,
**args)
# find previous results?
diff_results = None
if args.get('diff') or args.get('percent'):
try:
diff_results = read_csv(
args.get('diff') or args.get('percent'),
Result,
depth=depth,
**args)
except FileNotFoundError:
diff_results = []
# fold
diff_results = fold(Result, diff_results,
by=by,
defines=defines,
depth=depth)
# print table
if not args.get('quiet'):
table(Result, results, diff_results,
# note the use of labels + visible here
by=by,
fields=visible if visible is not None else fields,
sort=sort,
labels=labels,
depth=depth,
**args)
if __name__ == "__main__":
import argparse
import sys
parser = argparse.ArgumentParser(
description="Script to manipulate CSV files.",
allow_abbrev=False)
parser.add_argument(
'csv_paths',
nargs='*',
help="Input *.csv files.")
parser.add_argument(
'--help-mods',
action='store_true',
help="Show what %% modifiers are available.")
parser.add_argument(
'--help-exprs',
action='store_true',
help="Show what field exprs are available.")
parser.add_argument(
'-q', '--quiet',
action='store_true',
help="Don't show anything, useful with -o.")
parser.add_argument(
'-o', '--output',
help="Specify CSV file to store results.")
parser.add_argument(
'-O', '--output-json',
help="Specify JSON file to store results. This may contain "
"recursive info.")
parser.add_argument(
'-u', '--use',
help="Don't parse anything, use this CSV/JSON file.")
parser.add_argument(
'-d', '--diff',
help="Specify CSV/JSON file to diff against.")
parser.add_argument(
'-p', '--percent',
help="Specify CSV/JSON file to diff against, but only show "
"percentage change, not a full diff.")
parser.add_argument(
'-c', '--compare',
type=lambda x: tuple(v.strip() for v in x.split(',')),
help="Compare results to the row matching this by pattern.")
parser.add_argument(
'-a', '--all',
action='store_true',
help="Show all, not just the ones that changed.")
class AppendBy(argparse.Action):
def __call__(self, parser, namespace, value, option):
if namespace.by is None:
namespace.by = []
namespace.by.append((value, option in {
'-B', '--hidden-by',
'-I', '--hidden-enumerate'}))
parser.add_argument(
'-i', '--enumerate',
action=AppendBy,
nargs='?',
type=lambda x: (x, enumerate),
const=('i', enumerate),
help="Enumerate results with this field. This will prevent "
"result folding.")
parser.add_argument(
'-I', '--hidden-enumerate',
action=AppendBy,
nargs='?',
type=lambda x: (x, enumerate),
const=('i', enumerate),
help="Like -i/--enumerate, but hidden from the table renderer.")
parser.add_argument(
'-b', '--by',
action=AppendBy,
type=lambda x: (
lambda k, v=None: (
k.strip(),
v.strip() if v is not None else None)
)(*x.split('=', 1)),
help="Group by this field. This does _not_ support expressions, "
"but can be assigned a string with %% modifiers.")
parser.add_argument(
'-B', '--hidden-by',
action=AppendBy,
type=lambda x: (
lambda k, v=None: (
k.strip(),
v.strip() if v is not None else None)
)(*x.split('=', 1)),
help="Like -b/--by, but hidden from the table renderer.")
class AppendField(argparse.Action):
def __call__(self, parser, namespace, value, option):
if namespace.fields is None:
namespace.fields = []
namespace.fields.append((value, option in {
'-F', '--hidden-field'}))
parser.add_argument(
'-f', '--field',
dest='fields',
action=AppendField,
type=lambda x: (
lambda k, v=None: (
k.strip(),
RExpr(v) if v is not None else None)
)(*x.split('=', 1)),
help="Show this field. Can include an expression of the form "
"field=expr.")
parser.add_argument(
'-F', '--hidden-field',
dest='fields',
action=AppendField,
type=lambda x: (
lambda k, v=None: (
k.strip(),
v.strip() if v is not None else None)
)(*x.split('=', 1)),
help="Like -f/--field, but hidden from the table renderer.")
parser.add_argument(
'-D', '--define',
dest='defines',
action='append',
type=lambda x: (
lambda k, vs: (
k.strip(),
{v.strip() for v in vs.split(',')})
)(*x.split('=', 1)),
help="Only include results where this field is this value. May "
"include comma-separated options.")
class AppendSort(argparse.Action):
def __call__(self, parser, namespace, value, option):
if namespace.sort is None:
namespace.sort = []
namespace.sort.append((value, option in {'-S', '--reverse-sort'}))
parser.add_argument(
'-s', '--sort',
nargs='?',
action=AppendSort,
type=lambda x: (
lambda k, v=None: (
k.strip(),
RExpr(v) if v is not None else None)
)(*x.split('=', 1)),
const=(None, None),
help="Sort by this field. Can include an expression of the form "
"field=expr.")
parser.add_argument(
'-S', '--reverse-sort',
nargs='?',
action=AppendSort,
type=lambda x: (
lambda k, v=None: (
k.strip(),
RExpr(v) if v is not None else None)
)(*x.split('=', 1)),
const=(None, None),
help="Sort by this field, but backwards. Can include an expression "
"of the form field=expr.")
parser.add_argument(
'-z', '--depth',
nargs='?',
type=lambda x: int(x, 0),
const=0,
help="Depth of function calls to show. 0 shows all calls unless "
"we find a cycle. Defaults to 0.")
parser.add_argument(
'-Z', '--children',
nargs='?',
const='children',
action='append',
help="Field to use for recursive results. This expects a list "
"and really only works with JSON input.")
class AppendHot(argparse.Action):
def __call__(self, parser, namespace, value, option):
if namespace.hot is None:
namespace.hot = []
namespace.hot.append((value, option in {'-R', '--reverse-hot'}))
parser.add_argument(
'-r', '--hot',
nargs='?',
action=AppendHot,
type=lambda x: (
lambda k, v=None: (
k.strip(),
RExpr(v) if v is not None else None)
)(*x.split('=', 1)),
const=(None, None),
help="Show only the hot path for each function call. Can "
"optionally provide fields like sort. Can include an "
"expression in the form of field=expr.")
parser.add_argument(
'-R', '--reverse-hot',
nargs='?',
action=AppendHot,
type=lambda x: (
lambda k, v=None: (
k.strip(),
RExpr(v) if v is not None else None)
)(*x.split('=', 1)),
const=(None, None),
help="Like -r/--hot, but backwards.")
parser.add_argument(
'-N', '--notes',
nargs='?',
const='notes',
action='append',
help="Field to use for notes.")
parser.add_argument(
'--no-header',
action='store_true',
help="Don't show the header.")
parser.add_argument(
'--small-header',
action='store_true',
help="Don't show by field names.")
parser.add_argument(
'--no-total',
action='store_true',
help="Don't show the total.")
parser.add_argument(
'-Q', '--small-table',
action='store_true',
help="Equivalent to --small-header + --no-total.")
parser.add_argument(
'-Y', '--summary',
action='store_true',
help="Only show the total.")
sys.exit(main(**{k: v
for k, v in vars(parser.parse_intermixed_args()).items()
if v is not None}))