Files
littlefs/scripts/tracebd.py
Christopher Haster 46b78de500 Tweaked tracebd.py in a couple of ways, adopted bdgeom/--off/-n
- Tried to do the rescaling a bit better with truncating divisions, so
  there shouldn't be weird cross-pixel updates when things aren't well
  aligned.

- Adopted optional -B<block_size>x<block_count> flag for explicitly
  specifying the block-device geometry in a way that is compatible with
  other scripts. Should adopt this more places.

- Adopted optional <block>.<off> argument for start of range. This
  should match dbgblock.py.

- Adopted '-' for noop/zero-wear.

- Renamed a few internal things.

- Dropped subscript chars for wear, this didn't really add anything and
  can be accomplished by specifying the --wear-chars explicitly.

Also changed dbgblock.py to match, this mostly affects the --off/-n/--size
flags. For example, these are all the same:

  ./scripts/dbgblock.py disk -B4096 --off=10 --size=5
  ./scripts/dbgblock.py disk -B4096 --off=10 -n5
  ./scripts/dbgblock.py disk -B4096 --off=10,15
  ./scripts/dbgblock.py disk -B4096 -n10,15
  ./scripts/dbgblock.py disk -B4096 0.10 -n5

Also also adopted block-device geometry argument across scripts, where
the -B flag can optionally be a full <block_size>x<block_count> geometry:

  ./scripts/tracebd.py disk -B4096x256

Though this is mostly unused outside of tracebd.py right now. It will be
useful for anything that formats littlefs (littlefs-fuse?) and allowing
the format everywhere is a bit of a nice convenience.
2023-10-30 15:52:20 -05:00

1082 lines
33 KiB
Python
Executable File

