#!/usr/bin/env python3 # # Inspired by d3 and brendangregg's flamegraph svg: # - https://d3js.org # - https://github.com/brendangregg/FlameGraph # # prevent local imports if __name__ == "__main__": __import__('sys').path.pop(0) import bisect import collections as co import fnmatch import functools as ft import itertools as it import json import math as mt import os import re import shlex import struct try: import crc32c as crc32c_lib except ModuleNotFoundError: crc32c_lib = None TAG_NULL = 0x0000 ## 0x0000 v--- ---- ---- ---- TAG_CONFIG = 0x0000 ## 0x00tt v--- ---- -ttt tttt TAG_MAGIC = 0x0031 # 0x003r v--- ---- --11 --rr TAG_VERSION = 0x0034 # 0x0034 v--- ---- --11 -1-- TAG_RCOMPAT = 0x0035 # 0x0035 v--- ---- --11 -1-1 TAG_WCOMPAT = 0x0036 # 0x0036 v--- ---- --11 -11- TAG_OCOMPAT = 0x0037 # 0x0037 v--- ---- --11 -111 TAG_GEOMETRY = 0x0038 # 0x0038 v--- ---- --11 1--- TAG_NAMELIMIT = 0x0039 # 0x0039 v--- ---- --11 1--1 TAG_FILELIMIT = 0x003a # 0x003a v--- ---- --11 1-1- TAG_GDELTA = 0x0100 ## 0x01tt v--- ---1 -ttt ttrr TAG_GRMDELTA = 0x0100 # 0x0100 v--- ---1 ---- ---- TAG_NAME = 0x0200 ## 0x02tt v--- --1- -ttt tttt TAG_BNAME = 0x0200 # 0x0200 v--- --1- ---- ---- TAG_REG = 0x0201 # 0x0201 v--- --1- ---- ---1 TAG_DIR = 0x0202 # 0x0202 v--- --1- ---- --1- TAG_STICKYNOTE = 0x0203 # 0x0203 v--- --1- ---- --11 TAG_BOOKMARK = 0x0204 # 0x0204 v--- --1- ---- -1-- TAG_MNAME = 0x0220 # 0x0220 v--- --1- --1- ---- TAG_STRUCT = 0x0300 ## 0x03tt v--- --11 -ttt ttrr TAG_BRANCH = 0x0300 # 0x030r v--- --11 ---- --rr TAG_DATA = 0x0304 # 0x0304 v--- --11 ---- -1-- TAG_BLOCK = 0x0308 # 0x0308 v--- --11 ---- 1err TAG_DID = 0x0314 # 0x0314 v--- --11 ---1 -1-- TAG_BSHRUB = 0x0318 # 0x0318 v--- --11 ---1 1--- TAG_BTREE = 0x031c # 0x031c v--- --11 ---1 11rr TAG_MROOT = 0x0321 # 0x032r v--- --11 --1- --rr TAG_MDIR = 0x0325 # 0x0324 v--- --11 --1- -1rr TAG_MTREE = 0x032c # 0x032c v--- --11 --1- 11rr TAG_ATTR = 0x0400 ## 0x04aa v--- -1-a -aaa aaaa TAG_UATTR = 0x0400 # 0x04aa v--- -1-- -aaa aaaa TAG_SATTR = 0x0500 # 0x05aa v--- -1-1 -aaa aaaa TAG_SHRUB = 0x1000 ## 0x1kkk v--1 kkkk -kkk kkkk TAG_ALT = 0x4000 ## 0x4kkk v1cd kkkk -kkk kkkk TAG_B = 0x0000 TAG_R = 0x2000 TAG_LE = 0x0000 TAG_GT = 0x1000 TAG_CKSUM = 0x3000 ## 0x300p v-11 ---- ---- -pqq TAG_PHASE = 0x0003 TAG_PERTURB = 0x0004 TAG_NOTE = 0x3100 ## 0x3100 v-11 ---1 ---- ---- TAG_ECKSUM = 0x3200 ## 0x3200 v-11 --1- ---- ---- TAG_GCKSUMDELTA = 0x3300 ## 0x3300 v-11 --11 ---- ---- # assign colors to specific filesystem objects # # some nicer colors borrowed from Seaborn # note these include a non-opaque alpha # # COLORS = [ # '#7995c4', # was '#4c72b0bf', # blue # '#e6a37d', # was '#dd8452bf', # orange # '#80be8e', # was '#55a868bf', # green # '#d37a7d', # was '#c44e52bf', # red # '#a195c6', # was '#8172b3bf', # purple # '#ae9a88', # was '#937860bf', # brown # '#e3a8d2', # was '#da8bc3bf', # pink # '#a9a9a9', # was '#8c8c8cbf', # gray # '#d9cb97', # was '#ccb974bf', # yellow # '#8bc8da', # was '#64b5cdbf', # cyan # ] # COLORS_DARK = [ # '#7997b7', # was '#a1c9f4bf', # blue # '#bf8761', # was '#ffb482bf', # orange # '#6aac79', # was '#8de5a1bf', # green # '#bf7774', # was '#ff9f9bbf', # red # '#9c8cbf', # was '#d0bbffbf', # purple # '#a68c74', # was '#debb9bbf', # brown # '#bb84ab', # was '#fab0e4bf', # pink # '#9b9b9b', # was '#cfcfcfbf', # gray # '#bfbe7a', # was '#fffea3bf', # yellow # '#8bb5b4', # was '#b9f2f0bf', # cyan # ] # COLORS = { 'mdir': '#d9cb97', # was '#ccb974bf', # yellow 'btree': '#7995c4', # was '#4c72b0bf', # blue 'data': '#80be8e', # was '#55a868bf', # green 'corrupt': '#d37a7d', # was '#c44e52bf', # red 'conflict': '#d37a7d', # was '#c44e52bf', # red 'unused': '#e5e5e5', # light gray } COLORS_DARK = { 'mdir': '#bfbe7a', # was '#fffea3bf', # yellow 'btree': '#7997b7', # was '#a1c9f4bf', # blue 'data': '#6aac79', # was '#8de5a1bf', # green 'corrupt': '#bf7774', # was '#ff9f9bbf', # red 'conflict': '#bf7774', # was '#ff9f9bbf', # red 'unused': '#333333', # dark gray } WIDTH = 750 HEIGHT = 350 FONT = ['sans-serif'] FONT_SIZE = 10 # open with '-' for stdin/stdout def openio(path, mode='r', buffering=-1): import os if path == '-': if 'r' in mode: return os.fdopen(os.dup(sys.stdin.fileno()), mode, buffering) else: return os.fdopen(os.dup(sys.stdout.fileno()), mode, buffering) else: return open(path, mode, buffering) # some ways of block geometry representations # 512 -> 512 # 512x16 -> (512, 16) # 0x200x10 -> (512, 16) def bdgeom(s): s = s.strip() b = 10 if s.startswith('0x') or s.startswith('0X'): s = s[2:] b = 16 elif s.startswith('0o') or s.startswith('0O'): s = s[2:] b = 8 elif s.startswith('0b') or s.startswith('0B'): s = s[2:] b = 2 if 'x' in s: s, s_ = s.split('x', 1) return (int(s, b), int(s_, b)) else: return int(s, b) # parse some rbyd addr encodings # 0xa -> (0xa,) # 0xa.c -> ((0xa, 0xc),) # 0x{a,b} -> (0xa, 0xb) # 0x{a,b}.c -> ((0xa, 0xc), (0xb, 0xc)) def rbydaddr(s): s = s.strip() b = 10 if s.startswith('0x') or s.startswith('0X'): s = s[2:] b = 16 elif s.startswith('0o') or s.startswith('0O'): s = s[2:] b = 8 elif s.startswith('0b') or s.startswith('0B'): s = s[2:] b = 2 trunk = None if '.' in s: s, s_ = s.split('.', 1) trunk = int(s_, b) if s.startswith('{') and '}' in s: ss = s[1:s.find('}')].split(',') else: ss = [s] addr = [] for s in ss: if trunk is not None: addr.append((int(s, b), trunk)) else: addr.append(int(s, b)) return tuple(addr) def crc32c(data, crc=0): if crc32c_lib is not None: return crc32c_lib.crc32c(data, crc) else: crc ^= 0xffffffff for b in data: crc ^= b for j in range(8): crc = (crc >> 1) ^ ((crc & 1) * 0x82f63b78) return 0xffffffff ^ crc def pmul(a, b): r = 0 while b: if b & 1: r ^= a a <<= 1 b >>= 1 return r def crc32cmul(a, b): r = pmul(a, b) for _ in range(31): r = (r >> 1) ^ ((r & 1) * 0x82f63b78) return r def crc32ccube(a): return crc32cmul(crc32cmul(a, a), a) def popc(x): return bin(x).count('1') def parity(x): return popc(x) & 1 def fromle32(data, j=0): return struct.unpack('H', data[j:j+2].ljust(2, b'\0'))[0]; d += 2 weight, d_ = fromleb128(data, j+d); d += d_ size, d_ = fromleb128(data, j+d); d += d_ return tag>>15, tag&0x7fff, weight, size, d def frombranch(data, j=0): d = 0 block, d_ = fromleb128(data, j+d); d += d_ trunk, d_ = fromleb128(data, j+d); d += d_ cksum = fromle32(data, j+d); d += 4 return block, trunk, cksum, d def frombtree(data, j=0): d = 0 w, d_ = fromleb128(data, j+d); d += d_ block, trunk, cksum, d_ = frombranch(data, j+d); d += d_ return w, block, trunk, cksum, d def frommdir(data, j=0): blocks = [] d = 0 while j+d < len(data): block, d_ = fromleb128(data, j+d) blocks.append(block) d += d_ return tuple(blocks), d def fromshrub(data, j=0): d = 0 weight, d_ = fromleb128(data, j+d); d += d_ trunk, d_ = fromleb128(data, j+d); d += d_ return weight, trunk, d def frombptr(data, j=0): d = 0 size, d_ = fromleb128(data, j+d); d += d_ block, d_ = fromleb128(data, j+d); d += d_ off, d_ = fromleb128(data, j+d); d += d_ cksize, d_ = fromleb128(data, j+d); d += d_ cksum = fromle32(data, j+d); d += 4 return size, block, off, cksize, cksum, d def xxd(data, width=16): for i in range(0, len(data), width): yield '%-*s %-*s' % ( 3*width, ' '.join('%02x' % b for b in data[i:i+width]), width, ''.join( b if b >= ' ' and b <= '~' else '.' for b in map(chr, data[i:i+width]))) # human readable tag repr def tagrepr(tag, weight=None, size=None, *, global_=False, toff=None): # null tags if (tag & 0x6fff) == TAG_NULL: return '%snull%s%s' % ( 'shrub' if tag & TAG_SHRUB else '', ' w%d' % weight if weight else '', ' %d' % size if size else '') # config tags elif (tag & 0x6f00) == TAG_CONFIG: return '%s%s%s%s' % ( 'shrub' if tag & TAG_SHRUB else '', 'magic' if (tag & 0xfff) == TAG_MAGIC else 'version' if (tag & 0xfff) == TAG_VERSION else 'rcompat' if (tag & 0xfff) == TAG_RCOMPAT else 'wcompat' if (tag & 0xfff) == TAG_WCOMPAT else 'ocompat' if (tag & 0xfff) == TAG_OCOMPAT else 'geometry' if (tag & 0xfff) == TAG_GEOMETRY else 'namelimit' if (tag & 0xfff) == TAG_NAMELIMIT else 'filelimit' if (tag & 0xfff) == TAG_FILELIMIT else 'config 0x%02x' % (tag & 0xff), ' w%d' % weight if weight else '', ' %s' % size if size is not None else '') # global-state delta tags elif (tag & 0x6f00) == TAG_GDELTA: if global_: return '%s%s%s%s' % ( 'shrub' if tag & TAG_SHRUB else '', 'grm' if (tag & 0xfff) == TAG_GRMDELTA else 'gstate 0x%02x' % (tag & 0xff), ' w%d' % weight if weight else '', ' %s' % size if size is not None else '') else: return '%s%s%s%s' % ( 'shrub' if tag & TAG_SHRUB else '', 'grmdelta' if (tag & 0xfff) == TAG_GRMDELTA else 'gdelta 0x%02x' % (tag & 0xff), ' w%d' % weight if weight else '', ' %s' % size if size is not None else '') # name tags, includes file types elif (tag & 0x6f00) == TAG_NAME: return '%s%s%s%s' % ( 'shrub' if tag & TAG_SHRUB else '', 'bname' if (tag & 0xfff) == TAG_BNAME else 'reg' if (tag & 0xfff) == TAG_REG else 'dir' if (tag & 0xfff) == TAG_DIR else 'stickynote' if (tag & 0xfff) == TAG_STICKYNOTE else 'bookmark' if (tag & 0xfff) == TAG_BOOKMARK else 'mname' if (tag & 0xfff) == TAG_MNAME else 'name 0x%02x' % (tag & 0xff), ' w%d' % weight if weight else '', ' %s' % size if size is not None else '') # structure tags elif (tag & 0x6f00) == TAG_STRUCT: return '%s%s%s%s' % ( 'shrub' if tag & TAG_SHRUB else '', 'branch' if (tag & 0xfff) == TAG_BRANCH else 'data' if (tag & 0xfff) == TAG_DATA else 'block' if (tag & 0xfff) == TAG_BLOCK else 'did' if (tag & 0xfff) == TAG_DID else 'bshrub' if (tag & 0xfff) == TAG_BSHRUB else 'btree' if (tag & 0xfff) == TAG_BTREE else 'mroot' if (tag & 0xfff) == TAG_MROOT else 'mdir' if (tag & 0xfff) == TAG_MDIR else 'mtree' if (tag & 0xfff) == TAG_MTREE else 'struct 0x%02x' % (tag & 0xff), ' w%d' % weight if weight else '', ' %s' % size if size is not None else '') # custom attributes elif (tag & 0x6e00) == TAG_ATTR: return '%s%sattr 0x%02x%s%s' % ( 'shrub' if tag & TAG_SHRUB else '', 's' if tag & 0x100 else 'u', ((tag & 0x100) >> 1) + (tag & 0xff), ' w%d' % weight if weight else '', ' %s' % size if size is not None else '') # alt pointers elif tag & TAG_ALT: return 'alt%s%s 0x%03x%s%s' % ( 'r' if tag & TAG_R else 'b', 'gt' if tag & TAG_GT else 'le', tag & 0x0fff, ' w%d' % weight if weight is not None else '', ' 0x%x' % (0xffffffff & (toff-size)) if size and toff is not None else ' -%d' % size if size else '') # checksum tags elif (tag & 0x7f00) == TAG_CKSUM: return 'cksum%s%s%s%s%s' % ( 'q%d' % (tag & 0x3), 'p' if tag & TAG_PERTURB else '', ' 0x%02x' % (tag & 0xff) if tag & 0xf8 else '', ' w%d' % weight if weight else '', ' %s' % size if size is not None else '') # note tags elif (tag & 0x7f00) == TAG_NOTE: return 'note%s%s%s' % ( ' 0x%02x' % (tag & 0xff) if tag & 0xff else '', ' w%d' % weight if weight else '', ' %s' % size if size is not None else '') # erased-state checksum tags elif (tag & 0x7f00) == TAG_ECKSUM: return 'ecksum%s%s%s' % ( ' 0x%02x' % (tag & 0xff) if tag & 0xff else '', ' w%d' % weight if weight else '', ' %s' % size if size is not None else '') # global-checksum delta tags elif (tag & 0x7f00) == TAG_GCKSUMDELTA: if global_: return 'gcksum%s%s%s' % ( ' 0x%02x' % (tag & 0xff) if tag & 0xff else '', ' w%d' % weight if weight else '', ' %s' % size if size is not None else '') else: return 'gcksumdelta%s%s%s' % ( ' 0x%02x' % (tag & 0xff) if tag & 0xff else '', ' w%d' % weight if weight else '', ' %s' % size if size is not None else '') # unknown tags else: return '0x%04x%s%s' % ( tag, ' w%d' % weight if weight is not None else '', ' %d' % size if size is not None else '') # compute the difference between two paths, returning everything # in a after the paths diverge, as well as the relevant index def pathdelta(a, b): if not isinstance(a, list): a = list(a) i = 0 for a_, b_ in zip(a, b): try: if type(a_) == type(b_) and a_ == b_: i += 1 else: break # treat exceptions here as failure to match, most likely # the compared types are incompatible, it's the caller's # problem except Exception: break return [(i+j, a_) for j, a_ in enumerate(a[i:])] # a simple wrapper over an open file with bd geometry class Bd: def __init__(self, f, block_size=None, block_count=None): self.f = f self.block_size = block_size self.block_count = block_count def __repr__(self): return '<%s %s>' % (self.__class__.__name__, self.repr()) def repr(self): return 'bd %sx%s' % (self.block_size, self.block_count) def read(self, block, off, size): self.f.seek(block*self.block_size + off) return self.f.read(size) def readblock(self, block): self.f.seek(block*self.block_size) return self.f.read(self.block_size) # tagged data in an rbyd class Rattr: def __init__(self, tag, weight, blocks, toff, tdata, data): self.tag = tag self.weight = weight if isinstance(blocks, int): self.blocks = (blocks,) else: self.blocks = blocks self.toff = toff self.tdata = tdata self.data = data @property def block(self): return self.blocks[0] @property def tsize(self): return len(self.tdata) @property def off(self): return self.toff + len(self.tdata) @property def size(self): return len(self.data) def __bytes__(self): return self.data def __repr__(self): return '<%s %s>' % (self.__class__.__name__, self.repr()) def repr(self): return tagrepr(self.tag, self.weight, self.size) def __iter__(self): return iter((self.tag, self.weight, self.data)) def __eq__(self, other): return ((self.tag, self.weight, self.data) == (other.tag, other.weight, other.data)) def __ne__(self, other): return not self.__eq__(other) def __hash__(self): return hash((self.tag, self.weight, self.data)) # convenience for did/name access def _parse_name(self): # note we return a null name for non-name tags, this is so # vestigial names in btree nodes act as a catch-all if (self.tag & 0xff00) != TAG_NAME: did = 0 name = b'' else: did, d = fromleb128(self.data) name = self.data[d:] # cache both self.did = did self.name = name @ft.cached_property def did(self): self._parse_name() return self.did @ft.cached_property def name(self): self._parse_name() return self.name class Ralt: def __init__(self, tag, weight, blocks, toff, tdata, jump, color=None, followed=None): self.tag = tag self.weight = weight if isinstance(blocks, int): self.blocks = (blocks,) else: self.blocks = blocks self.toff = toff self.tdata = tdata self.jump = jump if color is not None: self.color = color else: self.color = 'r' if tag & TAG_R else 'b' self.followed = followed @property def block(self): return self.blocks[0] @property def tsize(self): return len(self.tdata) @property def off(self): return self.toff + len(self.tdata) @property def joff(self): return self.toff - self.jump def __repr__(self): return '<%s %s>' % (self.__class__.__name__, self.repr()) def repr(self): return tagrepr(self.tag, self.weight, self.jump, toff=self.toff) def __iter__(self): return iter((self.tag, self.weight, self.jump)) def __eq__(self, other): return ((self.tag, self.weight, self.jump) == (other.tag, other.weight, other.jump)) def __ne__(self, other): return not self.__eq__(other) def __hash__(self): return hash((self.tag, self.weight, self.jump)) # our core rbyd type class Rbyd: def __init__(self, blocks, trunk, weight, rev, eoff, cksum, data, *, shrub=False, gcksumdelta=None, redund=0): if isinstance(blocks, int): self.blocks = (blocks,) else: self.blocks = blocks self.trunk = trunk self.weight = weight self.rev = rev self.eoff = eoff self.cksum = cksum self.data = data self.shrub = shrub self.gcksumdelta = gcksumdelta self.redund = redund @property def block(self): return self.blocks[0] @property def corrupt(self): # use redund=-1 to indicate corrupt rbyds return self.redund >= 0 def addr(self): if len(self.blocks) == 1: return '0x%x.%x' % (self.block, self.trunk) else: return '0x{%s}.%x' % ( ','.join('%x' % block for block in self.blocks), self.trunk) def __repr__(self): return '<%s %s>' % (self.__class__.__name__, self.repr()) def repr(self): return 'rbyd %s w%s' % (self.addr(), self.weight) def __bool__(self): # use redund=-1 to indicate corrupt rbyds return self.redund >= 0 def __eq__(self, other): return ((frozenset(self.blocks), self.trunk) == (frozenset(other.blocks), other.trunk)) def __ne__(self, other): return not self.__eq__(other) def __hash__(self): return hash((frozenset(self.blocks), self.trunk)) @classmethod def _fetch(cls, data, block, trunk=None): # fetch the rbyd rev = fromle32(data, 0) cksum = 0 cksum_ = crc32c(data[0:4]) cksum__ = cksum_ perturb = False eoff = 0 eoff_ = None j_ = 4 trunk_ = 0 trunk__ = 0 trunk___ = 0 weight = 0 weight_ = 0 weight__ = 0 gcksumdelta = None gcksumdelta_ = None while j_ < len(data) and (not trunk or eoff <= trunk): # read next tag v, tag, w, size, d = fromtag(data, j_) if v != parity(cksum__): break cksum__ ^= 0x00000080 if v else 0 cksum__ = crc32c(data[j_:j_+d], cksum__) j_ += d if not tag & TAG_ALT and j_ + size > len(data): break # take care of cksums if not tag & TAG_ALT: if (tag & 0xff00) != TAG_CKSUM: cksum__ = crc32c(data[j_:j_+size], cksum__) # found a gcksumdelta? if (tag & 0xff00) == TAG_GCKSUMDELTA: gcksumdelta_ = Rattr(tag, w, block, j_-d, data[j_-d:j_], data[j_:j_+size]) # found a cksum? else: # check cksum cksum___ = fromle32(data, j_) if cksum__ != cksum___: break # commit what we have eoff = eoff_ if eoff_ else j_ + size cksum = cksum_ trunk_ = trunk__ weight = weight_ gcksumdelta = gcksumdelta_ gcksumdelta_ = None # update perturb bit perturb = bool(tag & TAG_PERTURB) # revert to data cksum and perturb cksum__ = cksum_ ^ (0xfca42daf if perturb else 0) # evaluate trunks if (tag & 0xf000) != TAG_CKSUM: if not (trunk and j_-d > trunk and not trunk___): # new trunk? if not trunk___: trunk___ = j_-d weight__ = 0 # keep track of weight weight__ += w # end of trunk? if not tag & TAG_ALT: # update trunk/weight unless we found a shrub or an # explicit trunk (which may be a shrub) is requested if not tag & TAG_SHRUB or trunk___ == trunk: trunk__ = trunk___ weight_ = weight__ # keep track of eoff for best matching trunk if trunk and j_ + size > trunk: eoff_ = j_ + size eoff = eoff_ cksum = cksum__ ^ ( 0xfca42daf if perturb else 0) trunk_ = trunk__ weight = weight_ gcksumdelta = gcksumdelta_ trunk___ = 0 # update canonical checksum, xoring out any perturb state cksum_ = cksum__ ^ (0xfca42daf if perturb else 0) if not tag & TAG_ALT: j_ += size return cls(block, trunk_, weight, rev, eoff, cksum, data, gcksumdelta=gcksumdelta, redund=0 if trunk_ else -1) @classmethod def fetch(cls, bd, blocks, trunk=None): # multiple blocks? if not isinstance(blocks, int): # fetch all blocks rbyds = [cls.fetch(bd, block, trunk) for block in blocks] # determine most recent revision/trunk rev, trunk = None, None for rbyd in rbyds: # compare with sequence arithmetic if rbyd and ( rev is None or not ((rbyd.rev - rev) & 0x80000000) or (rbyd.rev == rev and rbyd.trunk > trunk)): rev, trunk = rbyd.rev, rbyd.trunk # sort for reproducibility rbyds.sort(key=lambda rbyd: ( # prioritize valid redund blocks 0 if rbyd and rbyd.rev == rev and rbyd.trunk == trunk else 1, # default to sorting by block rbyd.block)) # choose an active rbyd rbyd = rbyds[0] # keep track of the other blocks rbyd.blocks = tuple(rbyd.block for rbyd in rbyds) # keep track of how many redund blocks are valid rbyd.redund = -1 + sum(1 for rbyd in rbyds if rbyd and rbyd.rev == rev and rbyd.trunk == trunk) # and patch the gcksumdelta if we have one if rbyd.gcksumdelta is not None: rbyd.gcksumdelta.blocks = rbyd.blocks return rbyd # seek/read the block block = blocks data = bd.readblock(block) # fetch the rbyd return cls._fetch(data, block, trunk) @classmethod def fetchck(cls, bd, blocks, trunk, weight, cksum): # try to fetch the rbyd normally rbyd = cls.fetch(bd, blocks, trunk) # cksum mismatch? trunk/weight mismatch? if (rbyd.cksum != cksum or rbyd.trunk != trunk or rbyd.weight != weight): # mark as corrupt and keep track of expected trunk/weight rbyd.redund = -1 rbyd.trunk = trunk rbyd.weight = weight return rbyd @classmethod def fetchshrub(cls, rbyd, trunk): # steal the original rbyd's data # # this helps avoid race conditions with cksums and stuff shrub = cls._fetch(rbyd.data, rbyd.block, trunk) shrub.blocks = rbyd.blocks shrub.shrub = True return shrub def lookupnext(self, rid, tag=None, *, path=False): if not self or rid >= self.weight: if path: return None, None, [] else: return None, None tag = max(tag or 0, 0x1) lower = 0 upper = self.weight path_ = [] # descend down tree j = self.trunk while True: _, alt, w, jump, d = fromtag(self.data, j) # found an alt? if alt & TAG_ALT: # follow? if ((rid, tag & 0xfff) > (upper-w-1, alt & 0xfff) if alt & TAG_GT else ((rid, tag & 0xfff) <= (lower+w-1, alt & 0xfff))): lower += upper-lower-w if alt & TAG_GT else 0 upper -= upper-lower-w if not alt & TAG_GT else 0 j = j - jump if path: # figure out which color if alt & TAG_R: _, nalt, _, _, _ = fromtag(self.data, j+jump+d) if nalt & TAG_R: color = 'y' else: color = 'r' else: color = 'b' path_.append(Ralt( alt, w, self.blocks, j+jump, self.data[j+jump:j+jump+d], jump, color=color, followed=True)) # stay on path else: lower += w if not alt & TAG_GT else 0 upper -= w if alt & TAG_GT else 0 j = j + d if path: # figure out which color if alt & TAG_R: _, nalt, _, _, _ = fromtag(self.data, j) if nalt & TAG_R: color = 'y' else: color = 'r' else: color = 'b' path_.append(Ralt( alt, w, self.blocks, j-d, self.data[j-d:j], jump, color=color, followed=False)) # found tag else: rid_ = upper-1 tag_ = alt w_ = upper-lower if not tag_ or (rid_, tag_) < (rid, tag): if path: return None, None, path_ else: return None, None rattr_ = Rattr(tag_, w_, self.blocks, j, self.data[j:j+d], self.data[j+d:j+d+jump]) if path: return rid_, rattr_, path_ else: return rid_, rattr_ def lookup(self, rid, tag=None, mask=None, *, path=False): if tag is None: tag, mask = 0, 0xffff if mask is None: mask = 0 r = self.lookupnext(rid, tag & ~mask, path=path) if path: rid_, rattr_, path_ = r else: rid_, rattr_ = r if (rid_ is None or rid_ != rid or (rattr_.tag & ~mask & 0xfff) != (tag & ~mask & 0xfff)): if path: return None, path_ else: return None if path: return rattr_, path_ else: return rattr_ def rids(self, *, path=False): rid = -1 while True: r = self.lookupnext(rid, path=path) if path: rid, name, path_ = r else: rid, name = r # found end of tree? if rid is None: break if path: yield rid, name, path_ else: yield rid, name rid += 1 def rattrs(self, rid=None, tag=None, mask=None, *, path=False): if rid is None: rid, tag = -1, 0 while True: r = self.lookupnext(rid, tag+0x1, path=path) if path: rid, rattr, path_ = r else: rid, rattr = r # found end of tree? if rid is None: break if path: yield rid, rattr, path_ else: yield rid, rattr tag = rattr.tag else: if tag is None: tag, mask = 0, 0xffff if mask is None: mask = 0 tag_ = max((tag & ~mask) - 1, 0) while True: r = self.lookupnext(rid, tag_+0x1, path=path) if path: rid_, rattr_, path_ = r else: rid_, rattr_ = r # found end of tree? if (rid_ is None or rid_ != rid or (rattr_.tag & ~mask & 0xfff) != (tag & ~mask & 0xfff)): break if path: yield rattr_, path_ else: yield rattr_ tag_ = rattr_.tag # lookup by name def namelookup(self, did, name): # binary search best = None, None lower = 0 upper = self.weight while lower < upper: rid, name_ = self.lookupnext( lower + (upper-1-lower)//2) if rid is None: break # bisect search space if (name_.did, name_.name) > (did, name): upper = rid-(name_.weight-1) elif (name_.did, name_.name) < (did, name): lower = rid + 1 # keep track of best match best = rid, name_ else: # found a match return rid, name_ return best # our rbyd btree type class Btree: def __init__(self, bd, rbyd): self.bd = bd self.rbyd = rbyd @property def block(self): return self.rbyd.block @property def blocks(self): return self.rbyd.blocks @property def trunk(self): return self.rbyd.trunk @property def weight(self): return self.rbyd.weight @property def rev(self): return self.rbyd.rev @property def cksum(self): return self.rbyd.cksum @property def shrub(self): return self.rbyd.shrub def addr(self): return self.rbyd.addr() def __repr__(self): return '<%s %s>' % (self.__class__.__name__, self.repr()) def repr(self): return 'btree %s w%s' % (self.addr(), self.weight) def __eq__(self, other): return self.rbyd == other.rbyd def __ne__(self, other): return not self.__eq__(other) def __hash__(self): return hash(self.rbyd) @classmethod def fetch(cls, bd, blocks, trunk=None): # rbyd fetch does most of the work here rbyd = Rbyd.fetch(bd, blocks, trunk) return cls(bd, rbyd) @classmethod def fetchck(cls, bd, blocks, trunk, weight, cksum): # rbyd fetchck does most of the work here rbyd = Rbyd.fetchck(bd, blocks, trunk, weight, cksum) return cls(bd, rbyd) @classmethod def fetchshrub(cls, bd, rbyd, trunk): shrub = Rbyd.fetchshrub(rbyd, trunk) return cls(bd, shrub) def lookupleaf(self, bid, *, path=False, depth=None): if not self or bid >= self.weight: if path: return None, None, None, None, [] else: return None, None, None, None rbyd = self.rbyd rid = bid depth_ = 1 path_ = [] while True: # corrupt branch? if not rbyd: if path: return bid, rbyd, rid, None, path_ else: return bid, rbyd, rid, None # first tag indicates the branch's weight rid_, name_ = rbyd.lookupnext(rid) if rid_ is None: if path: return None, None, None, None, path_ else: return None, None, None, None # keep track of path if path: path_.append((bid + (rid_-rid), rbyd, rid_, name_)) # find branch tag if there is one branch_ = rbyd.lookup(rid_, TAG_BRANCH, 0x3) # descend down branch? if branch_ is not None and ( not depth or depth_ < depth): block, trunk, cksum, _ = frombranch(branch_.data) rbyd = Rbyd.fetchck(self.bd, block, trunk, name_.weight, cksum) rid -= (rid_-(name_.weight-1)) depth_ += 1 else: if path: return bid + (rid_-rid), rbyd, rid_, name_, path_ else: return bid + (rid_-rid), rbyd, rid_, name_ # the non-leaf variants discard the rbyd info, these can be a bit # more convenient, but at a performance cost def lookupnext(self, bid, *, path=False, depth=None): # just discard the rbyd info r = self.lookupleaf(bid, path=path, depth=depth) if path: bid, rbyd, rid, name, path_ = r else: bid, rbyd, rid, name = r if path: return bid, name, path_ else: return bid, name def lookup(self, bid, tag=None, mask=None, *, path=False, depth=None): # lookup rbyd in btree # # note this function expects bid to be known, use lookupnext # first if you don't care about the exact bid (or better yet, # lookupleaf and call lookup on the returned rbyd) # # this matches rbyd's lookup behavior, which needs a known rid # to avoid a double lookup r = self.lookupleaf(bid, path=path, depth=depth) if path: bid_, rbyd_, rid_, name_, path_ = r else: bid_, rbyd_, rid_, name_ = r if bid_ is None or bid_ != bid: if path: return None, path_ else: return None # lookup tag in rbyd rattr_ = rbyd_.lookup(rid_, tag, mask) if rattr_ is None: if path: return None, path_ else: return None if path: return rattr_, path_ else: return rattr_ # note leaves only iterates over leaf rbyds, whereas traverse # traverses all rbyds def leaves(self, *, path=False, depth=None): # include our root rbyd even if the weight is zero if self.weight == 0: if path: yield -1, self.rbyd, [] else: yield -1, self.rbyd return bid = 0 while True: r = self.lookupleaf(bid, path=path, depth=depth) if r: bid, rbyd, rid, name, path_ = r else: bid, rbyd, rid, name = r if bid is None: break if path: yield (bid-rid + (rbyd.weight-1), rbyd, # path tail is usually redundant unless corrupt path_[:-1] if path_ and path_[-1][1] == rbyd else path_) else: yield bid-rid + (rbyd.weight-1), rbyd bid += rbyd.weight - rid + 1 def traverse(self, *, path=False, depth=None): ptrunk_ = [] for bid, rbyd, path_ in self.leaves( path=True, depth=depth): # we only care about the rbyds here trunk_ = ([(bid_-rid_ + (rbyd_.weight-1), rbyd_) for bid_, rbyd_, rid_, name_ in path_] + [(bid, rbyd)]) for d, (bid_, rbyd_) in pathdelta( trunk_, ptrunk_): # but include branch rids in the path if requested if path: yield bid_, rbyd_, path_[:d] else: yield bid_, rbyd_ ptrunk_ = trunk_ # note bids/rattrs do _not_ include corrupt btree nodes! def bids(self, *, leaves=False, path=False, depth=None): for r in self.leaves( path=path, depth=depth): if path: bid, rbyd, path_ = r else: bid, rbyd = r for rid, name in rbyd.rids(): bid_ = bid-(rbyd.weight-1) + rid if leaves: if path: yield (bid_, rbyd, rid, name, path_+[(bid_, rbyd, rid, name)]) else: yield bid_, rbyd, rid, name else: if path: yield (bid_, name, path_+[(bid_, rbyd, rid, name)]) else: yield bid_, name def rattrs(self, bid=None, tag=None, mask=None, *, leaves=False, path=False, depth=None): if bid is None: for r in self.leaves( path=path, depth=depth): if path: bid, rbyd, path_ = r else: bid, rbyd = r for rid, name in rbyd.rids(): bid_ = bid-(rbyd.weight-1) + rid for rattr in rbyd.rattrs(rid): if leaves: if path: yield (bid_, rbyd, rid, rattr, path_+[(bid_, rbyd, rid, name)]) else: yield bid_, rbyd, rid, rattr else: if path: yield (bid_, rattr, path_+[(bid_, rbyd, rid, name)]) else: yield bid_, rattr else: r = self.lookupleaf(bid, path=path, depth=depth) if path: bid, rbyd, rid, name, path_ = r else: bid, rbyd, rid, name = r if bid is None: return for rattr in rbyd.rattrs(rid, tag, mask): if leaves: if path: yield rbyd, rid, rattr, path_ else: yield rbyd, rid, rattr else: if path: yield rattr, path_ else: yield rattr # lookup by name def namelookupleaf(self, did, name, *, path=False, depth=None): rbyd = self.rbyd bid = 0 depth_ = 1 path_ = [] while True: # corrupt branch? if not rbyd: bid_ = bid+(rbyd.weight-1) if path: return bid_, rbyd, rbyd.weight-1, None, path_ else: return bid_, rbyd, rbyd.weight-1, None rid_, name_ = rbyd.namelookup(did, name) # keep track of path if path: path_.append((bid + rid_, rbyd, rid_, name_)) # find branch tag if there is one branch_ = rbyd.lookup(rid_, TAG_BRANCH, 0x3) # found another branch if branch_ is not None and ( not depth or depth_ < depth): block, trunk, cksum, _ = frombranch(branch_.data) rbyd = Rbyd.fetchck(self.bd, block, trunk, name_.weight, cksum) # update our bid bid += rid_ - (name_.weight-1) depth_ += 1 # found best match else: if path: return bid + rid_, rbyd, rid_, name_, path_ else: return bid + rid_, rbyd, rid_, name_ def namelookup(self, bid, *, path=False, depth=None): # just discard the rbyd info r = self.namelookupleaf(did, name, path=path, depth=depth) if path: bid, rbyd, rid, name, path_ = r else: bid, rbyd, rid, name = r if path: return bid, name, path_ else: return bid, name # a metadata id, this includes mbits for convenience class Mid: def __init__(self, mbid, mrid=None, *, mbits=None): # we need one of these to figure out mbits if mbits is not None: self.mbits = mbits elif isinstance(mbid, Mid): self.mbits = mbid.mbits else: assert mbits is not None, "mbits?" # accept other mids which can be useful for changing mrids if isinstance(mbid, Mid): mbid = mbid.mbid # accept either merged mid or separate mbid+mrid if mrid is None: mid = mbid mbid = mid | ((1 << self.mbits) - 1) mrid = mid & ((1 << self.mbits) - 1) # map mrid=-1 if mrid == ((1 << self.mbits) - 1): mrid = -1 self.mbid = mbid self.mrid = mrid @property def mid(self): return ((self.mbid & ~((1 << self.mbits) - 1)) | (self.mrid & ((1 << self.mbits) - 1))) def mbidrepr(self): return str(self.mbid >> self.mbits) def mridrepr(self): return str(self.mrid) def __repr__(self): return '<%s %s>' % (self.__class__.__name__, self.repr()) def repr(self): return '%s.%s' % (self.mbidrepr(), self.mridrepr()) def __iter__(self): return iter((self.mbid, self.mrid)) # note this is slightly different from mid order when mrid=-1 def __eq__(self, other): if isinstance(other, Mid): return (self.mbid, self.mrid) == (other.mbid, other.mrid) else: return self.mid == other def __ne__(self, other): if isinstance(other, Mid): return (self.mbid, self.mrid) != (other.mbid, other.mrid) else: return self.mid != other def __hash__(self): return hash((self.mbid, self.mrid)) def __lt__(self, other): return (self.mbid, self.mrid) < (other.mbid, other.mrid) def __le__(self, other): return (self.mbid, self.mrid) <= (other.mbid, other.mrid) def __gt__(self, other): return (self.mbid, self.mrid) > (other.mbid, other.mrid) def __ge__(self, other): return (self.mbid, self.mrid) >= (other.mbid, other.mrid) # mdirs, the gooey atomic center of littlefs # # really the only difference between this and our rbyd class is the # implicit mbid associated with the mdir class Mdir: def __init__(self, mid, rbyd, *, mbits=None): # we need one of these to figure out mbits if mbits is not None: self.mbits = mbits elif isinstance(mid, Mid): self.mbits = mid.mbits elif isinstance(rbyd, Mdir): self.mbits = rbyd.mbits else: assert mbits is not None, "mbits?" # strip mrid, bugs will happen if caller relies on mrid here self.mid = Mid(mid, -1, mbits=self.mbits) # accept either another mdir or rbyd if isinstance(rbyd, Mdir): self.rbyd = rbyd.rbyd else: self.rbyd = rbyd @property def data(self): return self.rbyd.data @property def block(self): return self.rbyd.block @property def blocks(self): return self.rbyd.blocks @property def trunk(self): return self.rbyd.trunk @property def weight(self): return self.rbyd.weight @property def rev(self): return self.rbyd.rev @property def eoff(self): return self.rbyd.eoff @property def cksum(self): return self.rbyd.cksum @property def gcksumdelta(self): return self.rbyd.gcksumdelta @property def corrupt(self): return self.rbyd.corrupt @property def redund(self): return self.rbyd.redund def addr(self): if len(self.blocks) == 1: return '0x%x' % self.block else: return '0x{%s}' % ( ','.join('%x' % block for block in self.blocks)) def __repr__(self): return '<%s %s>' % (self.__class__.__name__, self.repr()) def repr(self): return 'mdir %s %s w%s' % ( self.mid.mbidrepr(), self.addr(), self.weight) def __bool__(self): return bool(self.rbyd) # we _don't_ care about mid for equality, or trunk even def __eq__(self, other): return frozenset(self.blocks) == frozenset(other.blocks) def __ne__(self, other): return not self.__eq__(other) def __hash__(self): return hash(frozenset(self.blocks)) @classmethod def fetch(cls, bd, mid, blocks, trunk=None): rbyd = Rbyd.fetch(bd, blocks, trunk) return cls(mid, rbyd, mbits=Mtree.mbits_(bd)) def lookupnext(self, mid, tag=None, *, path=False): # this is similar to rbyd lookupnext, we just error if # lookupnext changes mids if not isinstance(mid, Mid): mid = Mid(mid, mbits=self.mbits) r = self.rbyd.lookupnext(mid.mrid, tag, path=path) if path: rid, rattr, path_ = r else: rid, rattr = r if rid != mid.mrid: if path: return None, path_ else: return None if path: return rattr, path_ else: return rattr def lookup(self, mid, tag=None, mask=None, *, path=False): if not isinstance(mid, Mid): mid = Mid(mid, mbits=self.mbits) return self.rbyd.lookup(mid.mrid, tag, mask, path=path) def mids(self, *, path=False): for r in self.rbyd.rids( path=path): if path: rid, name, path_ = r else: rid, name = r mid = Mid(self.mid, rid) if path: yield mid, name, path_ else: yield mid, name def rattrs(self, mid=None, tag=None, mask=None, *, path=False): if mid is None: for r in self.rbyd.rattrs( path=path): if path: rid, rattr, path_ = r else: rid, rattr = r mid = Mid(self.mid, rid) if path: yield mid, rattr, path_ else: yield mid, rattr else: if not isinstance(mid, Mid): mid = Mid(mid, mbits=self.mbits) yield from self.rbyd.rattrs(mid.mrid, tag, mask, path=path) # lookup by name def namelookup(self, did, name): # unlike rbyd namelookup, we need an exact match here rid, name_ = self.rbyd.namelookup(did, name) if rid is None or (name_.did, name_.name) != (did, name): return None, None return Mid(self.mid, rid), name_ # the mtree, the skeletal structure of littlefs class Mtree: def __init__(self, bd, mrootchain, mtree, *, mrootpath=False, mtreepath=False, mbits=None): if isinstance(mrootchain, Mdir): mrootchain = [Mdir] # we at least need the mrootanchor, even if it is corrupt assert len(mrootchain) >= 1 self.bd = bd if mbits is not None: self.mbits = mbits else: self.mbits = Mtree.mbits_(self.bd) self.mrootchain = mrootchain self.mrootanchor = mrootchain[0] self.mroot = mrootchain[-1] self.mtree = mtree # mbits is a static value derived from the block_size @staticmethod def mbits_(block_size): if isinstance(block_size, Bd): block_size = block_size.block_size return mt.ceil(mt.log2(block_size)) - 3 # convenience function for creating mbits-dependent mids def mid(self, mbid, mrid=None): return Mid(mbid, mrid, mbits=self.mbits) @property def block(self): return self.mroot.block @property def blocks(self): return self.mroot.blocks @property def trunk(self): return self.mroot.trunk @property def weight(self): if self.mtree is None: return 0 else: return self.mtree.weight @property def mbweight(self): return self.weight @property def mrweight(self): return 1 << self.mbits def mbweightrepr(self): return str(self.mbweight >> self.mbits) def mrweightrepr(self): return str(self.mrweight) @property def rev(self): return self.mroot.rev @property def cksum(self): return self.mroot.cksum def addr(self): return self.mroot.addr() def __repr__(self): return '<%s %s>' % (self.__class__.__name__, self.repr()) def repr(self): return 'mtree %s w%s.%s' % ( self.addr(), self.mbweightrepr(), self.mrweightrepr()) def __eq__(self, other): return self.mrootanchor == other.mrootanchor def __ne__(self, other): return not self.__eq__(other) def __hash__(self): return hash(self.mrootanchor) @classmethod def fetch(cls, bd, blocks=None, trunk=None, *, depth=None): # default to blocks 0x{0,1} if blocks is None: blocks = [0, 1] # figure out mbits mbits = Mtree.mbits_(bd) # fetch the mrootanchor mrootanchor = Mdir.fetch(bd, -1, blocks, trunk) # follow the mroot chain to try to find the active mroot mroot = mrootanchor mrootchain = [mrootanchor] mrootseen = set() while True: # corrupted? if not mroot: break # cycle detected? if mroot in mrootseen: break mrootseen.add(mroot) # stop here? if depth and len(mrootchain) >= depth: break # fetch the next mroot rattr_ = mroot.lookup(-1, TAG_MROOT, 0x3) if rattr_ is None: break blocks_, _ = frommdir(rattr_.data) mroot = Mdir.fetch(bd, -1, blocks_) mrootchain.append(mroot) # fetch the actual mtree, if there is one mtree = None if not depth or len(mrootchain) < depth: rattr_ = mroot.lookup(-1, TAG_MTREE, 0x3) if rattr_ is not None: w_, block_, trunk_, cksum_, _ = frombtree(rattr_.data) mtree = Btree.fetchck(bd, block_, trunk_, w_, cksum_) return cls(bd, mrootchain, mtree, mbits=mbits) def _lookupleaf(self, mid, *, path=False, depth=None): if not isinstance(mid, Mid): mid = self.mid(mid) if path or depth: # iterate over mrootchain path_ = [] for mroot in self.mrootchain: # stop here? if depth and len(path_) >= depth: if path: return mroot, path_ else: return mroot name = mroot.lookup(-1, TAG_MAGIC) path_.append((mroot.mid, mroot, name)) # no mtree? must be inlined in mroot if self.mtree is None: if mid.mbid != -1: if path: return None, path_ else: return None if path: return self.mroot, path_ else: return self.mroot # mtree? lookup in mtree else: # need to do two steps here in case lookupleaf stops early r = self.mtree.lookupleaf(mid.mid, path=path or depth, depth=depth-len(path_) if depth else None) if path or depth: bid_, rbyd_, rid_, name_, path__ = r path_.extend(path__) else: bid_, rbyd_, rid_, name_ = r if bid_ is None: if path: return None, path_ else: return None # corrupt btree node? if not rbyd_: if path: return (bid_, rbyd_, rid_), path_ else: return (bid_, rbyd_, rid_) # stop here? it's not an mdir, but we only return btree nodes # if explicitly requested if depth and len(path_) >= depth: if path: return (bid_, rbyd_, rid_), path_ else: return (bid_, rbyd_, rid_) # fetch the mdir rattr_ = rbyd_.lookup(rid_, TAG_MDIR, 0x3) # mdir tag missing? weird if rattr_ is None: if path: return (bid_, rbyd_, rid_), path_ else: return (bid_, rbyd_, rid_) blocks_, _ = frommdir(rattr_.data) mdir = Mdir.fetch(self.bd, mid, blocks_) if path: return mdir, path_ else: return mdir def lookupleaf(self, mid, *, mdirs_only=True, path=False, depth=None): # most of the logic is in _lookupleaf, this just helps # deduplicate the mdirs_only logic r = self._lookupleaf(mid, path=path, depth=depth) if path: mdir, path_ = r else: mdir = r if mdir is None or ( mdirs_only and not isinstance(mdir, Mdir)): if path: return None, path_ else: return None if path: return mdir, path_ else: return mdir def lookup(self, mid, *, path=False, depth=None): if not isinstance(mid, Mid): mid = self.mid(mid) # lookup the relevant mdir r = self.lookupleaf(mid, path=path, depth=depth) if path: mdir, path_ = r else: mdir = r if mdir is None: if path: return None, None, path_ else: return None, None # not in mdir? if mid.mrid >= mdir.weight: if path: return None, None, path_ else: return None, None # lookup mid in mdir rattr = mdir.lookup(mid) if path: return mdir, rattr, path_+[(mid, mdir, rattr)] else: return mdir, rattr # iterate over all mdirs, this includes the mrootchain def _leaves(self, *, path=False, depth=None): # iterate over mrootchain if path or depth: path_ = [] for mroot in self.mrootchain: if path: yield mroot, path_ else: yield mroot if path or depth: # stop here? if depth and len(path_) >= depth: return name = mroot.lookup(-1, TAG_MAGIC) path_.append((mroot.mid, mroot, name)) # do we even have an mtree? if self.mtree is not None: # include the mtree root even if the weight is zero if self.mtree.weight == 0: if path: yield -1, self.mtree.rbyd, path_ else: yield -1, self.mtree.rbyd return mid = self.mid(0) while True: r = self.lookupleaf(mid, mdirs_only=False, path=path, depth=depth) if path: mdir, path_ = r else: mdir = r if mdir is None: break # mdir? if isinstance(mdir, Mdir): if path: yield mdir, path_ else: yield mdir mid = self.mid(mid.mbid+1) # btree node? else: bid, rbyd, rid = mdir if path: yield ((bid-rid + (rbyd.weight-1), rbyd), # path tail is usually redundant unless corrupt path_[:-1] if path_ and isinstance(path_[-1][1], Rbyd) and path_[-1][1] == rbyd else path_) else: yield (bid-rid + (rbyd.weight-1), rbyd) mid = self.mid(bid-rid + (rbyd.weight-1) + 1) def leaves(self, *, mdirs_only=False, path=False, depth=None): for r in self._leaves( path=path, depth=depth): if path: mdir, path_ = r else: mdir = r if mdirs_only and not isinstance(mdir, Mdir): continue if path: yield mdir, path_ else: yield mdir # traverse over all mdirs and btree nodes # - mdir => Mdir # - btree node => (bid, rbyd) def _traverse(self, *, path=False, depth=None): ptrunk_ = [] for mdir, path_ in self.leaves( path=True, depth=depth): # we only care about the mdirs/rbyds here trunk_ = ([(lambda mid_, mdir_, name_: mdir_)(*p) if isinstance(p[1], Mdir) else (lambda bid_, rbyd_, rid_, name_: (bid_-rid_ + (rbyd_.weight-1), rbyd_))(*p) for p in path_] + [mdir]) for d, mdir in pathdelta( trunk_, ptrunk_): # but include branch mids/rids in the path if requested if path: yield mdir, path_[:d] else: yield mdir ptrunk_ = trunk_ def traverse(self, *, mdirs_only=False, path=False, depth=None): for r in self._traverse( path=path, depth=depth): if path: mdir, path_ = r else: mdir = r if mdirs_only and not isinstance(mdir, Mdir): continue if path: yield mdir, path_ else: yield mdir # these are just aliases # the difference between mdirs and leaves is mdirs defaults to only # mdirs, leaves can include btree nodes if corrupt def mdirs(self, *, mdirs_only=True, path=False, depth=None): return self.leaves( mdirs_only=mdirs_only, path=path, depth=depth) # note mids/rattrs do _not_ include corrupt btree nodes! def mids(self, *, mdirs_only=True, path=False, depth=None): for r in self.mdirs( mdirs_only=mdirs_only, path=path, depth=depth): if path: mdir, path_ = r else: mdir = r if isinstance(mdir, Mdir): for mid, name in mdir.mids(): if path: yield (mid, mdir, name, path_+[(mid, mdir, name)]) else: yield mid, mdir, name else: bid, rbyd = mdir for rid, name in rbyd.rids(): bid_ = bid-(rbyd.weight-1) + rid mid_ = self.mid(bid_) mdir_ = (bid_, rbyd, rid) if path: yield (mid_, mdir_, name, path_+[(bid_, rbyd, rid, name)]) else: yield mid_, mdir_, name def rattrs(self, mid=None, tag=None, mask=None, *, mdirs_only=True, path=False, depth=None): if mid is None: for r in self.mdirs( mdirs_only=mdirs_only, path=path, depth=depth): if path: mdir, path_ = r else: mdir = r if isinstance(mdir, Mdir): for mid, rattr in mdir.rattrs(): if path: yield (mid, mdir, rattr, path_+[(mid, mdir, mdir.lookup(mid))]) else: yield mid, mdir, rattr else: bid, rbyd = mdir for rid, name in rbyd.rids(): bid_ = bid-(rbyd.weight-1) + rid mid_ = self.mid(bid_) mdir_ = (bid_, rbyd, rid) for rattr in rbyd.rattrs(rid): if path: yield (mid_, mdir_, rattr, path_+[(bid_, rbyd, rid, name)]) else: yield mid_, mdir_, rattr else: if not isinstance(mid, Mid): mid = self.mid(mid) r = self.lookupleaf(mid, path=path, depth=depth) if path: mdir, path_ = r else: mdir = r if mdir is None or ( mdirs_only and not isinstance(mdir, Mdir)): return if isinstance(mdir, Mdir): for rattr in mdir.rattrs(mid, tag, mask): if path: yield rattr, path_ else: yield rattr else: bid, rbyd, rid = mdir for rattr in rbyd.rattrs(rid, tag, mask): if path: yield rattr, path_ else: yield rattr # lookup by name def _namelookupleaf(self, did, name, *, path=False, depth=None): if path or depth: # iterate over mrootchain path_ = [] for mroot in self.mrootchain: # stop here? if depth and len(path_) >= depth: if path: return mroot, path_ else: return mroot name = mroot.lookup(-1, TAG_MAGIC) path_.append((mroot.mid, mroot, name)) # no mtree? must be inlined in mroot if self.mtree is None: if path: return self.mroot, path_ else: return self.mroot # mtree? find name in mtree else: # need to do two steps here in case namelookupleaf stops early r = self.mtree.namelookupleaf(did, name, path=path or depth, depth=depth-len(path_) if depth else None) if path or depth: bid_, rbyd_, rid_, name_, path__ = r path_.extend(path__) else: bid_, rbyd_, rid_, name_ = r if bid_ is None: if path: return None, path_ else: return None # corrupt btree node? if not rbyd_: if path: return (bid_, rbyd_, rid_), path_ else: return (bid_, rbyd_, rid_) # stop here? it's not an mdir, but we only return btree nodes # if explicitly requested if depth and len(path_) >= depth: if path: return (bid_, rbyd_, rid_), path_ else: return (bid_, rbyd_, rid_) # fetch the mdir rattr_ = rbyd_.lookup(rid_, TAG_MDIR, 0x3) # mdir tag missing? weird if rattr_ is None: if path: return (bid_, rbyd_, rid_), path_ else: return (bid_, rbyd_, rid_) blocks_, _ = frommdir(rattr_.data) mdir = Mdir.fetch(self.bd, self.mid(bid_), blocks_) if path: return mdir, path_ else: return mdir def namelookupleaf(self, did, name, *, mdirs_only=True, path=False, depth=None): # most of the logic is in _namelookupleaf, this just helps # deduplicate the mdirs_only logic r = self._namelookupleaf(did, name, path=path, depth=depth) if path: mdir, path_ = r else: mdir = r if mdir is None or ( mdirs_only and not isinstance(mdir, Mdir)): if path: return None, path_ else: return None if path: return mdir, path_ else: return mdir def namelookup(self, did, name, *, path=False, depth=None): # lookup the relevant mdir r = self.namelookupleaf(did, name, path=path, depth=depth) if path: mdir, path_ = r else: mdir = r if mdir is None: if path: return None, None, None, path_ else: return None, None, None # find name in mdir mid_, name_ = mdir.namelookup(did, name) if mid_ is None: if path: return None, None, None, path_ else: return None, None, None if path: return mid_, mdir, name_, path_+[(mid_, mdir, name_)] else: return mid_, mdir, name_ # in-btree block pointers class Bptr: def __init__(self, rattr, block, off, size, cksize, cksum, ckdata, *, corrupt=False): self.rattr = rattr self.block = block self.off = off self.size = size self.cksize = cksize self.cksum = cksum self.ckdata = ckdata self.corrupt = corrupt @property def tag(self): return self.rattr.tag @property def weight(self): return self.rattr.weight # this is just for consistency with btrees, rbyds, etc @property def blocks(self): return [self.block] # try to avoid unnecessary allocations @ft.cached_property def data(self): return self.ckdata[self.off:self.off+self.size] def addr(self): return '0x%x.%x' % (self.block, self.off) def __repr__(self): return '<%s %s>' % (self.__class__.__name__, self.repr()) def repr(self): return '%sblock %s w%s %s' % ( 'shrub' if self.tag & TAG_SHRUB else '', self.addr(), self.weight, self.size) # lazily check the cksum @ft.cached_property def corrupt(self): cksum_ = crc32c(self.ckdata) return (cksum_ != self.cksum) @property def redund(self): return -1 if self.corrupt else 0 def __bool__(self): return not self.corrupt @classmethod def fetch(cls, bd, rattr, block, off, size, cksize, cksum): # seek/read cksize bytes from the block, the actual data should # always be a subset of cksize ckdata = bd.read(block, 0, cksize) return cls(rattr, block, off, size, cksize, cksum, ckdata) @classmethod def fetchck(cls, bd, rattr, blocks, off, size, cksize, cksum): # fetch the bptr normally bptr = cls.fetch(bd, rattr, blocks, off, size, cksize, cksum) # bit of a hack, but this exposes the lazy cksum checker del bptr.corrupt return bptr # yeah, so, this doesn't catch mismatched cksizes, but at least the # underlying data should be identical assuming no mutation def __eq__(self, other): return ((self.block, self.off, self.size) == (other.block, other.off, other.size)) def __ne__(self, other): return ((self.block, self.off, self.size) != (other.block, other.off, other.size)) def __hash__(self): return hash((self.block, self.off, self.size)) # lazy config object class Config: def __init__(self, mroot): self.mroot = mroot # lookup a specific tag def lookup(self, tag=None, mask=None): rattr = self.mroot.rbyd.lookup(-1, tag, mask) if rattr is None: return None return self._parse(rattr.tag, rattr) def __getitem__(self, key): if not isinstance(key, tuple): key = (key,) return self.lookup(*key) def __contains__(self, key): if not isinstance(key, tuple): key = (key,) return self.lookup(*key) is not None def __iter__(self): for rattr in self.mroot.rbyd.rattrs(-1, TAG_CONFIG, 0xff): yield self._parse(rattr.tag, rattr) # common config operations class Config: tag = None mask = None def __init__(self, mroot, tag, rattr): # replace tag with what we find self.tag = tag # and keep track of rattr self.rattr = rattr @property def block(self): return self.rattr.block @property def blocks(self): return self.rattr.blocks @property def toff(self): return self.rattr.toff @property def tdata(self): return self.rattr.data @property def off(self): return self.rattr.off @property def data(self): return self.rattr.data @property def size(self): return self.rattr.size def __bytes__(self): return self.data def __repr__(self): return '<%s %s>' % (self.__class__.__name__, self.repr()) def repr(self): return self.rattr.repr() def __iter__(self): return iter((self.tag, self.data)) def __eq__(self, other): return (self.tag, self.data) == (other.tag, other.data) def __ne__(self, other): return (self.tag, self.data) != (other.tag, other.data) def __hash__(self): return hash((self.tag, self.data)) # marker class for unknown config class Unknown(Config): pass # special handling for known configs # the filesystem magic string class Magic(Config): tag = TAG_MAGIC mask = 0x3 def repr(self): return 'magic \"%s\"' % ( ''.join(b if b >= ' ' and b <= '~' else '.' for b in map(chr, self.data))) # version tuple class Version(Config): tag = TAG_VERSION def __init__(self, mroot, tag, rattr): super().__init__(mroot, tag, rattr) d = 0 self.major, d_ = fromleb128(self.data, d); d += d_ self.minor, d_ = fromleb128(self.data, d); d += d_ @property def tuple(self): return (self.major, self.minor) def repr(self): return 'version v%s.%s' % (self.major, self.minor) # compat flags class Rcompat(Config): tag = TAG_RCOMPAT def repr(self): return 'rcompat 0x%s' % ( ''.join('%02x' % f for f in reversed(self.data))) class Wcompat(Config): tag = TAG_WCOMPAT def repr(self): return 'wcompat 0x%s' % ( ''.join('%02x' % f for f in reversed(self.data))) class Ocompat(Config): tag = TAG_OCOMPAT def repr(self): return 'ocompat 0x%s' % ( ''.join('%02x' % f for f in reversed(self.data))) # block device geometry class Geometry(Config): tag = TAG_GEOMETRY def __init__(self, mroot, tag, rattr): super().__init__(mroot, tag, rattr) d = 0 block_size, d_ = fromleb128(self.data, d); d += d_ block_count, d_ = fromleb128(self.data, d); d += d_ # these are offset by 1 to avoid overflow issues self.block_size = block_size + 1 self.block_count = block_count + 1 def repr(self): return 'geometry %sx%s' % (self.block_size, self.block_count) # file name limit class NameLimit(Config): tag = TAG_NAMELIMIT def __init__(self, mroot, tag, rattr): super().__init__(mroot, tag, rattr) self.limit, _ = fromleb128(self.data) def __int__(self): return self.limit def repr(self): return 'namelimit %s' % self.limit # file size limit class FileLimit(Config): tag = TAG_FILELIMIT def __init__(self, mroot, tag, rattr): super().__init__(mroot, tag, rattr) self.limit, _ = fromleb128(self.data) def __int__(self): return self.limit def repr(self): return 'filelimit %s' % self.limit # keep track of known configs _known = [c for c in Config.__subclasses__() if c.tag is not None] # parse if known def _parse(self, tag, rattr): # known config? for c in self._known: if (c.tag & ~(c.mask or 0)) == (tag & ~(c.mask or 0)): return c(self.mroot, tag, rattr) # otherwise return a marker class else: return self.Unknown(self.mroot, tag, rattr) # create cached accessors for known config def _parser(c): def _parser(self): return self.lookup(c.tag, c.mask) return _parser for c in _known: locals()[c.__name__.lower()] = ft.cached_property(_parser(c)) # lazy gstate object class Gstate: def __init__(self, mtree): self.mtree = mtree # lookup a specific tag def lookup(self, tag=None, mask=None): # collect relevant gdeltas in the mtree gdeltas = [] for mdir in self.mtree.mdirs(): # gcksumdelta is a bit special since it's outside the # rbyd tree if tag == TAG_GCKSUMDELTA: gdelta = mdir.gcksumdelta else: gdelta = mdir.rbyd.lookup(-1, tag, mask) if gdelta is not None: gdeltas.append((mdir.mid, gdelta)) # xor to find gstate return self._parse(tag, gdeltas) def __getitem__(self, key): if not isinstance(key, tuple): key = (key,) return self.lookup(*key) def __contains__(self, key): # note gstate doesn't really "not exist" like normal attrs, # missing gstate is equivalent to zero gstate, but we can # still test if there are any gdeltas that match the given # tag here if not isinstance(key, tuple): key = (key,) return any( (mdir.gcksumdelta if tag == TAG_GCKSUMDELTA else mdir.rbyd.lookup(-1, *key)) is not None for mdir in self.mtree.mdirs()) def __iter__(self): # first figure out what gstate tags actually exist in the # filesystem gtags = set() for mdir in self.mtree.mdirs(): if mdir.gcksumdelta is not None: gtags.add(TAG_GCKSUMDELTA) for rattr in mdir.rbyd.rattrs(-1): if (rattr.tag & 0xff00) == TAG_GDELTA: gtags.add(rattr.tag) # sort to keep things stable, moving gcksum to the front gtags = sorted(gtags, key=lambda t: (-(t & 0xf000), t)) # compute all gstate in one pass (well, two technically) gdeltas = {tag: [] for tag in gtags} for mdir in self.mtree.mdirs(): for tag in gtags: # gcksumdelta is a bit special since it's outside the # rbyd tree if tag == TAG_GCKSUMDELTA: gdelta = mdir.gcksumdelta else: gdelta = mdir.rbyd.lookup(-1, tag) if gdelta is not None: gdeltas[tag].append((mdir.mid, gdelta)) for tag in gtags: # xor to find gstate yield self._parse(tag, gdeltas[tag]) # common gstate operations class Gstate: tag = None mask = None def __init__(self, mtree, tag, gdeltas): # replace tag with what we find self.tag = tag # keep track of gdeltas for debugging self.gdeltas = gdeltas # xor together to build our gstate data = bytes() for mid, gdelta in gdeltas: data = bytes( a^b for a, b in it.zip_longest( data, gdelta.data, fillvalue=0)) self.data = data @property def size(self): return len(self.data) def __bytes__(self): return self.data def __repr__(self): return '<%s %s>' % (self.__class__.__name__, self.repr()) def repr(self): return tagrepr(self.tag, 0, self.size, global_=True) def __iter__(self): return iter((self.tag, self.data)) def __eq__(self, other): return (self.tag, self.data) == (other.tag, other.data) def __ne__(self, other): return (self.tag, self.data) != (other.tag, other.data) def __hash__(self): return hash((self.tag, self.data)) # marker class for unknown gstate class Unknown(Gstate): pass # special handling for known gstate # the global-checksum, cubed class Gcksum(Gstate): tag = TAG_GCKSUMDELTA def __init__(self, mtree, tag, gdeltas): super().__init__(mtree, tag, gdeltas) self.gcksum = fromle32(self.data) def __int__(self): return self.gcksum def repr(self): return 'gcksum %08x' % self.gcksum # any global-removes class Grm(Gstate): tag = TAG_GRMDELTA def __init__(self, mtree, tag, gdeltas): super().__init__(mtree, tag, gdeltas) queue = [] d = 0 for _ in range(2): mid, d_ = fromleb128(self.data, d); d += d_ # a null mid (mid=0.0) terminates the grm queue if not mid: break mid = mtree.mid(mid) # map mbids -> -1 if mroot-inlined if mtree.mtree is None: mid = mtree.mid(-1, mid.mrid) queue.append(mid) self.queue = queue def repr(self): return 'grm [%s]' % ', '.join(mid.repr() for mid in self.queue) # keep track of known gstate _known = [g for g in Gstate.__subclasses__() if g.tag is not None] # parse if known def _parse(self, tag, gdeltas): # known config? for g in self._known: if (g.tag & ~(g.mask or 0)) == (tag & ~(g.mask or 0)): return g(self.mtree, tag, gdeltas) # otherwise return a marker class else: return Unknown(self.mtree, tag, gdeltas) # create cached accessors for known gstate def _parser(g): def _parser(self): return self.lookup(g.tag, g.mask) return _parser for g in _known: locals()[g.__name__.lower()] = ft.cached_property(_parser(g)) # high-level littlefs representation class Lfs3: def __init__(self, bd, mtree, config=None, gstate=None, cksum=None, *, corrupt=False): self.bd = bd self.mtree = mtree # create lazy config/gstate objects self.config = config or Config(self.mroot) self.gstate = gstate or Gstate(self.mtree) # go ahead and fetch some expected fields self.version = self.config.version self.rcompat = self.config.rcompat self.wcompat = self.config.wcompat self.ocompat = self.config.ocompat if self.config.geometry is not None: self.block_count = self.config.geometry.block_count self.block_size = self.config.geometry.block_size else: self.block_count = self.bd.block_count self.block_size = self.bd.block_size # calculate on-disk gcksum if cksum is None: cksum = 0 for mdir in self.mtree.mdirs(): cksum ^= mdir.cksum self.cksum = cksum # is the filesystem corrupt? self.corrupt = corrupt # create the root directory, this is a bit of a special case self.root = self.Root(self) # mbits is a static value derived from the block_size @staticmethod def mbits_(block_size): return Mtree.mbits_(block_size) @property def mbits(self): return self.mtree.mbits # convenience function for creating mbits-dependent mids def mid(self, mbid, mrid=None): return self.mtree.mid(mbid, mrid) # most of our fields map to the mtree @property def block(self): return self.mroot.block @property def blocks(self): return self.mroot.blocks @property def trunk(self): return self.mroot.trunk @property def rev(self): return self.mroot.rev @property def weight(self): return self.mtree.weight @property def mbweight(self): return self.mtree.mbweight @property def mrweight(self): return self.mtree.mrweight def mbweightrepr(self): return self.mtree.mbweightrepr() def mrweightrepr(self): return self.mtree.mrweightrepr() @property def mrootchain(self): return self.mtree.mrootchain @property def mrootanchor(self): return self.mtree.mrootanchor @property def mroot(self): return self.mtree.mroot def addr(self): return self.mroot.addr() def __repr__(self): return '<%s %s>' % (self.__class__.__name__, self.repr()) def repr(self): return 'littlefs v%s.%s %sx%s %s w%s.%s' % ( self.version.major if self.version is not None else '?', self.version.minor if self.version is not None else '?', self.block_size if self.block_size is not None else '?', self.block_count if self.block_count is not None else '?', self.addr(), self.mbweightrepr(), self.mrweightrepr()) def __bool__(self): return not self.corrupt def __eq__(self, other): return self.mrootanchor == other.mrootanchor def __ne__(self, other): return self.mrootanchor != other.mrootanchor def __hash__(self): return hash(self.mrootanchor) @classmethod def fetch(cls, bd, blocks=None, trunk=None, *, depth=None, no_ck=False, no_ckmroot=False, no_ckmagic=False, no_ckgcksum=False): # Mtree does most of the work here mtree = Mtree.fetch(bd, blocks, trunk, depth=depth) # create lfs object lfs = cls(bd, mtree) # don't check anything? if no_ck: return lfs # check mroot if (not no_ckmroot and not lfs.corrupt and not lfs.ckmroot()): lfs.corrupt = True # check magic if (not no_ckmagic and not lfs.corrupt and not lfs.ckmagic()): lfs.corrupt = True # check gcksum if (not no_ckgcksum and not lfs.corrupt and not lfs.ckgcksum()): lfs.corrupt = True return lfs # check that the mroot is valid def ckmroot(self): return bool(self.mroot) # check that the magic string is littlefs def ckmagic(self): if self.config.magic is None: return False return self.config.magic.data == b'littlefs' # check that the gcksum checks out def ckgcksum(self): return crc32ccube(self.cksum) == int(self.gstate.gcksum) # read custom attrs def uattrs(self): return self.mroot.rattrs(-1, TAG_UATTR, 0xff) def sattrs(self): return self.mroot.rattrs(-1, TAG_SATTR, 0xff) def attrs(self): yield from self.uattrs() yield from self.sattrs() # is file in grm queue? def grmed(self, mid): if not isinstance(mid, Mid): mid = self.mid(mid) return mid in self.gstate.grm.queue # lookup operations def lookup(self, mid, mdir=None, *, all=False): import builtins all_, all = all, builtins.all # is this mid grmed? if not all_ and self.grmed(mid): return None if mdir is None: mdir, name = self.mtree.lookup(mid) if mdir is None: return None else: name = mdir.lookup(mid) # stickynote? if not all_ and name.tag == TAG_STICKYNOTE: return None return self._open(mid, mdir, name.tag, name) def namelookup(self, did, name, *, all=False): import builtins all_, all = all, builtins.all mid_, mdir_, name_ = self.mtree.namelookup(did, name) if mid_ is None: return None # is this mid grmed? if not all_ and self.grmed(mid_): return None # stickynote? if not all_ and name_.tag == TAG_STICKYNOTE: return None return self._open(mid_, mdir_, name_.tag, name_) class PathError(Exception): pass # split a path into its components # # note this follows littlefs's internal logic, so dots and dotdot # entries get resolved _before_ walking the path @staticmethod def pathsplit(path): path_ = path if isinstance(path_, str): path_ = path_.encode('utf8') # empty path? if path_ == b'': raise Lfs3.PathError("invalid path: %r" % path) path__ = [] for p in path_.split(b'/'): # skip multiple slashes and dots if p == b'' or p == b'.': continue path__.append(p) path_ = path__ # resolve dotdots path__ = [] dotdots = 0 for p in reversed(path_): if p == b'..': dotdots += 1 elif dotdots: dotdots -= 1 else: path__.append(p) if dotdots: raise Lfs3.PathError("invalid path: %r" % path) path__.reverse() path_ = path__ return path_ def pathlookup(self, did, path_=None, *, all=False, path=False, depth=None): import builtins all_, all = all, builtins.all # default to the root directory if path_ is None: did, path_ = 0, did # parse/split the path if isinstance(path_, (bytes, str)): path_ = self.pathsplit(path_) # start at the root dir dir = self.root did = did if path or depth: path__ = [] for p in path_: # lookup the next file file = self.namelookup(did, p, all=all_) if file is None: if path: return None, path__ else: return None # file? done? if not file.recursable: if path: return file, path__ else: return file # recurse down the file tree dir = file did = dir.did if path or depth: path__.append(dir) # stop here? if depth and len(path__) >= depth: if path: return None, path__ else: return None if path: return dir, path__ else: return dir def files(self, did=None, *, all=False, path=False, depth=None): import builtins all_, all = all, builtins.all # default to the root directory did = did or self.root.did # start with the bookmark entry mid, mdir, name = self.mtree.namelookup(did, b'') # no bookmark? weird if mid is None: return # iterate over files until we find a different did while name.did == did: # yield file, hiding grms, stickynotes, etc, by default if all_ or (not self.grmed(mid) and not name.tag == TAG_BOOKMARK and not name.tag == TAG_STICKYNOTE): file = self._open(mid, mdir, name.tag, name) if path: yield file, [] else: yield file # recurse? if (file.recursable and depth is not None and (depth == 0 or depth > 1)): for r in self.files(file.did, all=all_, path=path, depth=depth-1 if depth else 0): if path: file_, path_ = r yield file_, [file]+path_ else: file_ = r yield file_ # increment mid and find the next mdir if needed mbid, mrid = mid.mbid, mid.mrid + 1 if mrid == mdir.weight: mbid, mrid = mbid + (1 << self.mbits), 0 mdir = self.mtree.lookupleaf(mbid) if mdir is None: break # lookup name and adjust rid if necessary, you don't # normally need to do this, but we don't want the iteration # to terminate early on a corrupt filesystem mrid, name = mdir.rbyd.lookupnext(mrid) if mrid is None: break mid = self.mid(mbid, mrid) def orphans(self, all=False): import builtins all_, all = all, builtins.all # first find all reachable dids dids = {self.root.did} for file in self.files(depth=mt.inf): if file.recursable: dids.add(file.did) # then iterate over all dids and yield any that aren't reachable for mid, mdir, name in self.mtree.mids(): # is this mid grmed? if not all_ and self.grmed(mid): continue # stickynote? if not all_ and name.tag == TAG_STICKYNOTE: continue # unreachable? note this lazily parses the did if name.did not in dids: file = self._open(mid, mdir, name.tag, name) # mark as orphaned file.orphaned = True yield file # traverse the filesystem def traverse(self, *, mtree_only=False, shrubs=False, fragments=False, path=False): # traverse the mtree for r in self.mtree.traverse( path=path): if path: mdir, path_ = r else: mdir = r # mdir? if isinstance(mdir, Mdir): if path: yield mdir, path_ else: yield mdir # btree node? we only care about the rbyd for simplicity else: bid, rbyd = mdir if path: yield rbyd, path_ else: yield rbyd # traverse file bshrubs/btrees if not mtree_only and isinstance(mdir, Mdir): for mid, name in mdir.mids(): file = self._open(mid, mdir, name.tag, name) for r in file.traverse( path=path): if path: pos, data, path__ = r path__ = [(mid, mdir, name)]+path__ else: pos, data = r # inlined data? we usually ignore these if isinstance(data, Rattr): if fragments: if path: yield data, path_+path__ else: yield data # block pointer? elif isinstance(data, Bptr): if path: yield data, path_+path__ else: yield data # bshrub/btree node? we only care about the rbyd # for simplicity, we also usually ignore shrubs # since these live the the parent mdir else: if shrubs or not data.shrub: if path: yield data, path_+path__ else: yield data # common file operations, note Reg extends this for regular files class File: tag = None mask = None internal = False recursable = False grmed = False orphaned = False def __init__(self, lfs, mid, mdir, tag, name): self.lfs = lfs self.mid = mid self.mdir = mdir # replace tag with what we find self.tag = tag self.name = name # fetch the file structure if there is one self.struct = mdir.lookup(mid, TAG_STRUCT, 0xff) # bshrub/btree? self.bshrub = None if (self.struct is not None and (self.struct.tag & ~0x3) == TAG_BSHRUB): weight, trunk, _ = fromshrub(self.struct.data) self.bshrub = Btree.fetchshrub(lfs.bd, mdir.rbyd, trunk) elif (self.struct is not None and (self.struct.tag & ~0x3) == TAG_BTREE): weight, block, trunk, cksum, _ = frombtree(self.struct.data) self.bshrub = Btree.fetchck( lfs.bd, block, trunk, weight, cksum) # did? self.did = None if (self.struct is not None and self.struct.tag == TAG_DID): self.did, _ = fromleb128(self.struct.data) # some other info that is useful for scripts # mark as grmed if grmed if lfs.grmed(mid): self.grmed = True @property def size(self): if self.bshrub is not None: return self.bshrub.weight else: return 0 def structrepr(self): if self.struct is not None: # inlined bshrub? if (self.struct.tag & ~0x3) == TAG_BSHRUB: return 'bshrub %s' % self.bshrub.addr() # btree? elif (self.struct.tag & ~0x3) == TAG_BTREE: return 'btree %s' % self.bshrub.addr() # btree? else: return self.struct.repr() else: return '' def __repr__(self): return '<%s %s.%s %s>' % ( self.__class__.__name__, self.mid.mbidrepr(), self.mid.mridrepr(), self.repr()) def repr(self): return 'type 0x%02x%s' % ( self.tag & 0xff, ', %s' % self.structrepr() if self.struct is not None else '') def __eq__(self, other): return self.mid == other.mid def __ne__(self, other): return self.mid != other.mid def __hash__(self): return hash(self.mid) # read attrs, note this includes _all_ attrs def rattrs(self): return self.mdir.rattrs(self.mid) # read custom attrs def uattrs(self): return self.mdir.rattrs(self.mid, TAG_UATTR, 0xff) def sattrs(self): return self.mdir.rattrs(self.mid, TAG_SATTR, 0xff) def attrs(self): yield from self.uattrs() yield from self.sattrs() # lookup data in the underlying bshrub def _lookupleaf(self, pos, *, path=False, depth=None): # no bshrub? if self.bshrub is None: if path: return None, None, [] else: return None, None # lookup data in our bshrub r = self.bshrub.lookupleaf(pos, path=path or depth, depth=depth) if path or depth: bid, rbyd, rid, rattr, path_ = r else: bid, rbyd, rid, rattr = r if bid is None: if path: return None, None, path_ else: return None, None # corrupt btree node? if not rbyd: if path: return bid-(rbyd.weight-1), rbyd, path_ else: return bid-(rbyd.weight-1), rbyd # stop here? if depth and len(path_) >= depth: if path: return bid-(rattr.weight-1), rbyd, path_ else: return bid-(rattr.weight-1), rbyd # inlined data? if (rattr.tag & ~0x1003) == TAG_DATA: if path: return bid-(rattr.weight-1), rattr, path_ else: return bid-(rattr.weight-1), rattr # block pointer? elif (rattr.tag & ~0x1003) == TAG_BLOCK: size, block, off, cksize, cksum, _ = frombptr(rattr.data) bptr = Bptr.fetchck(self.lfs.bd, rattr, block, off, size, cksize, cksum) if path: return bid-(rattr.weight-1), bptr, path_ else: return bid-(rattr.weight-1), bptr # uh oh, something is broken else: if path: return bid-(rattr.weight-1), rattr, path_ else: return bid-(rattr.weight-1), rattr def lookupleaf(self, pos, *, data_only=True, path=False, depth=None): r = self._lookupleaf(pos, path=path, depth=depth) if path: pos, data, path_ = r else: pos, data = r if pos is None or ( data_only and not isinstance(data, (Rattr, Bptr))): if path: return None, None, path_ else: return None, None if path: return pos, data, path_ else: return pos, data def _leaves(self, *, path=False, depth=None): pos = 0 while True: r = self.lookupleaf(pos, data_only=False, path=path, depth=depth) if path: pos, data, path_ = r else: pos, data = r if pos is None: break # data? if isinstance(data, (Rattr, Bptr)): if path: yield pos, data, path_ else: yield pos, data pos += data.weight # btree node? else: rbyd = data if path: yield (pos, rbyd, # path tail is usually redundant unless corrupt path_[:-1] if path_ and path_[-1][1] == rbyd else path_) else: yield pos, rbyd pos += rbyd.weight def leaves(self, *, data_only=False, path=False, depth=None): for r in self._leaves( path=path, depth=depth): if path: pos, data, path_ = r else: pos, data = r if data_only and not isinstance(data, (Rattr, Bptr)): continue if path: yield pos, data, path_ else: yield pos, data def _traverse(self, *, path=False, depth=None): ptrunk_ = [] for pos, data, path_ in self.leaves( path=True, depth=depth): # we only care about the data/rbyds here trunk_ = ([(bid_-rid_, rbyd_) for bid_, rbyd_, rid_, name_ in path_] + [(pos, data)]) for d, (pos, data) in pathdelta( trunk_, ptrunk_): # but include branch rids in path if requested if path: yield pos, data, path_[:d] else: yield pos, data ptrunk_ = trunk_ def traverse(self, *, data_only=False, path=False, depth=None): for r in self._traverse( path=path, depth=depth): if path: pos, data, path_ = r else: pos, data = r if data_only and not isinstance(data, (Rattr, Bptr)): continue if path: yield pos, data, path_ else: yield pos, data def datas(self, *, data_only=True, path=False, depth=None): return self.leaves( data_only=data_only, path=path, depth=depth) # some convience operations for reading data def bytes(self, *, depth=None): for pos, data in self.datas(depth=depth): if data.size > 0: yield data.data if data.weight > data.size: yield b'\0' * (data.weight-data.size) def read(self, *, depth=None): return b''.join(self.bytes()) # bleh, with that out of the way, here are our known file types # regular files class Reg(File): tag = TAG_REG def repr(self): return 'reg %s%s' % ( self.size, ', %s' % self.structrepr() if self.struct is not None else '') # directories class Dir(File): tag = TAG_DIR def __init__(self, lfs, mid, mdir, tag, name): super().__init__(lfs, mid, mdir, tag, name) # we're recursable if we're a non-grmed directory with a did if (isinstance(self, Lfs3.Dir) and not self.grmed and self.did is not None): self.recursable = True def repr(self): return 'dir %s%s' % ( '0x%x' % self.did if self.did is not None else '?', ', %s' % self.structrepr() if self.struct is not None and self.struct.tag != TAG_DID else '') # provide some convenient filesystem access relative to our did def namelookup(self, name, **args): if self.did is None: return None return self.lfs.namelookup(self.did, name, **args) def pathlookup(self, path_, **args): if self.did is None: if args.get('path'): return None, [] else: return None return self.lfs.pathlookup(self.did, path_, **args) def files(self, **args): if self.did is None: return iter(()) return self.lfs.files(self.did, **args) # root is a bit special class Root(Dir): tag = None def __init__(self, lfs): # root always has mid=-1 and did=0 super().__init__(lfs, lfs.mid(-1), lfs.mroot, TAG_DIR, None) self.did = 0 self.recursable = True def repr(self): return 'root' # bookmarks keep track of where directories start class Bookmark(File): tag = TAG_BOOKMARK internal = True def repr(self): return 'bookmark %s%s' % ( '0x%x' % self.name.did if self.name.did is not None else '?', ', %s' % self.structrepr() if self.struct is not None else '') # stickynotes, i.e. uncommitted files, behave the same as files # for the most part class Stickynote(File): tag = TAG_STICKYNOTE internal = True def repr(self): return 'stickynote%s' % ( ' %s, %s' % (self.size, self.structrepr()) if self.struct is not None else '') # marker class for unknown file types class Unknown(File): pass # keep track of known file types _known = [f for f in File.__subclasses__() if f.tag is not None] # fetch/parse state if known def _open(self, mid, mdir, tag, name): # known file type? tag = name.tag for f in self._known: if (f.tag & ~(f.mask or 0)) == (tag & ~(f.mask or 0)): return f(self, mid, mdir, tag, name) # otherwise return a marker class else: return self.Unknown(self, mid, mdir, tag, name) # a representation of optionally key-mapped attrs class CsvAttr: def __init__(self, attrs, defaults=None): if attrs is None: attrs = [] if isinstance(attrs, dict): attrs = attrs.items() # normalize self.attrs = [] self.keyed = co.OrderedDict() for attr in attrs: if not isinstance(attr, tuple): attr = ((), attr) if attr[0] in {None, (), (None,), ('*',)}: attr = ((), attr[1]) if not isinstance(attr[0], tuple): attr = ((attr[0],), attr[1]) self.attrs.append(attr) if attr[0] not in self.keyed: self.keyed[attr[0]] = [] self.keyed[attr[0]].append(attr[1]) # create attrs object for defaults if isinstance(defaults, CsvAttr): self.defaults = defaults elif defaults is not None: self.defaults = CsvAttr(defaults) else: self.defaults = None def __repr__(self): if self.defaults is None: return 'CsvAttr(%r)' % ( [(','.join(attr[0]), attr[1]) for attr in self.attrs]) else: return 'CsvAttr(%r, %r)' % ( [(','.join(attr[0]), attr[1]) for attr in self.attrs], [(','.join(attr[0]), attr[1]) for attr in self.defaults.attrs]) def __iter__(self): if () in self.keyed: return it.cycle(self.keyed[()]) elif self.defaults is not None: return iter(self.defaults) else: return iter(()) def __bool__(self): return bool(self.attrs) def __getitem__(self, key): if isinstance(key, tuple): if len(key) > 0 and not isinstance(key[0], str): i, key = key if not isinstance(key, tuple): key = (key,) else: i, key = 0, key elif isinstance(key, str): i, key = 0, (key,) else: i, key = key, () # try to lookup by key best = None for ks, vs in self.keyed.items(): prefix = [] for j, k in enumerate(ks): if j < len(key) and fnmatch.fnmatchcase(key[j], k): prefix.append(k) else: prefix = None break if prefix is not None and ( best is None or len(prefix) >= len(best[0])): best = (prefix, vs) if best is not None: # cycle based on index return best[1][i % len(best[1])] # fallback to defaults? if self.defaults is not None: return self.defaults[i, key] raise KeyError(i, key) def get(self, key, default=None): try: return self.__getitem__(key) except KeyError: return default def __contains__(self, key): try: self.__getitem__(key) return True except KeyError: return False # get all results for a given key def getall(self, key, default=None): if not isinstance(key, tuple): key = (key,) # try to lookup by key best = None for ks, vs in self.keyed.items(): prefix = [] for j, k in enumerate(ks): if j < len(key) and fnmatch.fnmatchcase(key[j], k): prefix.append(k) else: prefix = None break if prefix is not None and ( best is None or len(prefix) >= len(best[0])): best = (prefix, vs) if best is not None: return best[1] # fallback to defaults? if self.defaults is not None: return self.defaults.getall(key, default) raise default # a key function for sorting by key order def key(self, key): if not isinstance(key, tuple): key = (key,) best = None for i, ks in enumerate(self.keyed.keys()): prefix = [] for j, k in enumerate(ks): if j < len(key) and (not k or key[j] == k): prefix.append(k) else: prefix = None break if prefix is not None and ( best is None or len(prefix) >= len(best[0])): best = (prefix, i) if best is not None: return best[1] # fallback to defaults? if self.defaults is not None: return len(self.keyed) + self.defaults.key(key) return len(self.keyed) # parse %-escaped strings # # attrs can override __getitem__ for lazy attr generation def punescape(s, attrs=None): pattern = re.compile( '%[%n]' '|' '%x..' '|' '%u....' '|' '%U........' '|' '%\((?P[^)]*)\)' '(?P[+\- #0-9\.]*[sdboxXfFeEgG])') def unescape(m): if m.group()[1] == '%': return '%' elif m.group()[1] == 'n': return '\n' elif m.group()[1] == 'x': return chr(int(m.group()[2:], 16)) elif m.group()[1] == 'u': return chr(int(m.group()[2:], 16)) elif m.group()[1] == 'U': return chr(int(m.group()[2:], 16)) elif m.group()[1] == '(': if attrs is not None: try: v = attrs[m.group('field')] except KeyError: return m.group() else: return m.group() f = m.group('format') if f[-1] in 'dboxX': if isinstance(v, str): v = dat(v, 0) v = int(v) elif f[-1] in 'fFeEgG': if isinstance(v, str): v = dat(v, 0) v = float(v) else: f = ('<' if '-' in f else '>') + f.replace('-', '') v = str(v) # note we need Python's new format syntax for binary return ('{:%s}' % f).format(v) else: assert False return re.sub(pattern, unescape, s) # naive space filling curve (the default) def naive_curve(width, height): for y in range(height): for x in range(width): yield x, y # space filling Hilbert-curve def hilbert_curve(width, height): # based on generalized Hilbert curves: # https://github.com/jakubcerveny/gilbert # def hilbert_(x, y, a_x, a_y, b_x, b_y): w = abs(a_x+a_y) h = abs(b_x+b_y) a_dx = -1 if a_x < 0 else +1 if a_x > 0 else 0 a_dy = -1 if a_y < 0 else +1 if a_y > 0 else 0 b_dx = -1 if b_x < 0 else +1 if b_x > 0 else 0 b_dy = -1 if b_y < 0 else +1 if b_y > 0 else 0 # trivial row if h == 1: for _ in range(w): yield x, y x, y = x+a_dx, y+a_dy return # trivial column if w == 1: for _ in range(h): yield x, y x, y = x+b_dx, y+b_dy return a_x_, a_y_ = a_x//2, a_y//2 b_x_, b_y_ = b_x//2, b_y//2 w_ = abs(a_x_+a_y_) h_ = abs(b_x_+b_y_) if 2*w > 3*h: # prefer even steps if w_ % 2 != 0 and w > 2: a_x_, a_y_ = a_x_+a_dx, a_y_+a_dy # split in two yield from hilbert_( x, y, a_x_, a_y_, b_x, b_y) yield from hilbert_( x+a_x_, y+a_y_, a_x-a_x_, a_y-a_y_, b_x, b_y) else: # prefer even steps if h_ % 2 != 0 and h > 2: b_x_, b_y_ = b_x_+b_dx, b_y_+b_dy # split in three yield from hilbert_( x, y, b_x_, b_y_, a_x_, a_y_) yield from hilbert_( x+b_x_, y+b_y_, a_x, a_y, b_x-b_x_, b_y-b_y_) yield from hilbert_( x+(a_x-a_dx)+(b_x_-b_dx), y+(a_y-a_dy)+(b_y_-b_dy), -b_x_, -b_y_, -(a_x-a_x_), -(a_y-a_y_)) if width >= height: yield from hilbert_(0, 0, +width, 0, 0, +height) else: yield from hilbert_(0, 0, 0, +height, +width, 0) # space filling Z-curve/Lebesgue-curve def lebesgue_curve(width, height): # we create a truncated Z-curve by simply filtering out the # points that are outside our region for i in range(2**(2*mt.ceil(mt.log2(max(width, height))))): # we just operate on binary strings here because it's easier b = '{:0{}b}'.format(i, 2*mt.ceil(mt.log2(i+1)/2)) x = int(b[1::2], 2) if b[1::2] else 0 y = int(b[0::2], 2) if b[0::2] else 0 if x < width and y < height: yield x, y # an abstract block representation class BmapBlock: def __init__(self, block, type='unused', value=None, usage=range(0), *, siblings=None, children=None, x=None, y=None, width=None, height=None): self.block = block self.type = type self.value = value self.usage = usage self.siblings = siblings if siblings is not None else set() self.children = children if children is not None else set() self.x = x self.y = y self.width = width self.height = height def __repr__(self): return 'BmapBlock(0x%x, %r, x=%s, y=%s, width=%s, height=%s)' % ( self.block, self.type, self.x, self.y, self.width, self.height) def __eq__(self, other): return self.block == other.block def __ne__(self, other): return self.block != other.block def __hash__(self): return hash(self.block) def __lt__(self, other): return self.block < other.block def __le__(self, other): return self.block <= other.block def __gt__(self, other): return self.block > other.block def __ge__(self, other): return self.block >= other.block # align to pixel boundaries def align(self): # this extra +0.1 and using points instead of width/height is # to help minimize rounding errors x0 = int(self.x+0.1) y0 = int(self.y+0.1) x1 = int(self.x+self.width+0.1) y1 = int(self.y+self.height+0.1) self.x = x0 self.y = y0 self.width = x1 - x0 self.height = y1 - y0 # generate a label @ft.cached_property def label(self): if self.type == 'mdir': return '%s %s %s w%s\ncksum %08x' % ( self.type, self.value.mid.mbidrepr(), self.value.addr(), self.value.weight, self.value.cksum) elif self.type == 'btree': return '%s %s w%s\ncksum %08x' % ( self.type, self.value.addr(), self.value.weight, self.value.cksum) elif self.type == 'data': return '%s %s %s\ncksize %s\ncksum %08x' % ( self.type, '0x%x.%x' % (self.block, self.value.off), self.value.size, self.value.cksize, self.value.cksum) elif self.type != 'unused': return '%s\n%s' % ( self.type, '0x%x' % self.block) else: return '' # generate attrs for punescaping @ft.cached_property def attrs(self): if self.type == 'mdir': return { 'block': self.block, 'type': self.type, 'addr': self.value.addr(), 'trunk': self.value.trunk, 'weight': self.value.weight, 'cksum': self.value.cksum, 'usage': len(self.usage), } elif self.type == 'btree': return { 'block': self.block, 'type': self.type, 'addr': self.value.addr(), 'trunk': self.value.trunk, 'weight': self.value.weight, 'cksum': self.value.cksum, 'usage': len(self.usage), } elif self.type == 'data': return { 'block': self.block, 'type': self.type, 'addr': self.value.addr(), 'off': self.value.off, 'size': self.value.size, 'cksize': self.value.cksize, 'cksum': self.value.cksum, 'usage': len(self.usage), } else: return { 'block': self.block, 'type': self.type, 'usage': len(self.usage), } def main(disk, output, mroots=None, *, trunk=None, block_size=None, block_count=None, blocks=None, no_ckmeta=False, no_ckdata=False, mtree_only=False, quiet=False, labels=[], colors=[], width=None, height=None, block_cols=None, block_rows=None, block_ratio=None, no_header=False, no_mode=False, hilbert=False, lebesgue=False, no_javascript=False, mode_tree=False, mode_branches=False, mode_references=False, mode_redund=False, to_scale=None, to_ratio=1/1, tiny=False, title=None, title_littlefs=False, title_usage=False, padding=None, no_label=False, dark=False, font=FONT, font_size=FONT_SIZE, background=None, **args): # tiny mode? if tiny: if block_ratio is None: block_ratio = 1 if to_scale is None: to_scale = 1 if padding is None: padding = 0 no_header = True no_label = True no_javascript = True if block_ratio is None: # golden ratio block_ratio = 1 / ((1 + mt.sqrt(5))/2) if padding is None: padding = 1 # default to all modes if (not mode_tree and not mode_branches and not mode_references and not mode_redund): mode_tree = True mode_branches = True mode_references = True mode_redund = True # what colors/labels to use? colors_ = CsvAttr(colors, defaults=COLORS_DARK if dark else COLORS) labels_ = CsvAttr(labels) if background is not None: background_ = background elif dark: background_ = '#000000' else: background_ = '#ffffff' # figure out width/height if width is not None: width_ = width else: width_ = WIDTH if height is not None: height_ = height else: height_ = HEIGHT # is bd geometry specified? if isinstance(block_size, tuple): block_size, block_count_ = block_size if block_count is None: block_count = block_count_ # flatten mroots, default to 0x{0,1} mroots = list(it.chain.from_iterable(mroots)) if mroots else [0, 1] # mroots may also encode trunks mroots, trunk = ( [block[0] if isinstance(block, tuple) else block for block in mroots], trunk if trunk is not None else ft.reduce( lambda x, y: y, (block[1] for block in mroots if isinstance(block, tuple)), None)) # we seek around a bunch, so just keep the disk open with open(disk, 'rb') as f: # if block_size is omitted, assume the block device is one big block if block_size is None: f.seek(0, os.SEEK_END) block_size = f.tell() # fetch the filesystem bd = Bd(f, block_size, block_count) lfs = Lfs3.fetch(bd, mroots, trunk) corrupted = not bool(lfs) # if we can't figure out the block_count, guess block_size_ = block_size block_count_ = block_count if block_count is None: if lfs.config.geometry is not None: block_count_ = lfs.config.geometry.block_count else: f.seek(0, os.SEEK_END) block_count_ = mt.ceil(f.tell() / block_size) # flatten blocks, default to all blocks blocks_ = list( range(blocks.start or 0, blocks.stop or block_count_) if isinstance(blocks, slice) else range(blocks, blocks+1) if blocks else range(block_count_)) # traverse the filesystem and create a block map bmap = {b: BmapBlock(b, 'unused') for b in blocks_} mdir_count = 0 btree_count = 0 data_count = 0 total_count = 0 for child, path in lfs.traverse( mtree_only=mtree_only, path=True): # track each block in our window for b in child.blocks: if b not in bmap: continue # mdir? if isinstance(child, Mdir): type = 'mdir' if b in child.blocks[:1+child.redund]: usage = range(child.eoff) else: usage = range(0) mdir_count += 1 total_count += 1 # btree node? elif isinstance(child, Rbyd): type = 'btree' if b in child.blocks[:1+child.redund]: usage = range(child.eoff) else: usage = range(0) btree_count += 1 total_count += 1 # bptr? elif isinstance(child, Bptr): type = 'data' usage = range(child.off, child.off+child.size) data_count += 1 total_count += 1 else: assert False, "%r?" % b # check for some common issues # block conflict? # # note we can't compare more than types due to different # trunks, slicing, etc if (b in bmap and bmap[b].type != 'unused' and bmap[b].type != type): if bmap[b].type == 'conflict': bmap[b].value.append(child) else: bmap[b] = BmapBlock(b, 'conflict', [bmap[b].value, child], range(block_size_)) corrupted = True # corrupt metadata? elif (not no_ckmeta and isinstance(child, (Mdir, Rbyd)) and not child): bmap[b] = BmapBlock(b, 'corrupt', child, range(block_size_)) corrupted = True # corrupt data? elif (not no_ckdata and isinstance(child, Bptr) and not child): bmap[b] = BmapBlock(b, 'corrupt', child, range(block_size_)) corrupted = True # normal block else: bmap[b] = BmapBlock(b, type, child, usage) # keep track of siblings bmap[b].siblings.update( b_ for b_ in child.blocks if b_ != b and b_ in bmap) # update parents with children if path: parent = path[-1][1] for b in parent.blocks: if b in bmap: bmap[b].children.update( b_ for b_ in child.blocks if b_ in bmap) # one last thing, build a title if title: title_ = punescape(title, { 'magic': 'littlefs%s' % ( '' if lfs.ckmagic() else '?'), 'version': 'v%s.%s' % ( lfs.version.major if lfs.version is not None else '?', lfs.version.minor if lfs.version is not None else '?'), 'version_major': lfs.version.major if lfs.version is not None else '?', 'version_minor': lfs.version.minor if lfs.version is not None else '?', 'geometry': '%sx%s' % ( lfs.block_size if lfs.block_size is not None else '?', lfs.block_count if lfs.block_count is not None else '?'), 'block_size': lfs.block_size if lfs.block_size is not None else '?', 'block_count': lfs.block_count if lfs.block_count is not None else '?', 'addr': lfs.addr(), 'weight': 'w%s.%s' % (lfs.mbweightrepr(), lfs.mrweightrepr()), 'mbweight': lfs.mbweightrepr(), 'mrweight': lfs.mrweightrepr(), 'rev': '%08x' % lfs.rev, 'cksum': '%08x%s' % ( lfs.cksum, '' if lfs.ckgcksum() else '?'), 'total': total_count, 'total_percent': 100*total_count / max(len(bmap), 1), 'mdir': mdir_count, 'mdir_percent': 100*mdir_count / max(len(bmap), 1), 'btree': btree_count, 'btree_percent': 100*btree_count / max(len(bmap), 1), 'data': data_count, 'data_percent': 100*data_count / max(len(bmap), 1), }) elif not title_usage: title_ = ('littlefs%s v%s.%s %sx%s %s w%s.%s, ' 'rev %08x, ' 'cksum %08x%s' % ( '' if lfs.ckmagic() else '?', lfs.version.major if lfs.version is not None else '?', lfs.version.minor if lfs.version is not None else '?', lfs.block_size if lfs.block_size is not None else '?', lfs.block_count if lfs.block_count is not None else '?', lfs.addr(), lfs.mbweightrepr(), lfs.mrweightrepr(), lfs.rev, lfs.cksum, '' if lfs.ckgcksum() else '?')) else: title_ = ('bd %sx%s, %s mdir, %s btree, %s data' % ( lfs.block_size if lfs.block_size is not None else '?', lfs.block_count if lfs.block_count is not None else '?', '%.1f%%' % (100*mdir_count / max(len(bmap), 1)), '%.1f%%' % (100*btree_count / max(len(bmap), 1)), '%.1f%%' % (100*data_count / max(len(bmap), 1)))) # scale width/height if requested if (to_scale is not None and (width is None or height is None)): # don't include header in scale width__ = width_ height__ = height_ if not no_header: height__ -= mt.ceil(FONT_SIZE * 1.3) # scale width only if height is not None: width__ = mt.ceil((len(bmap) * to_scale) / max(height__, 1)) # scale height only elif width is not None: height__ = mt.ceil((len(bmap) * to_scale) / max(width__, 1)) # scale based on aspect-ratio else: width__ = mt.ceil(mt.sqrt(len(bmap) * to_scale * to_ratio)) height__ = mt.ceil((len(bmap) * to_scale) / max(width__, 1)) if not no_header: height__ += mt.ceil(FONT_SIZE * 1.3) width_ = width__ height_ = height__ # create space for header x__ = 0 y__ = 0 width__ = width_ height__ = height_ if not no_header: y__ += mt.ceil(FONT_SIZE * 1.3) height__ -= min(mt.ceil(FONT_SIZE * 1.3), height__) # figure out block_cols_/block_rows_ if block_cols is not None and block_rows is not None: block_cols_ = block_cols block_rows_ = block_rows elif block_rows is not None: block_cols_ = mt.ceil(len(bmap) / block_rows) block_rows_ = block_rows elif block_cols is not None: block_cols_ = block_cols block_rows_ = mt.ceil(len(bmap) / block_cols) else: # divide by 2 until we hit our target ratio, this works # well for things that are often powers-of-two # # also prioritize rows at low resolution block_cols_ = 1 block_rows_ = len(bmap) while (block_rows_ > height__ or abs(((width__/(block_cols_*2)) / max(height__/mt.ceil(block_rows_/2), 1)) - block_ratio) < abs(((width__/block_cols_) / max(height__/block_rows_, 1))) - block_ratio): block_cols_ *= 2 block_rows_ = mt.ceil(block_rows_ / 2) block_width_ = width__ / block_cols_ block_height_ = height__ / block_rows_ # assign block locations based on block_rows_/block_cols_ and # the requested space filling curve for (x, y), b in zip( (hilbert_curve if hilbert else lebesgue_curve if lebesgue else naive_curve)(block_cols_, block_rows_), sorted(bmap.values())): b.x = x__ + (x * block_width_) b.y = y__ + (y * block_height_) b.width = block_width_ b.height = block_height_ # apply top padding if x == 0: b.x += padding b.width -= min(padding, b.width) if y == 0: b.y += padding b.height -= min(padding, b.height) # apply bottom padding b.width -= min(padding, b.width) b.height -= min(padding, b.height) # align to pixel boundaries b.align() # bump up to at least one pixel for every block b.width = max(b.width, 1) b.height = max(b.height, 1) # assign colors based on block type for b in bmap.values(): color__ = colors_.get((b.block, (b.type, '0x%x' % b.block))) if color__ is not None: if '%' in color__: color__ = punescape(color__, b.attrs) b.color = color__ # assign labels for b in bmap.values(): label__ = labels_.get((b.block, (b.type, '0x%x' % b.block))) if label__ is not None: if '%' in label__: label__ = punescape(label__, b.attrs) b.label = label__ # create svg file with openio(output, 'w') as f: def writeln(self, s=''): self.write(s) self.write('\n') f.writeln = writeln.__get__(f) # yes this is svg f.write('' % dict( width=width_, height=height_, font=','.join(font), font_size=font_size, background=background_, user_select='none' if not no_javascript else 'auto')) # create header if not no_header: f.write('' % dict( js= 'cursor="pointer" ' 'onclick="click_header(this,event)">' if not no_javascript else '')) # add an invisible rect to make things more clickable f.write('' % dict( x=0, y=0, width=width_, height=y__)) f.write('') f.write('' % dict( color='#ffffff' if dark else '#000000')) f.write('') f.write(title_) f.write('') if not no_mode and not no_javascript: f.write('' % dict( x=width_-3)) f.write('mode: %s' % ( 'tree' if mode_tree else 'branches' if mode_branches else 'references' if mode_references else 'redund')) f.write('') f.write('') f.write('') # create block tiles for b in bmap.values(): # skip anything with zero weight/height after aligning things if b.width == 0 or b.height == 0: continue f.write('' % dict( block=b.block, type=b.type, x=b.x, y=b.y, js= 'data-block="%(block)d" ' # precompute x/y for javascript, svg makes this # weirdly difficult to figure out post-transform 'data-x="%(x)d" ' 'data-y="%(y)d" ' 'data-width="%(width)d" ' 'data-height="%(height)d" ' 'onmouseenter="enter_block(this,event)" ' 'onmouseleave="leave_block(this,event)" ' 'onclick="click_block(this,event)">' % dict( block=b.block, x=b.x, y=b.y, width=b.width, height=b.height) if not no_javascript else '')) # add an invisible rect to make things more clickable f.write('' % dict( width=b.width + padding, height=b.height + padding)) f.write('') f.write('') f.write(b.label) f.write('') f.write('' % dict( block=b.block, color=b.color, width=b.width, height=b.height)) f.write('') if not no_label: f.write('' % b.block) f.write('' % b.block) f.write('') f.write('') f.write('' % b.block) for j, l in enumerate(b.label.split('\n')): if j == 0: f.write('') f.write(l) f.write('') else: f.write('') f.write(l) f.write('') f.write('') f.write('') if not no_javascript: # arrowhead for arrows f.write('') f.write('' % dict( color='#000000' if dark else '#555555')) f.write('') f.write('') f.write('') # javascript for arrows # # why tf does svg support javascript? f.write('') f.write('') # print some summary info if not quiet: print('updated %s, ' 'littlefs%s v%s.%s %sx%s %s w%s.%s, ' 'cksum %08x%s' % ( output, '' if lfs.ckmagic() else '?', lfs.version.major if lfs.version is not None else '?', lfs.version.minor if lfs.version is not None else '?', lfs.block_size if lfs.block_size is not None else '?', lfs.block_count if lfs.block_count is not None else '?', lfs.addr(), lfs.mbweightrepr(), lfs.mrweightrepr(), lfs.cksum, '' if lfs.ckgcksum() else '?')) if args.get('error_on_corrupt') and corrupted: sys.exit(2) if __name__ == "__main__": import argparse import sys parser = argparse.ArgumentParser( description="Render currently used blocks in a littlefs image " "as an interactive SVG block map.", allow_abbrev=False) parser.add_argument( 'disk', help="File containing the block device.") parser.add_argument( 'mroots', nargs='*', type=rbydaddr, help="Block address of the mroots. Defaults to 0x{0,1}.") parser.add_argument( '-o', '--output', required=True, help="Output *.svg file.") parser.add_argument( '--trunk', type=lambda x: int(x, 0), help="Use this offset as the trunk of the mroots.") parser.add_argument( '-b', '--block-size', type=bdgeom, help="Block size/geometry in bytes. Accepts x.") parser.add_argument( '--block-count', type=lambda x: int(x, 0), help="Block count in blocks.") parser.add_argument( '-@', '--blocks', type=lambda x: ( slice(*(int(x, 0) if x.strip() else None for x in x.split(',', 1))) if ',' in x else int(x, 0)), help="Show a specific block, may be a range.") parser.add_argument( '--no-ckmeta', action='store_true', help="Don't check metadata blocks for errors.") parser.add_argument( '--no-ckdata', action='store_true', help="Don't check metadata + data blocks for errors.") parser.add_argument( '--mtree-only', action='store_true', help="Only traverse the mtree.") parser.add_argument( '-q', '--quiet', action='store_true', help="Don't print info.") parser.add_argument( '-L', '--add-label', dest='labels', action='append', type=lambda x: ( lambda ks, v: ( tuple(k.strip() for k in ks.split(',')), v.strip()) )(*x.split('=', 1)) if '=' in x else x.strip(), help="Add a label to use. Can be assigned to a specific " "block type/block. Accepts %% modifiers.") parser.add_argument( '-C', '--add-color', dest='colors', action='append', type=lambda x: ( lambda ks, v: ( tuple(k.strip() for k in ks.split(',')), v.strip()) )(*x.split('=', 1)) if '=' in x else x.strip(), help="Add a color to use. Can be assigned to a specific " "block type/block. Accepts %% modifiers.") parser.add_argument( '-W', '--width', type=lambda x: int(x, 0), help="Width in pixels. Defaults to %r." % WIDTH) parser.add_argument( '-H', '--height', type=lambda x: int(x, 0), help="Height in pixels. Defaults to %r." % HEIGHT) parser.add_argument( '-X', '--block-cols', type=lambda x: int(x, 0), help="Number of blocks on the x-axis. Guesses from --block-count " "and --block-ratio by default.") parser.add_argument( '-Y', '--block-rows', type=lambda x: int(x, 0), help="Number of blocks on the y-axis. Guesses from --block-count " "and --block-ratio by default.") parser.add_argument( '--block-ratio', dest='block_ratio', type=lambda x: ( (lambda a, b: a / b)(*(float(v) for v in x.split(':', 1))) if ':' in x else float(x)), help="Target ratio for block sizes. Defaults to the golden ratio.") parser.add_argument( '--no-header', action='store_true', help="Don't show the header.") parser.add_argument( '--no-mode', action='store_true', help="Don't show the mode state.") parser.add_argument( '-U', '--hilbert', action='store_true', help="Render as a space-filling Hilbert curve.") parser.add_argument( '-Z', '--lebesgue', action='store_true', help="Render as a space-filling Z-curve.") parser.add_argument( '-J', '--no-javascript', action='store_true', help="Don't add javascript for interactability.") parser.add_argument( '--mode-tree', action='store_true', help="Include the tree rendering mode.") parser.add_argument( '--mode-branches', action='store_true', help="Include the branches rendering mode.") parser.add_argument( '--mode-references', action='store_true', help="Include the references rendering mode.") parser.add_argument( '--mode-redund', action='store_true', help="Include the redund rendering mode.") parser.add_argument( '--to-scale', nargs='?', type=lambda x: ( (lambda a, b: a / b)(*(float(v) for v in x.split(':', 1))) if ':' in x else float(x)), const=1, help="Scale the resulting treemap such that 1 pixel ~= 1/scale " "blocks. Defaults to scale=1. ") parser.add_argument( '--to-ratio', type=lambda x: ( (lambda a, b: a / b)(*(float(v) for v in x.split(':', 1))) if ':' in x else float(x)), help="Aspect ratio to use with --to-scale. Defaults to 1:1.") parser.add_argument( '-t', '--tiny', action='store_true', help="Tiny mode, alias for --block-ratio=1, --to-scale=1, " "--padding=0, --no-header, --no-label, and --no-javascript.") parser.add_argument( '--title', help="Add a title. Accepts %% modifiers.") parser.add_argument( '--title-littlefs', action='store_true', help="Use the littlefs mount string as the title. This is the " "default.") parser.add_argument( '--title-usage', action='store_true', help="Use the mdir/btree/data usage as the title.") parser.add_argument( '--padding', type=float, help="Padding to add to each block. Defaults to 1.") parser.add_argument( '--no-label', action='store_true', help="Don't render any labels.") parser.add_argument( '--dark', action='store_true', help="Use the dark style.") parser.add_argument( '--font', type=lambda x: [x.strip() for x in x.split(',')], help="Font family to use.") parser.add_argument( '--font-size', help="Font size to use. Defaults to %r." % FONT_SIZE) parser.add_argument( '--background', help="Background color to use. Note #00000000 can make the " "background transparent.") parser.add_argument( '-e', '--error-on-corrupt', action='store_true', help="Error if the filesystem is corrupt.") sys.exit(main(**{k: v for k, v in vars(parser.parse_intermixed_args()).items() if v is not None}))