Files
littlefs/scripts/dbgrbyd.py
Christopher Haster 6d9c077261 Reordered LFSR_TAG_NAMELIMIT/FILELIMIT
Not sure why, but this just seems more intuitive/correct. Maybe because
LFSR_TAG_NAME is always the first tag in a file's attr set:

  LFSR_TAG_NAMELIMIT    0x0039  v--- ---- --11 1--1
  LFSR_TAG_FILELIMIT    0x003a  v--- ---- --11 1-1-

Seeing as several parts of the codebase still use the previous order,
it seems reasonable to switch back to that.

No code changes.
2025-05-24 21:51:06 -05:00

1850 lines
60 KiB
Python
Executable File

#!/usr/bin/env python3
# prevent local imports
if __name__ == "__main__":
__import__('sys').path.pop(0)
import bisect
import collections as co
import functools as ft
import itertools as it
import math as mt
import os
import struct
try:
import crc32c as crc32c_lib
except ModuleNotFoundError:
crc32c_lib = None
COLORS = [
'34', # blue
'31', # red
'32', # green
'35', # purple
'33', # yellow
'36', # cyan
]
TAG_NULL = 0x0000 ## 0x0000 v--- ---- ---- ----
TAG_CONFIG = 0x0000 ## 0x00tt v--- ---- -ttt tttt
TAG_MAGIC = 0x0031 # 0x003r v--- ---- --11 --rr
TAG_VERSION = 0x0034 # 0x0034 v--- ---- --11 -1--
TAG_RCOMPAT = 0x0035 # 0x0035 v--- ---- --11 -1-1
TAG_WCOMPAT = 0x0036 # 0x0036 v--- ---- --11 -11-
TAG_OCOMPAT = 0x0037 # 0x0037 v--- ---- --11 -111
TAG_GEOMETRY = 0x0038 # 0x0038 v--- ---- --11 1---
TAG_NAMELIMIT = 0x0039 # 0x0039 v--- ---- --11 1--1
TAG_FILELIMIT = 0x003a # 0x003a v--- ---- --11 1-1-
TAG_GDELTA = 0x0100 ## 0x01tt v--- ---1 -ttt ttrr
TAG_GRMDELTA = 0x0100 # 0x0100 v--- ---1 ---- ----
TAG_NAME = 0x0200 ## 0x02tt v--- --1- -ttt tttt
TAG_BNAME = 0x0200 # 0x0200 v--- --1- ---- ----
TAG_REG = 0x0201 # 0x0201 v--- --1- ---- ---1
TAG_DIR = 0x0202 # 0x0202 v--- --1- ---- --1-
TAG_STICKYNOTE = 0x0203 # 0x0203 v--- --1- ---- --11
TAG_BOOKMARK = 0x0204 # 0x0204 v--- --1- ---- -1--
TAG_MNAME = 0x0220 # 0x0220 v--- --1- --1- ----
TAG_STRUCT = 0x0300 ## 0x03tt v--- --11 -ttt ttrr
TAG_BRANCH = 0x0300 # 0x030r v--- --11 ---- --rr
TAG_DATA = 0x0304 # 0x0304 v--- --11 ---- -1--
TAG_BLOCK = 0x0308 # 0x0308 v--- --11 ---- 1err
TAG_DID = 0x0314 # 0x0314 v--- --11 ---1 -1--
TAG_BSHRUB = 0x0318 # 0x0318 v--- --11 ---1 1---
TAG_BTREE = 0x031c # 0x031c v--- --11 ---1 11rr
TAG_MROOT = 0x0321 # 0x032r v--- --11 --1- --rr
TAG_MDIR = 0x0325 # 0x0324 v--- --11 --1- -1rr
TAG_MTREE = 0x032c # 0x032c v--- --11 --1- 11rr
TAG_ATTR = 0x0400 ## 0x04aa v--- -1-a -aaa aaaa
TAG_UATTR = 0x0400 # 0x04aa v--- -1-- -aaa aaaa
TAG_SATTR = 0x0500 # 0x05aa v--- -1-1 -aaa aaaa
TAG_SHRUB = 0x1000 ## 0x1kkk v--1 kkkk -kkk kkkk
TAG_ALT = 0x4000 ## 0x4kkk v1cd kkkk -kkk kkkk
TAG_B = 0x0000
TAG_R = 0x2000
TAG_LE = 0x0000
TAG_GT = 0x1000
TAG_CKSUM = 0x3000 ## 0x300p v-11 ---- ---- -pqq
TAG_PHASE = 0x0003
TAG_PERTURB = 0x0004
TAG_NOTE = 0x3100 ## 0x3100 v-11 ---1 ---- ----
TAG_ECKSUM = 0x3200 ## 0x3200 v-11 --1- ---- ----
TAG_GCKSUMDELTA = 0x3300 ## 0x3300 v-11 --11 ---- ----
# some ways of block geometry representations
# 512 -> 512
# 512x16 -> (512, 16)
# 0x200x10 -> (512, 16)
def bdgeom(s):
s = s.strip()
b = 10
if s.startswith('0x') or s.startswith('0X'):
s = s[2:]
b = 16
elif s.startswith('0o') or s.startswith('0O'):
s = s[2:]
b = 8
elif s.startswith('0b') or s.startswith('0B'):
s = s[2:]
b = 2
if 'x' in s:
s, s_ = s.split('x', 1)
return (int(s, b), int(s_, b))
else:
return int(s, b)
# parse some rbyd addr encodings
# 0xa -> (0xa,)
# 0xa.c -> ((0xa, 0xc),)
# 0x{a,b} -> (0xa, 0xb)
# 0x{a,b}.c -> ((0xa, 0xc), (0xb, 0xc))
def rbydaddr(s):
s = s.strip()
b = 10
if s.startswith('0x') or s.startswith('0X'):
s = s[2:]
b = 16
elif s.startswith('0o') or s.startswith('0O'):
s = s[2:]
b = 8
elif s.startswith('0b') or s.startswith('0B'):
s = s[2:]
b = 2
trunk = None
if '.' in s:
s, s_ = s.split('.', 1)
trunk = int(s_, b)
if s.startswith('{') and '}' in s:
ss = s[1:s.find('}')].split(',')
else:
ss = [s]
addr = []
for s in ss:
if trunk is not None:
addr.append((int(s, b), trunk))
else:
addr.append(int(s, b))
return tuple(addr)
def crc32c(data, crc=0):
if crc32c_lib is not None:
return crc32c_lib.crc32c(data, crc)
else:
crc ^= 0xffffffff
for b in data:
crc ^= b
for j in range(8):
crc = (crc >> 1) ^ ((crc & 1) * 0x82f63b78)
return 0xffffffff ^ crc
def popc(x):
return bin(x).count('1')
def parity(x):
return popc(x) & 1
def fromle32(data, j=0):
return struct.unpack('<I', data[j:j+4].ljust(4, b'\0'))[0]
def fromleb128(data, j=0):
word = 0
d = 0
while j+d < len(data):
b = data[j+d]
word |= (b & 0x7f) << 7*d
word &= 0xffffffff
if not b & 0x80:
return word, d+1
d += 1
return word, d
def fromtag(data, j=0):
d = 0
tag = struct.unpack('>H', data[j:j+2].ljust(2, b'\0'))[0]; d += 2
weight, d_ = fromleb128(data, j+d); d += d_
size, d_ = fromleb128(data, j+d); d += d_
return tag>>15, tag&0x7fff, weight, size, d
def xxd(data, width=16):
for i in range(0, len(data), width):
yield '%-*s %-*s' % (
3*width,
' '.join('%02x' % b for b in data[i:i+width]),
width,
''.join(
b if b >= ' ' and b <= '~' else '.'
for b in map(chr, data[i:i+width])))
# human readable tag repr
def tagrepr(tag, weight=None, size=None, *,
global_=False,
toff=None):
# null tags
if (tag & 0x6fff) == TAG_NULL:
return '%snull%s%s' % (
'shrub' if tag & TAG_SHRUB else '',
' w%d' % weight if weight else '',
' %d' % size if size else '')
# config tags
elif (tag & 0x6f00) == TAG_CONFIG:
return '%s%s%s%s' % (
'shrub' if tag & TAG_SHRUB else '',
'magic' if (tag & 0xfff) == TAG_MAGIC
else 'version' if (tag & 0xfff) == TAG_VERSION
else 'rcompat' if (tag & 0xfff) == TAG_RCOMPAT
else 'wcompat' if (tag & 0xfff) == TAG_WCOMPAT
else 'ocompat' if (tag & 0xfff) == TAG_OCOMPAT
else 'geometry' if (tag & 0xfff) == TAG_GEOMETRY
else 'namelimit' if (tag & 0xfff) == TAG_NAMELIMIT
else 'filelimit' if (tag & 0xfff) == TAG_FILELIMIT
else 'config 0x%02x' % (tag & 0xff),
' w%d' % weight if weight else '',
' %s' % size if size is not None else '')
# global-state delta tags
elif (tag & 0x6f00) == TAG_GDELTA:
if global_:
return '%s%s%s%s' % (
'shrub' if tag & TAG_SHRUB else '',
'grm' if (tag & 0xfff) == TAG_GRMDELTA
else 'gstate 0x%02x' % (tag & 0xff),
' w%d' % weight if weight else '',
' %s' % size if size is not None else '')
else:
return '%s%s%s%s' % (
'shrub' if tag & TAG_SHRUB else '',
'grmdelta' if (tag & 0xfff) == TAG_GRMDELTA
else 'gdelta 0x%02x' % (tag & 0xff),
' w%d' % weight if weight else '',
' %s' % size if size is not None else '')
# name tags, includes file types
elif (tag & 0x6f00) == TAG_NAME:
return '%s%s%s%s' % (
'shrub' if tag & TAG_SHRUB else '',
'bname' if (tag & 0xfff) == TAG_BNAME
else 'reg' if (tag & 0xfff) == TAG_REG
else 'dir' if (tag & 0xfff) == TAG_DIR
else 'stickynote' if (tag & 0xfff) == TAG_STICKYNOTE
else 'bookmark' if (tag & 0xfff) == TAG_BOOKMARK
else 'mname' if (tag & 0xfff) == TAG_MNAME
else 'name 0x%02x' % (tag & 0xff),
' w%d' % weight if weight else '',
' %s' % size if size is not None else '')
# structure tags
elif (tag & 0x6f00) == TAG_STRUCT:
return '%s%s%s%s' % (
'shrub' if tag & TAG_SHRUB else '',
'branch' if (tag & 0xfff) == TAG_BRANCH
else 'data' if (tag & 0xfff) == TAG_DATA
else 'block' if (tag & 0xfff) == TAG_BLOCK
else 'did' if (tag & 0xfff) == TAG_DID
else 'bshrub' if (tag & 0xfff) == TAG_BSHRUB
else 'btree' if (tag & 0xfff) == TAG_BTREE
else 'mroot' if (tag & 0xfff) == TAG_MROOT
else 'mdir' if (tag & 0xfff) == TAG_MDIR
else 'mtree' if (tag & 0xfff) == TAG_MTREE
else 'struct 0x%02x' % (tag & 0xff),
' w%d' % weight if weight else '',
' %s' % size if size is not None else '')
# custom attributes
elif (tag & 0x6e00) == TAG_ATTR:
return '%s%sattr 0x%02x%s%s' % (
'shrub' if tag & TAG_SHRUB else '',
's' if tag & 0x100 else 'u',
((tag & 0x100) >> 1) + (tag & 0xff),
' w%d' % weight if weight else '',
' %s' % size if size is not None else '')
# alt pointers
elif tag & TAG_ALT:
return 'alt%s%s 0x%03x%s%s' % (
'r' if tag & TAG_R else 'b',
'gt' if tag & TAG_GT else 'le',
tag & 0x0fff,
' w%d' % weight if weight is not None else '',
' 0x%x' % (0xffffffff & (toff-size))
if size and toff is not None
else ' -%d' % size if size
else '')
# checksum tags
elif (tag & 0x7f00) == TAG_CKSUM:
return 'cksum%s%s%s%s%s' % (
'q%d' % (tag & 0x3),
'p' if tag & TAG_PERTURB else '',
' 0x%02x' % (tag & 0xff) if tag & 0xf8 else '',
' w%d' % weight if weight else '',
' %s' % size if size is not None else '')
# note tags
elif (tag & 0x7f00) == TAG_NOTE:
return 'note%s%s%s' % (
' 0x%02x' % (tag & 0xff) if tag & 0xff else '',
' w%d' % weight if weight else '',
' %s' % size if size is not None else '')
# erased-state checksum tags
elif (tag & 0x7f00) == TAG_ECKSUM:
return 'ecksum%s%s%s' % (
' 0x%02x' % (tag & 0xff) if tag & 0xff else '',
' w%d' % weight if weight else '',
' %s' % size if size is not None else '')
# global-checksum delta tags
elif (tag & 0x7f00) == TAG_GCKSUMDELTA:
if global_:
return 'gcksum%s%s%s' % (
' 0x%02x' % (tag & 0xff) if tag & 0xff else '',
' w%d' % weight if weight else '',
' %s' % size if size is not None else '')
else:
return 'gcksumdelta%s%s%s' % (
' 0x%02x' % (tag & 0xff) if tag & 0xff else '',
' w%d' % weight if weight else '',
' %s' % size if size is not None else '')
# unknown tags
else:
return '0x%04x%s%s' % (
tag,
' w%d' % weight if weight is not None else '',
' %d' % size if size is not None else '')
# a simple wrapper over an open file with bd geometry
class Bd:
def __init__(self, f, block_size=None, block_count=None):
self.f = f
self.block_size = block_size
self.block_count = block_count
def __repr__(self):
return '<%s %s>' % (self.__class__.__name__, self.repr())
def repr(self):
return 'bd %sx%s' % (self.block_size, self.block_count)
def read(self, block, off, size):
self.f.seek(block*self.block_size + off)
return self.f.read(size)
def readblock(self, block):
self.f.seek(block*self.block_size)
return self.f.read(self.block_size)
# tagged data in an rbyd
class Rattr:
def __init__(self, tag, weight, blocks, toff, tdata, data):
self.tag = tag
self.weight = weight
if isinstance(blocks, int):
self.blocks = (blocks,)
else:
self.blocks = blocks
self.toff = toff
self.tdata = tdata
self.data = data
@property
def block(self):
return self.blocks[0]
@property
def tsize(self):
return len(self.tdata)
@property
def off(self):
return self.toff + len(self.tdata)
@property
def size(self):
return len(self.data)
def __bytes__(self):
return self.data
def __repr__(self):
return '<%s %s>' % (self.__class__.__name__, self.repr())
def repr(self):
return tagrepr(self.tag, self.weight, self.size)
def __iter__(self):
return iter((self.tag, self.weight, self.data))
def __eq__(self, other):
return ((self.tag, self.weight, self.data)
== (other.tag, other.weight, other.data))
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash((self.tag, self.weight, self.data))
# convenience for did/name access
def _parse_name(self):
# note we return a null name for non-name tags, this is so
# vestigial names in btree nodes act as a catch-all
if (self.tag & 0xff00) != TAG_NAME:
did = 0
name = b''
else:
did, d = fromleb128(self.data)
name = self.data[d:]
# cache both
self.did = did
self.name = name
@ft.cached_property
def did(self):
self._parse_name()
return self.did
@ft.cached_property
def name(self):
self._parse_name()
return self.name
class Ralt:
def __init__(self, tag, weight, blocks, toff, tdata, jump,
color=None, followed=None):
self.tag = tag
self.weight = weight
if isinstance(blocks, int):
self.blocks = (blocks,)
else:
self.blocks = blocks
self.toff = toff
self.tdata = tdata
self.jump = jump
if color is not None:
self.color = color
else:
self.color = 'r' if tag & TAG_R else 'b'
self.followed = followed
@property
def block(self):
return self.blocks[0]
@property
def tsize(self):
return len(self.tdata)
@property
def off(self):
return self.toff + len(self.tdata)
@property
def joff(self):
return self.toff - self.jump
def __repr__(self):
return '<%s %s>' % (self.__class__.__name__, self.repr())
def repr(self):
return tagrepr(self.tag, self.weight, self.jump, toff=self.toff)
def __iter__(self):
return iter((self.tag, self.weight, self.jump))
def __eq__(self, other):
return ((self.tag, self.weight, self.jump)
== (other.tag, other.weight, other.jump))
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash((self.tag, self.weight, self.jump))
# our core rbyd type
class Rbyd:
def __init__(self, blocks, trunk, weight, rev, eoff, cksum, data, *,
shrub=False,
gcksumdelta=None,
redund=0):
if isinstance(blocks, int):
self.blocks = (blocks,)
else:
self.blocks = blocks
self.trunk = trunk
self.weight = weight
self.rev = rev
self.eoff = eoff
self.cksum = cksum
self.data = data
self.shrub = shrub
self.gcksumdelta = gcksumdelta
self.redund = redund
@property
def block(self):
return self.blocks[0]
@property
def corrupt(self):
# use redund=-1 to indicate corrupt rbyds
return self.redund >= 0
def addr(self):
if len(self.blocks) == 1:
return '0x%x.%x' % (self.block, self.trunk)
else:
return '0x{%s}.%x' % (
','.join('%x' % block for block in self.blocks),
self.trunk)
def __repr__(self):
return '<%s %s>' % (self.__class__.__name__, self.repr())
def repr(self):
return 'rbyd %s w%s' % (self.addr(), self.weight)
def __bool__(self):
# use redund=-1 to indicate corrupt rbyds
return self.redund >= 0
def __eq__(self, other):
return ((frozenset(self.blocks), self.trunk)
== (frozenset(other.blocks), other.trunk))
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash((frozenset(self.blocks), self.trunk))
@classmethod
def _fetch(cls, data, block, trunk=None):
# fetch the rbyd
rev = fromle32(data, 0)
cksum = 0
cksum_ = crc32c(data[0:4])
cksum__ = cksum_
perturb = False
eoff = 0
eoff_ = None
j_ = 4
trunk_ = 0
trunk__ = 0
trunk___ = 0
weight = 0
weight_ = 0
weight__ = 0
gcksumdelta = None
gcksumdelta_ = None
while j_ < len(data) and (not trunk or eoff <= trunk):
# read next tag
v, tag, w, size, d = fromtag(data, j_)
if v != parity(cksum__):
break
cksum__ ^= 0x00000080 if v else 0
cksum__ = crc32c(data[j_:j_+d], cksum__)
j_ += d
if not tag & TAG_ALT and j_ + size > len(data):
break
# take care of cksums
if not tag & TAG_ALT:
if (tag & 0xff00) != TAG_CKSUM:
cksum__ = crc32c(data[j_:j_+size], cksum__)
# found a gcksumdelta?
if (tag & 0xff00) == TAG_GCKSUMDELTA:
gcksumdelta_ = Rattr(tag, w, block, j_-d,
data[j_-d:j_],
data[j_:j_+size])
# found a cksum?
else:
# check cksum
cksum___ = fromle32(data, j_)
if cksum__ != cksum___:
break
# commit what we have
eoff = eoff_ if eoff_ else j_ + size
cksum = cksum_
trunk_ = trunk__
weight = weight_
gcksumdelta = gcksumdelta_
gcksumdelta_ = None
# update perturb bit
perturb = bool(tag & TAG_PERTURB)
# revert to data cksum and perturb
cksum__ = cksum_ ^ (0xfca42daf if perturb else 0)
# evaluate trunks
if (tag & 0xf000) != TAG_CKSUM:
if not (trunk and j_-d > trunk and not trunk___):
# new trunk?
if not trunk___:
trunk___ = j_-d
weight__ = 0
# keep track of weight
weight__ += w
# end of trunk?
if not tag & TAG_ALT:
# update trunk/weight unless we found a shrub or an
# explicit trunk (which may be a shrub) is requested
if not tag & TAG_SHRUB or trunk___ == trunk:
trunk__ = trunk___
weight_ = weight__
# keep track of eoff for best matching trunk
if trunk and j_ + size > trunk:
eoff_ = j_ + size
eoff = eoff_
cksum = cksum__ ^ (
0xfca42daf if perturb else 0)
trunk_ = trunk__
weight = weight_
gcksumdelta = gcksumdelta_
trunk___ = 0
# update canonical checksum, xoring out any perturb state
cksum_ = cksum__ ^ (0xfca42daf if perturb else 0)
if not tag & TAG_ALT:
j_ += size
return cls(block, trunk_, weight, rev, eoff, cksum, data,
gcksumdelta=gcksumdelta,
redund=0 if trunk_ else -1)
@classmethod
def fetch(cls, bd, blocks, trunk=None):
# multiple blocks?
if not isinstance(blocks, int):
# fetch all blocks
rbyds = [cls.fetch(bd, block, trunk) for block in blocks]
# determine most recent revision/trunk
rev, trunk = None, None
for rbyd in rbyds:
# compare with sequence arithmetic
if rbyd and (
rev is None
or not ((rbyd.rev - rev) & 0x80000000)
or (rbyd.rev == rev and rbyd.trunk > trunk)):
rev, trunk = rbyd.rev, rbyd.trunk
# sort for reproducibility
rbyds.sort(key=lambda rbyd: (
# prioritize valid redund blocks
0 if rbyd and rbyd.rev == rev and rbyd.trunk == trunk
else 1,
# default to sorting by block
rbyd.block))
# choose an active rbyd
rbyd = rbyds[0]
# keep track of the other blocks
rbyd.blocks = tuple(rbyd.block for rbyd in rbyds)
# keep track of how many redund blocks are valid
rbyd.redund = -1 + sum(1 for rbyd in rbyds
if rbyd and rbyd.rev == rev and rbyd.trunk == trunk)
# and patch the gcksumdelta if we have one
if rbyd.gcksumdelta is not None:
rbyd.gcksumdelta.blocks = rbyd.blocks
return rbyd
# seek/read the block
block = blocks
data = bd.readblock(block)
# fetch the rbyd
return cls._fetch(data, block, trunk)
@classmethod
def fetchck(cls, bd, blocks, trunk, weight, cksum):
# try to fetch the rbyd normally
rbyd = cls.fetch(bd, blocks, trunk)
# cksum mismatch? trunk/weight mismatch?
if (rbyd.cksum != cksum
or rbyd.trunk != trunk
or rbyd.weight != weight):
# mark as corrupt and keep track of expected trunk/weight
rbyd.redund = -1
rbyd.trunk = trunk
rbyd.weight = weight
return rbyd
@classmethod
def fetchshrub(cls, rbyd, trunk):
# steal the original rbyd's data
#
# this helps avoid race conditions with cksums and stuff
shrub = cls._fetch(rbyd.data, rbyd.block, trunk)
shrub.blocks = rbyd.blocks
shrub.shrub = True
return shrub
def lookupnext(self, rid, tag=None, *,
path=False):
if not self or rid >= self.weight:
if path:
return None, None, []
else:
return None, None
tag = max(tag or 0, 0x1)
lower = 0
upper = self.weight
path_ = []
# descend down tree
j = self.trunk
while True:
_, alt, w, jump, d = fromtag(self.data, j)
# found an alt?
if alt & TAG_ALT:
# follow?
if ((rid, tag & 0xfff) > (upper-w-1, alt & 0xfff)
if alt & TAG_GT
else ((rid, tag & 0xfff)
<= (lower+w-1, alt & 0xfff))):
lower += upper-lower-w if alt & TAG_GT else 0
upper -= upper-lower-w if not alt & TAG_GT else 0
j = j - jump
if path:
# figure out which color
if alt & TAG_R:
_, nalt, _, _, _ = fromtag(self.data, j+jump+d)
if nalt & TAG_R:
color = 'y'
else:
color = 'r'
else:
color = 'b'
path_.append(Ralt(
alt, w, self.blocks, j+jump,
self.data[j+jump:j+jump+d], jump,
color=color,
followed=True))
# stay on path
else:
lower += w if not alt & TAG_GT else 0
upper -= w if alt & TAG_GT else 0
j = j + d
if path:
# figure out which color
if alt & TAG_R:
_, nalt, _, _, _ = fromtag(self.data, j)
if nalt & TAG_R:
color = 'y'
else:
color = 'r'
else:
color = 'b'
path_.append(Ralt(
alt, w, self.blocks, j-d,
self.data[j-d:j], jump,
color=color,
followed=False))
# found tag
else:
rid_ = upper-1
tag_ = alt
w_ = upper-lower
if not tag_ or (rid_, tag_) < (rid, tag):
if path:
return None, None, path_
else:
return None, None
rattr_ = Rattr(tag_, w_, self.blocks, j,
self.data[j:j+d],
self.data[j+d:j+d+jump])
if path:
return rid_, rattr_, path_
else:
return rid_, rattr_
def lookup(self, rid, tag=None, mask=None, *,
path=False):
if tag is None:
tag, mask = 0, 0xffff
if mask is None:
mask = 0
r = self.lookupnext(rid, tag & ~mask,
path=path)
if path:
rid_, rattr_, path_ = r
else:
rid_, rattr_ = r
if (rid_ is None
or rid_ != rid
or (rattr_.tag & ~mask & 0xfff)
!= (tag & ~mask & 0xfff)):
if path:
return None, path_
else:
return None
if path:
return rattr_, path_
else:
return rattr_
def rids(self, *,
path=False):
rid = -1
while True:
r = self.lookupnext(rid,
path=path)
if path:
rid, name, path_ = r
else:
rid, name = r
# found end of tree?
if rid is None:
break
if path:
yield rid, name, path_
else:
yield rid, name
rid += 1
def rattrs(self, rid=None, tag=None, mask=None, *,
path=False):
if rid is None:
rid, tag = -1, 0
while True:
r = self.lookupnext(rid, tag+0x1,
path=path)
if path:
rid, rattr, path_ = r
else:
rid, rattr = r
# found end of tree?
if rid is None:
break
if path:
yield rid, rattr, path_
else:
yield rid, rattr
tag = rattr.tag
else:
if tag is None:
tag, mask = 0, 0xffff
if mask is None:
mask = 0
tag_ = max((tag & ~mask) - 1, 0)
while True:
r = self.lookupnext(rid, tag_+0x1,
path=path)
if path:
rid_, rattr_, path_ = r
else:
rid_, rattr_ = r
# found end of tree?
if (rid_ is None
or rid_ != rid
or (rattr_.tag & ~mask & 0xfff)
!= (tag & ~mask & 0xfff)):
break
if path:
yield rattr_, path_
else:
yield rattr_
tag_ = rattr_.tag
# lookup by name
def namelookup(self, did, name):
# binary search
best = None, None
lower = 0
upper = self.weight
while lower < upper:
rid, name_ = self.lookupnext(
lower + (upper-1-lower)//2)
if rid is None:
break
# bisect search space
if (name_.did, name_.name) > (did, name):
upper = rid-(name_.weight-1)
elif (name_.did, name_.name) < (did, name):
lower = rid + 1
# keep track of best match
best = rid, name_
else:
# found a match
return rid, name_
return best
# jump renderer
class JumpArt:
# abstract thing for jump rendering
class Jump(co.namedtuple('Jump', ['a', 'b', 'x', 'color'])):
__slots__ = ()
def __new__(cls, a, b, x=0, color='b'):
return super().__new__(cls, a, b, x, color)
def __repr__(self):
return '%s(%s, %s, %s, %s)' % (
self.__class__.__name__,
self.a,
self.b,
self.x,
self.color)
# don't include color in branch comparisons, or else our tree
# renderings can end up with inconsistent colors between runs
def __eq__(self, other):
return (self.a, self.b, self.x) == (other.a, other.b, other.x)
def __ne__(self, other):
return (self.a, self.b, self.x) != (other.a, other.b, other.x)
def __hash__(self):
return hash((self.a, self.b, self.x))
def __init__(self, jumps):
self.jumps = jumps
self.width = 2*max((x for _, _, x, _ in jumps), default=0)
@classmethod
def collide(cls, jumps):
# figure out x-offsets to avoid collisions between jumps
for j in range(len(jumps)):
a, b, _, c = jumps[j]
x = 0
while any(
max(a, b) >= min(a_, b_)
and max(a_, b_) >= min(a, b)
and x == x_
for a_, b_, x_, _ in jumps[:j]):
x += 1
jumps[j] = cls.Jump(a, b, x, c)
return jumps
@classmethod
def fromrbyd(cls, rbyd, all=False):
import builtins
all_, all = all, builtins.all
jumps = []
j_ = 4
while j_ < (len(rbyd.data) if all_ else rbyd.eoff):
j = j_
v, tag, w, size, d = fromtag(rbyd.data, j_)
j_ += d
if not tag & TAG_ALT:
j_ += size
if tag & TAG_ALT and size:
# figure out which alt color
if tag & TAG_R:
_, ntag, _, _, _ = fromtag(rbyd.data, j_)
if ntag & TAG_R:
jumps.append(cls.Jump(j, j-size, 0, 'y'))
else:
jumps.append(cls.Jump(j, j-size, 0, 'r'))
else:
jumps.append(cls.Jump(j, j-size, 0, 'b'))
jumps = cls.collide(jumps)
return cls(jumps)
def repr(self, j, color=False):
# render jumps
chars = {}
for a, b, x, c in self.jumps:
c_start = (
'\x1b[33m' if color and c == 'y'
else '\x1b[31m' if color and c == 'r'
else '\x1b[1;30m' if color
else '')
c_stop = '\x1b[m' if color else ''
if j == a:
for x_ in range(2*x+1):
chars[x_] = '%s-%s' % (c_start, c_stop)
chars[2*x+1] = '%s\'%s' % (c_start, c_stop)
elif j == b:
for x_ in range(2*x+1):
chars[x_] = '%s-%s' % (c_start, c_stop)
chars[2*x+1] = '%s.%s' % (c_start, c_stop)
chars[0] = '%s<%s' % (c_start, c_stop)
elif j >= min(a, b) and j <= max(a, b):
chars[2*x+1] = '%s|%s' % (c_start, c_stop)
return ''.join(chars.get(x, ' ')
for x in range(max(chars.keys(), default=0)+1))
# lifetime renderer
class LifetimeArt:
# abstract things for lifetime rendering
class Lifetime(co.namedtuple('Lifetime', ['id', 'origin', 'tags'])):
__slots__ = ()
def __new__(cls, id, origin, tags=None):
return super().__new__(cls, id, origin,
set(tags) if tags is not None else set())
def __repr__(self):
return '%s(%s, %s, %s)' % (
self.__class__.__name__,
self.id,
self.origin,
self.tags)
def add(self, j):
self.tags.add(j)
def __bool__(self):
return bool(self.tags)
# define equality by id
def __eq__(self, other):
return self.id == other.id
def __ne__(self, other):
return self.id != other.id
def __hash__(self):
return hash((self.id))
def __lt__(self, other):
return self.id < other.id
def __le__(self, other):
return self.id <= other.id
def __gt__(self, other):
return self.id > other.id
def __ge__(self, other):
return self.id >= other.id
class Checkpoint(co.namedtuple('Checkpoint', [
'j', 'weights', 'lifetimes', 'grows', 'shrinks', 'tags'])):
__slots__ = ()
def __new__(cls, j, weights, lifetimes,
grows=None, shrinks=None, tags=None):
return super().__new__(cls, j,
# note we rely on tuple making frozen copies here
tuple(weights),
tuple(lifetimes),
frozenset(grows) if grows is not None else frozenset(),
frozenset(shrinks) if shrinks is not None else frozenset(),
frozenset(tags) if tags is not None else frozenset())
# define equality by checkpoint offset
def __eq__(self, other):
return self.j == other.j
def __ne__(self, other):
return self.j != other.j
def __hash__(self):
return hash((self.j))
def __lt__(self, other):
return self.j < other.j
def __le__(self, other):
return self.j <= other.j
def __gt__(self, other):
return self.j > other.j
def __ge__(self, other):
return self.j >= other.j
def __init__(self, checkpoints):
self.lifetimes = sorted(set(
lifetime
for checkpoint in checkpoints
for lifetime in checkpoint.lifetimes))
self.checkpoints = checkpoints
self.width = 2*max(
(sum(1 for lifetime in checkpoint.lifetimes if lifetime)
for checkpoint in checkpoints),
default=0)
@staticmethod
def index(weights, rid):
for i, w in enumerate(weights):
if rid < w:
return i, rid
rid -= w
return len(weights), 0
@classmethod
def fromrbyd(cls, rbyd, all=False):
import builtins
all_, all = all, builtins.all
# first figure out where each rid comes from
id = 0
weights = []
lifetimes = []
checkpoints = [cls.Checkpoint(0, [], [])]
lower_, upper_ = 0, 0
weight_ = 0
trunk_ = 0
j_ = 4
while j_ < (len(rbyd.data) if all_ else rbyd.eoff):
j = j_
v, tag, w, size, d = fromtag(rbyd.data, j_)
j_ += d
if not tag & TAG_ALT:
j_ += size
# evaluate trunks
if (tag & 0xf000) != TAG_CKSUM:
if not trunk_:
trunk_ = j_-d
lower_, upper_ = 0, 0
if tag & TAG_ALT and not tag & TAG_GT:
lower_ += w
else:
upper_ += w
if not tag & TAG_ALT:
# derive the current tag's rid from alt weights
delta = (lower_+upper_) - weight_
weight_ = lower_+upper_
rid = lower_ + w-1
trunk_ = 0
if (tag & 0xf000) != TAG_CKSUM and not tag & TAG_ALT:
# note we ignore out-of-bounds here for debugging
if delta > 0:
# grow lifetimes
l = cls.Lifetime(id, j)
id += 1
i, rid_ = cls.index(weights, lower_)
if rid_ > 0:
weights[i:i+1] = [rid_, delta, weights[i]-rid_]
lifetimes[i:i+1] = [lifetimes[i], l, lifetimes[i]]
else:
weights[i:i] = [delta]
lifetimes[i:i] = [l]
checkpoints.append(cls.Checkpoint(
j, weights, lifetimes,
grows={i},
tags={i}))
elif delta < 0:
# shrink lifetimes
i, rid_ = cls.index(weights, lower_)
delta_ = -delta
weights_ = weights.copy()
lifetimes_ = lifetimes.copy()
shrinks = set()
while delta_ > 0 and i < len(weights_):
if weights_[i] > delta_:
delta__ = min(delta_, weights_[i]-rid_)
delta_ -= delta__
weights_[i] -= delta__
i += 1
rid_ = 0
else:
delta_ -= weights_[i]
weights_[i:i+1] = []
lifetimes_[i:i+1] = []
shrinks.add(i + len(shrinks))
checkpoints.append(cls.Checkpoint(
j, weights, lifetimes,
shrinks=shrinks,
tags={i}))
weights = weights_
lifetimes = lifetimes_
if rid >= 0:
# attach tag to lifetime
i, rid_ = cls.index(weights, rid)
if i < len(weights):
lifetimes[i].add(j)
if delta == 0:
checkpoints.append(cls.Checkpoint(
j, weights, lifetimes,
tags={i}))
return cls(checkpoints)
def repr(self, j, color=False):
i = bisect.bisect(self.checkpoints, j,
key=lambda checkpoint: checkpoint.j) - 1
j_, weights, lifetimes, grows, shrinks, tags = self.checkpoints[i]
reprs = []
colors = []
was = None
for i, (w, lifetime) in enumerate(zip(weights, lifetimes)):
# skip lifetimes with no tags and shrinks
if not lifetime or (j != j_ and i in shrinks):
if i in grows or i in shrinks or i in tags:
tags = tags | {i+1}
continue
if j == j_ and i in grows:
reprs.append('.')
was = 'grow'
elif j == j_ and i in shrinks:
reprs.append('\'')
was = 'shrink'
elif j == j_ and i in tags:
reprs.append('* ')
elif was == 'grow':
reprs.append('\\ ')
elif was == 'shrink':
reprs.append('/ ')
else:
reprs.append('| ')
colors.append(COLORS[lifetime.id % len(COLORS)])
return '%s%*s' % (
''.join('%s%s%s' % (
'\x1b[%sm' % c if color else '',
r,
'\x1b[m' if color else '')
for r, c in zip(reprs, colors)),
self.width - sum(len(r) for r in reprs), '')
# tree renderer
class TreeArt:
# tree branches are an abstract thing for tree rendering
class Branch(co.namedtuple('Branch', ['a', 'b', 'z', 'color'])):
__slots__ = ()
def __new__(cls, a, b, z=0, color='b'):
# a and b are context specific
return super().__new__(cls, a, b, z, color)
def __repr__(self):
return '%s(%s, %s, %s, %s)' % (
self.__class__.__name__,
self.a,
self.b,
self.z,
self.color)
# don't include color in branch comparisons, or else our tree
# renderings can end up with inconsistent colors between runs
def __eq__(self, other):
return (self.a, self.b, self.z) == (other.a, other.b, other.z)
def __ne__(self, other):
return (self.a, self.b, self.z) != (other.a, other.b, other.z)
def __hash__(self):
return hash((self.a, self.b, self.z))
# also order by z first, which can be useful for reproducibly
# prioritizing branches when simplifying trees
def __lt__(self, other):
return (self.z, self.a, self.b) < (other.z, other.a, other.b)
def __le__(self, other):
return (self.z, self.a, self.b) <= (other.z, other.a, other.b)
def __gt__(self, other):
return (self.z, self.a, self.b) > (other.z, other.a, other.b)
def __ge__(self, other):
return (self.z, self.a, self.b) >= (other.z, other.a, other.b)
# apply a function to a/b while trying to avoid copies
def map(self, filter_, map_=None):
if map_ is None:
filter_, map_ = None, filter_
a = self.a
if filter_ is None or filter_(a):
a = map_(a)
b = self.b
if filter_ is None or filter_(b):
b = map_(b)
if a != self.a or b != self.b:
return self.__class__(
a if a != self.a else self.a,
b if b != self.b else self.b,
self.z,
self.color)
else:
return self
def __init__(self, tree):
self.tree = tree
self.depth = max((t.z+1 for t in tree), default=0)
if self.depth > 0:
self.width = 2*self.depth + 2
else:
self.width = 0
def __iter__(self):
return iter(self.tree)
def __bool__(self):
return bool(self.tree)
def __len__(self):
return len(self.tree)
# render an rbyd rbyd tree for debugging
@classmethod
def _fromrbydrtree(cls, rbyd, **args):
trunks = co.defaultdict(lambda: (-1, 0))
alts = co.defaultdict(lambda: {})
for rid, rattr, path in rbyd.rattrs(path=True):
# keep track of trunks/alts
trunks[rattr.toff] = (rid, rattr.tag)
for ralt in path:
if ralt.followed:
alts[ralt.toff] |= {'f': ralt.joff, 'c': ralt.color}
else:
alts[ralt.toff] |= {'nf': ralt.off, 'c': ralt.color}
if args.get('tree_rbyd'):
# treat unreachable alts as converging paths
for j_, alt in alts.items():
if 'f' not in alt:
alt['f'] = alt['nf']
elif 'nf' not in alt:
alt['nf'] = alt['f']
else:
# prune any alts with unreachable edges
pruned = {}
for j, alt in alts.items():
if 'f' not in alt:
pruned[j] = alt['nf']
elif 'nf' not in alt:
pruned[j] = alt['f']
for j in pruned.keys():
del alts[j]
for j, alt in alts.items():
while alt['f'] in pruned:
alt['f'] = pruned[alt['f']]
while alt['nf'] in pruned:
alt['nf'] = pruned[alt['nf']]
# find the trunk and depth of each alt
def rec_trunk(j):
if j not in alts:
return trunks[j]
else:
if 'nft' not in alts[j]:
alts[j]['nft'] = rec_trunk(alts[j]['nf'])
return alts[j]['nft']
for j in alts.keys():
rec_trunk(j)
for j, alt in alts.items():
if alt['f'] in alts:
alt['ft'] = alts[alt['f']]['nft']
else:
alt['ft'] = trunks[alt['f']]
def rec_height(j):
if j not in alts:
return 0
else:
if 'h' not in alts[j]:
alts[j]['h'] = max(
rec_height(alts[j]['f']),
rec_height(alts[j]['nf'])) + 1
return alts[j]['h']
for j in alts.keys():
rec_height(j)
t_depth = max((alt['h']+1 for alt in alts.values()), default=0)
# convert to more general tree representation
tree = set()
for j, alt in alts.items():
# note all non-trunk edges should be colored black
tree.add(cls.Branch(
alt['nft'],
alt['nft'],
t_depth-1 - alt['h'],
alt['c']))
if alt['ft'] != alt['nft']:
tree.add(cls.Branch(
alt['nft'],
alt['ft'],
t_depth-1 - alt['h'],
'b'))
return cls(tree)
# render an rbyd btree tree for debugging
@classmethod
def _fromrbydbtree(cls, rbyd, **args):
# for rbyds this is just a pointer to every rid
tree = set()
root = None
for rid, name in rbyd.rids():
b = (rid, name.tag)
if root is None:
root = b
tree.add(cls.Branch(root, b))
return cls(tree)
# render an rbyd tree for debugging
@classmethod
def fromrbyd(cls, rbyd, **args):
if args.get('tree_btree'):
return cls._fromrbydbtree(rbyd, **args)
else:
return cls._fromrbydrtree(rbyd, **args)
# render some nice ascii trees
def repr(self, x, color=False):
if self.depth == 0:
return ''
def branchrepr(tree, x, d, was):
for t in tree:
if t.z == d and t.b == x:
if any(t.z == d and t.a == x
for t in tree):
return '+-', t.color, t.color
elif any(t.z == d
and x > min(t.a, t.b)
and x < max(t.a, t.b)
for t in tree):
return '|-', t.color, t.color
elif t.a < t.b:
return '\'-', t.color, t.color
else:
return '.-', t.color, t.color
for t in tree:
if t.z == d and t.a == x:
return '+ ', t.color, None
for t in tree:
if (t.z == d
and x > min(t.a, t.b)
and x < max(t.a, t.b)):
return '| ', t.color, was
if was:
return '--', was, was
return ' ', None, None
trunk = []
was = None
for d in range(self.depth):
t, c, was = branchrepr(self.tree, x, d, was)
trunk.append('%s%s%s%s' % (
'\x1b[33m' if color and c == 'y'
else '\x1b[31m' if color and c == 'r'
else '\x1b[1;30m' if color and c == 'b'
else '',
t,
('>' if was else ' ') if d == self.depth-1 else '',
'\x1b[m' if color and c else ''))
return '%s ' % ''.join(trunk)
# show the rbyd log
def dbg_log(rbyd, *,
color=False,
**args):
data = rbyd.data
# preprocess jumps
if args.get('jumps'):
jumpart = JumpArt.fromrbyd(rbyd,
all=args.get('all'))
# preprocess lifetimes
l_width = 0
if args.get('lifetimes'):
lifetimeart = LifetimeArt.fromrbyd(rbyd, all=args.get('all'))
l_width = lifetimeart.width
# dynamically size the id field
#
# we need to do an additional pass to find this since our rbyd weight
# does not include any shrub trees
data = rbyd.data
weight_ = 0
weight__ = 0
trunk_ = 0
j_ = 4
while j_ < (len(data) if args.get('all') else rbyd.eoff):
j = j_
v, tag, w, size, d = fromtag(data, j_)
j_ += d
if not tag & TAG_ALT:
j_ += size
# evaluate trunks
if (tag & 0xf000) != TAG_CKSUM:
if not trunk_:
trunk_ = j_-d
weight__ = 0
weight__ += w
if not tag & TAG_ALT:
# found new weight?
weight_ = max(weight_, weight__)
trunk_ = 0
w_width = mt.ceil(mt.log10(max(1, weight_)+1))
# print revision count
if args.get('raw'):
print('%8s: %*s%*s %s' % (
'%04x' % 0,
l_width, '',
2*w_width+1, '',
next(xxd(data[0:4]))))
# print tags
cksum = crc32c(data[0:4])
cksum_ = cksum
perturb = False
lower_, upper_ = 0, 0
trunk_ = 0
j_ = 4
while j_ < (len(data) if args.get('all') else rbyd.eoff):
notes = []
# read next tag
j = j_
v, tag, w, size, d = fromtag(data, j_)
# note if parity doesn't match
if v != parity(cksum_):
notes.append('v!=%x' % parity(cksum_))
cksum_ ^= 0x00000080 if v else 0
cksum_ = crc32c(data[j_:j_+d], cksum_)
j_ += d
# take care of cksums
if not tag & TAG_ALT:
if (tag & 0xff00) != TAG_CKSUM:
cksum_ = crc32c(data[j_:j_+size], cksum_)
# found a cksum?
else:
# note if phase doesn't match
if (tag & 0x3) != (rbyd.block & 0x3):
notes.append('q!=%x' % (rbyd.block & 0x3))
# check cksum
cksum__ = fromle32(data, j_)
# note if cksum doesn't match
if cksum_ != cksum__:
notes.append('cksum!=%08x' % cksum__)
# update perturb bit
perturb = bool(tag & TAG_PERTURB)
# revert to data cksum and perturb
cksum_ = cksum ^ (0xfca42daf if perturb else 0)
j_ += size
# evaluate trunks
if (tag & 0xf000) != TAG_CKSUM:
if not trunk_:
trunk_ = j_-d
lower_, upper_ = 0, 0
if tag & TAG_ALT and not tag & TAG_GT:
lower_ += w
else:
upper_ += w
# end of trunk?
if not tag & TAG_ALT:
# derive the current tag's rid from alt weights
rid = lower_ + w-1
trunk_ = 0
# update canonical checksum, xoring out any perturb state
cksum = cksum_ ^ (0xfca42daf if perturb else 0)
# show human-readable tag representation
print('%s%08x:%s %*s%s%*s %-*s%s%s%s' % (
'\x1b[1;30m' if color and j >= rbyd.eoff else '',
j,
'\x1b[m' if color and j >= rbyd.eoff else '',
l_width, lifetimeart.repr(j, color)
if args.get('lifetimes')
else '',
'\x1b[1;30m' if color and j >= rbyd.eoff else '',
2*w_width+1, '' if (tag & 0xe000) != 0x0000
else '%d-%d' % (rid-(w-1), rid) if w > 1
else rid,
56+w_width, '%-*s %s' % (
21+w_width, tagrepr(tag, w, size, toff=j),
next(xxd(data[j+d:j+d+min(size, 8)], 8), '')
if not args.get('raw')
and not args.get('no_truncate')
and not tag & TAG_ALT
else ''),
' (%s)' % ', '.join(notes) if notes else '',
'\x1b[m' if color and j >= rbyd.eoff else '',
' %s' % jumpart.repr(j, color)
if args.get('jumps') and not notes
else ''))
# show on-disk encoding of tags
if args.get('raw'):
for o, line in enumerate(xxd(data[j:j+d])):
print('%s%8s: %*s%*s %s%s' % (
'\x1b[1;30m' if color and j >= rbyd.eoff else '',
'%04x' % (j + o*16),
l_width, '',
2*w_width+1, '',
line,
'\x1b[m' if color and j >= rbyd.eoff else ''))
if args.get('raw') or args.get('no_truncate'):
if not tag & TAG_ALT:
for o, line in enumerate(xxd(data[j+d:j+d+size])):
print('%s%8s: %*s%*s %s%s' % (
'\x1b[1;30m' if color and j >= rbyd.eoff else '',
'%04x' % (j+d + o*16),
l_width, '',
2*w_width+1, '',
line,
'\x1b[m' if color and j >= rbyd.eoff else ''))
# show the rbyd tree
def dbg_tree(rbyd, *,
color=False,
**args):
if not rbyd:
return
# precompute tree renderings
t_width = 0
if (args.get('tree')
or args.get('tree_rbyd')
or args.get('tree_btree')):
tree = TreeArt.fromrbyd(rbyd, **args)
t_width = tree.width
# dynamically size the id field
w_width = mt.ceil(mt.log10(max(1, rbyd.weight)+1))
for i, (rid, rattr) in enumerate(rbyd.rattrs()):
# show human-readable tag representation
print('%08x: %s%*s %-*s %s' % (
rattr.toff,
tree.repr((rid, rattr.tag), color)
if (args.get('tree')
or args.get('tree_rbyd')
or args.get('tree_btree'))
else '',
2*w_width+1, '%d-%d' % (rid-(rattr.weight-1), rid)
if rattr.weight > 1
else rid if rattr.weight > 0 or i == 0
else '',
21+w_width, rattr.repr(),
next(xxd(rattr.data[:8], 8), '')
if not args.get('raw')
and not args.get('no_truncate')
and not rattr.tag & TAG_ALT
else ''))
# show on-disk encoding of tags
if args.get('raw'):
for o, line in enumerate(xxd(rattr.tdata)):
print('%8s: %*s%*s %s' % (
'%04x' % (rattr.toff + o*16),
t_width, '',
2*w_width+1, '',
line))
if args.get('raw') or args.get('no_truncate'):
if not rattr.tag & TAG_ALT:
for o, line in enumerate(xxd(rattr.data)):
print('%8s: %*s%*s %s' % (
'%04x' % (rattr.off + o*16),
t_width, '',
2*w_width+1, '',
line))
def main(disk, blocks=None, *,
trunk=None,
block_size=None,
block_count=None,
quiet=False,
color='auto',
**args):
# figure out what color should be
if color == 'auto':
color = sys.stdout.isatty()
elif color == 'always':
color = True
else:
color = False
# is bd geometry specified?
if isinstance(block_size, tuple):
block_size, block_count_ = block_size
if block_count is None:
block_count = block_count_
# flatten blocks, default to block 0
blocks = list(it.chain.from_iterable(blocks)) if blocks else [0]
# blocks may also encode trunks
blocks, trunk = (
[block[0] if isinstance(block, tuple)
else block
for block in blocks],
trunk if trunk is not None
else ft.reduce(
lambda x, y: y,
(block[1] for block in blocks
if isinstance(block, tuple)),
None))
with open(disk, 'rb') as f:
# if block_size is omitted, assume the block device is one big block
if block_size is None:
f.seek(0, os.SEEK_END)
block_size = f.tell()
# fetch the rbyd
bd = Bd(f, block_size, block_count)
rbyd = Rbyd.fetch(bd, blocks, trunk)
# print some information about the rbyd
if not quiet:
print('rbyd %s w%d, rev %08x, size %d, cksum %08x' % (
rbyd.addr(),
rbyd.weight,
rbyd.rev,
rbyd.eoff,
rbyd.cksum))
if args.get('log') and not quiet:
dbg_log(rbyd,
color=color,
**args)
elif not quiet:
dbg_tree(rbyd,
color=color,
**args)
if args.get('error_on_corrupt') and not rbyd:
sys.exit(2)
if __name__ == "__main__":
import argparse
import sys
parser = argparse.ArgumentParser(
description="Debug rbyd metadata.",
allow_abbrev=False)
parser.add_argument(
'disk',
help="File containing the block device.")
parser.add_argument(
'blocks',
nargs='*',
type=rbydaddr,
help="Block address of metadata blocks.")
parser.add_argument(
'--trunk',
type=lambda x: int(x, 0),
help="Use this offset as the trunk of the tree.")
parser.add_argument(
'-b', '--block-size',
type=bdgeom,
help="Block size/geometry in bytes. Accepts <size>x<count>.")
parser.add_argument(
'--block-count',
type=lambda x: int(x, 0),
help="Block count in blocks.")
parser.add_argument(
'-q', '--quiet',
action='store_true',
help="Don't show anything, useful when checking for errors.")
parser.add_argument(
'--color',
choices=['never', 'always', 'auto'],
default='auto',
help="When to use terminal colors. Defaults to 'auto'.")
parser.add_argument(
'-a', '--all',
action='store_true',
help="Don't stop parsing on bad commits.")
parser.add_argument(
'-l', '--log',
action='store_true',
help="Show the raw tags as they appear in the log.")
parser.add_argument(
'-x', '--raw',
action='store_true',
help="Show the raw data including tag encodings.")
parser.add_argument(
'-T', '--no-truncate',
action='store_true',
help="Don't truncate, show the full contents.")
parser.add_argument(
'-t', '--tree',
action='store_true',
help="Show the rbyd tree.")
parser.add_argument(
'-R', '--tree-rbyd',
action='store_true',
help="Show the full rbyd tree.")
parser.add_argument(
'-B', '--tree-btree',
action='store_true',
help="Show a simplified btree tree.")
parser.add_argument(
'-j', '--jumps',
action='store_true',
help="Show alt pointer jumps in the margin.")
parser.add_argument(
'-g', '--lifetimes',
action='store_true',
help="Show inserts/deletes of ids in the margin.")
parser.add_argument(
'-e', '--error-on-corrupt',
action='store_true',
help="Error if no valid commit is found.")
sys.exit(main(**{k: v
for k, v in vars(parser.parse_intermixed_args()).items()
if v is not None}))