scripts: csv.py: Adopting full expr parser for field exprs

This is a work-in-progress, but the general idea is to replace the
existing rename mechanic in csv.py with a full expr parser:

  $ ./scripts/csv.py input.csv -ba=x -fb=y+z

I've been putting this off for a while, as it feels like too big a jump
in complexity for what was intended to be a simple script. But
complexity is a bit funny in programming. Even if a full parser is more
difficult to implement, if it's the right grammar for the job, the
resulting script should end up both easier to understand and easier to
extend.

The original intention was that any sufficiently complicated math could
be implemented in ad-hoc Python scripts that operate directly on the CSV
files, but CSV parsing in Python is annoying enough that this never
really worked well.

But I'm probably overselling the complexity. This is classic CS stuff:

  1. build a syntax tree
  2. map symbols to input fields
  3. typecheck, fold, eval, etc

One neat thing is that in addition to providing type and eval
information, our exprs can also provide information on how to "fold" the
field after eval. This kicks in when merging muliple rows when grouping
by -b/--by, and for finding the TOTAL results.

This can be used to merge stack results correctly with max:

  $ ./scripts/csv.py stack.csv \
          -fframe='sum(frame)' -flimit='max(limit)'

Or can be used to find other interesting measurements:

  $ ./scripts/csv.py stack.csv \
          -favg='avg(frame)' -fstddev='stddev(frame)'

These changes also make the eval order of input/output fields much
stricter which is probably a good thing.

This should replace all of the somewhat hacky fake-expr flags in csv.py:

- --int     => -fa='int(b)'
- --float   => -fa='float(b)'
- --frac    => -fa='frac(b)'
- --sum     => -fa='sum(b)'
- --prod    => -fa='prod(b)'
- --min     => -fa='min(b)'
- --max     => -fa='max(b)'
- --avg     => -fa='avg(b)'
- --stddev  => -fa='stddev(b)'
- --gmean   => -fa='gmean(b)'
- --gstddev => -fa='gstddev(b)'

If you squint you might be able to see a pattern.
This commit is contained in:
Christopher Haster
2024-11-10 12:18:32 -06:00
parent 7cfcc1af1d
commit 4061891a02

View File

