Files
littlefs/scripts/dbgmtree.py
Christopher Haster af0c3967b4 Adopted new tree renderer in dbgmtree, implemented mtree rendering
In addition to plugging in the rbyd and btree renderers in dbgbtree.py,
this required wiring in rbyd trees in the mdirs and mroots.

A bit tricky, but with a more-or-less straightforward implementation thanks
to the common edge description used for the tree renderer.

For example, a relatively small mtree:

  $ ./scripts/dbgmtree.py disk -B4096 -t -i
  mroot 0x{0,1}.45, rev 1, weight 0
  mdir                     ids   tag                     ...
  {0000,0001}: .--------->    -1 magic 8                 ...
               | .------->       config 21               ...
               +-+-+             btree 7                 ...
    0006.000a:     | .-+       0 mdir w1 2               ...
  {0002,0003}:     | | '->   0.0 inlined w1 1024         ...
    0006.000a:     '-+-+       1 mdir w1 2               ...
  {0004,0005}:         '->   1.0 inlined w1 1024         ...
2023-05-30 18:10:32 -05:00

1585 lines
57 KiB
Python
Executable File

#!/usr/bin/env python3
import bisect
import collections as co
import itertools as it
import math as m
import os
import struct
TAG_UNR = 0x0002
TAG_MAGIC = 0x0030
TAG_CONFIG = 0x0040
TAG_MROOT = 0x0110
TAG_NAME = 0x1000
TAG_BRANCH = 0x1000
TAG_REG = 0x1010
TAG_DIR = 0x1020
TAG_STRUCT = 0x3000
TAG_INLINED = 0x3000
TAG_BLOCK = 0x3100
TAG_MDIR = 0x3200
TAG_BTREE = 0x3300
TAG_UATTR = 0x4000
TAG_ALT = 0x0008
TAG_CRC = 0x0004
TAG_FCRC = 0x1004
# parse some rbyd addr encodings
# 0xa -> [0xa]
# 0xa.b -> ([0xa], b)
# 0x{a,b} -> [0xa, 0xb]
def rbydaddr(s):
s = s.strip()
b = 10
if s.startswith('0x') or s.startswith('0X'):
s = s[2:]
b = 16
elif s.startswith('0o') or s.startswith('0O'):
s = s[2:]
b = 8
elif s.startswith('0b') or s.startswith('0B'):
s = s[2:]
b = 2
trunk = None
if '.' in s:
s, s_ = s.split('.', 1)
trunk = int(s_, b)
if s.startswith('{') and '}' in s:
ss = s[1:s.find('}')].split(',')
else:
ss = [s]
addr = []
for s in ss:
if trunk is not None:
addr.append((int(s, b), trunk))
else:
addr.append(int(s, b))
return addr
def crc32c(data, crc=0):
crc ^= 0xffffffff
for b in data:
crc ^= b
for j in range(8):
crc = (crc >> 1) ^ ((crc & 1) * 0x82f63b78)
return 0xffffffff ^ crc
def fromle16(data):
return struct.unpack('<H', data[0:2].ljust(2, b'\0'))[0]
def fromle32(data):
return struct.unpack('<I', data[0:4].ljust(4, b'\0'))[0]
def fromleb128(data):
word = 0
for i, b in enumerate(data):
word |= ((b & 0x7f) << 7*i)
word &= 0xffffffff
if not b & 0x80:
return word, i+1
return word, len(data)
def fromtag(data):
tag = fromle16(data)
weight, d = fromleb128(data[2:])
size, d_ = fromleb128(data[2+d:])
return tag&1, tag&~1, weight, size, 2+d+d_
def frommdir(data):
blocks = []
d = 0
while d < len(data):
block, d_ = fromleb128(data[d:])
blocks.append(block)
d += d_
return blocks
def frombtree(data):
w, d1 = fromleb128(data)
trunk, d2 = fromleb128(data[d1:])
block, d3 = fromleb128(data[d1+d2:])
crc = fromle32(data[d1+d2+d3:])
return w, trunk, block, crc
def popc(x):
return bin(x).count('1')
def xxd(data, width=16, crc=False):
for i in range(0, len(data), width):
yield '%-*s %-*s' % (
3*width,
' '.join('%02x' % b for b in data[i:i+width]),
width,
''.join(
b if b >= ' ' and b <= '~' else '.'
for b in map(chr, data[i:i+width])))
def tagrepr(tag, w, size, off=None):
if (tag & 0xfffe) == TAG_UNR:
return 'unr%s%s' % (
' w%d' % w if w else '',
' %d' % size if size else '')
elif (tag & 0xfffc) == TAG_MAGIC:
return '%smagic%s %d' % (
'rm' if tag & 0x2 else '',
' w%d' % w if w else '',
size)
elif (tag & 0xfffc) == TAG_CONFIG:
return '%sconfig%s %d' % (
'rm' if tag & 0x2 else '',
' w%d' % w if w else '',
size)
elif (tag & 0xfffc) == TAG_MROOT:
return '%smroot%s %d' % (
'rm' if tag & 0x2 else '',
' w%d' % w if w else '',
size)
elif (tag & 0xf00c) == TAG_NAME:
return '%s%s%s %d' % (
'rm' if tag & 0x2 else '',
'bname' if (tag & 0xfffe) == TAG_BRANCH
else 'reg' if (tag & 0xfffe) == TAG_REG
else 'dir' if (tag & 0xfffe) == TAG_DIR
else 'name 0x%02x' % ((tag & 0x0ff0) >> 4),
' w%d' % w if w else '',
size)
elif (tag & 0xf00c) == TAG_STRUCT:
return '%s%s%s %d' % (
'rm' if tag & 0x2 else '',
'inlined' if (tag & 0xfffe) == TAG_INLINED
else 'block' if (tag & 0xfffe) == TAG_BLOCK
else 'mdir' if (tag & 0xfffe) == TAG_MDIR
else 'btree' if (tag & 0xfffe) == TAG_BTREE
else 'struct 0x%02x' % ((tag & 0x0ff0) >> 4),
' w%d' % w if w else '',
size)
elif (tag & 0xf00c) == TAG_UATTR:
return '%suattr 0x%02x%s%s' % (
'rm' if tag & 0x2 else '',
(tag & 0x0ff0) >> 4,
' w%d' % w if w else '',
' %d' % size if not tag & 0x2 or size else '')
elif (tag & 0xf00e) == TAG_CRC:
return 'crc%x%s %d' % (
1 if tag & 0x10 else 0,
' 0x%x' % w if w > 0 else '',
size)
elif (tag & 0xfffe) == TAG_FCRC:
return 'fcrc%s %d' % (
' 0x%x' % w if w > 0 else '',
size)
elif tag & 0x8:
return 'alt%s%s 0x%x w%d %s' % (
'r' if tag & 0x2 else 'b',
'gt' if tag & 0x4 else 'le',
tag & 0xfff0,
w,
'0x%x' % (0xffffffff & (off-size))
if off is not None
else '-%d' % off)
else:
return '0x%04x w%d %d' % (tag, w, size)
class Rbyd:
def __init__(self, block, data, rev, off, trunk, weight):
self.block = block
self.data = data
self.rev = rev
self.off = off
self.trunk = trunk
self.weight = weight
self.other_blocks = []
def addr(self):
if not self.other_blocks:
return '0x%x.%x' % (self.block, self.trunk)
else:
return '0x{%x,%s}.%x' % (
self.block,
','.join('%x' % block for block in self.other_blocks),
self.trunk)
@classmethod
def fetch(cls, f, block_size, blocks, trunk=None):
if isinstance(blocks, int):
blocks = [blocks]
if len(blocks) > 1:
# fetch all blocks
rbyds = [cls.fetch(f, block_size, block, trunk) for block in blocks]
# determine most recent revision
i = 0
for i_, rbyd in enumerate(rbyds):
# compare with sequence arithmetic
if rbyd and (
not ((rbyd.rev - rbyds[i].rev) & 0x80000000)
or (rbyd.rev == rbyds[i].rev
and rbyd.trunk > rbyds[i].trunk)):
i = i_
# keep track of the other blocks
rbyd = rbyds[i]
rbyd.other_blocks = [rbyds[(i+1+j) % len(rbyds)].block
for j in range(len(rbyds)-1)]
return rbyd
else:
# block may encode a trunk
block = blocks[0]
if isinstance(block, tuple):
if trunk is None:
trunk = block[1]
block = block[0]
# seek to the block
f.seek(block * block_size)
data = f.read(block_size)
# fetch the rbyd
rev = fromle32(data[0:4])
crc = 0
crc_ = crc32c(data[0:4])
off = 0
j_ = 4
trunk_ = 0
trunk__ = 0
weight = 0
lower_, upper_ = 0, 0
weight_ = 0
wastrunk = False
trunkoff = None
while j_ < len(data) and (not trunk or off <= trunk):
v, tag, w, size, d = fromtag(data[j_:])
if v != (popc(crc_) & 1):
break
crc_ = crc32c(data[j_:j_+d], crc_)
j_ += d
if not tag & 0x8 and j_ + size > len(data):
break
# take care of crcs
if not tag & 0x8:
if (tag & 0xf00f) != TAG_CRC:
crc_ = crc32c(data[j_:j_+size], crc_)
# found a crc?
else:
crc__ = fromle32(data[j_:j_+4])
if crc_ != crc__:
break
# commit what we have
off = trunkoff if trunkoff else j_ + size
crc = crc_
trunk_ = trunk__
weight = weight_
# evaluate trunks
if (tag & 0xc) != 0x4 and (
not trunk or trunk >= j_-d or wastrunk):
# new trunk?
if not wastrunk:
trunk__ = j_-d
lower_, upper_ = 0, 0
wastrunk = True
# keep track of weight
if tag & 0x8:
if tag & 0x4:
upper_ += w
else:
lower_ += w
else:
weight_ = lower_+upper_+w
wastrunk = False
# keep track of off for best matching trunk
if trunk and j_ + size > trunk:
trunkoff = j_ + size
if not tag & 0x8:
j_ += size
return cls(block, data, rev, off, trunk_, weight)
def lookup(self, id, tag):
if not self:
return True, 0, -1, 0, 0, 0, b'', []
lower = -1
upper = self.weight
path = []
# descend down tree
j = self.trunk
while True:
_, alt, weight_, jump, d = fromtag(self.data[j:])
# found an alt?
if alt & 0x8:
# follow?
if ((id, tag & ~0xf) > (upper-weight_-1, alt & ~0xf)
if alt & 0x4
else ((id, tag & ~0xf) <= (lower+weight_, alt & ~0xf))):
lower += upper-lower-1-weight_ if alt & 0x4 else 0
upper -= upper-lower-1-weight_ if not alt & 0x4 else 0
j = j - jump
# figure out which color
if alt & 0x2:
_, nalt, _, _, _ = fromtag(self.data[j+jump+d:])
if nalt & 0x2:
path.append((j+jump, j, True, 'y'))
else:
path.append((j+jump, j, True, 'r'))
else:
path.append((j+jump, j, True, 'b'))
# stay on path
else:
lower += weight_ if not alt & 0x4 else 0
upper -= weight_ if alt & 0x4 else 0
j = j + d
# figure out which color
if alt & 0x2:
_, nalt, _, _, _ = fromtag(self.data[j:])
if nalt & 0x2:
path.append((j-d, j, False, 'y'))
else:
path.append((j-d, j, False, 'r'))
else:
path.append((j-d, j, False, 'b'))
# found tag
else:
id_ = upper-1
tag_ = alt
w_ = id_-lower
done = (id_, tag_) < (id, tag) or tag_ & 2
return done, id_, tag_, w_, j, d, self.data[j+d:j+d+jump], path
def __bool__(self):
return bool(self.trunk)
def __eq__(self, other):
return self.block == other.block and self.trunk == other.trunk
def __ne__(self, other):
return not self.__eq__(other)
def __iter__(self):
tag = 0
id = -1
while True:
done, id, tag, w, j, d, data, _ = self.lookup(id, tag+0x10)
if done:
break
yield id, tag, w, j, d, data
# create tree representation for debugging
def tree(self):
trunks = co.defaultdict(lambda: (-1, 0))
alts = co.defaultdict(lambda: {})
id, tag = -1, 0
while True:
done, id, tag, w, j, d, data, path = self.lookup(id, tag+0x10)
# found end of tree?
if done:
break
# keep track of trunks/alts
trunks[j] = (id, tag)
for j_, j__, followed, c in path:
if followed:
alts[j_] |= {'f': j__, 'c': c}
else:
alts[j_] |= {'nf': j__, 'c': c}
# prune any alts with unreachable edges
pruned = {}
for j_, alt in alts.items():
if 'f' not in alt:
pruned[j_] = alt['nf']
elif 'nf' not in alt:
pruned[j_] = alt['f']
for j_ in pruned.keys():
del alts[j_]
for j_, alt in alts.items():
while alt['f'] in pruned:
alt['f'] = pruned[alt['f']]
while alt['nf'] in pruned:
alt['nf'] = pruned[alt['nf']]
# find the trunk and depth of each alt, assuming pruned alts
# didn't exist
def rec_trunk(j_):
if j_ not in alts:
return trunks[j_]
else:
if 'nft' not in alts[j_]:
alts[j_]['nft'] = rec_trunk(alts[j_]['nf'])
return alts[j_]['nft']
for j_ in alts.keys():
rec_trunk(j_)
for j_, alt in alts.items():
if alt['f'] in alts:
alt['ft'] = alts[alt['f']]['nft']
else:
alt['ft'] = trunks[alt['f']]
def rec_height(j_):
if j_ not in alts:
return 0
else:
if 'h' not in alts[j_]:
alts[j_]['h'] = max(
rec_height(alts[j_]['f']),
rec_height(alts[j_]['nf'])) + 1
return alts[j_]['h']
for j_ in alts.keys():
rec_height(j_)
t_depth = max((alt['h']+1 for alt in alts.values()), default=0)
# convert to more general tree representation
tree = []
for j, alt in alts.items():
# note all non-trunk edges should be black
tree.append({
'a': alt['nft'],
'b': alt['nft'],
'd': t_depth-1 - alt['h'],
'c': alt['c'],
})
tree.append({
'a': alt['nft'],
'b': alt['ft'],
'd': t_depth-1 - alt['h'],
'c': 'b',
})
return tree, t_depth
# btree lookup with this rbyd as the root
def btree_lookup(self, f, block_size, bid, depth=None):
rbyd = self
rid = bid
depth_ = 1
path = []
# corrupted? return a corrupted block once
if not rbyd:
return bid > 0, bid, 0, rbyd, -1, [], path
while True:
# collect all tags, normally you don't need to do this
# but we are debugging here
name = None
tags = []
branch = None
rid_ = rid
tag = 0
w = 0
for i in it.count():
done, rid__, tag, w_, j, d, data, _ = rbyd.lookup(
rid_, tag+0x10)
if done or (i != 0 and rid__ != rid_):
break
# first tag indicates the branch's weight
if i == 0:
rid_, w = rid__, w_
# catch any branches
if tag == TAG_BTREE:
branch = (tag, j, d, data)
tags.append((tag, j, d, data))
# keep track of path
path.append((bid + (rid_-rid), w, rbyd, rid_, tags))
# descend down branch?
if branch is not None and (
not depth or depth_ < depth):
tag, j, d, data = branch
w_, trunk, block, crc = frombtree(data)
rbyd = Rbyd.fetch(f, block_size, block, trunk)
# corrupted? bail here so we can keep traversing the tree
if not rbyd:
return False, bid + (rid_-rid), w, rbyd, -1, [], path
rid -= (rid_-(w-1))
depth_ += 1
else:
return not tags, bid + (rid_-rid), w, rbyd, rid_, tags, path
# btree rbyd-tree generation for debugging
def btree_tree(self, f, block_size, depth=None, *,
inner=False):
# find the max depth of each layer to nicely align trees
# TODO memoize
bdepths = {}
bid = -1
while True:
done, bid, w, rbyd, rid, tags, path = self.btree_lookup(
f, block_size, bid+1, depth=depth)
if done:
break
for d, (bid, w, rbyd, rid, tags) in enumerate(path):
_, rdepth = rbyd.tree()
bdepths[d] = max(bdepths.get(d, 0), rdepth)
# find all branches
tree = []
root = None
branches = {}
bid = -1
while True:
done, bid, w, rbyd, rid, tags, path = self.btree_lookup(
f, block_size, bid+1, depth=depth)
if done:
break
d_ = 0
leaf = None
for d, (bid, w, rbyd, rid, tags) in enumerate(path):
if not tags:
continue
# map rbyd tree into B-tree space
rtree, rdepth = rbyd.tree()
# note we adjust our bid/rids to be left-leaning,
# this allows a global order and make tree rendering quite
# a bit easier
for i in range(len(rtree)):
a_rid, a_tag = rtree[i]['a']
b_rid, b_tag = rtree[i]['b']
_, _, _, a_w, _, _, _, _ = rbyd.lookup(a_rid, 0)
_, _, _, b_w, _, _, _, _ = rbyd.lookup(b_rid, 0)
rtree[i] = {
'a': (a_rid-(a_w-1), a_tag),
'b': (b_rid-(b_w-1), b_tag),
'd': rtree[i]['d'],
'c': rtree[i]['c'],
}
# connect our branch to the rbyd's root
if leaf is not None:
root = min(rtree,
key=lambda branch: branch['d'],
default=None)
if root is not None:
r_rid, r_tag = root['a']
else:
r_rid, r_tag = rid-(w-1), tags[0][0]
tree.append({
'a': leaf,
'b': (bid-rid+r_rid, d, r_rid, r_tag),
'd': d_-1,
'c': 'b',
})
for branch in rtree:
# map rbyd branches into our btree space
a_rid, a_tag = branch['a']
b_rid, b_tag = branch['b']
tree.append({
'a': (bid-rid+a_rid, d, a_rid, a_tag),
'b': (bid-rid+b_rid, d, b_rid, b_tag),
'd': branch['d'] + d_ + bdepths.get(d, 0)-rdepth,
'c': branch['c'],
})
d_ += max(bdepths.get(d, 0), 1)
leaf = (bid-(w-1), d, rid-(w-1), TAG_BTREE)
# remap branches to leaves if we aren't showing inner branches
if not inner:
# step through each layer backwards
b_depth = max((branch['a'][1]+1 for branch in tree), default=0)
# keep track of the original tree to find the original bids,
# unfortunately because we store the bids in the branches we
# overwrite these
tree_ = tree.copy()
for bd in reversed(range(b_depth-1)):
# find leaf-roots at this level
roots = {}
for branch_, branch in zip(tree_, tree):
bid = branch['b'][0] - branch['b'][2]
# choose the highest node as the root
if (branch_['b'][1] == b_depth-1
and (bid not in roots
or branch_['d'] < roots[bid]['d'])):
roots[bid] = branch_
# remap branches to leaf-roots
tree__ = []
for branch_ in tree_:
if branch_['a'][1] == bd and branch_['a'][0] in roots:
branch_ = {
'a': roots[branch_['a'][0]]['b'],
'b': branch_['b'],
'd': branch_['d'],
'c': branch_['c'],
}
if branch_['b'][1] == bd and branch_['b'][0] in roots:
branch_ = {
'a': branch_['a'],
'b': roots[branch_['b'][0]]['b'],
'd': branch_['d'],
'c': branch_['c'],
}
tree__.append(branch_)
tree_ = tree__
tree = tree_
return tree, max((branch['d']+1 for branch in tree), default=0)
# btree B-tree generation for debugging
def btree_btree(self, f, block_size, depth=None, *,
inner=False):
# find all branches
tree = []
root = None
branches = {}
bid = -1
while True:
done, bid, w, rbyd, rid, tags, path = self.btree_lookup(
f, block_size, bid+1, depth=depth)
if done:
break
# if we're not showing inner nodes, prefer names higher in
# the tree since this avoids showing vestigial names
name = None
if not inner:
name = None
for bid_, w_, rbyd_, rid_, tags_ in reversed(path):
for tag_, j_, d_, data_ in tags_:
if tag_ & 0xf00f == TAG_NAME:
name = (tag_, j_, d_, data_)
if rid_-(w_-1) != 0:
break
a = root
for d, (bid, w, rbyd, rid, tags) in enumerate(path):
if not tags:
continue
b = (bid-(w-1), d, rid-(w-1),
(name if name else tags[0])[0])
# remap branches to leaves if we aren't showing
# inner branches
if not inner:
if b not in branches:
bid, w, rbyd, rid, tags = path[-1]
if not tags:
continue
branches[b] = (
bid-(w-1), len(path)-1, rid-(w-1),
(name if name else tags[0])[0])
b = branches[b]
# found entry point?
if root is None:
root = b
a = root
tree.append({
'a': a,
'b': b,
'd': d,
'c': 'b',
})
a = b
return tree, max((branch['d']+1 for branch in tree), default=0)
def main(disk, mroots=None, *,
block_size=None,
color='auto',
**args):
# figure out what color should be
if color == 'auto':
color = sys.stdout.isatty()
elif color == 'always':
color = True
else:
color = False
# flatten mroots, default to 0x{0,1}
if not mroots:
mroots = [[0,1]]
mroots = [block for mroots_ in mroots for block in mroots_]
# we seek around a bunch, so just keep the disk open
with open(disk, 'rb') as f:
# if block_size is omitted, assume the block device is one big block
if block_size is None:
f.seek(0, os.SEEK_END)
block_size = f.tell()
# before we print, we need to do a pass for a few things:
# - find the actual mroot
# - find the total weight
mweight = 0
rweight = 0
mroot = Rbyd.fetch(f, block_size, mroots)
while True:
# corrupted?
if not mroot:
break
rweight = max(rweight, mroot.weight)
# fetch the next mroot
done, rid, tag, w, j, d, data, _ = mroot.lookup(-1, TAG_MROOT)
if not (not done and rid == -1 and tag == TAG_MROOT):
break
blocks = frommdir(data)
mroot = Rbyd.fetch(f, block_size, blocks)
# fetch the mdir, if there is one
mdir = None
done, rid, tag, w, j, _, data, _ = mroot.lookup(-1, TAG_MDIR)
if not done and rid == -1 and tag == TAG_MDIR:
blocks = frommdir(data)
mdir = Rbyd.fetch(f, block_size, blocks)
# corrupted?
if mdir:
rweight = max(rweight, mdir.weight)
# fetch the actual mtree, if there is one
mtree = None
done, rid, tag, w, j, d, data, _ = mroot.lookup(-1, TAG_BTREE)
if not done and rid == -1 and tag == TAG_BTREE:
w, trunk, block, crc = frombtree(data)
mtree = Rbyd.fetch(f, block_size, block, trunk)
mweight = w
# traverse entries
mid = -1
while True:
done, mid, w, rbyd, rid, tags, path = mtree.btree_lookup(
f, block_size, mid+1, depth=args.get('depth'))
if done:
break
# corrupted?
if not rbyd:
continue
mdir__ = None
if not args.get('depth') or len(path) < args.get('depth'):
mdir__ = next(((tag, j, d, data)
for tag, j, d, data in tags
if tag == TAG_MDIR),
None)
if mdir__:
# fetch the mdir
_, _, _, data = mdir__
blocks = frommdir(data)
mdir_ = Rbyd.fetch(f, block_size, blocks)
# corrupted?
if mdir_:
rweight = max(rweight, mdir_.weight)
# precompute rbyd-tree if requested
t_width = 0
if args.get('tree'):
# compute mroot chain "tree", prefix our actual mtree with this
tree = []
d_ = 0
mroot_ = Rbyd.fetch(f, block_size, mroots)
for d in it.count():
# corrupted?
if not mroot_:
break
# compute the mroots rbyd-tree
rtree, rdepth = mroot_.tree()
# connect branch to our root
if d > 0:
root = min(rtree,
key=lambda branch: branch['d'],
default=None)
if root:
r_rid, r_tag = root['a']
else:
_, r_rid, r_tag, _, _, _, _, _ = mroot_.lookup(-1, 0x10)
tree.append({
'a': (-1, d-1, 0, -1, TAG_MROOT),
'b': (-1, d, 0, r_rid, r_tag),
'd': d_-1,
'c': 'b',
})
# map the tree into our metadata space
for branch in rtree:
a_rid, a_tag = branch['a']
b_rid, b_tag = branch['b']
tree.append({
'a': (-1, d, 0, a_rid, a_tag),
'b': (-1, d, 0, b_rid, b_tag),
'd': d_ + branch['d'],
'c': branch['c'],
})
d_ += rdepth
# fetch the next mroot
done, rid, tag, w, j, _, data, _ = mroot_.lookup(-1, TAG_MROOT)
if not (not done and rid == -1 and tag == TAG_MROOT):
break
blocks = frommdir(data)
mroot_ = Rbyd.fetch(f, block_size, blocks)
# compute mdir's rbyd-tree if there is one
if mdir:
rtree, rdepth = mdir.tree()
# connect branch to our root
root = min(rtree,
key=lambda branch: branch['d'],
default=None)
if root:
r_rid, r_tag = root['a']
else:
_, r_rid, r_tag, _, _, _, _, _ = mdir.lookup(-1, 0x10)
tree.append({
'a': (-1, d, 0, -1, TAG_MDIR),
'b': (0, 0, 0, r_rid, r_tag),
'd': d_-1,
'c': 'b',
})
# map the tree into our metadata space
for branch in rtree:
a_rid, a_tag = branch['a']
b_rid, b_tag = branch['b']
tree.append({
'a': (0, 0, 0, a_rid, a_tag),
'b': (0, 0, 0, b_rid, b_tag),
'd': d_ + branch['d'],
'c': branch['c'],
})
# compute the mtree's rbyd-tree if there is one
if mtree:
tree_, tdepth = mtree.btree_tree(
f, block_size,
depth=args.get('depth'),
inner=args.get('inner'))
# connect a branch to the root of the tree
root = min(tree_, key=lambda branch: branch['d'], default=None)
if root:
r_bid, r_bd, r_rid, r_tag = root['a']
tree.append({
'a': (-1, d, 0, -1, TAG_BTREE),
'b': (r_bid, r_bd, r_rid, 0, r_tag),
'd': d_-1,
'c': 'b',
})
# map the tree into our metadata space
for branch in tree_:
a_bid, a_bd, a_rid, a_tag = branch['a']
b_bid, b_bd, b_rid, b_tag = branch['b']
tree.append({
'a': (a_bid, a_bd, a_rid, 0, a_tag),
'b': (b_bid, b_bd, b_rid, 0, b_tag),
'd': d_ + branch['d'],
'c': branch['c'],
})
# find the max depth of each mdir to nicely align trees
# TODO memoize
mdepth = 0
mid = -1
while True:
done, mid, w, rbyd, rid, tags, path = mtree.btree_lookup(
f, block_size, mid+1, depth=args.get('depth'))
if done:
break
# corrupted?
if not rbyd:
continue
mdir__ = None
if not args.get('depth') or len(path) < args.get('depth'):
mdir__ = next(((tag, j, d, data)
for tag, j, d, data in tags
if tag == TAG_MDIR),
None)
if mdir__:
# fetch the mdir
_, _, _, data = mdir__
blocks = frommdir(data)
mdir_ = Rbyd.fetch(f, block_size, blocks)
rtree, rdepth = mdir_.tree()
mdepth = max(mdepth, rdepth)
# compute the rbyd-tree for each mdir
mid = -1
while True:
done, mid, w, rbyd, rid, tags, path = mtree.btree_lookup(
f, block_size, mid+1, depth=args.get('depth'))
if done:
break
# corrupted?
if not rbyd:
continue
mdir__ = None
if not args.get('depth') or len(path) < args.get('depth'):
mdir__ = next(((tag, j, d, data)
for tag, j, d, data in tags
if tag == TAG_MDIR),
None)
if mdir__:
# fetch the mdir
_, _, _, data = mdir__
blocks = frommdir(data)
mdir_ = Rbyd.fetch(f, block_size, blocks)
rtree, rdepth = mdir_.tree()
# connect the root to the mtree
branch = max(
(branch for branch in tree
if branch['b'][0] == mid-(w-1)),
key=lambda branch: branch['d'],
default=None)
if branch:
root = min(rtree,
key=lambda branch: branch['d'],
default=None)
if root:
r_rid, r_tag = root['a']
else:
_, r_rid, r_tag, _, _, _, _, _ = (
mdir_.lookup(-1, 0x10))
tree.append({
'a': branch['b'],
'b': (mid-(w-1), len(path), 0, r_rid, r_tag),
'd': d_ + tdepth,
'c': 'b',
})
# map the tree into our metadata space
for branch in rtree:
a_rid, a_tag = branch['a']
b_rid, b_tag = branch['b']
tree.append({
'a': (mid-(w-1), len(path), 0, a_rid, a_tag),
'b': (mid-(w-1), len(path), 0, b_rid, b_tag),
'd': (d_ + tdepth + 1
+ branch['d'] + mdepth-rdepth),
'c': branch['c'],
})
# remap branches to leaves if we aren't showing inner branches
if not args.get('inner'):
# step through each layer backwards
b_depth = max((branch['a'][1]+1 for branch in tree),
default=0)
# keep track of the original tree to find the original bids,
# unfortunately because we store the bids in the branches we
# overwrite these
tree_ = tree.copy()
for bd in reversed(range(b_depth-1)):
# find leaf-roots at this level
roots = {}
for branch_, branch in zip(tree_, tree):
bid = branch['b'][0] - branch['b'][2]
# choose the highest node as the root
if (branch_['b'][1] == b_depth-1
and (bid not in roots
or branch_['d'] < roots[bid]['d'])):
roots[bid] = branch_
# remap branches to leaf-roots
tree__ = []
for branch_ in tree_:
# note we ignore mroot branches, we don't collapse
# normally these
if (branch_['a'][0] != -1
and branch_['a'][1] == bd
and branch_['a'][0] in roots):
branch_ = {
'a': roots[branch_['a'][0]]['b'],
'b': branch_['b'],
'd': branch_['d'],
'c': branch_['c'],
}
if (branch_['b'][0] != -1
and branch_['b'][1] == bd
and branch_['b'][0] in roots):
branch_ = {
'a': branch_['a'],
'b': roots[branch_['b'][0]]['b'],
'd': branch_['d'],
'c': branch_['c'],
}
tree__.append(branch_)
tree_ = tree__
tree = tree_
# precompute B-tree if requested
elif args.get('btree'):
# compute mroot chain "tree", prefix our actual mtree with this
tree = []
mroot_ = Rbyd.fetch(f, block_size, mroots)
for d in it.count():
# corrupted?
if not mroot_:
break
# connect branch to our first tag
if d > 0:
done, rid, tag, w, j, _, data, _ = mroot_.lookup(-1, 0x10)
if not done:
tree.append({
'a': (-1, d-1, 0, -1, TAG_MROOT),
'b': (-1, d, 0, rid, tag),
'd': 0,
'c': 'b',
})
# fetch the next mroot
done, rid, tag, w, j, _, data, _ = mroot_.lookup(-1, TAG_MROOT)
if not (not done and rid == -1 and tag == TAG_MROOT):
break
blocks = frommdir(data)
mroot_ = Rbyd.fetch(f, block_size, blocks)
# create a branch to our mdir if there is one
if mdir:
# connect branch to our first tag
done, rid, tag, w, j, _, data, _ = mdir.lookup(-1, 0x10)
if not done:
tree.append({
'a': (-1, d, 0, -1, TAG_MDIR),
'b': (0, 0, 0, rid, tag),
'd': 0,
'c': 'b',
})
# compute the mtree's B-tree if there is one
if mtree:
tree_, tdepth = mtree.btree_btree(
f, block_size,
depth=args.get('depth'),
inner=args.get('inner'))
# connect a branch to the root of the tree
root = min(tree_, key=lambda branch: branch['d'], default=None)
if root:
r_bid, r_bd, r_rid, r_tag = root['a']
tree.append({
'a': (-1, d, 0, -1, TAG_BTREE),
'b': (r_bid, r_bd, r_rid, 0, r_tag),
'd': 0,
'c': 'b',
})
# map the tree into our metadata space
for branch in tree_:
a_bid, a_bd, a_rid, a_tag = branch['a']
b_bid, b_bd, b_rid, b_tag = branch['b']
tree.append({
'a': (a_bid, a_bd, a_rid, 0, a_tag),
'b': (b_bid, b_bd, b_rid, 0, b_tag),
'd': 1 + branch['d'],
'c': branch['c'],
})
# remap branches to leaves if we aren't showing inner branches
if not args.get('inner'):
mid = -1
while True:
done, mid, w, rbyd, rid, tags, path = (
mtree.btree_lookup(
f, block_size, mid+1, depth=args.get('depth')))
if done:
break
# corrupted?
if not rbyd:
continue
mdir__ = None
if (not args.get('depth')
or len(path) < args.get('depth')):
mdir__ = next(((tag, j, d, data)
for tag, j, d, data in tags
if tag == TAG_MDIR),
None)
if mdir__:
# fetch the mdir
_, _, _, data = mdir__
blocks = frommdir(data)
mdir_ = Rbyd.fetch(f, block_size, blocks)
# find the first entry in the mdir, map branches
# to this entry
done, rid, tag, _, j, d, data, _ = (
mdir_.lookup(-1, 0x10))
tree_ = []
for branch in tree:
if branch['a'][0] == mid-(w-1):
a_bid, a_bd, _, _, _ = branch['a']
branch = {
'a': (a_bid, a_bd+1, 0, rid, tag),
'b': branch['b'],
'd': branch['d'],
'c': branch['c'],
}
if branch['b'][0] == mid-(w-1):
b_bid, b_bd, _, _, _ = branch['b']
branch = {
'a': branch['a'],
'b': (b_bid, b_bd+1, 0, rid, tag),
'd': branch['d'],
'c': branch['c'],
}
tree_.append(branch)
tree = tree_
# common tree renderer
if args.get('tree') or args.get('btree'):
# find the max depth from the tree
t_depth = max((branch['d']+1 for branch in tree), default=0)
if t_depth > 0:
t_width = 2*t_depth + 2
def treerepr(mid, w, md, mrid, rid, tag):
if t_depth == 0:
return ''
def branchrepr(x, d, was):
for branch in tree:
if branch['d'] == d and branch['b'] == x:
if any(branch['d'] == d and branch['a'] == x
for branch in tree):
return '+-', branch['c'], branch['c']
elif any(branch['d'] == d
and x > min(branch['a'], branch['b'])
and x < max(branch['a'], branch['b'])
for branch in tree):
return '|-', branch['c'], branch['c']
elif branch['a'] < branch['b']:
return '\'-', branch['c'], branch['c']
else:
return '.-', branch['c'], branch['c']
for branch in tree:
if branch['d'] == d and branch['a'] == x:
return '+ ', branch['c'], None
for branch in tree:
if (branch['d'] == d
and x > min(branch['a'], branch['b'])
and x < max(branch['a'], branch['b'])):
return '| ', branch['c'], was
if was:
return '--', was, was
return ' ', None, None
trunk = []
was = None
for d in range(t_depth):
t, c, was = branchrepr(
(mid-max(w-1, 0), md, mrid-max(w-1, 0), rid, tag),
d, was)
trunk.append('%s%s%s%s' % (
'\x1b[33m' if color and c == 'y'
else '\x1b[31m' if color and c == 'r'
else '\x1b[90m' if color and c == 'b'
else '',
t,
('>' if was else ' ') if d == t_depth-1 else '',
'\x1b[m' if color and c else ''))
return '%s ' % ''.join(trunk)
def dbg_mdir(mdir, mid, md):
for i, (rid, tag, w, j, d, data) in enumerate(mdir):
# show human-readable tag representation
print('%12s %s%-57s' % (
'{%s}:' % ','.join('%04x' % block
for block in it.chain([mdir.block],
mdir.other_blocks))
if i == 0 else '',
treerepr(mid, 1, md, 0, rid, tag)
if args.get('tree') or args.get('btree') else '',
'%*s %-22s%s' % (
w_width, '%d.%d-%d' % (mid, rid-(w-1), rid)
if w > 1 else '%d.%d' % (mid, rid)
if w > 0 else '%d' % mid
if i == 0 else '',
tagrepr(tag, w, len(data), j),
' %s' % next(xxd(data, 8), '')
if not args.get('no_truncate') else '')))
# show in-device representation
if args.get('device'):
print('%11s %*s %s' % (
'',
w_width, '',
'%-22s%s' % (
'%04x %08x %07x' % (tag, w, len(data)),
' %s' % ' '.join(
'%08x' % fromle32(
mdir.data[j+d+i*4
: j+d+min(i*4+4,len(data))])
for i in range(
min(m.ceil(len(data)/4),
3)))[:23]
if not args.get('no_truncate')
and not tag & 0x8 else '')))
# show on-disk encoding of tags
if args.get('raw'):
for o, line in enumerate(xxd(mdir.data[j:j+d])):
print('%11s: %*s %s' % (
'%04x' % (j + o*16),
w_width, '',
line))
if args.get('raw') or args.get('no_truncate'):
if not tag & 0x8:
for o, line in enumerate(xxd(data)):
print('%11s: %*s %s' % (
'%04x' % (j+d + o*16),
w_width, '',
line))
# prbyd here means the last rendered rbyd, we update
# in dbg_branch to always print interleaved addresses
prbyd = None
def dbg_branch(bid, w, rbyd, rid, tags, bd):
nonlocal prbyd
# show human-readable representation
for i, (tag, j, d, data) in enumerate(tags):
print('%12s %s%*s %-22s %s' % (
'%04x.%04x:' % (rbyd.block, rbyd.trunk)
if prbyd is None or rbyd != prbyd
else '',
treerepr(bid, w, bd, rid, 0, tag)
if args.get('tree') or args.get('btree') else '',
w_width, '' if i != 0
else '%d-%d' % (bid-(w-1), bid) if w > 1
else bid if w > 0
else '',
tagrepr(tag, w if i == 0 else 0, len(data), None),
# note we render names a bit different here
''.join(
b if b >= ' ' and b <= '~' else '.'
for b in map(chr, data))
if tag & 0xf00f == TAG_NAME
else next(xxd(data, 8), '')
if not args.get('no_truncate')
else ''))
prbyd = rbyd
# show in-device representation
if args.get('device'):
for i, (tag, j, d, data) in enumerate(tags):
print('%11s %*s%*s %-22s%s' % (
'',
t_width, '',
w_width, '',
'%04x %08x %07x' % (tag, w if i == 0 else 0, len(data)),
' %s' % ' '.join(
'%08x' % fromle32(
rbyd.data[j+d+i*4 : j+d + min(i*4+4,len(data))])
for i in range(min(m.ceil(len(data)/4), 3)))[:23]))
# show on-disk encoding of tags/data
for i, (tag, j, d, data) in enumerate(tags):
if args.get('raw'):
for o, line in enumerate(xxd(rbyd.data[j:j+d])):
print('%11s: %*s%*s %s' % (
'%04x' % (j + o*16),
t_width, '',
w_width, '',
line))
# note we don't render name tags with no_truncate
if args.get('raw') or (
args.get('no_truncate') and tag & 0xf00f != TAG_NAME):
for o, line in enumerate(xxd(data)):
print('%11s: %*s%*s %s' % (
'%04x' % (j+d + o*16),
t_width, '',
w_width, '',
line))
# print the actual mroot
print('mroot %s, rev %d, weight %d' % (
mroot.addr(), mroot.rev, mroot.weight))
# print header
w_width = (m.ceil(m.log10(max(1, mweight)+1))
+ 2*m.ceil(m.log10(max(1, rweight)+1))
+ 2)
print('%-11s %*s%-*s %-22s %s' % (
'mdir',
t_width, '',
w_width, 'ids',
'tag',
'data (truncated)'
if not args.get('no_truncate') else ''))
# show each mroot
prbyd = None
ppath = []
corrupted = False
mroot = Rbyd.fetch(f, block_size, mroots)
for d in it.count():
# corrupted?
if not mroot:
print('{%s}: %s%s%s' % (
','.join('%04x' % block
for block in it.chain([mroot.block],
mroot.other_blocks)),
'\x1b[31m' if color else '',
'(corrupted mroot %s)' % mroot.addr(),
'\x1b[m' if color else ''))
corrupted = True
break
else:
# show the mdir
dbg_mdir(mroot, -1, d)
# fetch the next mroot
done, rid, tag, w, j, _, data, _ = mroot.lookup(-1, TAG_MROOT)
if not (not done and rid == -1 and tag == TAG_MROOT):
break
blocks = frommdir(data)
mroot = Rbyd.fetch(f, block_size, blocks)
# show the mdir, if there is one
done, rid, tag, w, j, _, data, _ = mroot.lookup(-1, TAG_MDIR)
if not done and rid == -1 and tag == TAG_MDIR:
blocks = frommdir(data)
mdir = Rbyd.fetch(f, block_size, blocks)
# corrupted?
if not mdir:
print('{%s}: %s%s%s' % (
','.join('%04x' % block
for block in it.chain([mdir.block],
mdir.other_blocks)),
'\x1b[31m' if color else '',
'(corrupted mdir %s)' % mdir.addr(),
'\x1b[m' if color else ''))
corrupted = True
else:
# show the mdir
dbg_mdir(mdir, 0, 0)
# fetch the actual mtree, if there is one
done, rid, tag, w, j, d, data, _ = mroot.lookup(-1, TAG_BTREE)
if not done and rid == -1 and tag == TAG_BTREE:
w, trunk, block, crc = frombtree(data)
mtree = Rbyd.fetch(f, block_size, block, trunk)
# traverse entries
mid = -1
while True:
done, mid, w, rbyd, rid, tags, path = mtree.btree_lookup(
f, block_size, mid+1, depth=args.get('depth'))
if done:
break
# print inner btree entries if requested
if args.get('inner'):
changed = False
for (x, px) in it.zip_longest(
enumerate(path[:-1]),
enumerate(ppath[:-1])):
if x is None:
break
if not (changed or px is None or x != px):
continue
changed = True
# show the inner entry
d, (mid_, w_, rbyd_, rid_, tags_) = x
dbg_branch(mid_, w_, rbyd_, rid_, tags_, d)
ppath = path
# corrupted? try to keep printing the tree
if not rbyd:
print('%11s: %*s%s%s%s' % (
'%04x.%04x' % (rbyd.block, rbyd.trunk),
t_width, '',
'\x1b[31m' if color else '',
'(corrupted rbyd %s)' % rbyd.addr(),
'\x1b[m' if color else ''))
prbyd = rbyd
corrupted = True
continue
# if we're not showing inner nodes, prefer names higher in the
# tree since this avoids showing vestigial names
if not args.get('inner'):
name = None
for mid_, w_, rbyd_, rid_, tags_ in reversed(path):
for tag_, j_, d_, data_ in tags_:
if tag_ & 0xf00f == TAG_NAME:
name = (tag_, j_, d_, data_)
if rid_-(w_-1) != 0:
break
if name is not None:
tags = [name] + [(tag, j, d, data)
for tag, j, d, data in tags
if tag & 0xf00f != TAG_NAME]
# find mdir in the tags
mdir__ = None
if not args.get('depth') or len(path) < args.get('depth'):
mdir__ = next(((tag, j, d, data)
for tag, j, d, data in tags
if tag == TAG_MDIR),
None)
# print btree entries in certain cases
if args.get('inner') or not mdir__:
dbg_branch(mid, w, rbyd, rid, tags, len(path)-1)
if not mdir__:
continue
# fetch the mdir
_, _, _, data = mdir__
blocks = frommdir(data)
mdir_ = Rbyd.fetch(f, block_size, blocks)
# corrupted?
if not mdir_:
print('{%s}: %*s%s%s%s' % (
','.join('%04x' % block
for block in it.chain([mdir_.block],
mdir_.other_blocks)),
t_width, '',
'\x1b[31m' if color else '',
'(corrupted mdir %s)' % mdir_.addr(),
'\x1b[m' if color else ''))
corrupted = True
else:
# show the mdir
dbg_mdir(mdir_, mid, len(path))
# force next btree entry to be shown
prbyd = None
if args.get('error_on_corrupt') and corrupted:
sys.exit(2)
if __name__ == "__main__":
import argparse
import sys
parser = argparse.ArgumentParser(
description="Debug littlefs's metadata tree.",
allow_abbrev=False)
parser.add_argument(
'disk',
help="File containing the block device.")
parser.add_argument(
'mroots',
nargs='*',
type=rbydaddr,
help="Block address of the mroots. Defaults to 0x{0,1}.")
parser.add_argument(
'-B', '--block-size',
type=lambda x: int(x, 0),
help="Block size in bytes.")
parser.add_argument(
'--color',
choices=['never', 'always', 'auto'],
default='auto',
help="When to use terminal colors. Defaults to 'auto'.")
parser.add_argument(
'-r', '--raw',
action='store_true',
help="Show the raw data including tag encodings.")
parser.add_argument(
'-x', '--device',
action='store_true',
help="Show the device-side representation of tags.")
parser.add_argument(
'-T', '--no-truncate',
action='store_true',
help="Don't truncate, show the full contents.")
parser.add_argument(
'-i', '--inner',
action='store_true',
help="Show inner branches.")
parser.add_argument(
'-t', '--tree',
action='store_true',
help="Show the underlying rbyd trees.")
parser.add_argument(
'-b', '--btree',
action='store_true',
help="Show the underlying B-tree.")
parser.add_argument(
'-Z', '--depth',
type=lambda x: int(x, 0),
help="Depth of tree to show.")
parser.add_argument(
'-e', '--error-on-corrupt',
action='store_true',
help="Error if B-tree is corrupt.")
sys.exit(main(**{k: v
for k, v in vars(parser.parse_intermixed_args()).items()
if v is not None}))