#!/usr/bin/env python3
#
# Display operations on block devices based on trace output
#
# Example:
# ./scripts/tracebd.py trace
#
# Copyright (c) 2022, The littlefs authors.
# SPDX-License-Identifier: BSD-3-Clause
#
import collections as co
import functools as ft
import io
import itertools as it
import math as m
import os
import re
import shutil
import threading as th
import time
CHARS = 'rpe-'
COLORS = ['32', '35', '34', '']
WEAR_CHARS = '-123456789'
WEAR_COLORS = ['', '', '', '', '', '', '', '35', '35', '1;31']
CHARS_DOTS = " .':"
CHARS_BRAILLE = (
'⠀⢀⡀⣀⠠⢠⡠⣠⠄⢄⡄⣄⠤⢤⡤⣤' '⠐⢐⡐⣐⠰⢰⡰⣰⠔⢔⡔⣔⠴⢴⡴⣴'
'⠂⢂⡂⣂⠢⢢⡢⣢⠆⢆⡆⣆⠦⢦⡦⣦' '⠒⢒⡒⣒⠲⢲⡲⣲⠖⢖⡖⣖⠶⢶⡶⣶'
'⠈⢈⡈⣈⠨⢨⡨⣨⠌⢌⡌⣌⠬⢬⡬⣬' '⠘⢘⡘⣘⠸⢸⡸⣸⠜⢜⡜⣜⠼⢼⡼⣼'
'⠊⢊⡊⣊⠪⢪⡪⣪⠎⢎⡎⣎⠮⢮⡮⣮' '⠚⢚⡚⣚⠺⢺⡺⣺⠞⢞⡞⣞⠾⢾⡾⣾'
'⠁⢁⡁⣁⠡⢡⡡⣡⠅⢅⡅⣅⠥⢥⡥⣥' '⠑⢑⡑⣑⠱⢱⡱⣱⠕⢕⡕⣕⠵⢵⡵⣵'
'⠃⢃⡃⣃⠣⢣⡣⣣⠇⢇⡇⣇⠧⢧⡧⣧' '⠓⢓⡓⣓⠳⢳⡳⣳⠗⢗⡗⣗⠷⢷⡷⣷'
'⠉⢉⡉⣉⠩⢩⡩⣩⠍⢍⡍⣍⠭⢭⡭⣭' '⠙⢙⡙⣙⠹⢹⡹⣹⠝⢝⡝⣝⠽⢽⡽⣽'
'⠋⢋⡋⣋⠫⢫⡫⣫⠏⢏⡏⣏⠯⢯⡯⣯' '⠛⢛⡛⣛⠻⢻⡻⣻⠟⢟⡟⣟⠿⢿⡿⣿')
def openio(path, mode='r', buffering=-1):
# allow '-' for stdin/stdout
if path == '-':
if 'r' in mode:
return os.fdopen(os.dup(sys.stdin.fileno()), mode, buffering)
else:
return os.fdopen(os.dup(sys.stdout.fileno()), mode, buffering)
else:
return open(path, mode, buffering)
# some ways of block geometry representations
# 512 -> 512
# 512x16 -> (512, 16)
# 0x200x10 -> (512, 16)
def bdgeom(s):
s = s.strip()
b = 10
if s.startswith('0x') or s.startswith('0X'):
s = s[2:]
b = 16
elif s.startswith('0o') or s.startswith('0O'):
s = s[2:]
b = 8
elif s.startswith('0b') or s.startswith('0B'):
s = s[2:]
b = 2
if 'x' in s:
s, s_ = s.split('x', 1)
return (int(s, b), int(s_, b))
else:
return int(s, b)
# parse some rbyd addr encodings
# 0xa -> [0xa]
# 0xa.c -> [(0xa, 0xc)]
# 0x{a,b} -> [0xa, 0xb]
# 0x{a,b}.c -> [(0xa, 0xc), (0xb, 0xc)]
def rbydaddr(s):
s = s.strip()
b = 10
if s.startswith('0x') or s.startswith('0X'):
s = s[2:]
b = 16
elif s.startswith('0o') or s.startswith('0O'):
s = s[2:]
b = 8
elif s.startswith('0b') or s.startswith('0B'):
s = s[2:]
b = 2
trunk = None
if '.' in s:
s, s_ = s.split('.', 1)
trunk = int(s_, b)
if s.startswith('{') and '}' in s:
ss = s[1:s.find('}')].split(',')
else:
ss = [s]
addr = []
for s in ss:
if trunk is not None:
addr.append((int(s, b), trunk))
else:
addr.append(int(s, b))
return addr
class LinesIO:
def __init__(self, maxlen=None):
self.maxlen = maxlen
self.lines = co.deque(maxlen=maxlen)
self.tail = io.StringIO()
# trigger automatic sizing
if maxlen == 0:
self.resize(0)
def write(self, s):
# note using split here ensures the trailing string has no newline
lines = s.split('\n')
if len(lines) > 1 and self.tail.getvalue():
self.tail.write(lines[0])
lines[0] = self.tail.getvalue()
self.tail = io.StringIO()
self.lines.extend(lines[:-1])
if lines[-1]:
self.tail.write(lines[-1])
def resize(self, maxlen):
self.maxlen = maxlen
if maxlen == 0:
maxlen = shutil.get_terminal_size((80, 5))[1]
if maxlen != self.lines.maxlen:
self.lines = co.deque(self.lines, maxlen=maxlen)
canvas_lines = 1
def draw(self):
# did terminal size change?
if self.maxlen == 0:
self.resize(0)
# first thing first, give ourself a canvas
while LinesIO.canvas_lines < len(self.lines):
sys.stdout.write('\n')
LinesIO.canvas_lines += 1
# clear the bottom of the canvas if we shrink
shrink = LinesIO.canvas_lines - len(self.lines)
if shrink > 0:
for i in range(shrink):
sys.stdout.write('\r')
if shrink-1-i > 0:
sys.stdout.write('\x1b[%dA' % (shrink-1-i))
sys.stdout.write('\x1b[K')
if shrink-1-i > 0:
sys.stdout.write('\x1b[%dB' % (shrink-1-i))
sys.stdout.write('\x1b[%dA' % shrink)
LinesIO.canvas_lines = len(self.lines)
for i, line in enumerate(self.lines):
# move cursor, clear line, disable/reenable line wrapping
sys.stdout.write('\r')
if len(self.lines)-1-i > 0:
sys.stdout.write('\x1b[%dA' % (len(self.lines)-1-i))
sys.stdout.write('\x1b[K')
sys.stdout.write('\x1b[?7l')
sys.stdout.write(line)
sys.stdout.write('\x1b[?7h')
if len(self.lines)-1-i > 0:
sys.stdout.write('\x1b[%dB' % (len(self.lines)-1-i))
sys.stdout.flush()
# space filling Hilbert-curve
#
# note we memoize the last curve since this is a bit expensive
#
@ft.lru_cache(1)
def hilbert_curve(width, height):
# based on generalized Hilbert curves:
# https://github.com/jakubcerveny/gilbert
#
def hilbert_(x, y, a_x, a_y, b_x, b_y):
w = abs(a_x+a_y)
h = abs(b_x+b_y)
a_dx = -1 if a_x < 0 else +1 if a_x > 0 else 0
a_dy = -1 if a_y < 0 else +1 if a_y > 0 else 0
b_dx = -1 if b_x < 0 else +1 if b_x > 0 else 0
b_dy = -1 if b_y < 0 else +1 if b_y > 0 else 0
# trivial row
if h == 1:
for _ in range(w):
yield (x,y)
x, y = x+a_dx, y+a_dy
return
# trivial column
if w == 1:
for _ in range(h):
yield (x,y)
x, y = x+b_dx, y+b_dy
return
a_x_, a_y_ = a_x//2, a_y//2
b_x_, b_y_ = b_x//2, b_y//2
w_ = abs(a_x_+a_y_)
h_ = abs(b_x_+b_y_)
if 2*w > 3*h:
# prefer even steps
if w_ % 2 != 0 and w > 2:
a_x_, a_y_ = a_x_+a_dx, a_y_+a_dy
# split in two
yield from hilbert_(x, y, a_x_, a_y_, b_x, b_y)
yield from hilbert_(x+a_x_, y+a_y_, a_x-a_x_, a_y-a_y_, b_x, b_y)
else:
# prefer even steps
if h_ % 2 != 0 and h > 2:
b_x_, b_y_ = b_x_+b_dx, b_y_+b_dy
# split in three
yield from hilbert_(x, y, b_x_, b_y_, a_x_, a_y_)
yield from hilbert_(x+b_x_, y+b_y_, a_x, a_y, b_x-b_x_, b_y-b_y_)
yield from hilbert_(
x+(a_x-a_dx)+(b_x_-b_dx), y+(a_y-a_dy)+(b_y_-b_dy),
-b_x_, -b_y_, -(a_x-a_x_), -(a_y-a_y_))
if width >= height:
curve = hilbert_(0, 0, +width, 0, 0, +height)
else:
curve = hilbert_(0, 0, 0, +height, +width, 0)
return list(curve)
# space filling Z-curve/Lebesgue-curve
#
# note we memoize the last curve since this is a bit expensive
#
@ft.lru_cache(1)
def lebesgue_curve(width, height):
# we create a truncated Z-curve by simply filtering out the points
# that are outside our region
curve = []
for i in range(2**(2*m.ceil(m.log2(max(width, height))))):
# we just operate on binary strings here because it's easier
b = '{:0{}b}'.format(i, 2*m.ceil(m.log2(i+1)/2))
x = int(b[1::2], 2) if b[1::2] else 0
y = int(b[0::2], 2) if b[0::2] else 0
if x < width and y < height:
curve.append((x, y))
return curve
class Pixel(int):
__slots__ = ()
def __new__(cls, state=0, *,
wear=0,
readed=False,
proged=False,
erased=False):
return super().__new__(cls,
state
| (wear << 3)
| (1 if readed else 0)
| (2 if proged else 0)
| (4 if erased else 0))
@property
def wear(self):
return self >> 3
@property
def readed(self):
return (self & 1) != 0
@property
def proged(self):
return (self & 2) != 0
@property
def erased(self):
return (self & 4) != 0
def read(self):
return Pixel(int(self) | 1)
def prog(self):
return Pixel(int(self) | 2)
def erase(self):
return Pixel((int(self) | 4) + 8)
def clear(self):
return Pixel(int(self) & ~7)
def __or__(self, other):
return Pixel(
(int(self) | int(other)) & 7,
wear=max(self.wear, other.wear))
def worn(self, max_wear, *,
block_cycles=None,
wear_chars=None,
**_):
if wear_chars is None:
wear_chars = WEAR_CHARS
if block_cycles:
return self.wear / block_cycles
else:
return self.wear / max(max_wear, len(wear_chars))
def draw(self, max_wear, char=None, *,
read=True,
prog=True,
erase=True,
wear=False,
block_cycles=None,
color=True,
dots=False,
braille=False,
chars=None,
wear_chars=None,
colors=None,
wear_colors=None,
**_):
# fallback to default chars/colors
if chars is None:
chars = CHARS
if len(chars) < len(CHARS):
chars = chars + CHARS[len(chars):]
if colors is None:
colors = COLORS
if len(colors) < len(COLORS):
colors = colors + COLORS[len(colors):]
if wear_chars is None:
wear_chars = WEAR_CHARS
if wear_colors is None:
wear_colors = WEAR_COLORS
# compute char/color
c = chars[3]
f = [colors[3]]
if wear:
w = min(
self.worn(
max_wear,
block_cycles=block_cycles,
wear_chars=wear_chars),
1)
c = wear_chars[int(w * (len(wear_chars)-1))]
f.append(wear_colors[int(w * (len(wear_colors)-1))])
if prog and self.proged:
c = chars[1]
f.append(colors[1])
elif erase and self.erased:
c = chars[2]
f.append(colors[2])
elif read and self.readed:
c = chars[0]
f.append(colors[0])
# override char?
if char:
c = char
# apply colors
if f and color:
c = '%s%s\x1b[m' % (
''.join('\x1b[%sm' % f_ for f_ in f),
c)
return c
class Bd:
def __init__(self, *,
block_size=1,
block_count=1,
width=None,
height=1,
pixels=None):
if width is None:
width = block_count
if pixels is None:
self.pixels = [Pixel() for _ in range(width*height)]
else:
self.pixels = pixels
self.block_size = block_size
self.block_count = block_count
self.width = width
self.height = height
def _op(self, f, block=None, off=None, size=None):
if block is None:
range_ = range(len(self.pixels))
else:
if off is None:
off, size = 0, self.block_size
elif size is None:
off, size = 0, off
# update our geometry?
if off+size > self.block_size or block >= self.block_count:
self.resize(
block_size=max(self.block_size, off+size),
block_count=max(self.block_count, block+1))
# map to our block space
start = block*self.block_size + off
stop = block*self.block_size + off+size
start = ((start*len(self.pixels))
// (self.block_size*self.block_count))
stop = ((stop*len(self.pixels))
// (self.block_size*self.block_count))
stop = max(stop, start+1)
range_ = range(start, stop)
# apply the op
for i in range_:
self.pixels[i] = f(self.pixels[i])
def read(self, block=None, off=None, size=None):
self._op(Pixel.read, block, off, size)
def prog(self, block=None, off=None, size=None):
self._op(Pixel.prog, block, off, size)
def erase(self, block=None, off=None, size=None):
self._op(Pixel.erase, block, off, size)
def clear(self, block=None, off=None, size=None):
self._op(Pixel.clear, block, off, size)
def copy(self):
return Bd(
pixels=self.pixels.copy(),
block_size=self.block_size,
block_count=self.block_count,
width=self.width,
height=self.height)
def resize(self, *,
block_size=None,
block_count=None,
width=None,
height=None):
block_size = (block_size if block_size is not None
else self.block_size)
block_count = (block_count if block_count is not None
else self.block_count)
width = width if width is not None else self.width
height = height if height is not None else self.height
if (block_size == self.block_size
and block_count == self.block_count
and width == self.width
and height == self.height):
return
# transform our pixels
pixels = []
for x in range(width*height):
# map into our old bd space
start = (x*(block_size*block_count)) // (width*height)
stop = ((x+1)*(block_size*block_count)) // (width*height)
stop = max(stop, start+1)
# aggregate state
pixels.append(ft.reduce(
Pixel.__or__,
self.pixels[start:stop],
Pixel()))
self.block_size = block_size
self.block_count = block_count
self.width = width
self.height = height
self.pixels = pixels
def draw(self, row, *,
read=False,
prog=False,
erase=False,
wear=False,
hilbert=False,
lebesgue=False,
dots=False,
braille=False,
**args):
# find max wear?
max_wear = None
if wear:
max_wear = max(p.wear for p in self.pixels)
# fold via a curve?
if hilbert:
grid = [None]*(self.width*self.height)
for (x,y), p in zip(
hilbert_curve(self.width, self.height),
self.pixels):
grid[x + y*self.width] = p
elif lebesgue:
grid = [None]*(self.width*self.height)
for (x,y), p in zip(
lebesgue_curve(self.width, self.height),
self.pixels):
grid[x + y*self.width] = p
else:
grid = self.pixels
# need to wait for more trace output before rendering
#
# this is sort of a hack that knows the output is going to a terminal
if (braille and self.height < 4) or (dots and self.height < 2):
needed_height = 4 if braille else 2
self.history = getattr(self, 'history', [])
self.history.append(grid)
if len(self.history)*self.height < needed_height:
# skip for now
return None
grid = list(it.chain.from_iterable(
# did we resize?
it.islice(it.chain(h, it.repeat(Pixel())),
self.width*self.height)
for h in self.history))
self.history = []
line = []
if braille:
# encode into a byte
for x in range(0, self.width, 2):
byte_p = 0
best_p = Pixel()
for i in range(2*4):
p = grid[x+(2-1-(i%2)) + ((row*4)+(4-1-(i//2)))*self.width]
best_p |= p
if ((read and p.readed)
or (prog and p.proged)
or (erase and p.erased)
or (not read and not prog and not erase
and wear and p.worn(max_wear, **args) >= 0.7)):
byte_p |= 1 << i
line.append(best_p.draw(
max_wear,
CHARS_BRAILLE[byte_p],
braille=True,
read=read,
prog=prog,
erase=erase,
wear=wear,
**args))
elif dots:
# encode into a byte
for x in range(self.width):
byte_p = 0
best_p = Pixel()
for i in range(2):
p = grid[x + ((row*2)+(2-1-i))*self.width]
best_p |= p
if ((read and p.readed)
or (prog and p.proged)
or (erase and p.erased)
or (not read and not prog and not erase
and wear and p.worn(max_wear, **args) >= 0.7)):
byte_p |= 1 << i
line.append(best_p.draw(
max_wear,
CHARS_DOTS[byte_p],
dots=True,
read=read,
prog=prog,
erase=erase,
wear=wear,
**args))
else:
for x in range(self.width):
line.append(grid[x + row*self.width].draw(
max_wear,
read=read,
prog=prog,
erase=erase,
wear=wear,
**args))
return ''.join(line)
def main(path='-', block=None, *,
off=None,
size=None,
block_size=None,
block_count=None,
block_cycles=None,
read=False,
prog=False,
erase=False,
wear=False,
reset=False,
color='auto',
dots=False,
braille=False,
width=None,
height=None,
lines=None,
cat=False,
hilbert=False,
lebesgue=False,
coalesce=None,
sleep=None,
keep_open=False,
**args):
# figure out what color should be
if color == 'auto':
color = sys.stdout.isatty()
elif color == 'always':
color = True
else:
color = False
# exclusive wear or read/prog/erase by default
if not read and not prog and not erase and not wear:
read = True
prog = True
erase = True
# assume a reasonable lines/height if not specified
#
# note that we let height = None if neither hilbert or lebesgue
# are specified, this is a bit special as the default may be less
# than one character in height.
if height is None and (hilbert or lebesgue):
if lines is not None:
height = lines
else:
height = 5
if lines is None:
if height is not None:
lines = height
else:
lines = 5
# is bd geometry specified?
if isinstance(block_size, tuple):
block_size, block_count_ = block_size
if block_count is None:
block_count = block_count_
# allow ranges for blocks/offs
if not isinstance(block, tuple):
block = (block,)
if any(isinstance(block, list) and len(block) > 1 for block in block):
print("error: More than one block address?")
sys.exit(-1)
block = tuple(
block[0] if isinstance(block, list) else block
for block in block)
if not isinstance(off, tuple):
off = (off,)
if not isinstance(size, tuple):
size = (size,)
block_start = (
block[0][0] if isinstance(block[0], tuple)
else block[0] if block[0] is not None
else 0)
block_stop = (
block[1][0] if len(block) > 1 and isinstance(block[1], tuple)
else block[1] if len(block) > 1 and block[1] is not None
else block_start+1 if len(block) == 1 and block[0] is not None
else block_count)
off_start = (
off[0] if off[0] is not None
else block[0][1] if isinstance(block[0], tuple)
else block[1][1] if len(block) > 1 and isinstance(block[1], tuple)
else size[0] if len(size) > 1 and size[0] is not None
else 0)
off_stop = (
off_start + size[0] if len(size) == 1 and size[0] is not None
else off[1] if len(off) > 1 and off[1] is not None
else size[1] if len(size) > 1 and size[1] is not None
else block_size)
# create a block device representation
bd = Bd()
def resize(*, block_size=None, block_count=None):
nonlocal bd
# size may be overriden by cli args
if off_stop is not None:
block_size = off_stop-off_start
if block_stop is not None:
block_count = block_stop-block_start
# figure out best width/height
if width is None:
width_ = min(80, shutil.get_terminal_size((80, 5))[0])
elif width:
width_ = width
else:
width_ = shutil.get_terminal_size((80, 5))[0]
if height is None:
height_ = 0
elif height:
height_ = height
else:
height_ = shutil.get_terminal_size((80, 5))[1]
bd.resize(
block_size=block_size,
block_count=block_count,
# scale if we're printing with dots or braille
width=2*width_ if braille else width_,
height=max(1,
4*height_ if braille
else 2*height_ if dots
else height_))
resize()
# parse a line of trace output
pattern = re.compile(
'^(?P<file>[^:]*):(?P<line>[0-9]+):trace:.*?bd_(?:'
'(?P<create>create\w*)\('
'(?:'
'block_size=(?P<block_size>\w+)'
'|' 'block_count=(?P<block_count>\w+)'
'|' '.*?' ')*' '\)'
'|' '(?P<read>read)\('
'\s*(?P<read_ctx>\w+)' '\s*,'
'\s*(?P<read_block>\w+)' '\s*,'
'\s*(?P<read_off>\w+)' '\s*,'
'\s*(?P<read_buffer>\w+)' '\s*,'
'\s*(?P<read_size>\w+)' '\s*\)'
'|' '(?P<prog>prog)\('
'\s*(?P<prog_ctx>\w+)' '\s*,'
'\s*(?P<prog_block>\w+)' '\s*,'
'\s*(?P<prog_off>\w+)' '\s*,'
'\s*(?P<prog_buffer>\w+)' '\s*,'
'\s*(?P<prog_size>\w+)' '\s*\)'
'|' '(?P<erase>erase)\('
'\s*(?P<erase_ctx>\w+)' '\s*,'
'\s*(?P<erase_block>\w+)'
'\s*\(\s*(?P<erase_size>\w+)\s*\)' '\s*\)'
'|' '(?P<sync>sync)\('
'\s*(?P<sync_ctx>\w+)' '\s*\)' ')\s*$')
def parse(line):
nonlocal bd
# string searching is much faster than the regex here, and this
# actually has a big impact given how much trace output comes
# through here
if 'trace' not in line or 'bd' not in line:
return False
m = pattern.match(line)
if not m:
return False
if m.group('create'):
# update our block size/count
block_size = int(m.group('block_size'), 0)
block_count = int(m.group('block_count'), 0)
resize(block_size=block_size, block_count=block_count)
if reset:
bd = Bd(
block_size=bd.block_size,
block_count=bd.block_count,
width=bd.width,
height=bd.height)
return True
elif m.group('read') and read:
block = int(m.group('read_block'), 0)
off = int(m.group('read_off'), 0)
size = int(m.group('read_size'), 0)
if ((block_stop is not None and block >= block_stop)
or block < block_start
or (off_stop is not None and off >= off_stop)
or off+size <= off_start):
return False
block -= block_start
size = ((min(off+size, off_stop)
if off_stop is not None else off+size)
- max(off, off_start))
bd.read(block, off, size)
return True
elif m.group('prog') and prog:
block = int(m.group('prog_block'), 0)
off = int(m.group('prog_off'), 0)
size = int(m.group('prog_size'), 0)
if ((block_stop is not None and block >= block_stop)
or block < block_start
or (off_stop is not None and off >= off_stop)
or off+size <= off_start):
return False
block -= block_start
size = ((min(off+size, off_stop)
if off_stop is not None else off+size)
- max(off, off_start))
bd.prog(block, off, size)
return True
elif m.group('erase') and (erase or wear):
block = int(m.group('erase_block'), 0)
off = 0
size = int(m.group('erase_size'), 0)
if ((block_stop is not None and block >= block_stop)
or block < block_start
or (off_stop is not None and off >= off_stop)
or off+size <= off_start):
return False
block -= block_start
size = ((min(off+size, off_stop)
if off_stop is not None else off+size)
- max(off, off_start))
bd.erase(block, off, size)
return True
else:
return False
# print trace output
def draw(f):
def writeln(s=''):
f.write(s)
f.write('\n')
f.writeln = writeln
# don't forget we've scaled this for braille/dots!
for row in range(
m.ceil(bd.height/4) if braille
else m.ceil(bd.height/2) if dots
else bd.height):
line = bd.draw(row,
read=read,
prog=prog,
erase=erase,
wear=wear,
block_cycles=block_cycles,
color=color,
dots=dots,
braille=braille,
hilbert=hilbert,
lebesgue=lebesgue,
**args)
if line:
f.writeln(line)
bd.clear()
resize()
# read/parse/coalesce operations
if cat:
ring = sys.stdout
else:
ring = LinesIO(lines)
# if sleep print in background thread to avoid getting stuck in a read call
event = th.Event()
lock = th.Lock()
if sleep:
done = False
def background():
while not done:
event.wait()
event.clear()
with lock:
draw(ring)
if not cat:
ring.draw()
time.sleep(sleep or 0.01)
th.Thread(target=background, daemon=True).start()
try:
while True:
with openio(path) as f:
changed = 0
for line in f:
with lock:
changed += parse(line)
# need to redraw?
if changed and (not coalesce or changed >= coalesce):
if sleep:
event.set()
else:
draw(ring)
if not cat:
ring.draw()
changed = 0
if not keep_open:
break
# don't just flood open calls
time.sleep(sleep or 0.1)
except FileNotFoundError as e:
print("error: file not found %r" % path)
sys.exit(-1)
except KeyboardInterrupt:
pass
if sleep:
done = True
lock.acquire() # avoids https://bugs.python.org/issue42717
if not cat:
sys.stdout.write('\n')
if __name__ == "__main__":
import sys
import argparse
parser = argparse.ArgumentParser(
description="Display operations on block devices based on "
"trace output.",
allow_abbrev=False)
parser.add_argument(
'path',
nargs='?',
help="Path to read from.")
parser.add_argument(
'block',
nargs='?',
type=lambda x: tuple(
rbydaddr(x) if x.strip() else None
for x in x.split(',')),
help="Optional block to show, may be a range.")
parser.add_argument(
'-B', '--block-size',
type=bdgeom,
help="Block size/geometry in bytes.")
parser.add_argument(
'--block-count',
type=lambda x: int(x, 0),
help="Block count in blocks.")
parser.add_argument(
'-C', '--block-cycles',
type=lambda x: int(x, 0),
help="Assumed maximum number of erase cycles when measuring wear.")
parser.add_argument(
'--off',
type=lambda x: tuple(
int(x, 0) if x.strip() else None
for x in x.split(',')),
help="Show a specific offset, may be a range.")
parser.add_argument(
'--size',
type=lambda x: tuple(
int(x, 0) if x.strip() else None
for x in x.split(',')),
help="Show this many bytes, may be a range.")
parser.add_argument(
'-r', '--read',
action='store_true',
help="Render reads.")
parser.add_argument(
'-p', '--prog',
action='store_true',
help="Render progs.")
parser.add_argument(
'-e', '--erase',
action='store_true',
help="Render erases.")
parser.add_argument(
'-w', '--wear',
action='store_true',
help="Render wear.")
parser.add_argument(
'-R', '--reset',
action='store_true',
help="Reset wear on block device initialization.")
parser.add_argument(
'--color',
choices=['never', 'always', 'auto'],
default='auto',
help="When to use terminal colors. Defaults to 'auto'.")
parser.add_argument(
'-:', '--dots',
action='store_true',
help="Use 1x2 ascii dot characters.")
parser.add_argument(
'-⣿', '--braille',
action='store_true',
help="Use 2x4 unicode braille characters. Note that braille characters "
"sometimes suffer from inconsistent widths.")
parser.add_argument(
'--chars',
help="Characters to use for read, prog, erase, noop operations.")
parser.add_argument(
'--wear-chars',
help="Characters to use for showing wear.")
parser.add_argument(
'--colors',
type=lambda x: [x.strip() for x in x.split(',')],
help="Colors to use for read, prog, erase, noop operations.")
parser.add_argument(
'--wear-colors',
type=lambda x: [x.strip() for x in x.split(',')],
help="Colors to use for showing wear.")
parser.add_argument(
'-W', '--width',
nargs='?',
type=lambda x: int(x, 0),
const=0,
help="Width in columns. 0 uses the terminal width. Defaults to "
"min(terminal, 80).")
parser.add_argument(
'-H', '--height',
nargs='?',
type=lambda x: int(x, 0),
const=0,
help="Height in rows. 0 uses the terminal height. Defaults to 1.")
parser.add_argument(
'-n', '--lines',
nargs='?',
type=lambda x: int(x, 0),
const=0,
help="Show this many lines of history. 0 uses the terminal height. "
"Defaults to 5.")
parser.add_argument(
'-z', '--cat',
action='store_true',
help="Pipe directly to stdout.")
parser.add_argument(
'-U', '--hilbert',
action='store_true',
help="Render as a space-filling Hilbert curve.")
parser.add_argument(
'-Z', '--lebesgue',
action='store_true',
help="Render as a space-filling Z-curve.")
parser.add_argument(
'-c', '--coalesce',
type=lambda x: int(x, 0),
help="Number of operations to coalesce together.")
parser.add_argument(
'-s', '--sleep',
type=float,
help="Time in seconds to sleep between reads, coalescing operations.")
parser.add_argument(
'-k', '--keep-open',
action='store_true',
help="Reopen the pipe on EOF, useful when multiple "
"processes are writing.")
sys.exit(main(**{k: v
for k, v in vars(parser.parse_intermixed_args()).items()
if v is not None}))