Files
littlefs/scripts/dbgtrace.py
Christopher Haster 8cc81aef7d scripts: Adopt __get__ binding for write/writeln methods
This actually binds our custom write/writeln functions as methods to the
file object:

  def writeln(self, s=''):
      self.write(s)
      self.write('\n')
  f.writeln = writeln.__get__(f)

This doesn't really gain us anything, but is a bit more correct and may
be safer if other code messes with the file's internals.
2025-06-27 12:56:03 -05:00

1864 lines
59 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
#
# Render operations on block devices based on trace output
#
# Example:
# ./scripts/dbgtrace.py trace
#
# Copyright (c) 2022, The littlefs authors.
# SPDX-License-Identifier: BSD-3-Clause
#
# prevent local imports
if __name__ == "__main__":
__import__('sys').path.pop(0)
import bisect
import collections as co
import fnmatch
import functools as ft
import io
import itertools as it
import math as mt
import os
import re
import shlex
import shutil
import sys
import threading as th
import time
# assign chars/colors to specific bd operations
CHARS = {
'read': 'r',
'prog': 'p',
'erase': 'e',
'noop': '-',
}
COLORS = {
'read': '32',
'prog': '35',
'erase': '34',
'noop': '1;30',
}
# assign chars/colors to varying levels of wear
WEAR_CHARS = '0123456789'
WEAR_COLORS = ['1;30', '1;30', '1;30', '', '', '', '', '31', '31', '1;31']
# give more interesting operations a higher priority
#
# note that while progs always subset erases, erases are much rarer,
# which is why we give them priority
Z_ORDER = ['erase', 'prog', 'read', 'wear', 'noop']
CHARS_DOTS = " .':"
CHARS_BRAILLE = (
'⠀⢀⡀⣀⠠⢠⡠⣠⠄⢄⡄⣄⠤⢤⡤⣤' '⠐⢐⡐⣐⠰⢰⡰⣰⠔⢔⡔⣔⠴⢴⡴⣴'
'⠂⢂⡂⣂⠢⢢⡢⣢⠆⢆⡆⣆⠦⢦⡦⣦' '⠒⢒⡒⣒⠲⢲⡲⣲⠖⢖⡖⣖⠶⢶⡶⣶'
'⠈⢈⡈⣈⠨⢨⡨⣨⠌⢌⡌⣌⠬⢬⡬⣬' '⠘⢘⡘⣘⠸⢸⡸⣸⠜⢜⡜⣜⠼⢼⡼⣼'
'⠊⢊⡊⣊⠪⢪⡪⣪⠎⢎⡎⣎⠮⢮⡮⣮' '⠚⢚⡚⣚⠺⢺⡺⣺⠞⢞⡞⣞⠾⢾⡾⣾'
'⠁⢁⡁⣁⠡⢡⡡⣡⠅⢅⡅⣅⠥⢥⡥⣥' '⠑⢑⡑⣑⠱⢱⡱⣱⠕⢕⡕⣕⠵⢵⡵⣵'
'⠃⢃⡃⣃⠣⢣⡣⣣⠇⢇⡇⣇⠧⢧⡧⣧' '⠓⢓⡓⣓⠳⢳⡳⣳⠗⢗⡗⣗⠷⢷⡷⣷'
'⠉⢉⡉⣉⠩⢩⡩⣩⠍⢍⡍⣍⠭⢭⡭⣭' '⠙⢙⡙⣙⠹⢹⡹⣹⠝⢝⡝⣝⠽⢽⡽⣽'
'⠋⢋⡋⣋⠫⢫⡫⣫⠏⢏⡏⣏⠯⢯⡯⣯' '⠛⢛⡛⣛⠻⢻⡻⣻⠟⢟⡟⣟⠿⢿⡿⣿')
# open with '-' for stdin/stdout
def openio(path, mode='r', buffering=-1):
import os
if path == '-':
if 'r' in mode:
return os.fdopen(os.dup(sys.stdin.fileno()), mode, buffering)
else:
return os.fdopen(os.dup(sys.stdout.fileno()), mode, buffering)
else:
return open(path, mode, buffering)
# some ways of block geometry representations
# 512 -> 512
# 512x16 -> (512, 16)
# 0x200x10 -> (512, 16)
def bdgeom(s):
s = s.strip()
b = 10
if s.startswith('0x') or s.startswith('0X'):
s = s[2:]
b = 16
elif s.startswith('0o') or s.startswith('0O'):
s = s[2:]
b = 8
elif s.startswith('0b') or s.startswith('0B'):
s = s[2:]
b = 2
if 'x' in s:
s, s_ = s.split('x', 1)
return (int(s, b), int(s_, b))
else:
return int(s, b)
# parse some rbyd addr encodings
# 0xa -> (0xa,)
# 0xa.c -> ((0xa, 0xc),)
# 0x{a,b} -> (0xa, 0xb)
# 0x{a,b}.c -> ((0xa, 0xc), (0xb, 0xc))
def rbydaddr(s):
s = s.strip()
b = 10
if s.startswith('0x') or s.startswith('0X'):
s = s[2:]
b = 16
elif s.startswith('0o') or s.startswith('0O'):
s = s[2:]
b = 8
elif s.startswith('0b') or s.startswith('0B'):
s = s[2:]
b = 2
trunk = None
if '.' in s:
s, s_ = s.split('.', 1)
trunk = int(s_, b)
if s.startswith('{') and '}' in s:
ss = s[1:s.find('}')].split(',')
else:
ss = [s]
addr = []
for s in ss:
if trunk is not None:
addr.append((int(s, b), trunk))
else:
addr.append(int(s, b))
return tuple(addr)
# a pseudo-stdout ring buffer
class RingIO:
def __init__(self, maxlen=None, head=False):
self.maxlen = maxlen
self.head = head
self.lines = co.deque(
maxlen=max(maxlen, 0) if maxlen is not None else None)
self.tail = io.StringIO()
# trigger automatic sizing
self.resize(self.maxlen)
@property
def width(self):
# just fetch this on demand, we don't actually use width
return shutil.get_terminal_size((80, 5))[0]
@property
def height(self):
# calculate based on terminal height?
if self.maxlen is None or self.maxlen <= 0:
return max(
shutil.get_terminal_size((80, 5))[1]
+ (self.maxlen or 0),
0)
# limit to maxlen
else:
return self.maxlen
def resize(self, maxlen):
self.maxlen = maxlen
if maxlen is not None and maxlen <= 0:
maxlen = self.height
if maxlen != self.lines.maxlen:
self.lines = co.deque(self.lines, maxlen=maxlen)
def __len__(self):
return len(self.lines)
def write(self, s):
# note using split here ensures the trailing string has no newline
lines = s.split('\n')
if len(lines) > 1 and self.tail.getvalue():
self.tail.write(lines[0])
lines[0] = self.tail.getvalue()
self.tail = io.StringIO()
self.lines.extend(lines[:-1])
if lines[-1]:
self.tail.write(lines[-1])
# keep track of maximum drawn canvas
canvas_lines = 1
def draw(self):
# did terminal size change?
self.resize(self.maxlen)
# copy lines
lines = self.lines.copy()
# pad to fill any existing canvas, but truncate to terminal size
h = shutil.get_terminal_size((80, 5))[1]
lines.extend('' for _ in range(
len(lines),
min(RingIO.canvas_lines, h)))
while len(lines) > h:
if self.head:
lines.pop()
else:
lines.popleft()
# build up the redraw in memory first and render in a single
# write call, this minimizes flickering caused by the cursor
# jumping around
canvas = []
# hide the cursor
canvas.append('\x1b[?25l')
# give ourself a canvas
while RingIO.canvas_lines < len(lines):
canvas.append('\n')
RingIO.canvas_lines += 1
# write lines from top to bottom so later lines overwrite earlier
# lines, note xA/xB stop at terminal boundaries
for i, line in enumerate(lines):
# move to col 0
canvas.append('\r')
# move up to line
if len(lines)-1-i > 0:
canvas.append('\x1b[%dA' % (len(lines)-1-i))
# clear line
canvas.append('\x1b[K')
# disable line wrap
canvas.append('\x1b[?7l')
# print the line
canvas.append(line)
# enable line wrap
canvas.append('\x1b[?7h') # enable line wrap
# move back down
if len(lines)-1-i > 0:
canvas.append('\x1b[%dB' % (len(lines)-1-i))
# show the cursor again
canvas.append('\x1b[?25h')
# write to stdout and flush
sys.stdout.write(''.join(canvas))
sys.stdout.flush()
# a representation of optionally key-mapped attrs
class CsvAttr:
def __init__(self, attrs, defaults=None):
if attrs is None:
attrs = []
if isinstance(attrs, dict):
attrs = attrs.items()
# normalize
self.attrs = []
self.keyed = co.OrderedDict()
for attr in attrs:
if not isinstance(attr, tuple):
attr = ((), attr)
if attr[0] in {None, (), (None,), ('*',)}:
attr = ((), attr[1])
if not isinstance(attr[0], tuple):
attr = ((attr[0],), attr[1])
self.attrs.append(attr)
if attr[0] not in self.keyed:
self.keyed[attr[0]] = []
self.keyed[attr[0]].append(attr[1])
# create attrs object for defaults
if isinstance(defaults, CsvAttr):
self.defaults = defaults
elif defaults is not None:
self.defaults = CsvAttr(defaults)
else:
self.defaults = None
def __repr__(self):
if self.defaults is None:
return 'CsvAttr(%r)' % (
[(','.join(attr[0]), attr[1])
for attr in self.attrs])
else:
return 'CsvAttr(%r, %r)' % (
[(','.join(attr[0]), attr[1])
for attr in self.attrs],
[(','.join(attr[0]), attr[1])
for attr in self.defaults.attrs])
def __iter__(self):
if () in self.keyed:
return it.cycle(self.keyed[()])
elif self.defaults is not None:
return iter(self.defaults)
else:
return iter(())
def __bool__(self):
return bool(self.attrs)
def __getitem__(self, key):
if isinstance(key, tuple):
if len(key) > 0 and not isinstance(key[0], str):
i, key = key
if not isinstance(key, tuple):
key = (key,)
else:
i, key = 0, key
elif isinstance(key, str):
i, key = 0, (key,)
else:
i, key = key, ()
# try to lookup by key
best = None
for ks, vs in self.keyed.items():
prefix = []
for j, k in enumerate(ks):
if j < len(key) and fnmatch.fnmatchcase(key[j], k):
prefix.append(k)
else:
prefix = None
break
if prefix is not None and (
best is None or len(prefix) >= len(best[0])):
best = (prefix, vs)
if best is not None:
# cycle based on index
return best[1][i % len(best[1])]
# fallback to defaults?
if self.defaults is not None:
return self.defaults[i, key]
raise KeyError(i, key)
def get(self, key, default=None):
try:
return self.__getitem__(key)
except KeyError:
return default
def __contains__(self, key):
try:
self.__getitem__(key)
return True
except KeyError:
return False
# get all results for a given key
def getall(self, key, default=None):
if not isinstance(key, tuple):
key = (key,)
# try to lookup by key
best = None
for ks, vs in self.keyed.items():
prefix = []
for j, k in enumerate(ks):
if j < len(key) and fnmatch.fnmatchcase(key[j], k):
prefix.append(k)
else:
prefix = None
break
if prefix is not None and (
best is None or len(prefix) >= len(best[0])):
best = (prefix, vs)
if best is not None:
return best[1]
# fallback to defaults?
if self.defaults is not None:
return self.defaults.getall(key, default)
raise default
# a key function for sorting by key order
def key(self, key):
if not isinstance(key, tuple):
key = (key,)
best = None
for i, ks in enumerate(self.keyed.keys()):
prefix = []
for j, k in enumerate(ks):
if j < len(key) and (not k or key[j] == k):
prefix.append(k)
else:
prefix = None
break
if prefix is not None and (
best is None or len(prefix) >= len(best[0])):
best = (prefix, i)
if best is not None:
return best[1]
# fallback to defaults?
if self.defaults is not None:
return len(self.keyed) + self.defaults.key(key)
return len(self.keyed)
# parse %-escaped strings
#
# attrs can override __getitem__ for lazy attr generation
def punescape(s, attrs=None):
pattern = re.compile(
'%[%n]'
'|' '%x..'
'|' '%u....'
'|' '%U........'
'|' '%\((?P<field>[^)]*)\)'
'(?P<format>[+\- #0-9\.]*[sdboxXfFeEgG])')
def unescape(m):
if m.group()[1] == '%': return '%'
elif m.group()[1] == 'n': return '\n'
elif m.group()[1] == 'x': return chr(int(m.group()[2:], 16))
elif m.group()[1] == 'u': return chr(int(m.group()[2:], 16))
elif m.group()[1] == 'U': return chr(int(m.group()[2:], 16))
elif m.group()[1] == '(':
if attrs is not None:
try:
v = attrs[m.group('field')]
except KeyError:
return m.group()
else:
return m.group()
f = m.group('format')
if f[-1] in 'dboxX':
if isinstance(v, str):
v = dat(v, 0)
v = int(v)
elif f[-1] in 'fFeEgG':
if isinstance(v, str):
v = dat(v, 0)
v = float(v)
else:
f = ('<' if '-' in f else '>') + f.replace('-', '')
v = str(v)
# note we need Python's new format syntax for binary
return ('{:%s}' % f).format(v)
else: assert False
return re.sub(pattern, unescape, s)
# split %-escaped strings into chars
def psplit(s):
pattern = re.compile(
'%[%n]'
'|' '%x..'
'|' '%u....'
'|' '%U........'
'|' '%\((?P<field>[^)]*)\)'
'(?P<format>[+\- #0-9\.]*[sdboxXfFeEgG])')
return [m.group() for m in re.finditer(pattern.pattern + '|.', s)]
# a little ascii renderer
class Canvas:
def __init__(self, width, height, *,
color=False,
dots=False,
braille=False):
# scale if we're printing with dots or braille
if braille:
xscale, yscale = 2, 4
elif dots:
xscale, yscale = 1, 2
else:
xscale, yscale = 1, 1
self.width_ = width
self.height_ = height
self.width = xscale*width
self.height = yscale*height
self.xscale = xscale
self.yscale = yscale
self.color_ = color
self.dots = dots
self.braille = braille
# create initial canvas
self.chars = [0] * (width*height)
self.colors = [''] * (width*height)
def char(self, x, y, char=None):
# ignore out of bounds
if x < 0 or y < 0 or x >= self.width or y >= self.height:
return False
x_ = x // self.xscale
y_ = y // self.yscale
if char is not None:
c = self.chars[x_ + y_*self.width_]
# mask in sub-char pixel?
if isinstance(char, bool):
if not isinstance(c, int):
c = 0
self.chars[x_ + y_*self.width_] = (c
| (1
<< ((y%self.yscale)*self.xscale
+ (self.xscale-1)-(x%self.xscale))))
else:
self.chars[x_ + y_*self.width_] = char
else:
c = self.chars[x_ + y_*self.width_]
if isinstance(c, int):
return ((c
>> ((y%self.yscale)*self.xscale
+ (self.xscale-1)-(x%self.xscale)))
& 1) == 1
else:
return c
def color(self, x, y, color=None):
# ignore out of bounds
if x < 0 or y < 0 or x >= self.width or y >= self.height:
return ''
x_ = x // self.xscale
y_ = y // self.yscale
if color is not None:
self.colors[x_ + y_*self.width_] = color
else:
return self.colors[x_ + y_*self.width_]
def __getitem__(self, xy):
x, y = xy
return self.char(x, y)
def __setitem__(self, xy, char):
x, y = xy
self.char(x, y, char)
def point(self, x, y, *,
char=True,
color=''):
self.char(x, y, char)
self.color(x, y, color)
def line(self, x1, y1, x2, y2, *,
char=True,
color=''):
# incremental error line algorithm
ex = abs(x2 - x1)
ey = -abs(y2 - y1)
dx = +1 if x1 < x2 else -1
dy = +1 if y1 < y2 else -1
e = ex + ey
while True:
self.point(x1, y1, char=char, color=color)
e2 = 2*e
if x1 == x2 and y1 == y2:
break
if e2 > ey:
e += ey
x1 += dx
if x1 == x2 and y1 == y2:
break
if e2 < ex:
e += ex
y1 += dy
self.point(x2, y2, char=char, color=color)
def rect(self, x, y, w, h, *,
char=True,
color=''):
for j in range(h):
for i in range(w):
self.point(x+i, y+j, char=char, color=color)
def label(self, x, y, label, width=None, height=None, *,
color=''):
x_ = x
y_ = y
for char in label:
if char == '\n':
x_ = x
y_ -= self.yscale
else:
if ((width is None or x_ < x+width)
and (height is None or y_ > y-height)):
self.point(x_, y_, char=char, color=color)
x_ += self.xscale
def draw(self, row):
y_ = self.height_-1 - row
row_ = []
for x_ in range(self.width_):
# char?
c = self.chars[x_ + y_*self.width_]
if isinstance(c, int):
if self.braille:
assert c < 256
c = CHARS_BRAILLE[c]
elif self.dots:
assert c < 4
c = CHARS_DOTS[c]
else:
assert c < 2
c = '.' if c else ' '
# color?
if self.color_:
color = self.colors[x_ + y_*self.width_]
if color:
c = '\x1b[%sm%s\x1b[m' % (color, c)
row_.append(c)
return ''.join(row_)
# naive space filling curve (the default)
def naive_curve(width, height):
for y in range(height):
for x in range(width):
yield x, y
# space filling Hilbert-curve
def hilbert_curve(width, height):
# based on generalized Hilbert curves:
# https://github.com/jakubcerveny/gilbert
#
def hilbert_(x, y, a_x, a_y, b_x, b_y):
w = abs(a_x+a_y)
h = abs(b_x+b_y)
a_dx = -1 if a_x < 0 else +1 if a_x > 0 else 0
a_dy = -1 if a_y < 0 else +1 if a_y > 0 else 0
b_dx = -1 if b_x < 0 else +1 if b_x > 0 else 0
b_dy = -1 if b_y < 0 else +1 if b_y > 0 else 0
# trivial row
if h == 1:
for _ in range(w):
yield x, y
x, y = x+a_dx, y+a_dy
return
# trivial column
if w == 1:
for _ in range(h):
yield x, y
x, y = x+b_dx, y+b_dy
return
a_x_, a_y_ = a_x//2, a_y//2
b_x_, b_y_ = b_x//2, b_y//2
w_ = abs(a_x_+a_y_)
h_ = abs(b_x_+b_y_)
if 2*w > 3*h:
# prefer even steps
if w_ % 2 != 0 and w > 2:
a_x_, a_y_ = a_x_+a_dx, a_y_+a_dy
# split in two
yield from hilbert_(
x, y,
a_x_, a_y_, b_x, b_y)
yield from hilbert_(
x+a_x_, y+a_y_,
a_x-a_x_, a_y-a_y_, b_x, b_y)
else:
# prefer even steps
if h_ % 2 != 0 and h > 2:
b_x_, b_y_ = b_x_+b_dx, b_y_+b_dy
# split in three
yield from hilbert_(
x, y,
b_x_, b_y_, a_x_, a_y_)
yield from hilbert_(
x+b_x_, y+b_y_,
a_x, a_y, b_x-b_x_, b_y-b_y_)
yield from hilbert_(
x+(a_x-a_dx)+(b_x_-b_dx), y+(a_y-a_dy)+(b_y_-b_dy),
-b_x_, -b_y_, -(a_x-a_x_), -(a_y-a_y_))
if width >= height:
yield from hilbert_(0, 0, +width, 0, 0, +height)
else:
yield from hilbert_(0, 0, 0, +height, +width, 0)
# space filling Z-curve/Lebesgue-curve
def lebesgue_curve(width, height):
# we create a truncated Z-curve by simply filtering out the
# points that are outside our region
for i in range(2**(2*mt.ceil(mt.log2(max(width, height))))):
# we just operate on binary strings here because it's easier
b = '{:0{}b}'.format(i, 2*mt.ceil(mt.log2(i+1)/2))
x = int(b[1::2], 2) if b[1::2] else 0
y = int(b[0::2], 2) if b[0::2] else 0
if x < width and y < height:
yield x, y
# a mergable range set type
class RangeSet:
def __init__(self, ranges=None):
self._ranges = []
if ranges is not None:
# using add here makes sure all ranges are merged/sorted
# correctly
for r in ranges:
self.add(r)
def __repr__(self):
return 'RangeSet(%r)' % self._ranges
def __contains__(self, k):
i = bisect.bisect(self._ranges, k,
key=lambda r: r.start) - 1
if i > -1:
return k in self._ranges[i]
else:
return False
def __bool__(self):
return bool(self._ranges)
def ranges(self):
yield from self._ranges
def __iter__(self):
for r in self._ranges:
yield from r
def add(self, r):
assert isinstance(r, range)
# trivial range?
if not r:
return
# find earliest possible merge point
ranges = self._ranges
i = bisect.bisect_left(ranges, r.start,
key=lambda r: r.stop)
# copy ranges < merge
merged = ranges[:i]
# merge ranges and append
while i < len(ranges) and ranges[i].start <= r.stop:
r = range(
min(ranges[i].start, r.start),
max(ranges[i].stop, r.stop))
i += 1
merged.append(r)
# copy ranges > merge
merged.extend(ranges[i:])
self._ranges = merged
def remove(self, r):
assert isinstance(r, range)
# trivial range?
if not r:
return
# find earliest possible carve point
ranges = self._ranges
i = bisect.bisect_left(ranges, r.start,
key=lambda r: r.stop)
# copy ranges < carve
carved = ranges[:i]
# carve overlapping ranges, note this can split ranges
while i < len(ranges) and ranges[i].start <= r.stop:
if ranges[i].start < r.start:
carved.append(range(ranges[i].start, r.start))
if ranges[i].stop > r.stop:
carved.append(range(r.stop, ranges[i].stop))
i += 1
# copy ranges > carve
carved.extend(ranges[i:])
self._ranges = carved
@property
def start(self):
if not self._ranges:
return 0
else:
return self._ranges[0].start
@property
def stop(self):
if not self._ranges:
return 0
else:
return self._ranges[-1].stop
def __len__(self):
return self.stop
def copy(self):
# create a shallow copy
ranges = RangeSet()
ranges._ranges = self._ranges.copy()
return ranges
def __getitem__(self, slice_):
assert isinstance(slice_, slice)
# create a copy
ranges = self.copy()
# just use remove to do the carving, it's good enough probably
if slice_.stop is not None:
ranges.remove(range(slice_.stop, len(self)))
if slice_.start is not None:
ranges.remove(range(0, slice_.start))
ranges._ranges = [range(
r.start - slice_.start,
r.stop - slice_.start)
for r in ranges._ranges]
return ranges
def __ior__(self, other):
for r in other.ranges():
self.add(r)
return self
def __or__(self, other):
ranges = self.copy()
ranges |= other
return ranges
# an abstract block representation
class TraceBlock:
def __init__(self, block, *,
readed=None, proged=None, erased=None, wear=0,
x=None, y=None, width=None, height=None):
self.block = block
self.readed = readed if readed is not None else RangeSet()
self.proged = proged if proged is not None else RangeSet()
self.erased = erased if erased is not None else RangeSet()
self.wear = wear
self.x = x
self.y = y
self.width = width
self.height = height
def __repr__(self):
return 'TraceBlock(0x%x, x=%s, y=%s, width=%s, height=%s)' % (
self.block,
self.x, self.y, self.width, self.height)
def __eq__(self, other):
return self.block == other.block
def __ne__(self, other):
return self.block != other.block
def __hash__(self):
return hash(self.block)
def __lt__(self, other):
return self.block < other.block
def __le__(self, other):
return self.block <= other.block
def __gt__(self, other):
return self.block > other.block
def __ge__(self, other):
return self.block >= other.block
# align to pixel boundaries
def align(self):
# this extra +0.1 and using points instead of width/height is
# to help minimize rounding errors
x0 = int(self.x+0.1)
y0 = int(self.y+0.1)
x1 = int(self.x+self.width+0.1)
y1 = int(self.y+self.height+0.1)
self.x = x0
self.y = y0
self.width = x1 - x0
self.height = y1 - y0
# generate attrs for punescaping
@ft.cached_property
def attrs(self):
# really the only reasonable attrs are block and wear
return {
'block': self.block,
'wear': self.wear,
}
# some simulated bd operations
def read(self, off, size):
self.readed.add(range(off, off+size))
def prog(self, off, size):
self.proged.add(range(off, off+size))
def erase(self, off, size, *,
wear=0):
self.erased.add(range(off, off+size))
self.wear += wear
def clear(self):
self.readed = RangeSet()
self.proged = RangeSet()
self.erased = RangeSet()
def main(path='-', *,
block_size=None,
block_count=None,
blocks=None,
reads=False,
progs=False,
erases=False,
wear=False,
wear_only=False,
block_cycles=None,
volatile=False,
chars=[],
wear_chars=[],
colors=[],
wear_colors=[],
color='auto',
dots=False,
braille=False,
width=None,
height=None,
block_cols=None,
block_rows=None,
block_ratio=None,
no_header=False,
hilbert=False,
lebesgue=False,
contiguous=False,
to_scale=None,
to_ratio=1/1,
tiny=False,
title=None,
lines=None,
head=False,
cat=False,
coalesce=None,
sleep=None,
keep_open=False,
**args):
# figure out what color should be
if color == 'auto':
color = sys.stdout.isatty()
elif color == 'always':
color = True
else:
color = False
# if not specified default to all ops
if not reads and not progs and not erases and not wear_only:
progs = True
reads = True
erases = True
# wear_only implies only wear
if wear_only:
wear = True
# block_cycles implies wear
if block_cycles is not None:
wear = True
# tiny mode?
if tiny:
if block_ratio is None:
block_ratio = 1
if to_scale is None:
to_scale = 1
no_header = True
if block_ratio is None:
# try to align block_ratio to chars, even in braille/dots
# mode (we can't color sub-chars)
if braille or dots:
block_ratio = 1/2
else:
block_ratio = 1
# what chars/colors/labels to use?
chars_ = []
for char in chars:
if isinstance(char, tuple):
chars_.extend((char[0], c) for c in psplit(char[1]))
else:
chars_.extend(psplit(char))
chars_ = CsvAttr(chars_,
defaults=[True] if braille or dots else CHARS)
wear_chars_ = []
for char in wear_chars:
if isinstance(char, tuple):
wear_chars_.extend((char[0], c) for c in psplit(char[1]))
else:
wear_chars_.extend(psplit(char))
wear_chars_ = CsvAttr(wear_chars_,
defaults=[True] if braille or dots else WEAR_CHARS)
colors_ = CsvAttr(colors, defaults=COLORS)
wear_colors_ = CsvAttr(wear_colors, defaults=WEAR_COLORS)
# is bd geometry specified?
if isinstance(block_size, tuple):
block_size, block_count_ = block_size
if block_count is None:
block_count = block_count_
# keep track of block_size/block_count and block map state
block_size_ = block_size
block_count_ = block_count
bmap = None
# keep track of some extra info
readed = 0
proged = 0
erased = 0
def bmap_init(block_size__, block_count__):
nonlocal block_size_
nonlocal block_count_
nonlocal bmap
nonlocal readed
nonlocal proged
nonlocal erased
# keep track of block_size/block_count
if block_size is None:
block_size_ = block_size__
if block_count is None:
block_count_ = block_count__
# flatten blocks, default to all blocks
blocks_ = list(
range(blocks.start or 0, blocks.stop or block_count_)
if isinstance(blocks, slice)
else range(blocks, blocks+1)
if blocks
else range(block_count_))
# create a new block map?
if bmap is None or volatile:
bmap = {b: TraceBlock(b) for b in blocks_}
readed = 0
proged = 0
erased = 0
# just resize block map
else:
bmap = {b: bmap[b] if b in bmap else TraceBlock(b)
for b in blocks_}
# if we know block_count, go ahead and flatten blocks + create
# a block map, otherwise we need to wait for first bd init
if block_size is not None and block_count is not None:
bmap_init(block_size, block_count)
## trace parser
# precompute trace regexes
init_pattern = re.compile(
'^(?P<file>[^ :]*):(?P<line>[0-9]+):trace:'
'.*?bd_createcfg\('
'\s*(?P<ctx>\w+)'
'(?:'
'block_size=(?P<block_size>\w+)'
'|' 'block_count=(?P<block_count>\w+)'
'|' '.*?' ')*' '\)')
read_pattern = re.compile(
'^(?P<file>[^ :]*):(?P<line>[0-9]+):trace:'
'.*?bd_read\('
'\s*(?P<ctx>\w+)' '\s*,'
'\s*(?P<block>\w+)' '\s*,'
'\s*(?P<off>\w+)' '\s*,'
'\s*(?P<buffer>\w+)' '\s*,'
'\s*(?P<size>\w+)' '\s*\)')
prog_pattern = re.compile(
'^(?P<file>[^ :]*):(?P<line>[0-9]+):trace:'
'.*?bd_prog\('
'\s*(?P<ctx>\w+)' '\s*,'
'\s*(?P<block>\w+)' '\s*,'
'\s*(?P<off>\w+)' '\s*,'
'\s*(?P<buffer>\w+)' '\s*,'
'\s*(?P<size>\w+)' '\s*\)')
erase_pattern = re.compile(
'^(?P<file>[^ :]*):(?P<line>[0-9]+):trace:'
'.*?bd_erase\('
'\s*(?P<ctx>\w+)' '\s*,'
'\s*(?P<block>\w+)'
'(?:\s*\(\s*(?P<size>\w+)\s*\))?' '\s*\)')
sync_pattern = re.compile(
'^(?P<file>[^ :]*):(?P<line>[0-9]+):trace:'
'.*?bd_sync\('
'\s*(?P<ctx>\w+)' '\s*\)')
def trace__(line):
nonlocal readed
nonlocal proged
nonlocal erased
# string searching is much faster than the regex here, this
# actually has a big impact given the sheer quantity of how much
# trace output we have to deal with
if ('trace' not in line
# ignore return trace statements
or '->' in line):
return False
# note we can't do most ops until we know block_count/block_size
# bd init?
if 'bd_createcfg(' in line:
m = init_pattern.match(line)
if not m:
return False
# block_size/block_count missing?
if not m.group('block_size') or not m.group('block_count'):
return False
block_size__ = int(m.group('block_size'), 0)
block_count__ = int(m.group('block_count'), 0)
bmap_init(block_size__, block_count__)
return True
# bd read?
elif reads and bmap is not None and 'bd_read(' in line:
m = read_pattern.match(line)
if not m:
return False
block = int(m.group('block'), 0)
off = int(m.group('off'), 0)
size = int(m.group('size'), 0)
if block not in bmap:
return False
else:
bmap[block].read(off, size)
readed += size
return True
# bd prog?
elif progs and bmap is not None and 'bd_prog(' in line:
m = prog_pattern.match(line)
if not m:
return False
block = int(m.group('block'), 0)
off = int(m.group('off'), 0)
size = int(m.group('size'), 0)
if block not in bmap:
return False
else:
bmap[block].prog(off, size)
proged += size
return True
# bd erase?
elif (erases or wear) and bmap is not None and 'bd_erase(' in line:
m = erase_pattern.match(line)
if not m:
return False
block = int(m.group('block'), 0)
if block not in bmap:
return False
else:
bmap[block].erase(0, block_size_,
wear=+1 if wear else 0)
erased += block_count_
return True
else:
return False
## bmap renderer
# these curves are expensive to calculate, so memoize these
if hilbert:
curve = ft.lru_cache(16)(lambda w, h: list(hilbert_curve(w, h)))
elif lebesgue:
curve = ft.lru_cache(16)(lambda w, h: list(lebesgue_curve(w, h)))
else:
curve = ft.lru_cache(16)(lambda w, h: list(naive_curve(w, h)))
def draw__(ring, width, height):
nonlocal bmap
# still waiting on bd init
if bmap is None:
return
# compute total ops
total = readed + proged + erased
# if we're showing wear, find min/max/avg/etc
if wear:
wear_min = min(b.wear for b in bmap.values())
wear_max = max(b.wear for b in bmap.values())
wear_avg = (sum(b.wear for b in bmap.values())
/ max(len(bmap), 1))
wear_stddev = mt.sqrt(
sum((b.wear - wear_avg)**2 for b in bmap.values())
/ max(len(bmap), 1))
# if block_cycles isn't provided or is zero, scale based on
# max wear
if block_cycles:
block_cycles_ = block_cycles
else:
block_cycles_ = wear_max
# build a title
if title:
title_ = punescape(title, {
'geometry': '%sx%s' % (block_size_, block_count_),
'block_size': block_size_,
'block_count': block_count_,
'total': total,
'read': readed,
'read_percent': 100*readed / max(total, 1),
'prog': proged,
'prog_percent': 100*proged / max(total, 1),
'erase': erased,
'erase_percent': 100*erased / max(total, 1),
'wear_min': wear_min if wear else '?',
'wear_min_percent':
100*wear_min / max(block_cycles_, 1) if wear else '?',
'wear_max': wear_max if wear else '?',
'wear_max_percent':
100*wear_max / max(block_cycles_, 1) if wear else '?',
'wear_avg': wear_avg if wear else '?',
'wear_avg_percent':
100*wear_avg / max(block_cycles_, 1) if wear else '?',
'wear_stddev': wear_stddev if wear else '?',
'wear_stddev_percent':
100*wear_stddev / max(block_cycles_, 1) if wear else '?',
})
else:
title_ = ('bd %dx%d%s%s%s%s' % (
block_size_, block_count_,
', %s read' % ('%.1f%%' % (100*readed / max(total, 1)))
if reads else '',
', %s prog' % ('%.1f%%' % (100*proged / max(total, 1)))
if progs else '',
', %s erase' % ('%.1f%%' % (100*erased / max(total, 1)))
if erases else '',
', %s wear' % (
'%.1f%% +-%.1fσ' % (
100*wear_avg / max(block_cycles_, 1),
100*wear_stddev / max(block_cycles_, 1)))
if wear else ''))
# give ring a writeln function
def writeln(self, s=''):
self.write(s)
self.write('\n')
ring.writeln = writeln.__get__(ring)
# figure out width/height
if width is None:
width_ = min(80, shutil.get_terminal_size((80, 5))[0])
elif width > 0:
width_ = width
else:
width_ = max(0, shutil.get_terminal_size((80, 5))[0] + width)
if height is None:
height_ = 2 if not no_header else 1
elif height > 0:
height_ = height
else:
height_ = max(0, shutil.get_terminal_size((80, 5))[1] + height)
# scale width/height if requested
if (to_scale is not None
and (width is None or height is None)):
# don't include header in scale
width__ = width_
height__ = height_ - (1 if not no_header else 0)
# scale width only
if height is not None:
width__ = mt.ceil((len(bmap) * to_scale) / max(height__, 1))
# scale height only
elif width is not None:
height__ = mt.ceil((len(bmap) * to_scale) / max(width__, 1))
# scale based on aspect-ratio
else:
width__ = mt.ceil(mt.sqrt(len(bmap) * to_scale * to_ratio))
height__ = mt.ceil((len(bmap) * to_scale) / max(width__, 1))
width_ = width__
height_ = height__ + (1 if not no_header else 0)
# create a canvas
canvas = Canvas(
width_,
height_ - (1 if not no_header else 0),
color=color,
dots=dots,
braille=braille)
# if contiguous, compute the global curve
if contiguous:
global_block = min(bmap.keys(), default=0)
global_curve = list(curve(canvas.width, canvas.height))
# if blocky, figure out block sizes/locations
else:
# figure out block_cols_/block_rows_
if block_cols is not None and block_rows is not None:
block_cols_ = block_cols
block_rows_ = block_rows
elif block_rows is not None:
block_cols_ = mt.ceil(len(bmap) / block_rows)
block_rows_ = block_rows
elif block_cols is not None:
block_cols_ = block_cols
block_rows_ = mt.ceil(len(bmap) / block_cols)
else:
# divide by 2 until we hit our target ratio, this works
# well for things that are often powers-of-two
block_cols_ = 1
block_rows_ = len(bmap)
while (abs(((canvas.width/(block_cols_*2))
/ max(canvas.height/mt.ceil(block_rows_/2), 1))
- block_ratio)
< abs(((canvas.width/block_cols_)
/ max(canvas.height/block_rows_, 1)))
- block_ratio):
block_cols_ *= 2
block_rows_ = mt.ceil(block_rows_ / 2)
block_width_ = canvas.width / block_cols_
block_height_ = canvas.height / block_rows_
# assign block locations based on block_rows_/block_cols_ and
# the requested space filling curve
for (x, y), b in zip(
curve(block_cols_, block_rows_),
sorted(bmap.values())):
b.x = x * block_width_
b.y = y * block_height_
b.width = block_width_
b.height = block_height_
# align to pixel boundaries
b.align()
# bump up to at least one pixel for every block, dont't
# worry about out-of-bounds, Canvas handles this for us
b.width = max(b.width, 1)
b.height = max(b.height, 1)
# assign chars based on op + block
for b in bmap.values():
b.chars = {}
for op in ((['read'] if reads else [])
+ (['prog'] if progs else [])
+ (['erase'] if erases else [])
+ ['noop']):
char__ = chars_.get((b.block, (op, '0x%x' % b.block)))
if char__ is not None:
if isinstance(char__, str):
# don't punescape unless we have to
if '%' in char__:
char__ = punescape(char__, b.attrs)
char__ = char__[0] # limit to 1 char
b.chars[op] = char__
# assign colors based on op + block
for b in bmap.values():
b.colors = {}
for op in ((['read'] if reads else [])
+ (['prog'] if progs else [])
+ (['erase'] if erases else [])
+ ['noop']):
color__ = colors_.get((b.block, (op, '0x%x' % b.block)))
if color__ is not None:
# don't punescape unless we have to
if '%' in color__:
color__ = punescape(color__, b.attrs)
b.colors[op] = color__
# assign wear chars based on block
if wear:
for b in bmap.values():
b.wear_chars = []
for char__ in wear_chars_.getall((b.block, '0x%x' % b.block)):
if isinstance(char__, str):
# don't punescape unless we have to
if '%' in char__:
char__ = punescape(char__, b.attrs)
char__ = char__[0] # limit to 1 char
b.wear_chars.append(char__)
# assign wear colors based on block
if wear:
for b in bmap.values():
b.wear_colors = []
for color__ in wear_colors_.getall((b.block, '0x%x' % b.block)):
# don't punescape unless we have to
if '%' in color__:
color__ = punescape(color__, b.attrs)
b.wear_colors.append(color__)
# render to canvas in a specific z-order that prioritizes
# interesting ops
for op in reversed(Z_ORDER):
# don't render noops in braille/dots mode
if (braille or dots) and op == 'noop':
continue
# skip ops we're not interested in
if ((not reads and op == 'read')
or (not progs and op == 'prog')
or (not erases and op == 'erase')
or (not wear and op == 'wear')):
continue
for b in bmap.values():
if op == 'read':
ranges__ = b.readed
char__ = b.chars['read']
color__ = b.colors['read']
elif op == 'prog':
ranges__ = b.proged
char__ = b.chars['prog']
color__ = b.colors['prog']
elif op == 'erase':
ranges__ = b.erased
char__ = b.chars['erase']
color__ = b.colors['erase']
elif op == 'wear':
# _no_ wear?
if b.wear == 0:
continue
ranges__ = RangeSet([range(block_size_)])
# scale char/color based on either block_cycles
# or wear_avg
if block_cycles:
wear__ = min(b.wear / max(block_cycles, 1), 1.0)
else:
wear__ = min(b.wear / max(2*wear_avg, 1), 1.0)
char__ = b.wear_chars[int(wear__*(len(b.wear_chars)-1))]
color__ = b.wear_colors[int(wear__*(len(b.wear_colors)-1))]
else:
ranges__ = RangeSet([range(block_size_)])
char__ = b.chars['noop']
color__ = b.colors['noop']
if not ranges__:
continue
# contiguous?
if contiguous:
for range__ in ranges__.ranges():
# where are we in the curve?
block__ = b.block - global_block
range__ = range(
mt.floor(((block__*block_size_ + range__.start)
/ (block_size_ * len(bmap)))
* len(global_curve)),
mt.ceil(((block__*block_size_ + range__.stop)
/ (block_size_ * len(bmap)))
* len(global_curve)))
# map to global curve
for i in range__:
if i >= len(global_curve):
continue
x__, y__ = global_curve[i]
# flip y
y__ = canvas.height - (y__+1)
canvas.point(x__, y__,
char=char__,
color=color__)
# blocky?
else:
x__ = b.x
y__ = b.y
width__ = b.width
height__ = b.height
# flip y
y__ = canvas.height - (y__+height__)
for range__ in ranges__.ranges():
# scale from bytes -> pixels
range__ = range(
mt.floor((range__.start/block_size_)
* (width__*height__)),
mt.ceil((range__.stop/block_size_)
* (width__*height__)))
# map to in-block curve
for i, (dx, dy) in enumerate(curve(width__, height__)):
if i in range__:
# flip y
canvas.point(x__+dx, y__+(height__-(dy+1)),
char=char__,
color=color__)
# print some summary info
if not no_header:
ring.writeln(title_)
# draw canvas
for row in range(canvas.height//canvas.yscale):
line = canvas.draw(row)
ring.writeln(line)
# clear bmap
for b in bmap.values():
b.clear()
## main loop
lock = th.Lock()
event = th.Event()
def main_():
try:
while True:
with openio(path) as f:
count = 0
for line in f:
with lock:
count += trace__(line)
# always redraw if we're sleeping, otherwise
# wait for coalesce number of operations
if sleep is not None or count >= (coalesce or 1):
event.set()
count = 0
if not keep_open:
break
# don't just flood open calls
time.sleep(sleep or 2)
except FileNotFoundError as e:
print("error: file not found %r" % path,
file=sys.stderr)
sys.exit(-1)
except KeyboardInterrupt:
pass
# keep track of history if lines specified
if lines is not None:
ring = RingIO(lines+1
if not no_header and lines > 0
else lines)
def draw_():
# cat? write directly to stdout
if cat:
draw__(sys.stdout,
width=width,
# make space for shell prompt
height=-1 if height is ... else height)
# not cat? write to a bounded ring
else:
ring_ = RingIO(head=head)
draw__(ring_,
width=width,
height=0 if height is ... else height)
# no history? draw immediately
if lines is None:
ring_.draw()
# history? merge with previous lines
else:
# write header separately?
if not no_header:
if not ring.lines:
ring.lines.append('')
ring.lines.extend(it.islice(ring_.lines, 1, None))
ring.lines[0] = ring_.lines[0]
else:
ring.lines.extend(ring_.lines)
ring.draw()
# print in a background thread
done = False
def background():
while not done:
event.wait()
event.clear()
with lock:
draw_()
# sleep a minimum amount of time to avoid flickering
time.sleep(sleep or 0.01)
th.Thread(target=background, daemon=True).start()
main_()
done = True
lock.acquire() # avoids https://bugs.python.org/issue42717
if not cat:
# give ourselves one last draw, helps if background is
# never triggered
draw_()
sys.stdout.write('\n')
if __name__ == "__main__":
import sys
import argparse
parser = argparse.ArgumentParser(
description="Render operations on block devices based on "
"trace output.",
allow_abbrev=False)
parser.add_argument(
'path',
nargs='?',
help="Path to read from.")
parser.add_argument(
'-b', '--block-size',
type=bdgeom,
help="Block size/geometry in bytes. Accepts <size>x<count>.")
parser.add_argument(
'--block-count',
type=lambda x: int(x, 0),
help="Block count in blocks.")
parser.add_argument(
'-@', '--blocks',
type=lambda x: (
slice(*(int(x, 0) if x.strip() else None
for x in x.split(',', 1)))
if ',' in x
else int(x, 0)),
help="Show a specific block, may be a range.")
parser.add_argument(
'--reads',
action='store_true',
help="Render reads.")
parser.add_argument(
'--progs',
action='store_true',
help="Render progs.")
parser.add_argument(
'--erases',
action='store_true',
help="Render erases.")
parser.add_argument(
'--wear',
action='store_true',
help="Render wear.")
parser.add_argument(
'--wear-only',
action='store_true',
help="Only render wear, don't render bd ops. Implies --wear.")
parser.add_argument(
'-w', '--block-cycles',
nargs='?',
type=lambda x: int(x, 0),
const=0,
help="Assumed maximum number of erase cycles when measuring "
"wear. Defaults to the maximum wear on any single block. "
"Implies --wear.")
parser.add_argument(
'--volatile',
action='store_true',
help="Reset wear on block device initialization.")
parser.add_argument(
'-.', '--add-char', '--chars',
dest='chars',
action='append',
type=lambda x: (
lambda ks, v: (
tuple(k.strip() for k in ks.split(',')),
v.strip())
)(*x.split('=', 1))
if '=' in x else x.strip(),
help="Add characters to use. Can be assigned to a specific "
"operation/block. Accepts %% modifiers.")
parser.add_argument(
'-,', '--add-wear-char', '--wear-chars',
dest='wear_chars',
action='append',
type=lambda x: (
lambda ks, v: (
tuple(k.strip() for k in ks.split(',')),
v.strip())
)(*x.split('=', 1))
if '=' in x else x.strip(),
help="Add wear characters to use. Can be assigned to a specific "
"operation/block. Accepts %% modifiers.")
parser.add_argument(
'-C', '--add-color',
dest='colors',
action='append',
type=lambda x: (
lambda ks, v: (
tuple(k.strip() for k in ks.split(',')),
v.strip())
)(*x.split('=', 1))
if '=' in x else x.strip(),
help="Add a color to use. Can be assigned to a specific "
"block type/block. Accepts %% modifiers.")
parser.add_argument(
'-G', '--add-wear-color',
dest='wear_colors',
action='append',
type=lambda x: (
lambda ks, v: (
tuple(k.strip() for k in ks.split(',')),
v.strip())
)(*x.split('=', 1))
if '=' in x else x.strip(),
help="Add a wear color to use. Can be assigned to a specific "
"operation/block. Accepts %% modifiers.")
parser.add_argument(
'--color',
choices=['never', 'always', 'auto'],
default='auto',
help="When to use terminal colors. Defaults to 'auto'.")
parser.add_argument(
'-:', '--dots',
action='store_true',
help="Use 1x2 ascii dot characters.")
parser.add_argument(
'-⣿', '--braille',
action='store_true',
help="Use 2x4 unicode braille characters. Note that braille "
"characters sometimes suffer from inconsistent widths.")
parser.add_argument(
'-W', '--width',
nargs='?',
type=lambda x: int(x, 0),
const=0,
help="Width in columns. <=0 uses the terminal width. Defaults "
"to min(terminal, 80).")
parser.add_argument(
'-H', '--height',
nargs='?',
type=lambda x: int(x, 0),
const=..., # handles shell prompt spacing, which is a bit subtle
help="Height in rows. <=0 uses the terminal height. Defaults "
"to 1.")
parser.add_argument(
'-X', '--block-cols',
type=lambda x: int(x, 0),
help="Number of blocks on the x-axis. Guesses from --block-count "
"and --block-ratio by default.")
parser.add_argument(
'-Y', '--block-rows',
type=lambda x: int(x, 0),
help="Number of blocks on the y-axis. Guesses from --block-count "
"and --block-ratio by default.")
parser.add_argument(
'--block-ratio',
dest='block_ratio',
type=lambda x: (
(lambda a, b: a / b)(*(float(v) for v in x.split(':', 1)))
if ':' in x else float(x)),
help="Target ratio for block sizes. Defaults to 1:1 or 1:2 "
"for -:/--dots and -⣿/--braille.")
parser.add_argument(
'--no-header',
action='store_true',
help="Don't show the header.")
parser.add_argument(
'-U', '--hilbert',
action='store_true',
help="Render as a space-filling Hilbert curve.")
parser.add_argument(
'-Z', '--lebesgue',
action='store_true',
help="Render as a space-filling Z-curve.")
parser.add_argument(
'-u', '--contiguous',
action='store_true',
help="Render as one contiguous curve instead of organizing by "
"blocks first.")
parser.add_argument(
'--to-scale',
nargs='?',
type=lambda x: (
(lambda a, b: a / b)(*(float(v) for v in x.split(':', 1)))
if ':' in x else float(x)),
const=1,
help="Scale the resulting map such that 1 char ~= 1/scale "
"blocks. Defaults to scale=1. ")
parser.add_argument(
'--to-ratio',
type=lambda x: (
(lambda a, b: a / b)(*(float(v) for v in x.split(':', 1)))
if ':' in x else float(x)),
help="Aspect ratio to use with --to-scale. Defaults to 1:1.")
parser.add_argument(
'-t', '--tiny',
action='store_true',
help="Tiny mode, alias for --block-ratio=1, --to-scale=1, "
"and --no-header.")
parser.add_argument(
'--title',
help="Add a title. Accepts %% modifiers.")
parser.add_argument(
'-n', '--lines',
nargs='?',
type=lambda x: int(x, 0),
const=0,
help="Show this many lines of history. <=0 uses the terminal "
"height. Defaults to 1.")
parser.add_argument(
'-^', '--head',
action='store_true',
help="Show the first n lines.")
parser.add_argument(
'-c', '--cat',
action='store_true',
help="Pipe directly to stdout.")
parser.add_argument(
'-+', '--coalesce',
type=lambda x: int(x, 0),
help="Number of operations to coalesce together.")
parser.add_argument(
'-~', '--sleep',
type=float,
help="Seconds to sleep between draws, coalescing operations "
"in between.")
parser.add_argument(
'-k', '--keep-open',
action='store_true',
help="Reopen the pipe on EOF, useful when multiple "
"processes are writing.")
sys.exit(main(**{k: v
for k, v in vars(parser.parse_intermixed_args()).items()
if v is not None}))