在boltdb中,最核心的数据结构当属B+树了。B+树是数据库或文件系统中常见的数据结构,它的特点是能够保证数据稳定有序,因为每个叶子节点的深度都相同,因此其插入和修改操作拥有较稳定的时间复杂度。
那么boltdb中B+树的节点是如何表示的呢?答案是node
。node
对应page
在内存中的数据结构,node
序列化之后便是page
,page
反序列化之后是node
。因为node
在内存中,且node
唯一对应B+树的一个节点
在概念上,boltdb中的一个Bucket
唯一对应一棵B+树。如下图所示
node
node
分为两种,branch node和leaf node。
branch node如下所示,需要注意的是,boltdb中的branch node与通常的B+树略有不同,通常B+树中branch node的key数量比子节点数量多1, 而boltdb中key和子节点是一一对应的,这里应该是出于简化设计的考虑。因此在boltdb的branch node中,子节点i
子树中的key应当介于key(i)
和key(i+1)
之间,左闭右开。
leaf node如下所示。我们注意到叶子节点的数据除了key
和value
之外,还有一个flags
,它的作用是为了区分普通数据和子Bucket
数据。当flags != 0
时,key为当前Bucket下的子Bucket名,value可反序列化为bucket
结构,其中记录着子Bucket对应B+树的根节点所在的page id。稍后在Bucket
一节中我们将会深入探讨
node
代码如下:
type nodes []*node // inode represents an internal node inside of a node. // It can be used to point to elements in a page or point // to an element which hasn't been added to a page yet. type inode struct { flags uint32 pgid pgid key []byte value []byte } type inodes []inode // node represents an in-memory, deserialized page. type node struct { bucket *Bucket isLeaf bool unbalanced bool spilled bool key []byte pgid pgid parent *node children nodes inodes inodes }
node中的成员变量:
bucket
: 当前节点所属的Bucket(直属bucket, 不包含祖先)
isLeaf
: 当前节点是否为叶子节点
unbalanced
: 表示该节点是否有可能被合并。如果从node
中去掉了某对kv数据, 则unbalance从false变成true。合并策略见
spilled
: 表示该节点是否已经被拆分。在一个写事务中,已拆分的节点不会再被拆分。当node
中插入数据,导致node
中数据大小超过一定阈值时,便会触发node
的拆分,
pgid
: 该node对应page的page id
parent
: 当前节点的父节点
inodes
: 本节点内的key/value数据对。对于叶子节点来说,key和value都存在。对于非叶子节点来说,只有key存在。
key
:inodes
中所有key/value数据中最小的key
children
: 当前节点的所有子节点。如果当前节点是非叶子节点,则len(inodes) = len(children)
; 如果当前节点是叶子节点,则children为空
node
中的成员函数:
minKeys
: node允许存在的最少key数量(leaf node情况下为1。branch node情况下为2)
size
: 返回node序列化成page之后的字节数,boltdb中以此判断当前节点是否需要分裂或合并
sizeLessThan
: 判断node序列化之后大小是否小于给定大小。出于性能考虑,实现中做了提前返回,比直接调用node.size
并判断是否小于给定大小要快
pageElementSize
: 当前节点为leaf node时,返回leafPageElementSize
, 即一个leafPageElement
对象的长度;当前节点为branch node时,返回branchPageElementSize
, 即branchPageElement
对象的长度。
childAt
: 返回第index
个子节点。实现上,首先从inodes
中拿到子节点的page id, 然后从bucket
中根据page id进行索引,最终返回子节点对应的node
对象
childIndex
: 输入child node, 从inodes
数组中二分查找该child node的位置。注意这里之所以能使用二分查找是因为inodes
中所有元素都是按照key进行升序排列的
numChildren
: 返回当前节点的子节点数量
nextSibling
: 当前节点的右兄弟节点,如果没有返回nil。实现上,首先调用node.childIndex
判断当前节点在其父节点中的位置index
,然后返回父节点index+1
位置的子节点
prevSibling
: 当前节点的左兄弟节点,如果没有返回nil。实现上类似nextSibling
。
node的序列化: node -> page
node.write
: 将node对象序列化到page中
序列化步骤:
- 根据
node.isLeaf
设置page.flags
- 检查
inodes
数组长度是否超过上限0xFFFF
,超过则panic(因为B+树的分裂策略,这种情况不可能发生,除非有脏数据)
- 将
inodes
中的数据复制到page.ptr
所指向的缓冲区中。如果node是leaf node, 则page缓冲区中写入的是[]leafPageElement
和所有key/value对。如果node是branch node, 则page缓冲区中写入的是[]branchPageElement
和所有的key值。
for i, item := range n.inodes { _assert(len(item.key) > 0, "write: zero-length inode key") // Write the page element. if n.isLeaf { elem := p.leafPageElement(uint16(i)) elem.pos = uint32(uintptr(unsafe.Pointer(&b[0])) - uintptr(unsafe.Pointer(elem))) elem.flags = item.flags elem.ksize = uint32(len(item.key)) elem.vsize = uint32(len(item.value)) } else { elem := p.branchPageElement(uint16(i)) elem.pos = uint32(uintptr(unsafe.Pointer(&b[0])) - uintptr(unsafe.Pointer(elem))) elem.ksize = uint32(len(item.key)) elem.pgid = item.pgid _assert(elem.pgid != p.id, "write: circular dependency occurred") } // If the length of key+value is larger than the max allocation size // then we need to reallocate the byte array pointer. // // See: https://github.com/boltdb/bolt/pull/335 klen, vlen := len(item.key), len(item.value) if len(b) < klen+vlen { b = (*[maxAllocSize]byte)(unsafe.Pointer(&b[0]))[:] } // Write data for the element to the end of the page. copy(b[0:], item.key) b = b[klen:] copy(b[0:], item.value) b = b[vlen:] }
node的反序列化:page -> node
node.read
: 从page对象初始化node,即page反序列化成node 反序列化步骤如下:
- 复制page id到node中
- 根据
page.flags
确定node.isLeaf
- 将page缓冲区中的数据复制到
node.inodes
中。branch page还是leaf page两种情况下数据格式略有不同。
// read initializes the node from a page. func (n *node) read(p *page) { n.pgid = p.id n.isLeaf = ((p.flags & leafPageFlag) != 0) n.inodes = make(inodes, int(p.count)) for i := 0; i < int(p.count); i++ { inode := &n.inodes[i] if n.isLeaf { elem := p.leafPageElement(uint16(i)) inode.flags = elem.flags inode.key = elem.key() inode.value = elem.value() } else { elem := p.branchPageElement(uint16(i)) inode.pgid = elem.pgid inode.key = elem.key() } _assert(len(inode.key) > 0, "read: zero-length inode key") } // Save first key so we can find the node in the parent when we spill. if len(n.inodes) > 0 { n.key = n.inodes[0].key _assert(len(n.key) > 0, "read: zero-length node key") } else { n.key = nil } }
插入数据
node.put
: 向当前节点中插入key/value
。pageid
和flags
这两个输入参数是相关的,当flags不为零时,表示这对kv是Bucket数据,此时pageid
是其bucket的root page
。当flags为零时,pageid
没有意义,此时这对kv就是普通数据。
实现上:
- 首先从当前节点的
inodes
二分搜索key/value
对插入位置index
。
- 如果
inodes
中存在等值的key
, 则覆盖其value
- 如果
inodes
中不存在等值key
, 则在index位置处插入一个inode
// put inserts a key/value. func (n *node) put(oldKey, newKey, value []byte, pgid pgid, flags uint32) { ... // Find insertion index. index := sort.Search(len(n.inodes), func(i int) bool { return bytes.Compare(n.inodes[i].key, oldKey) != -1 }) // Add capacity and shift nodes if we don't have an exact match and need to insert. exact := (len(n.inodes) > 0 && index < len(n.inodes) && bytes.Equal(n.inodes[index].key, oldKey)) if !exact { n.inodes = append(n.inodes, inode{}) copy(n.inodes[index+1:], n.inodes[index:]) } inode := &n.inodes[index] inode.flags = flags inode.key = newKey inode.value = value inode.pgid = pgid _assert(len(inode.key) > 0, "put: zero-length inode key") }
删除数据
node.del
: 从当期节点中删除一对kv数据。最后设置unbalanced为true, 表示本节点有可能触发合并
实现上:
- 首先从
inodes
中二分查找key
,对应位置为index
- 从
inodes
中删除index
位置的inode
对象
// del removes a key from the node. func (n *node) del(key []byte) { // Find index of key. index := sort.Search(len(n.inodes), func(i int) bool { return bytes.Compare(n.inodes[i].key, key) != -1 }) // Exit if the key isn't found. if index >= len(n.inodes) || !bytes.Equal(n.inodes[index].key, key) { return } // Delete inode from the node. n.inodes = append(n.inodes[:index], n.inodes[index+1:]...) // Mark the node as needing rebalancing. n.unbalanced = true }
解引用
node.dereference
: 将所有inodes
数据复制到Heap
上,避免对DB
中mmap
缓冲区的引用,避免因为boltdb重新mmap
造成原有mmap
缓冲区失效。这个方法是递归的,对当前节点执行dereference
会递归对所有子节点调用dereference
释放
node.free
: 将当前节点对应的page
从freelist
中释放。
Bucket
在Boltdb学习笔记之〇--概述中我们提到,Bucket类似于关系型数据库中的表,区别在于Bucket支持嵌套,一个Bucket中可以包含很多子Bucket。而不管是父Bucket还是子Bucket,每个Bucket都唯一对应一个B+树,B+树中存储着该Bucket的所有数据和children Bucket。
因为Bucket与B+树一一对应,boltdb中所有B+树操作都能在Bucket中找到对应接口。因此接下来我们以Bucket为载体总结B+树相关操作。
以下代码定义了bucket
和Bucket
两种类型。在bucket
中,root
表示该Bucket对应的B+树的根节点page id,sequence
是一个单调递增的id, 唯一对应一个Bucket。
bucket
可序列化为[]byte
类型,作为leaf page中的value值。同样的,在leaf page中,如果某条leafPageElement
数据中flags != 0
, 我们也可读取其value值并将其反序列成bucket
对象。
// bucket represents the on-file representation of a bucket. // This is stored as the "value" of a bucket key. If the bucket is small enough, // then its root page can be stored inline in the "value", after the bucket // header. In the case of inline buckets, the "root" will be 0. type bucket struct { root pgid // page id of the bucket's root-level page sequence uint64 // monotonically incrementing, used by NextSequence() }
Bucket
由bucket
组成,除此之外还有其他一些成员,注意这些字段仅存在于内存中,无法序列化到db文件中。
// Bucket represents a collection of key/value pairs inside the database. type Bucket struct { *bucket tx *Tx // the associated transaction buckets map[string]*Bucket // subbucket cache page *page // inline page reference rootNode *node // materialized node for the root page. nodes map[pgid]*node // node cache // Sets the threshold for filling nodes when they split. By default, // the bucket will fill to 50% but it can be useful to increase this // amount if you know that your write workloads are mostly append-only. // // This is non-persisted across transactions so it must be set in every Tx. FillPercent float64 }
tx
:Bucket
所关联的事务id
buckets
: 缓存当前Bucket
下的子Bucket
page
: 仅当Bucket
为inline
时有效,即本Bucket
下的所有数据都可容纳于一个page中。
rootNode
: 缓存当前Bucket对应的B+树根节点
nodes
: 缓存Bucket中B+树的节点。
FillPercent
: 设置Bucket对应的B+树中的分裂阈值。默认值为50%。
B+树相关操作
查询数据
Bucket.Get
会在当前Bucket
中搜索key
对应的value, 如果B+树中找不到key, 或者key对应的value是子Bucket数据,则返回nil值。否则返回key对应的value值。
// Get retrieves the value for a key in the bucket. // Returns a nil value if the key does not exist or if the key is a nested bucket. // The returned value is only valid for the life of the transaction. func (b *Bucket) Get(key []byte) []byte { k, v, flags := b.Cursor().seek(key) // Return nil if this is a bucket. if (flags & bucketLeafFlag) != 0 { return nil } // If our target node isn't the same key as what's passed in then return nil. if !bytes.Equal(key, k) { return nil } return v }
以上b.Curosr().seek(key)
即完成了B+树中key的查找, Curosr.seek
代码如下
// seek moves the cursor to a given key and returns it. // If the key does not exist then the next key is used. func (c *Cursor) seek(seek []byte) (key []byte, value []byte, flags uint32) { _assert(c.bucket.tx.db != nil, "tx closed") // Start from root page/node and traverse to correct page. c.stack = c.stack[:0] c.search(seek, c.bucket.root) ref := &c.stack[len(c.stack)-1] // If the cursor is pointing to the end of page/node then return nil. if ref.index >= ref.count() { return nil, nil, 0 } // If this is a bucket then return a nil value. return c.keyValue() }
Cursor.seek
中调用了Cursor.search
, 该函数会从B+树根节点开始递归搜索key = seek
的数据。如下所示,Cursor.search
中
- 首先根据page id获取B+树根节点对应的
page
和node
对象(如果Bucket中已经缓存了该page, 则node
对象有效,否则为空)
- 然后对page类型进行校验,必须是branch page或leaf page类型。换言之,page类型不能是meta page或freelist page
- 接着进行递归搜索:如果
n
有效(对应节点已被Bucket缓存),则执行Cursor.searchNode
;否则执行Cursor.searchPage
- 递归搜索的边界条件是到达B+树叶子节点,此时调用
Cursor.nsearch
在叶子节点中查找key, 记录下第一个大于等于目标key的位置。
// search recursively performs a binary search against a given page/node until it finds a given key. func (c *Cursor) search(key []byte, pgid pgid) { p, n := c.bucket.pageNode(pgid) if p != nil && (p.flags&(branchPageFlag|leafPageFlag)) == 0 { panic(fmt.Sprintf("invalid page type: %d: %x", p.id, p.flags)) } e := elemRef{page: p, node: n} c.stack = append(c.stack, e) // If we're on a leaf page/node then find the specific node. if e.isLeaf() { c.nsearch(key) return } if n != nil { c.searchNode(key, n) return } c.searchPage(key, p) }
Cursor.searchNode
里,在节点n
的inodes
数组中二分查找key
,找到最后一个小于等于key
值的inode
的位置,接着在该位置对应的子节点中继续查找key
func (c *Cursor) searchNode(key []byte, n *node) { var exact bool index := sort.Search(len(n.inodes), func(i int) bool { // TODO(benbjohnson): Optimize this range search. It's a bit hacky right now. // sort.Search() finds the lowest index where f() != -1 but we need the highest index. ret := bytes.Compare(n.inodes[i].key, key) if ret == 0 { exact = true } return ret != -1 }) if !exact && index > 0 { index-- } c.stack[len(c.stack)-1].index = index // Recursively search to the next page. c.search(key, n.inodes[index].pgid) }
插入数据
接口Bucket.Put
用于在当前Bucket
对应的B+树中插入一对key/value数据。如果key已存在,则用新value覆盖老value值,如果key不存在,则新增key/value对。
在查询数据
这一节我们已经见到了Cursor.seek
这个接口,它的功能从Bucket
中搜索目标key。如果目标key存在,返回对应的key, value和flags(用于区分value是普通数据还是子Bucket数据)。如果目标key不存在,则返回大于目标key的最小key, 及其对应的value、flags
// Put sets the value for a key in the bucket. // If the key exist then its previous value will be overwritten. // Supplied value must remain valid for the life of the transaction. // Returns an error if the bucket was created from a read-only transaction, if the key is blank, if the key is too large, or if the value is too large. func (b *Bucket) Put(key []byte, value []byte) error { if b.tx.db == nil { return ErrTxClosed } else if !b.Writable() { return ErrTxNotWritable } else if len(key) == 0 { return ErrKeyRequired } else if len(key) > MaxKeySize { return ErrKeyTooLarge } else if int64(len(value)) > MaxValueSize { return ErrValueTooLarge } // Move cursor to correct position. c := b.Cursor() k, _, flags := c.seek(key) // Return an error if there is an existing key with a bucket value. if bytes.Equal(key, k) && (flags&bucketLeafFlag) != 0 { return ErrIncompatibleValue } // Insert into node. key = cloneBytes(key) c.node().put(key, key, value, 0, 0) return nil }
所以以上代码中,当Cursor.seek
返回之后,首先校验flags
, 如果为子Bucket数据返回ErrIncompatibleValue
错误。然后将key/value数据插入到Cursor
对象c
当前所在的节点上。
删除数据
Bucket.Delete
从当前Bucket对应的B+树中删除目标key的数据。如果目标key不存在则什么也不做
实现上:
- 校验:事务中db是否为空,当前Bucket是否可写
- 调用
Cursor.seek
寻找目标key在B+树上的位置。
- 在B+树叶子节点上删除目标key
// Delete removes a key from the bucket. // If the key does not exist then nothing is done and a nil error is returned. // Returns an error if the bucket was created from a read-only transaction. func (b *Bucket) Delete(key []byte) error { if b.tx.db == nil { return ErrTxClosed } else if !b.Writable() { return ErrTxNotWritable } // Move cursor to correct position. c := b.Cursor() _, _, flags := c.seek(key) // Return an error if there is already existing bucket value. if (flags & bucketLeafFlag) != 0 { return ErrIncompatibleValue } // Delete the node if we have a matching key. c.node().del(key) return nil }
节点分裂
随着用户不断向Bucket
中插入新数据,B+树的某些叶子节点可能会超过分裂阈值,从而触发分裂。
Bucket.spill
实现了当前Bucket
对应的B+树的分裂,以及所有子Bucket
对应的B+树的分裂(递归的)。其实现如下:
- 首先遍历缓存中的所有子Bucket,对于每个子Bucket
- 如果子Bucket小到可以
inlineable
, 则将子Bucket对应B+树中所有page释放,并将该子Bucket序列化到value中 - 否则对子Bucket递归调用
Bucket.spill
,并将更新后的Bucket header序列化到value中. - 在当前Bucket对应的B+树中更新子Bucket的value, key为子Bucket名
- 然后对当前Bucket对应的B+树调用
node.spill
,执行节点分裂
- 更新当前Bucket对应的B+树的根节点(B+树节点分裂可能会生成新的根节点)
// spill writes all the nodes for this bucket to dirty pages. func (b *Bucket) spill() error { // Spill all child buckets first. for name, child := range b.buckets { // If the child bucket is small enough and it has no child buckets then // write it inline into the parent bucket's page. Otherwise spill it // like a normal bucket and make the parent value a pointer to the page. var value []byte if child.inlineable() { child.free() value = child.write() } else { if err := child.spill(); err != nil { return err } // Update the child bucket header in this bucket. value = make([]byte, unsafe.Sizeof(bucket{})) var bucket = (*bucket)(unsafe.Pointer(&value[0])) *bucket = *child.bucket } // Skip writing the bucket if there are no materialized nodes. if child.rootNode == nil { continue } // Update parent node. var c = b.Cursor() k, _, flags := c.seek([]byte(name)) if !bytes.Equal([]byte(name), k) { panic(fmt.Sprintf("misplaced bucket header: %x -> %x", []byte(name), k)) } if flags&bucketLeafFlag == 0 { panic(fmt.Sprintf("unexpected bucket header flag: %x", flags)) } c.node().put([]byte(name), []byte(name), value, 0, bucketLeafFlag) } // Ignore if there's not a materialized root node. if b.rootNode == nil { return nil } // Spill nodes. if err := b.rootNode.spill(); err != nil { return err } b.rootNode = b.rootNode.root() // Update the root node for this bucket. if b.rootNode.pgid >= b.tx.meta.pgid { panic(fmt.Sprintf("pgid (%d) above high water mark (%d)", b.rootNode.pgid, b.tx.meta.pgid)) } b.root = b.rootNode.pgid return nil }
从上面的分析可以看到B+树分裂的核心逻辑其实在node.spill
中,代码略长这里就不列出了,感兴趣的同学可查看源码。这里仅总结基本思路:
在对一个B+树节点执行分裂操作时:
- 首先将当前节点的所有子节点按照
node.key
升序排序
- 然后当前节点的所有子节点递归调用
node.spill
, 执行可能触发的分裂操作
- 接着对当前节点调用
node.split
,返回一组分裂后的新节点, 放置于当前节点的父节点下,用于替换当前节点
- 最后处理边界条件:如果当前节点是B+树根节点,判断
node.spill
中是否生成了新的根节点,如果是则对新根节点执行node.spill
在node.split
中, 将当前节点根据一定的策略分裂成一组新节点, 策略如下:
- 如果当前节点中
len(n.inodes) <= minKeysPerPage * 2
(分裂后每个page中key的数量不超过minKeysPerPage
),则不必分裂
- 如果当前节点序列化之后的大小小于
pageSize
(该节点可被一个page容纳), 则不必分裂
- 根据
fillPercent
计算出node分裂的大小阈值threshold
:fillPercent * pageSize
- 根据
threshold
,找到当前node的分裂点(超过阈值的临界inode的位置)
- 如果当前node没有parent,即自身就是B+树根节点, 则创建一个新节点作为当前节点的父节点
- 在当前节点分裂点处一分为二,生成两个新节点
left
和right
, 接着对right
节点重复执行1-6步
节点合并
当boltdb中的B+树不断删除数据之后,导致一些节点中的数据越来越小,此时需要对低于一定阈值的节点进行递归合并。代码中由Bucket.reblance
实现.
在该函数中,首先对所有缓存的node调用node.rebalance
执行合并操作,然后对所有的子Bucket递归调用Bucket.rebalance
// rebalance attempts to balance all nodes. func (b *Bucket) rebalance() { for _, n := range b.nodes { n.rebalance() } for _, child := range b.buckets { child.rebalance() } }
从上面的分析看到,节点合并的核心逻辑在node.rebalance
中, 代码较长,这里仅总结其合并策略:
- 如果当前节点的
unbalanced
为false, 说明该节点从在本次事务中未删除任何数据,那么无需对其合并
- 如果当前节点大小大于
pageSize
的1/4, 且其中key的数量大于node.minKeys()
, 那么无需对其合并
- 除了1和2两种情况下,说明当前节点需要合并,合并步骤如下:
- 如果当前节点是B+树的根节点。按照以下两种情况细分
- 如果当前节点非leaf node且子节点个数为1,将子节点的所有子节点放置于当前节点下,并去除子节点。
- 如果当前节点是leaf node,直接返回
- 如果当前节点的子节点个数为零,则将其从父节点的
inodes
和children
中删除,然后对父节点重新执行rebalance
- 如果当前节点是其父节点的第一个子节点,则将右兄弟节点合并到当前节点中, 并对父节点执行
rebalance
- 如果当前节点不是其父节点的第一个子节点,则将当前节点合并到其左兄弟节点中,
- 并对父节点执行
rebalance
Bucket相关操作
创建子Bucket
Bucket.CreateBucket
: 在当前Bucket下创建子Bucket.
- 首先进行校验:db是否有效,当前Bucket是否可写,待新建Bucket的name是否为空
- 如果同名的子Bucket已存在,返回
ErrBucketExists
错误
- 如果同名的key/value数据已存在,返回
ErrIncompatibleValue
错误
- 在对应的B+树叶子节点中插入子Bucket数据,并返回子Bucket
注意:CreateBucket
会产生一个dirty node和一个new Bucket对象,都是在内存中,在事务提交之前不会flush到磁盘,不会影响其他只读事务。
// CreateBucket creates a new bucket at the given key and returns the new bucket. // Returns an error if the key already exists, if the bucket name is blank, or if the bucket name is too long. // The bucket instance is only valid for the lifetime of the transaction. func (b *Bucket) CreateBucket(key []byte) (*Bucket, error) { if b.tx.db == nil { return nil, ErrTxClosed } else if !b.tx.writable { return nil, ErrTxNotWritable } else if len(key) == 0 { return nil, ErrBucketNameRequired } // Move cursor to correct position. c := b.Cursor() k, _, flags := c.seek(key) // Return an error if there is an existing key. if bytes.Equal(key, k) { if (flags & bucketLeafFlag) != 0 { return nil, ErrBucketExists } return nil, ErrIncompatibleValue } // Create empty, inline bucket. var bucket = Bucket{ bucket: &bucket{}, rootNode: &node{isLeaf: true}, FillPercent: DefaultFillPercent, } var value = bucket.write() // Insert into node. key = cloneBytes(key) c.node().put(key, key, value, 0, bucketLeafFlag) // Since subbuckets are not allowed on inline buckets, we need to // dereference the inline page, if it exists. This will cause the bucket // to be treated as a regular, non-inline bucket for the rest of the tx. b.page = nil return b.Bucket(key), nil }
获取子Bucket
Bucket.Bucket
实现了从当前Bucket中获取名为name
的子Bucket。首先从缓存Bucket.buckets
中查找子Bucket, 有则直接返回。否则从当前Bucket对应的B+树中查找该子Bucket, 如果存在则将子Bucket加入到缓存中,并返回;否则返回nil
// Bucket retrieves a nested bucket by name. // Returns nil if the bucket does not exist. // The bucket instance is only valid for the lifetime of the transaction. func (b *Bucket) Bucket(name []byte) *Bucket { if b.buckets != nil { if child := b.buckets[string(name)]; child != nil { return child } } // Move cursor to key. c := b.Cursor() k, v, flags := c.seek(name) // Return nil if the key doesn't exist or it is not a bucket. if !bytes.Equal(name, k) || (flags&bucketLeafFlag) == 0 { return nil } // Otherwise create a bucket and cache it. var child = b.openBucket(v) if b.buckets != nil { b.buckets[string(name)] = child } return child }
删除子Bucket
Bucket.DeleteBucket
实现了在当前Bucket中删除指定name
的子Bucket. 由于子Bucket可能还嵌套包含自己的子Bucket, 因此在从当前Bucket对应的B+树中删除该子Bucket时,还需递归调用Bucket.DeleteBucket
将子Bucket中的子Bucket删除。