@@ -22,32 +22,10 @@ import itertools as it
import math as mt
import os
import re
import sys
# supported merge operations
#
# this is a terrible way to express these
#
OPS = {
'sum': lambda xs: sum(xs[1:], start=xs[0]),
'prod': lambda xs: mt.prod(xs[1:], start=xs[0]),
'min': min,
'max': max,
'avg': lambda xs: RFloat(sum(float(x) for x in xs) / len(xs)),
'stddev': lambda xs: (
lambda avg: RFloat(
mt.sqrt(sum((float(x) - avg)**2 for x in xs) / len(xs)))
)(sum(float(x) for x in xs) / len(xs)),
'gmean': lambda xs: RFloat(mt.prod(float(x) for x in xs)**(1/len(xs))),
'gstddev': lambda xs: (
lambda gmean: RFloat(
mt.exp(mt.sqrt(
sum(mt.log(float(x)/gmean)**2 for x in xs)
/ len(xs)))
if gmean else mt.inf)
)(mt.prod(float(x) for x in xs)**(1/len(xs))),
}
# various field types
# integer fields
class RInt(co.namedtuple('RInt', 'x')):
@@ -247,12 +225,519 @@ class RFrac(co.namedtuple('RFrac', 'a,b')):
def __ge__(self, other):
return not self.__lt__(other)
# available types
TYPES = co.OrderedDict([
('int', RInt),
('float', RFloat),
('frac', RFrac)
])
# various fold operations
class RSum:
def __call__(self, xs):
return sum(xs[1:], start=xs[0])
class RProd:
def __call__(self, xs):
return mt.prod(xs[1:], start=xs[0])
class RMin:
def __call__(self, xs):
return min(xs)
class RMax:
def __call__(self, xs):
return max(xs)
class RAvg:
def __call__(self, xs):
return RFloat(sum(float(x) for x in xs) / len(xs))
class RStddev:
def __call__(self, xs):
avg = sum(float(x) for x in xs) / len(xs)
return RFloat(mt.sqrt(sum((float(x) - avg)**2 for x in xs) / len(xs)))
class RGMean:
def __call__(self, xs):
return RFloat(mt.prod(float(x) for x in xs)**(1/len(xs)))
class RGStddev:
def __call__(self, xs):
gmean = mt.prod(float(x) for x in xs)**(1/len(xs))
return RFloat(
mt.exp(mt.sqrt(
sum(mt.log(float(x)/gmean)**2 for x in xs) / len(xs)))
if gmean else mt.inf)
# a lazily-evaluated field expression
class RExpr:
# expr parsing/typechecking/etc errors
class Error(Exception):
def __init__(self, reason):
self.reason = reason
# expr nodes
class Expr:
def __init__(self, *args):
for k, v in zip('abcdefghijklmnopqrstuvwxyz', args):
setattr(self, k, v)
def __repr__(self):
return '%s(%s)' % (
self.__class__.__name__,
','.join(
repr(getattr(self, k))
for k in it.takewhile(
lambda k: hasattr(self, k),
'abcdefghijklmnopqrstuvwxyz')))
def fields(self):
return set(it.chain.from_iterable(
getattr(self, k).fields()
for k in it.takewhile(
lambda k: hasattr(self, k),
'abcdefghijklmnopqrstuvwxyz')))
def type(self, types={}):
return self.a.type(types)
def fold(self, types={}):
return self.a.fold(types)
def eval(self, fields={}):
return self.a.eval(fields)
class Field(Expr):
def fields(self):
return {self.a}
def type(self, types={}):
if self.a not in types:
raise RExpr.Error("untyped field? %s" % self.a)
return types[self.a]
def fold(self, types={}):
if self.a not in types:
raise RExpr.Error("unfoldable field? %s" % self.a)
return RSum, types[self.a]
def eval(self, fields={}):
if self.a not in fields:
raise RExpr.Error("unknown field? %s" % self.a)
return fields[self.a]
class StrLit(Expr):
def fields(self):
return set()
def eval(self, fields={}):
return self.a
class IntLit(Expr):
def fields(self):
return set()
def type(self, types={}):
return RInt
def fold(self, types={}):
return RSum, RInt
def eval(self, fields={}):
return self.a
class FloatLit(Expr):
def fields(self):
return set()
def type(self, types={}):
return RFloat
def fold(self, types={}):
return RSum, RFloat
def eval(self, fields={}):
return self.a
class Int(Expr):
def type(self, types={}):
return RInt
def eval(self, fields={}):
return RInt(self.a.eval(fields))
class Float(Expr):
def type(self, types={}):
return RFloat
def eval(self, fields={}):
return RFloat(self.a.eval(fields))
class Frac(Expr):
def type(self, types={}):
return RFrac
def eval(self, fields={}):
return RFrac(self.a.eval(fields), self.b.eval(fields))
class Sum(Expr):
def fold(self, types={}):
return RSum, self.a.type(types)
class Prod(Expr):
def fold(self, types={}):
return RProd, self.a.type(types)
class Min(Expr):
def fold(self, types={}):
return RMin, self.a.type(types)
class Max(Expr):
def fold(self, types={}):
return RMax, self.a.type(types)
class Avg(Expr):
def fold(self, types={}):
return RAvg, RFloat
class Stddev(Expr):
def fold(self, types={}):
return RStddev, RFloat
class GMean(Expr):
def fold(self, types={}):
return RGMean, RFloat
class GStddev(Expr):
def fold(self, types={}):
return RGStddev, RFloat
class Ratio(Expr):
pass
class Total(Expr):
pass
class Ceil(Expr):
pass
class Floor(Expr):
pass
class Log(Expr):
pass
class Pow(Expr):
pass
class Sqrt(Expr):
pass
funcs = {
# types
'int': Int,
'float': Float,
'frac': Frac,
# functions
'ratio': Ratio,
'total': Total,
'ceil': Ceil,
'floor': Floor,
'log': Log,
'pow': Pow,
'sqrt': Sqrt,
# mergers
'sum': Sum,
'prod': Prod,
'min': Min,
'max': Max,
'avg': Avg,
'stddev': Stddev,
'gmean': GMean,
'gstddev': GStddev,
}
class Pos(Expr):
pass
class Neg(Expr):
pass
class Not(Expr):
pass
class Notnot(Expr):
pass
class Mul(Expr):
pass
class Div(Expr):
pass
class Mod(Expr):
pass
class Add(Expr):
pass
class Sub(Expr):
pass
class Shl(Expr):
pass
class Shr(Expr):
pass
class And(Expr):
pass
class Xor(Expr):
pass
class Or(Expr):
pass
class Lt(Expr):
pass
class Le(Expr):
pass
class Gt(Expr):
pass
class Ge(Expr):
pass
class Ne(Expr):
pass
class Eq(Expr):
pass
class Andand(Expr):
pass
class Oror(Expr):
pass
class Ife(Expr):
def type(self, types={}):
return self.b.type(types)
def fold(self, types={}):
return self.b.fold(types)
# parse and expr
def __init__(self, expr):
self.expr = expr.strip()
# parse the expression into a tree
def p_expr(expr, prec=0):
if expr.startswith('('):
a, tail = p_expr(expr[1:].lstrip())
if not tail.startswith(')'):
raise RExpr.Error("mismatched parens? %s" % tail)
tail = tail[1:].lstrip()
elif re.match('[_a-zA-Z][_a-zA-Z0-9]*', expr):
m = re.match('[_a-zA-Z][_a-zA-Z0-9]*', expr)
tail = expr[len(m.group()):].lstrip()
if tail.startswith('('):
tail = tail[1:].lstrip()
if m.group() not in RExpr.funcs:
raise RExpr.Error("unknown function? %s" % m.group())
args = []
while True:
a, tail = p_expr(tail)
args.append(a)
if tail.startswith(','):
tail = tail[1:].lstrip()
continue
else:
if not tail.startswith(')'):
raise RExpr.Error(
"mismatched parens? %s" % tail)
a = RExpr.funcs[m.group()](*args)
tail = tail[1:].lstrip()
break
else:
a = RExpr.Field(m.group())
elif re.match('(?:"(?:\\.|[^"])*"|\'(?:\\.|[^\'])\')', expr):
m = re.match('(?:"(?:\\.|[^"])*"|\'(?:\\.|[^\'])\')', expr)
a = RExpr.StrLit(m.group()[1:-1])
tail = expr[len(m.group()):].lstrip()
elif re.match('[+-]?[_0-9]*\.[_0-9eE]', expr):
m = re.match('[+-]?[_0-9]*\.[_0-9eE]', expr)
a = RExpr.FloatLit(RFloat(m.group()))
tail = expr[len(m.group()):].lstrip()
elif re.match('[+-]?(?:(?:0[bBoOxX])?[_0-9a-fA-F]+|∞|inf)', expr):
m = re.match('[+-]?(?:(?:0[bBoOxX])?[_0-9a-fA-F]+|∞|inf)', expr)
a = RExpr.IntLit(RInt(m.group()))
tail = expr[len(m.group()):].lstrip()
elif expr.startswith('+'):
a, tail = p_expr(expr[1:].lstrip(), 12)
a = RExpr.Pos(a)
elif expr.startswith('-'):
a, tail = p_expr(expr[1:].lstrip(), 12)
a = RExpr.Neg(a)
elif expr.startswith('~'):
a, tail = p_expr(expr[1:].lstrip(), 12)
a = RExpr.Not(a)
elif expr.startswith('!'):
a, tail = p_expr(expr[1:].lstrip(), 4)
a = RExpr.Notnot(a)
else:
raise RExpr.Error("unknown expr? %s" % expr)
while True:
if tail.startswith('*') and prec < 11:
b, tail = p_expr(tail[1:].lstrip(), 11)
a = RExpr.Mul(a, b)
elif tail.startswith('/') and prec < 11:
b, tail = p_expr(tail[1:].lstrip(), 11)
a = RExpr.Div(a, b)
elif tail.startswith('%') and prec < 11:
b, tail = p_expr(tail[1:].lstrip(), 11)
a = RExpr.Mod(a, b)
elif tail.startswith('+') and prec < 10:
b, tail = p_expr(tail[1:].lstrip(), 10)
a = RExpr.Add(a, b)
elif tail.startswith('-') and prec < 10:
b, tail = p_expr(tail[1:].lstrip(), 10)
a = RExpr.Sub(a, b)
elif tail.startswith('<<') and prec < 9:
b, tail = p_expr(tail[2:].lstrip(), 9)
a = RExpr.Shl(a, b)
elif tail.startswith('>>') and prec < 9:
b, tail = p_expr(tail[2:].lstrip(), 9)
a = RExpr.Shr(a, b)
elif tail.startswith('&') and prec < 8:
b, tail = p_expr(tail[1:].lstrip(), 8)
a = RExpr.And(a, b)
elif tail.startswith('^') and prec < 7:
b, tail = p_expr(tail[1:].lstrip(), 7)
a = RExpr.Xor(a, b)
elif tail.startswith('|') and prec < 6:
b, tail = p_expr(tail[1:].lstrip(), 6)
a = RExpr.Or(a, b)
elif tail.startswith('<') and prec < 5:
b, tail = p_expr(tail[1:].lstrip(), 5)
a = RExpr.Lt(a, b)
elif tail.startswith('<=') and prec < 5:
b, tail = p_expr(tail[2:].lstrip(), 5)
a = RExpr.Le(a, b)
elif tail.startswith('>') and prec < 5:
b, tail = p_expr(tail[1:].lstrip(), 5)
a = RExpr.Gt(a, b)
elif tail.startswith('>=') and prec < 5:
b, tail = p_expr(tail[2:].lstrip(), 5)
a = RExpr.Ge(a, b)
elif tail.startswith('!=') and prec < 5:
b, tail = p_expr(tail[2:].lstrip(), 5)
a = RExpr.Ne(a, b)
elif tail.startswith('==') and prec < 5:
b, tail = p_expr(tail[2:].lstrip(), 5)
a = RExpr.Eq(a, b)
elif tail.startswith('&&') and prec < 3:
b, tail = p_expr(tail[2:].lstrip(), 3)
a = RExpr.Andand(a, b)
elif tail.startswith('||') and prec < 2:
b, tail = p_expr(tail[2:].lstrip(), 2)
a = RExpr.Oror(a, b)
elif tail.startswith('?') and prec <= 1:
b, tail = p_expr(tail[1:].lstrip(), 1)
if not tail.startswith(':'):
raise RExpr.Error("Mismatched ?:? %s" % tail)
c, tail = p_expr(tail[1:].lstrip(), 1)
a = RExpr.Ife(a, b, c)
else:
return a, tail
try:
self.tree, tail = p_expr(self.expr)
if tail:
raise RExpr.Error("trailing expr? %s" % tail)
except RExpr.Error as e:
print('error: in expr: %s' % self.expr,
file=sys.stderr)
print('error: %s' % e.reason,
file=sys.stderr)
sys.exit(3)
# recursively find all fields
def fields(self):
try:
return self.tree.fields()
except RExpr.Error as e:
print('error: in expr: %s' % self.expr,
file=sys.stderr)
print('error: %s' % e.reason,
file=sys.stderr)
sys.exit(3)
# recursively find the type
def type(self, types={}):
try:
return self.tree.type(types)
except RExpr.Error as e:
print('error: in expr: %s' % self.expr,
file=sys.stderr)
print('error: %s' % e.reason,
file=sys.stderr)
sys.exit(3)
# recursively find the fold operation
def fold(self, types={}):
try:
return self.tree.fold(types)
except RExpr.Error as e:
print('error: in expr: %s' % self.expr,
file=sys.stderr)
print('error: %s' % e.reason,
file=sys.stderr)
sys.exit(3)
# recursive evaluate the expr
def eval(self, fields={}):
try:
return self.tree.eval(fields)
except RExpr.Error as e:
print('error: in expr: %s' % self.expr,
file=sys.stderr)
print('error: %s' % e.reason,
file=sys.stderr)
sys.exit(3)
def openio(path, mode='r', buffering=-1):
@@ -265,7 +750,7 @@ def openio(path, mode='r', buffering=-1):
else:
return open(path, mode, buffering)
def collect(csv_paths, renames=[], defines=[]):
def collect(csv_paths, defines=[]):
# collect results from CSV files
fields = []
results = []
@@ -277,15 +762,6 @@ def collect(csv_paths, renames=[], defines=[]):
k for k in reader.fieldnames
if k not in fields)
for r in reader:
# apply any renames
if renames:
# make a copy so renames can overlap
r_ = {}
for new_k, old_k in renames:
if old_k in r:
r_[new_k] = r[old_k]
r.update(r_)
# filter by matching defines
if not all(k in r and r[k] in vs for k, vs in defines):
continue
@@ -299,63 +775,82 @@ def collect(csv_paths, renames=[], defines=[]):
def infer(fields_, results,
by=None,
fields=None,
types={},
ops={},
renames=[],
exprs=[],
defines=[]):
# if by not specified, guess it's anything not in fields/renames/defines
# we only really care about the last expr for each field
exprs = {k: expr for k, expr in exprs}
# find all fields our exprs depend on
fields__ = set(it.chain.from_iterable(
expr.fields() for _, expr in exprs.items()))
# if by not specified, guess it's anything not in fields/exprs/defines
if by is None:
by = [k for k in fields_
if k not in (fields or [])
and not any(k == old_k for _, old_k in renames)
and k not in fields__
and not any(k == k_ for k_, _ in defines)]
# if fields not specified, guess it's anything not in by/renames/defines
# if fields not specified, guess it's anything not in by/exprs/defines
if fields is None:
fields = [k for k in fields_
if k not in (by or [])
and not any(k == old_k for _, old_k in renames)
and k not in fields__
and not any(k == k_ for k_, _ in defines)]
# deduplicate by/fields
by = list(co.OrderedDict.fromkeys(by).keys())
fields = list(co.OrderedDict.fromkeys(fields).keys())
# find best type for all fields
types_ = {}
for k in fields:
if k in types:
types_[k] = types[k]
else:
for t in TYPES.values():
for r in results:
if k in r and r[k].strip():
try:
t(r[k])
except ValueError:
break
else:
types_[k] = t
break
# find best type for all fields used by field exprs
fields__ = set(it.chain.from_iterable(
exprs[k].fields() if k in exprs else {k}
for k in fields))
types = {}
for k in fields__:
for t in [RInt, RFloat, RFrac]:
for r in results:
if k in r and r[k].strip():
try:
t(r[k])
except ValueError:
break
else:
print("error: no type matches field %r?" % k,
file=sys.stderr)
sys.exit(-1)
types = types_
types[k] = t
break
else:
print("error: no type matches field %r?" % k,
file=sys.stderr)
sys.exit(-1)
# does folding change the type?
types_ = {}
for k, t in types.items():
types_[k] = ops.get(k, OPS['sum'])([t()]).__class__
# typecheck field exprs, note these may reference input fields
# with the same name
types__ = types.copy()
for k, expr in exprs.items():
if k in fields:
types__[k] = expr.type(types)
# foldcheck field exprs
folds = {k: (RSum, t) for k, v in types.items()}
for k, expr in exprs.items():
if k in fields:
folds[k] = expr.fold(types)
folds = {k: (f(), t) for k, (f, t) in folds.items()}
# create result class
def __new__(cls, **r):
# evaluate types
r_ = {k: types[k](v) if k in types else v
for k, v in r.items()}
# evaluate exprs
r__ = r_.copy()
for k, expr in exprs.items():
r__[k] = expr.eval(r_)
# return result
return cls.__mro__[1].__new__(cls,
**{k: r.get(k, '') for k in by},
**{k: r[k] if k in r and isinstance(r[k], tuple)
else ([types[k](r[k])], 1) if k in r
else ([], 0)
**{k: r__.get(k, '') for k in by},
**{k: ([r__[k]], 1) if k in r__ else ([], 0)
for k in fields})
def __add__(self, other):
@@ -367,7 +862,8 @@ def infer(fields_, results,
else:
return (a[0][:a[1]] + b[0][:b[1]], a[1] + b[1])
return self.__class__(
# lazily fold results
return self.__class__.__mro__[1].__new__(self.__class__,
**{k: getattr(self, k) for k in by},
**{k: extend(
object.__getattribute__(self, k),
@@ -375,10 +871,12 @@ def infer(fields_, results,
for k in fields})
def __getattribute__(self, k):
# lazily fold results on demand, this avoids issues with fold
# operations that depend on the number of results
if k in fields:
v = object.__getattribute__(self, k)
if v[1]:
return ops.get(k, OPS['sum'])(v[0][:v[1]])
return folds[k][0](v[0][:v[1]])
else:
return None
return object.__getattribute__(self, k)
@@ -391,7 +889,7 @@ def infer(fields_, results,
'_by': by,
'_fields': fields,
'_sort': fields,
'_types': types_,
'_types': {k: t for k, (_, t) in folds.items()},
})
@@ -609,49 +1107,15 @@ def main(csv_paths, *,
defines=[],
sort=None,
**args):
# separate out renames
renames = list(it.chain.from_iterable(
((k, v) for v in vs)
for k, vs in it.chain(by or [], fields or [])))
# separate out exprs
exprs = [(k, v)
for k, v in it.chain(by or [], fields or [])
if v is not None]
if by is not None:
by = [k for k, _ in by]
if fields is not None:
fields = [k for k, _ in fields]
# figure out types
types = {}
for t in TYPES.keys():
for k in args.get(t, []):
if k in types:
print("error: conflicting type for field %r?" % k,
file=sys.stderr)
sys.exit(-1)
types[k] = TYPES[t]
# rename types?
if renames:
types_ = {}
for new_k, old_k in renames:
if old_k in types:
types_[new_k] = types[old_k]
types.update(types_)
# figure out merge operations
ops = {}
for o in OPS.keys():
for k in args.get(o, []):
if k in ops:
print("error: conflicting op for field %r?" % k,
file=sys.stderr)
sys.exit(-1)
ops[k] = OPS[o]
# rename ops?
if renames:
ops_ = {}
for new_k, old_k in renames:
if old_k in ops:
ops_[new_k] = ops[old_k]
ops.update(ops_)
if by is None and fields is None:
print("error: needs --by or --fields to figure out fields",
file=sys.stderr)
@@ -662,27 +1126,18 @@ def main(csv_paths, *,
csv_paths = csv_paths + [args['use']]
# find CSV files
fields_, results = collect(csv_paths, renames, defines)
fields_, results = collect(csv_paths, defines)
# homogenize
Result = infer(fields_, results,
by=by,
fields=fields,
types=types,
ops=ops,
renames=renames,
exprs=exprs,
defines=defines)
results_ = []
for r in results:
if not any(k in r and r[k].strip()
for k in Result._fields):
continue
try:
results_.append(Result(**{
k: r[k] for k in Result._by + Result._fields
if k in r and r[k].strip()}))
except TypeError:
pass
results_.append(Result(**{
k: v for k, v in r.items() if v.strip()}))
results = results_
# fold
@@ -773,25 +1228,23 @@ if __name__ == "__main__":
'-b', '--by',
action='append',
type=lambda x: (
lambda k, vs=None: (
lambda k, v=None: (
k.strip(),
tuple(v.strip() for v in vs.split(','))
if vs is not None else ())
RExpr(v) if v is not None else None)
)(*x.split('=', 1)),
help="Group by this field. Can rename fields with "
"new_name=old_name.")
help="Group by this field. Can include an expression of the form "
"field=expr.")
parser.add_argument(
'-f', '--field',
dest='fields',
action='append',
type=lambda x: (
lambda k, vs=None: (
lambda k, v=None: (
k.strip(),
tuple(v.strip() for v in vs.split(','))
if vs is not None else ())
RExpr(v) if v is not None else None)
)(*x.split('=', 1)),
help="Show this field. Can rename fields with "
"new_name=old_name.")
help="Show this field. Can include an expression of the form "
"field=expr.")
parser.add_argument(
'-D', '--define',
dest='defines',