Files
littlefs/scripts/csv.py
Christopher Haster ac0aa3633e scripts: csv.py: RExpr decorators to help simplify func/uop/bop parsing
This was more tricky than expected since Python's class scope is so
funky (I just eneded up with using lazy cached __get__ functions that
scan the RExpr class for tagged members), but these decorators help avoid
repeated boilerplate for common expr patterns.

We can even deduplicate binary expr parsing without sacrificing
precedence.
2024-11-16 12:33:41 -06:00

1349 lines
39 KiB
Python
Executable File

#!/usr/bin/env python3
#
# Script to manipulate CSV files.
#
# Example:
# ./scripts/code.py lfs.o lfs_util.o -q -o lfs.code.csv
# ./scripts/data.py lfs.o lfs_util.o -q -o lfs.data.csv
# ./scripts/csv.py lfs.code.csv lfs.data.csv -q -o lfs.csv
# ./scripts/csv.py -Y lfs.csv -f code=code_size,data=data_size
#
# Copyright (c) 2022, The littlefs authors.
# SPDX-License-Identifier: BSD-3-Clause
#
# prevent local imports
__import__('sys').path.pop(0)
import collections as co
import csv
import functools as ft
import itertools as it
import math as mt
import os
import re
import sys
# various field types
# integer fields
class RInt(co.namedtuple('RInt', 'x')):
__slots__ = ()
def __new__(cls, x=0):
if isinstance(x, RInt):
return x
if isinstance(x, str):
try:
x = int(x, 0)
except ValueError:
# also accept +-∞ and +-inf
if re.match('^\s*\+?\s*(?:∞|inf)\s*$', x):
x = mt.inf
elif re.match('^\s*-\s*(?:∞|inf)\s*$', x):
x = -mt.inf
else:
raise
assert isinstance(x, int) or mt.isinf(x), x
return super().__new__(cls, x)
def __str__(self):
if self.x == mt.inf:
return ''
elif self.x == -mt.inf:
return '-∞'
else:
return str(self.x)
def __int__(self):
assert not mt.isinf(self.x)
return self.x
def __float__(self):
return float(self.x)
none = '%7s' % '-'
def table(self):
return '%7s' % (self,)
def diff(self, other):
new = self.x if self else 0
old = other.x if other else 0
diff = new - old
if diff == +mt.inf:
return '%7s' % '+∞'
elif diff == -mt.inf:
return '%7s' % '-∞'
else:
return '%+7d' % diff
def ratio(self, other):
new = self.x if self else 0
old = other.x if other else 0
if mt.isinf(new) and mt.isinf(old):
return 0.0
elif mt.isinf(new):
return +mt.inf
elif mt.isinf(old):
return -mt.inf
elif not old and not new:
return 0.0
elif not old:
return +mt.inf
else:
return (new-old) / old
def __add__(self, other):
return self.__class__(self.x + other.x)
def __sub__(self, other):
return self.__class__(self.x - other.x)
def __mul__(self, other):
return self.__class__(self.x * other.x)
# float fields
class RFloat(co.namedtuple('RFloat', 'x')):
__slots__ = ()
def __new__(cls, x=0.0):
if isinstance(x, RFloat):
return x
if isinstance(x, str):
try:
x = float(x)
except ValueError:
# also accept +-∞ and +-inf
if re.match('^\s*\+?\s*(?:∞|inf)\s*$', x):
x = mt.inf
elif re.match('^\s*-\s*(?:∞|inf)\s*$', x):
x = -mt.inf
else:
raise
assert isinstance(x, float), x
return super().__new__(cls, x)
def __str__(self):
if self.x == mt.inf:
return ''
elif self.x == -mt.inf:
return '-∞'
else:
return '%.1f' % self.x
def __float__(self):
return float(self.x)
none = RInt.none
table = RInt.table
def diff(self, other):
new = self.x if self else 0
old = other.x if other else 0
diff = new - old
if diff == +mt.inf:
return '%7s' % '+∞'
elif diff == -mt.inf:
return '%7s' % '-∞'
else:
return '%+7.1f' % diff
ratio = RInt.ratio
__add__ = RInt.__add__
__sub__ = RInt.__sub__
__mul__ = RInt.__mul__
# fractional fields, a/b
class RFrac(co.namedtuple('RFrac', 'a,b')):
__slots__ = ()
def __new__(cls, a=0, b=None):
if isinstance(a, RFrac) and b is None:
return a
if isinstance(a, str) and b is None:
a, b = a.split('/', 1)
if b is None:
b = a
return super().__new__(cls, RInt(a), RInt(b))
def __str__(self):
return '%s/%s' % (self.a, self.b)
def __float__(self):
return float(self.a)
none = '%11s' % '-'
def table(self):
return '%11s' % (self,)
def notes(self):
t = self.a.x/self.b.x if self.b.x else 1.0
return ['%' if t == +mt.inf
else '-∞%' if t == -mt.inf
else '%.1f%%' % (100*t)]
def diff(self, other):
new_a, new_b = self if self else (RInt(0), RInt(0))
old_a, old_b = other if other else (RInt(0), RInt(0))
return '%11s' % ('%s/%s' % (
new_a.diff(old_a).strip(),
new_b.diff(old_b).strip()))
def ratio(self, other):
new_a, new_b = self if self else (RInt(0), RInt(0))
old_a, old_b = other if other else (RInt(0), RInt(0))
new = new_a.x/new_b.x if new_b.x else 1.0
old = old_a.x/old_b.x if old_b.x else 1.0
return new - old
def __add__(self, other):
return self.__class__(self.a + other.a, self.b + other.b)
def __sub__(self, other):
return self.__class__(self.a - other.a, self.b - other.b)
def __mul__(self, other):
return self.__class__(self.a * other.a, self.b + other.b)
def __eq__(self, other):
self_a, self_b = self if self.b.x else (RInt(1), RInt(1))
other_a, other_b = other if other.b.x else (RInt(1), RInt(1))
return self_a * other_b == other_a * self_b
def __ne__(self, other):
return not self.__eq__(other)
def __lt__(self, other):
self_a, self_b = self if self.b.x else (RInt(1), RInt(1))
other_a, other_b = other if other.b.x else (RInt(1), RInt(1))
return self_a * other_b < other_a * self_b
def __gt__(self, other):
return self.__class__.__lt__(other, self)
def __le__(self, other):
return not self.__gt__(other)
def __ge__(self, other):
return not self.__lt__(other)
# various fold operations
class RSum:
def __call__(self, xs):
return sum(xs[1:], start=xs[0])
class RProd:
def __call__(self, xs):
return mt.prod(xs[1:], start=xs[0])
class RMin:
def __call__(self, xs):
return min(xs)
class RMax:
def __call__(self, xs):
return max(xs)
class RAvg:
def __call__(self, xs):
return RFloat(sum(float(x) for x in xs) / len(xs))
class RStddev:
def __call__(self, xs):
avg = sum(float(x) for x in xs) / len(xs)
return RFloat(mt.sqrt(sum((float(x) - avg)**2 for x in xs) / len(xs)))
class RGMean:
def __call__(self, xs):
return RFloat(mt.prod(float(x) for x in xs)**(1/len(xs)))
class RGStddev:
def __call__(self, xs):
gmean = mt.prod(float(x) for x in xs)**(1/len(xs))
return RFloat(
mt.exp(mt.sqrt(
sum(mt.log(float(x)/gmean)**2 for x in xs) / len(xs)))
if gmean else mt.inf)
# a lazily-evaluated field expression
class RExpr:
# expr parsing/typechecking/etc errors
class Error(Exception):
def __init__(self, reason):
self.reason = reason
# expr node base class
class Expr:
def __init__(self, *args):
for k, v in zip('abcdefghijklmnopqrstuvwxyz', args):
setattr(self, k, v)
def __repr__(self):
return '%s(%s)' % (
self.__class__.__name__,
','.join(
repr(getattr(self, k))
for k in it.takewhile(
lambda k: hasattr(self, k),
'abcdefghijklmnopqrstuvwxyz')))
def fields(self):
return set(it.chain.from_iterable(
getattr(self, k).fields()
for k in it.takewhile(
lambda k: hasattr(self, k),
'abcdefghijklmnopqrstuvwxyz')))
def type(self, types={}):
return self.a.type(types)
def fold(self, types={}):
return self.a.fold(types)
def eval(self, fields={}):
return self.a.eval(fields)
# expr nodes
# field expr
class Field(Expr):
def fields(self):
return {self.a}
def type(self, types={}):
if self.a not in types:
raise RExpr.Error("untyped field? %s" % self.a)
return types[self.a]
def fold(self, types={}):
if self.a not in types:
raise RExpr.Error("unfoldable field? %s" % self.a)
return RSum, types[self.a]
def eval(self, fields={}):
if self.a not in fields:
raise RExpr.Error("unknown field? %s" % self.a)
return fields[self.a]
# literal exprs
class StrLit(Expr):
def fields(self):
return set()
def eval(self, fields={}):
return self.a
class IntLit(Expr):
def fields(self):
return set()
def type(self, types={}):
return RInt
def fold(self, types={}):
return RSum, RInt
def eval(self, fields={}):
return self.a
class FloatLit(Expr):
def fields(self):
return set()
def type(self, types={}):
return RFloat
def fold(self, types={}):
return RSum, RFloat
def eval(self, fields={}):
return self.a
# func expr helper
def func(name):
def func(f):
f._func = name
return f
return func
class Funcs:
@ft.cache
def __get__(self, _, cls):
return {x._func: x
for x in cls.__dict__.values()
if hasattr(x, '_func')}
funcs = Funcs()
# type exprs
@func('int')
class Int(Expr):
def type(self, types={}):
return RInt
def eval(self, fields={}):
return RInt(self.a.eval(fields))
@func('float')
class Float(Expr):
def type(self, types={}):
return RFloat
def eval(self, fields={}):
return RFloat(self.a.eval(fields))
@func('frac')
class Frac(Expr):
def type(self, types={}):
return RFrac
def eval(self, fields={}):
return RFrac(self.a.eval(fields), self.b.eval(fields))
# fold exprs
@func('sum')
class Sum(Expr):
def fold(self, types={}):
return RSum, self.a.type(types)
@func('prod')
class Prod(Expr):
def fold(self, types={}):
return RProd, self.a.type(types)
@func('min')
class Min(Expr):
def fold(self, types={}):
return RMin, self.a.type(types)
@func('max')
class Max(Expr):
def fold(self, types={}):
return RMax, self.a.type(types)
@func('avg')
class Avg(Expr):
def fold(self, types={}):
return RAvg, RFloat
@func('stddev')
class Stddev(Expr):
def fold(self, types={}):
return RStddev, RFloat
@func('gmean')
class GMean(Expr):
def fold(self, types={}):
return RGMean, RFloat
@func('stddev')
class GStddev(Expr):
def fold(self, types={}):
return RGStddev, RFloat
# functions
@func('ratio')
class Ratio(Expr):
pass
@func('total')
class Total(Expr):
pass
@func('ceil')
class Ceil(Expr):
pass
@func('floor')
class Floor(Expr):
pass
@func('log')
class Log(Expr):
pass
@func('pow')
class Pow(Expr):
pass
@func('sqrt')
class Sqrt(Expr):
pass
# unary expr helper
def uop(op):
def uop(f):
f._uop = op
return f
return uop
class UOps:
@ft.cache
def __get__(self, _, cls):
return {x._uop: x
for x in cls.__dict__.values()
if hasattr(x, '_uop')}
uops = UOps()
# unary ops
@uop('+')
class Pos(Expr):
pass
@uop('-')
class Neg(Expr):
pass
@uop('~')
class Not(Expr):
pass
@uop('!')
class Notnot(Expr):
pass
# binary expr help
def bop(op, prec):
def bop(f):
f._bop = op
f._bprec = prec
return f
return bop
class BOps:
@ft.cache
def __get__(self, _, cls):
return {x._bop: x
for x in cls.__dict__.values()
if hasattr(x, '_bop')}
bops = BOps()
class BPrecs:
@ft.cache
def __get__(self, _, cls):
return {x._bop: x._bprec
for x in cls.__dict__.values()
if hasattr(x, '_bop')}
bprecs = BPrecs()
# binary ops
@bop('*', 10)
class Mul(Expr):
pass
@bop('/', 10)
class Div(Expr):
pass
@bop('%', 10)
class Mod(Expr):
pass
@bop('+', 9)
class Add(Expr):
pass
@bop('-', 9)
class Sub(Expr):
pass
@bop('<<', 8)
class Shl(Expr):
pass
@bop('>>', 8)
class Shr(Expr):
pass
@bop('&', 7)
class And(Expr):
pass
@bop('^', 6)
class Xor(Expr):
pass
@bop('|', 5)
class Or(Expr):
pass
@bop('==', 4)
class Eq(Expr):
pass
@bop('!=', 4)
class Ne(Expr):
pass
@bop('<', 4)
class Lt(Expr):
pass
@bop('<=', 4)
class Le(Expr):
pass
@bop('>', 4)
class Gt(Expr):
pass
@bop('>=', 4)
class Ge(Expr):
pass
@bop('&&', 3)
class Andand(Expr):
pass
@bop('||', 2)
class Oror(Expr):
pass
# ternary ops
class Ife(Expr):
def type(self, types={}):
return self.b.type(types)
def fold(self, types={}):
return self.b.fold(types)
# parse an expr
def __init__(self, expr):
self.expr = expr.strip()
# parse the expression into a tree
def p_expr(expr, prec=0):
# parens
if expr.startswith('('):
a, tail = p_expr(expr[1:].lstrip())
if not tail.startswith(')'):
raise RExpr.Error("mismatched parens? %s" % tail)
tail = tail[1:].lstrip()
# fields/functions
elif re.match('[_a-zA-Z][_a-zA-Z0-9]*', expr):
m = re.match('[_a-zA-Z][_a-zA-Z0-9]*', expr)
tail = expr[len(m.group()):].lstrip()
if tail.startswith('('):
tail = tail[1:].lstrip()
if m.group() not in RExpr.funcs:
raise RExpr.Error("unknown function? %s" % m.group())
args = []
while True:
a, tail = p_expr(tail)
args.append(a)
if tail.startswith(','):
tail = tail[1:].lstrip()
continue
else:
if not tail.startswith(')'):
raise RExpr.Error(
"mismatched parens? %s" % tail)
a = RExpr.funcs[m.group()](*args)
tail = tail[1:].lstrip()
break
else:
a = RExpr.Field(m.group())
# strings
elif re.match('(?:"(?:\\.|[^"])*"|\'(?:\\.|[^\'])\')', expr):
m = re.match('(?:"(?:\\.|[^"])*"|\'(?:\\.|[^\'])\')', expr)
a = RExpr.StrLit(m.group()[1:-1])
tail = expr[len(m.group()):].lstrip()
# floats
elif re.match('[+-]?[_0-9]*\.[_0-9eE]', expr):
m = re.match('[+-]?[_0-9]*\.[_0-9eE]', expr)
a = RExpr.FloatLit(RFloat(m.group()))
tail = expr[len(m.group()):].lstrip()
# ints
elif re.match('[+-]?(?:(?:0[bBoOxX])?[_0-9a-fA-F]+|∞|inf)', expr):
m = re.match('[+-]?(?:(?:0[bBoOxX])?[_0-9a-fA-F]+|∞|inf)', expr)
a = RExpr.IntLit(RInt(m.group()))
tail = expr[len(m.group()):].lstrip()
# unary ops
elif any(expr.startswith(op) for op in RExpr.uops.keys()):
# sort by len to avoid ambiguities
for op in sorted(RExpr.uops.keys(), reverse=True):
if expr.startswith(op):
a, tail = p_expr(expr[len(op):].lstrip(), mt.inf())
a = RExpr.uops[op](a)
break
else:
assert False
# unknown expr?
else:
raise RExpr.Error("unknown expr? %s" % expr)
# parse tail
while True:
# binary ops
if any(tail.startswith(op) and prec < RExpr.bprecs[op]
for op in RExpr.bops.keys()):
# sort by len to avoid ambiguities
for op in sorted(RExpr.bops.keys(), reverse=True):
if tail.startswith(op) and prec < RExpr.bprecs[op]:
b, tail = p_expr(
tail[len(op):].lstrip(),
RExpr.bprecs[op])
a = RExpr.bops[op](a, b)
break
else:
assert False
# ternary ops, this is intentionally right associative
elif tail.startswith('?') and prec <= 1:
b, tail = p_expr(tail[1:].lstrip(), 1)
if not tail.startswith(':'):
raise RExpr.Error("Mismatched ?:? %s" % tail)
c, tail = p_expr(tail[1:].lstrip(), 1)
a = RExpr.Ife(a, b, c)
# no tail
else:
return a, tail
try:
self.tree, tail = p_expr(self.expr)
if tail:
raise RExpr.Error("trailing expr? %s" % tail)
except RExpr.Error as e:
print('error: in expr: %s' % self.expr,
file=sys.stderr)
print('error: %s' % e.reason,
file=sys.stderr)
sys.exit(3)
# recursively find all fields
def fields(self):
try:
return self.tree.fields()
except RExpr.Error as e:
print('error: in expr: %s' % self.expr,
file=sys.stderr)
print('error: %s' % e.reason,
file=sys.stderr)
sys.exit(3)
# recursively find the type
def type(self, types={}):
try:
return self.tree.type(types)
except RExpr.Error as e:
print('error: in expr: %s' % self.expr,
file=sys.stderr)
print('error: %s' % e.reason,
file=sys.stderr)
sys.exit(3)
# recursively find the fold operation
def fold(self, types={}):
try:
return self.tree.fold(types)
except RExpr.Error as e:
print('error: in expr: %s' % self.expr,
file=sys.stderr)
print('error: %s' % e.reason,
file=sys.stderr)
sys.exit(3)
# recursive evaluate the expr
def eval(self, fields={}):
try:
return self.tree.eval(fields)
except RExpr.Error as e:
print('error: in expr: %s' % self.expr,
file=sys.stderr)
print('error: %s' % e.reason,
file=sys.stderr)
sys.exit(3)
def openio(path, mode='r', buffering=-1):
# allow '-' for stdin/stdout
if path == '-':
if 'r' in mode:
return os.fdopen(os.dup(sys.stdin.fileno()), mode, buffering)
else:
return os.fdopen(os.dup(sys.stdout.fileno()), mode, buffering)
else:
return open(path, mode, buffering)
def collect(csv_paths, defines=[]):
# collect results from CSV files
fields = []
results = []
for path in csv_paths:
try:
with openio(path) as f:
reader = csv.DictReader(f, restval='')
fields.extend(
k for k in reader.fieldnames
if k not in fields)
for r in reader:
# filter by matching defines
if not all(k in r and r[k] in vs for k, vs in defines):
continue
results.append(r)
except FileNotFoundError:
pass
return fields, results
def infer(fields_, results,
by=None,
fields=None,
exprs=[],
defines=[]):
# we only really care about the last expr for each field
exprs = {k: expr for k, expr in exprs}
# find all fields our exprs depend on
fields__ = set(it.chain.from_iterable(
expr.fields() for _, expr in exprs.items()))
# if by not specified, guess it's anything not in fields/exprs/defines
if by is None:
by = [k for k in fields_
if k not in (fields or [])
and k not in fields__
and not any(k == k_ for k_, _ in defines)]
# if fields not specified, guess it's anything not in by/exprs/defines
if fields is None:
fields = [k for k in fields_
if k not in (by or [])
and k not in fields__
and not any(k == k_ for k_, _ in defines)]
# deduplicate by/fields
by = list(co.OrderedDict.fromkeys(by).keys())
fields = list(co.OrderedDict.fromkeys(fields).keys())
# find best type for all fields used by field exprs
fields__ = set(it.chain.from_iterable(
exprs[k].fields() if k in exprs else {k}
for k in fields))
types = {}
for k in fields__:
for t in [RInt, RFloat, RFrac]:
for r in results:
if k in r and r[k].strip():
try:
t(r[k])
except ValueError:
break
else:
types[k] = t
break
else:
print("error: no type matches field %r?" % k,
file=sys.stderr)
sys.exit(-1)
# typecheck field exprs, note these may reference input fields
# with the same name
types__ = types.copy()
for k, expr in exprs.items():
if k in fields:
types__[k] = expr.type(types)
# foldcheck field exprs
folds = {k: (RSum, t) for k, v in types.items()}
for k, expr in exprs.items():
if k in fields:
folds[k] = expr.fold(types)
folds = {k: (f(), t) for k, (f, t) in folds.items()}
# create result class
def __new__(cls, **r):
# evaluate types
r_ = {k: types[k](v) if k in types else v
for k, v in r.items()}
# evaluate exprs
r__ = r_.copy()
for k, expr in exprs.items():
r__[k] = expr.eval(r_)
# return result
return cls.__mro__[1].__new__(cls,
**{k: r__.get(k, '') for k in by},
**{k: ([r__[k]], 1) if k in r__ else ([], 0)
for k in fields})
def __add__(self, other):
# reuse lists if possible
def extend(a, b):
if len(a[0]) == a[1]:
a[0].extend(b[0][:b[1]])
return (a[0], a[1] + b[1])
else:
return (a[0][:a[1]] + b[0][:b[1]], a[1] + b[1])
# lazily fold results
return self.__class__.__mro__[1].__new__(self.__class__,
**{k: getattr(self, k) for k in by},
**{k: extend(
object.__getattribute__(self, k),
object.__getattribute__(other, k))
for k in fields})
def __getattribute__(self, k):
# lazily fold results on demand, this avoids issues with fold
# operations that depend on the number of results
if k in fields:
v = object.__getattribute__(self, k)
if v[1]:
return folds[k][0](v[0][:v[1]])
else:
return None
return object.__getattribute__(self, k)
return type('Result', (co.namedtuple('Result', by + fields),), {
'__slots__': (),
'__new__': __new__,
'__add__': __add__,
'__getattribute__': __getattribute__,
'_by': by,
'_fields': fields,
'_sort': fields,
'_types': {k: t for k, (_, t) in folds.items()},
})
def fold(Result, results, by=None, defines=[]):
if by is None:
by = Result._by
for k in it.chain(by or [], (k for k, _ in defines)):
if k not in Result._by and k not in Result._fields:
print("error: could not find field %r?" % k,
file=sys.stderr)
sys.exit(-1)
# filter by matching defines
if defines:
results_ = []
for r in results:
if all(getattr(r, k) in vs for k, vs in defines):
results_.append(r)
results = results_
# organize results into conflicts
folding = co.OrderedDict()
for r in results:
name = tuple(getattr(r, k) for k in by)
if name not in folding:
folding[name] = []
folding[name].append(r)
# merge conflicts
folded = []
for name, rs in folding.items():
folded.append(sum(rs[1:], start=rs[0]))
return folded
def table(Result, results, diff_results=None, *,
by=None,
fields=None,
sort=None,
summary=False,
all=False,
percent=False,
**_):
all_, all = all, __builtins__.all
if by is None:
by = Result._by
if fields is None:
fields = Result._fields
types = Result._types
# fold again
results = fold(Result, results, by=by)
if diff_results is not None:
diff_results = fold(Result, diff_results, by=by)
# organize by name
table = {
','.join(str(getattr(r, k) or '') for k in by): r
for r in results}
diff_table = {
','.join(str(getattr(r, k) or '') for k in by): r
for r in diff_results or []}
names = [name
for name in table.keys() | diff_table.keys()
if diff_results is None
or all_
or any(
types[k].ratio(
getattr(table.get(name), k, None),
getattr(diff_table.get(name), k, None))
for k in fields)]
# sort again, now with diff info, note that python's sort is stable
names.sort()
if diff_results is not None:
names.sort(
key=lambda n: tuple(
types[k].ratio(
getattr(table.get(n), k, None),
getattr(diff_table.get(n), k, None))
for k in fields),
reverse=True)
if sort:
for k, reverse in reversed(sort):
names.sort(
key=lambda n: tuple(
(getattr(table[n], k),)
if getattr(table.get(n), k, None) is not None
else ()
for k in (
[k] if k else [
k for k in Result._sort
if k in fields])),
reverse=reverse ^ (not k or k in Result._fields))
# build up our lines
lines = []
# header
header = ['%s%s' % (
','.join(by),
' (%d added, %d removed)' % (
sum(1 for n in table if n not in diff_table),
sum(1 for n in diff_table if n not in table))
if diff_results is not None and not percent else '')
if not summary else '']
if diff_results is None:
for k in fields:
header.append(k)
elif percent:
for k in fields:
header.append(k)
else:
for k in fields:
header.append('o'+k)
for k in fields:
header.append('n'+k)
for k in fields:
header.append('d'+k)
lines.append(header)
# entry helper
def table_entry(name, r, diff_r=None):
entry = [name]
if diff_results is None:
for k in fields:
entry.append(
(getattr(r, k).table(),
getattr(getattr(r, k), 'notes', lambda: [])())
if getattr(r, k, None) is not None
else types[k].none)
elif percent:
for k in fields:
entry.append(
(getattr(r, k).table()
if getattr(r, k, None) is not None
else types[k].none,
(lambda t: ['+∞%'] if t == +mt.inf
else ['-∞%'] if t == -mt.inf
else ['%+.1f%%' % (100*t)])(
types[k].ratio(
getattr(r, k, None),
getattr(diff_r, k, None)))))
else:
for k in fields:
entry.append(getattr(diff_r, k).table()
if getattr(diff_r, k, None) is not None
else types[k].none)
for k in fields:
entry.append(getattr(r, k).table()
if getattr(r, k, None) is not None
else types[k].none)
for k in fields:
entry.append(
(types[k].diff(
getattr(r, k, None),
getattr(diff_r, k, None)),
(lambda t: ['+∞%'] if t == +mt.inf
else ['-∞%'] if t == -mt.inf
else ['%+.1f%%' % (100*t)] if t
else [])(
types[k].ratio(
getattr(r, k, None),
getattr(diff_r, k, None)))))
return entry
# entries
if not summary:
for name in names:
r = table.get(name)
if diff_results is None:
diff_r = None
else:
diff_r = diff_table.get(name)
lines.append(table_entry(name, r, diff_r))
# total
r = next(iter(fold(Result, results, by=[])), None)
if diff_results is None:
diff_r = None
else:
diff_r = next(iter(fold(Result, diff_results, by=[])), None)
lines.append(table_entry('TOTAL', r, diff_r))
# homogenize
lines = [
[x if isinstance(x, tuple) else (x, []) for x in line]
for line in lines]
# find the best widths, note that column 0 contains the names and is
# handled a bit differently
widths = co.defaultdict(lambda: 7, {0: 23})
notes = co.defaultdict(lambda: 0)
for line in lines:
for i, x in enumerate(line):
widths[i] = max(widths[i], ((len(x[0])+1+4-1)//4)*4-1)
notes[i] = max(notes[i], 1+2*len(x[1])+sum(len(n) for n in x[1]))
# print our table
for line in lines:
print('%-*s %s' % (
widths[0], line[0][0],
' '.join('%*s%-*s' % (
widths[i], x[0],
notes[i], ' (%s)' % ', '.join(x[1]) if x[1] else '')
for i, x in enumerate(line[1:], 1))))
def main(csv_paths, *,
by=None,
fields=None,
defines=[],
sort=None,
**args):
# separate out exprs
exprs = [(k, v)
for k, v in it.chain(by or [], fields or [])
if v is not None]
if by is not None:
by = [k for k, _ in by]
if fields is not None:
fields = [k for k, _ in fields]
if by is None and fields is None:
print("error: needs --by or --fields to figure out fields",
file=sys.stderr)
sys.exit(-1)
# use is just an alias
if args.get('use'):
csv_paths = csv_paths + [args['use']]
# find CSV files
fields_, results = collect(csv_paths, defines)
# homogenize
Result = infer(fields_, results,
by=by,
fields=fields,
exprs=exprs,
defines=defines)
results_ = []
for r in results:
results_.append(Result(**{
k: v for k, v in r.items() if v.strip()}))
results = results_
# fold
results = fold(Result, results, by=by, defines=defines)
# sort, note that python's sort is stable
results.sort()
if sort:
for k, reverse in reversed(sort):
results.sort(
key=lambda r: tuple(
(getattr(r, k),) if getattr(r, k) is not None else ()
for k in ([k] if k else Result._sort)),
reverse=reverse ^ (not k or k in Result._fields))
# write results to CSV
if args.get('output'):
with openio(args['output'], 'w') as f:
writer = csv.DictWriter(f, Result._by + Result._fields)
writer.writeheader()
for r in results:
# note we need to go through getattr to resolve lazy fields
writer.writerow({
k: getattr(r, k)
for k in Result._by + Result._fields})
# find previous results?
if args.get('diff'):
_, diff_results = collect([args['diff']], renames, defines)
diff_results_ = []
for r in diff_results:
if not any(k in r and r[k].strip()
for k in Result._fields):
continue
try:
diff_results_.append(Result(**{
k: r[k] for k in Result._by + Result._fields
if k in r and r[k].strip()}))
except TypeError:
pass
diff_results = diff_results_
# fold
diff_results = fold(Result, diff_results, by=by, defines=defines)
# print table
if not args.get('quiet'):
table(Result, results,
diff_results if args.get('diff') else None,
by=by,
fields=fields,
sort=sort,
**args)
if __name__ == "__main__":
import argparse
import sys
parser = argparse.ArgumentParser(
description="Script to manipulate CSV files.",
allow_abbrev=False)
parser.add_argument(
'csv_paths',
nargs='*',
help="Input *.csv files.")
parser.add_argument(
'-q', '--quiet',
action='store_true',
help="Don't show anything, useful with -o.")
parser.add_argument(
'-o', '--output',
help="Specify CSV file to store results.")
parser.add_argument(
'-u', '--use',
help="Don't parse anything, use this CSV file.")
parser.add_argument(
'-d', '--diff',
help="Specify CSV file to diff against.")
parser.add_argument(
'-a', '--all',
action='store_true',
help="Show all, not just the ones that changed.")
parser.add_argument(
'-p', '--percent',
action='store_true',
help="Only show percentage change, not a full diff.")
parser.add_argument(
'-b', '--by',
action='append',
type=lambda x: (
lambda k, v=None: (
k.strip(),
RExpr(v) if v is not None else None)
)(*x.split('=', 1)),
help="Group by this field. Can include an expression of the form "
"field=expr.")
parser.add_argument(
'-f', '--field',
dest='fields',
action='append',
type=lambda x: (
lambda k, v=None: (
k.strip(),
RExpr(v) if v is not None else None)
)(*x.split('=', 1)),
help="Show this field. Can include an expression of the form "
"field=expr.")
parser.add_argument(
'-D', '--define',
dest='defines',
action='append',
type=lambda x: (
lambda k, vs: (
k.strip(),
{v.strip() for v in vs.split(',')})
)(*x.split('=', 1)),
help="Only include results where this field is this value. May "
"include comma-separated options.")
class AppendSort(argparse.Action):
def __call__(self, parser, namespace, value, option):
if namespace.sort is None:
namespace.sort = []
namespace.sort.append((value, True if option == '-S' else False))
parser.add_argument(
'-s', '--sort',
nargs='?',
action=AppendSort,
help="Sort by this field.")
parser.add_argument(
'-S', '--reverse-sort',
nargs='?',
action=AppendSort,
help="Sort by this field, but backwards.")
parser.add_argument(
'-Y', '--summary',
action='store_true',
help="Only show the total.")
parser.add_argument(
'--int',
action='append',
help="Treat these fields as ints.")
parser.add_argument(
'--float',
action='append',
help="Treat these fields as floats.")
parser.add_argument(
'--frac',
action='append',
help="Treat these fields as fractions.")
parser.add_argument(
'--sum',
action='append',
help="Add these fields (the default).")
parser.add_argument(
'--prod',
action='append',
help="Multiply these fields.")
parser.add_argument(
'--min',
action='append',
help="Take the minimum of these fields.")
parser.add_argument(
'--max',
action='append',
help="Take the maximum of these fields.")
parser.add_argument(
'--avg', '--mean',
action='append',
help="Average these fields.")
parser.add_argument(
'--stddev',
action='append',
help="Find the standard deviation of these fields.")
parser.add_argument(
'--gmean',
action='append',
help="Find the geometric mean of these fields.")
parser.add_argument(
'--gstddev',
action='append',
help="Find the geometric standard deviation of these fields.")
sys.exit(main(**{k: v
for k, v in vars(parser.parse_intermixed_args()).items()
if v is not None}))