区块链教程Fabric1.0源代码分析Ledger historydb历史数据库

简介:

1、historydb概述

historydb,用于存储所有块读写集中写集的内容。
代码分布在core/ledger/kvledger/history/historydb目录下,目录结构如下:

  • historydb.go,定义核心接口HistoryDBProvider和HistoryDB。
  • histmgr_helper.go,historydb工具函数。
  • historyleveldb目录,historydb基于leveldb的实现。
        * historyleveldb.go,HistoryDBProvider和HistoryDB接口实现,即historyleveldb.HistoryDBProvider和historyleveldb.historyDB结构体及方法。

    * historyleveldb_query_executer.go,定义LevelHistoryDBQueryExecutor和historyScanner结构体及方法。

2、核心接口定义

HistoryDBProvider接口定义:

type HistoryDBProvider interface {
    GetDBHandle(id string) (HistoryDB, error) //获取HistoryDB
    Close() //关闭所有HistoryDB
}
//代码在core/ledger/kvledger/history/historydb/historydb.go

HistoryDB接口定义:

type HistoryDB interface {
    //构造 LevelHistoryDBQueryExecutor
    NewHistoryQueryExecutor(blockStore blkstorage.BlockStore) (ledger.HistoryQueryExecutor, error)
    //提交Block入historyDB
    Commit(block *common.Block) error
    //获取savePointKey,即version.Height
    GetLastSavepoint() (*version.Height, error)
    //是否应该恢复,比较lastAvailableBlock和Savepoint
    ShouldRecover(lastAvailableBlock uint64) (bool, uint64, error)
    //提交丢失的块
    CommitLostBlock(block *common.Block) error
}
//代码在core/ledger/kvledger/history/historydb/historydb.go

补充ledger.HistoryQueryExecutor接口定义:执行历史记录查询。

type HistoryQueryExecutor interface {
    GetHistoryForKey(namespace string, key string) (commonledger.ResultsIterator, error) //按key查历史记录
}
//代码在core/ledger/ledger_interface.go

3、historydb工具函数

//构造复合HistoryKey,ns 0x00 key 0x00 blocknum trannum
func ConstructCompositeHistoryKey(ns string, key string, blocknum uint64, trannum uint64) []byte
//构造部分复合HistoryKey,ns 0x00 key 0x00 0xff
func ConstructPartialCompositeHistoryKey(ns string, key string, endkey bool) []byte 
//按分隔符separator,分割bytesToSplit
func SplitCompositeHistoryKey(bytesToSplit []byte, separator []byte) ([]byte, []byte) 
//代码在core/ledger/kvledger/history/historydb/histmgr_helper.go

4、HistoryDB接口实现

HistoryDB接口实现,即historyleveldb.historyDB结构体及方法。historyDB结构体定义如下:

type historyDB struct {
    db     *leveldbhelper.DBHandle //leveldb
    dbName string //dbName
}
//代码在core/ledger/kvledger/history/historydb/historyleveldb/historyleveldb.go

涉及方法如下:

//构造historyDB
func newHistoryDB(db *leveldbhelper.DBHandle, dbName string) *historyDB
//do nothing
func (historyDB *historyDB) Open() error
//do nothing
func (historyDB *historyDB) Close()
//提交Block入historyDB,将读写集中写集入库,并更新savePointKey
func (historyDB *historyDB) Commit(block *common.Block) error
//构造 LevelHistoryDBQueryExecutor
func (historyDB *historyDB) NewHistoryQueryExecutor(blockStore blkstorage.BlockStore) (ledger.HistoryQueryExecutor, error)
获取savePointKey,即version.Height
func (historyDB *historyDB) GetLastSavepoint() (*version.Height, error)
//是否应该恢复,比较lastAvailableBlock和Savepoint
func (historyDB *historyDB) ShouldRecover(lastAvailableBlock uint64) (bool, uint64, error)
//提交丢失的块,即调用historyDB.Commit(block)
func (historyDB *historyDB) CommitLostBlock(block *common.Block) error
//代码在core/ledger/kvledger/history/historydb/historyleveldb/historyleveldb.go

func (historyDB historyDB) Commit(block common.Block) error代码如下:

blockNo := block.Header.Number //区块编号
var tranNo uint64 //交易编号,初始化值为0
dbBatch := leveldbhelper.NewUpdateBatch() //leveldb批量更新

//交易验证代码,type TxValidationFlags []uint8
//交易筛选器
txsFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
if len(txsFilter) == 0 {
    txsFilter = util.NewTxValidationFlags(len(block.Data.Data))
    block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter
}
for _, envBytes := range block.Data.Data {
    if txsFilter.IsInvalid(int(tranNo)) { //检查指定的交易是否有效
        tranNo++
        continue
    }
    //[]byte反序列化为Envelope
    env, err := putils.GetEnvelopeFromBlock(envBytes)
    payload, err := putils.GetPayload(env) //e.Payload反序列化为Payload
    //[]byte反序列化为ChannelHeader
    chdr, err := putils.UnmarshalChannelHeader(payload.Header.ChannelHeader)

    if common.HeaderType(chdr.Type) == common.HeaderType_ENDORSER_TRANSACTION { //背书交易,type HeaderType int32
        respPayload, err := putils.GetActionFromEnvelope(envBytes) //获取ChaincodeAction
        txRWSet := &rwsetutil.TxRwSet{}
        err = txRWSet.FromProtoBytes(respPayload.Results) //[]byte反序列化后构造NsRwSet,加入txRWSet.NsRwSets
        for _, nsRWSet := range txRWSet.NsRwSets {
            ns := nsRWSet.NameSpace
            for _, kvWrite := range nsRWSet.KvRwSet.Writes {
                writeKey := kvWrite.Key
                //txRWSet中写集入库
                compositeHistoryKey := historydb.ConstructCompositeHistoryKey(ns, writeKey, blockNo, tranNo)
                dbBatch.Put(compositeHistoryKey, emptyValue)
            }
        }
    } else {
        logger.Debugf("Skipping transaction [%d] since it is not an endorsement transaction\n", tranNo)
    }
    tranNo++
}

height := version.NewHeight(blockNo, tranNo)
dbBatch.Put(savePointKey, height.ToBytes())
err := historyDB.db.WriteBatch(dbBatch, true)
//代码在core/ledger/kvledger/history/historydb/historyleveldb/historyleveldb.go

Tx(Transaction 交易)相关更详细内容,参考:Fabric 1.0源代码笔记 之 Tx(Transaction 交易)

5、HistoryDBProvider接口实现

HistoryDBProvider接口实现,即historyleveldb.HistoryDBProvider结构体和方法。

type HistoryDBProvider struct {
    dbProvider *leveldbhelper.Provider
}

//构造HistoryDBProvider
func NewHistoryDBProvider() *HistoryDBProvider
//获取HistoryDB
func (provider *HistoryDBProvider) GetDBHandle(dbName string) (historydb.HistoryDB, error)
//关闭所有HistoryDB句柄,调取provider.dbProvider.Close()
func (provider *HistoryDBProvider) Close()
//代码在core/ledger/kvledger/history/historydb/historyleveldb/historyleveldb.go

6、LevelHistoryDBQueryExecutor和historyScanner结构体及方法

LevelHistoryDBQueryExecutor结构体及方法:实现ledger.HistoryQueryExecutor接口。

type LevelHistoryDBQueryExecutor struct {
    historyDB  *historyDB
    blockStore blkstorage.BlockStore //用于传递给historyScanner
}
//按key查historyDB,调用q.historyDB.db.GetIterator(compositeStartKey, compositeEndKey)
func (q *LevelHistoryDBQueryExecutor) GetHistoryForKey(namespace string, key string) (commonledger.ResultsIterator, error) 
//代码在core/ledger/kvledger/history/historydb/historyleveldb/historyleveldb_query_executer.go

historyScanner结构体及方法:实现ledger.ResultsIterator接口。

type historyScanner struct {
    compositePartialKey []byte //ns 0x00 key 0x00
    namespace           string
    key                 string
    dbItr               iterator.Iterator //leveldb迭代器
    blockStore          blkstorage.BlockStore
}

//构造historyScanner
func newHistoryScanner(compositePartialKey []byte, namespace string, key string, dbItr iterator.Iterator, blockStore blkstorage.BlockStore) *historyScanner
//按迭代器中key取blockNum和tranNum,再按blockNum和tranNum从blockStore中取Envelope,然后从Envelope的txRWSet.NsRwSets中按key查找并构造queryresult.KeyModification
func (scanner *historyScanner) Next() (commonledger.QueryResult, error)
func (scanner *historyScanner) Close() //scanner.dbItr.Release()
从Envelope的txRWSet.NsRwSets中按key查找并构造queryresult.KeyModification
func getKeyModificationFromTran(tranEnvelope *common.Envelope, namespace string, key string) (commonledger.QueryResult, error)
//代码在core/ledger/kvledger/history/historydb/historyleveldb/historyleveldb_query_executer.go

补充queryresult.KeyModification:

type KeyModification struct {
    TxId      string //交易ID,ChannelHeader.TxId
    Value     []byte //读写集中Value,KVWrite.Value
    Timestamp *google_protobuf.Timestamp //ChannelHeader.Timestamp
    IsDelete  bool //KVWrite.IsDelete
}
//代码在protos/ledger/queryresult/kv_query_result.pb.go

感谢关注兄弟连区块链教程分享!

相关文章
|
5天前
|
存储 数据挖掘 数据处理
2600 万表流计算分析如何做到? 时序数据库 TDengine 助力数百家超市智能化转型
在生鲜超市的高效运营中,实时数据分析至关重要。万象云鼎的“云鲜生”通过智能秤+网关+软件系统的组合,实现了销售数据的精准管理与优化。而在数据处理方面,TDengine 的流计算能力成为了这一方案的核心支撑。本文详细分享了“云鲜生”如何利用 TDengine 高效存储和分析海量销售数据,在优化超市运营、提升用户体验的同时,解决高基数分组、高并发查询等技术挑战。
21 1
|
19天前
|
存储 关系型数据库 分布式数据库
PolarDB 开源基础教程系列 8 数据库生态
PolarDB是一款开源的云原生分布式数据库,源自阿里云商业产品。为降低使用门槛,PolarDB携手伙伴打造了完整的开源生态,涵盖操作系统、芯片、存储、集成管控、监控、审计、开发者工具、数据同步、超融合计算、ISV软件、开源插件、人才培养、社区合作及大型用户合作等领域。通过这些合作伙伴,PolarDB提供了丰富的功能和服务,支持多种硬件和软件环境,满足不同用户的需求。更多信息请访问[PolarDB开源官方网站](https://openpolardb.com/home)。
53 4
|
2月前
|
关系型数据库 MySQL API
新手教程:数据库操作(使用PDO或MySQLi扩展)
本文为新手介绍如何使用PDO和MySQLi扩展连接与操作MySQL数据库。PDO更现代灵活,支持多种数据库,适合大多数应用;MySQLi提供面向过程和面向对象两种API,适合直接控制数据库操作。教程涵盖安装配置、创建连接、执行查询(查询、插入、更新、删除)及错误处理等内容。希望这篇教程能帮助你快速上手PHP中的数据库操作!
163 32
|
2月前
|
关系型数据库 分布式数据库 数据库
瑶池数据库大讲堂|PolarDB HTAP:为在线业务插上实时分析的翅膀
瑶池数据库大讲堂介绍PolarDB HTAP,为在线业务提供实时分析能力。内容涵盖MySQL在线业务的分析需求与现有解决方案、PolarDB HTAP架构优化、针对分析型负载的优化(如向量化执行、多核并行处理)及近期性能改进和用户体验提升。通过这些优化,PolarDB HTAP实现了高效的数据处理和查询加速,帮助用户更好地应对复杂业务场景。
|
4月前
|
存储 SQL Apache
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
Apache Doris 是一个基于 MPP 架构的高性能实时分析数据库,以其极高的速度和易用性著称。它支持高并发点查询和复杂分析场景,适用于报表分析、即席查询、数据仓库和数据湖查询加速等。最新发布的 2.0.2 版本在性能、稳定性和多租户支持方面有显著提升。社区活跃,已广泛应用于电商、广告、用户行为分析等领域。
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
|
5月前
|
SQL 关系型数据库 MySQL
Vanna使用ollama分析本地数据库
这篇文章详细介绍了如何使用Vanna和Ollama框架来分析本地数据库,实现自然语言查询转换为SQL语句并与数据库交互的过程。
1179 7
Vanna使用ollama分析本地数据库
|
4月前
|
存储 Java 关系型数据库
在Java开发中,数据库连接是应用与数据交互的关键环节。本文通过案例分析,深入探讨Java连接池的原理与最佳实践
在Java开发中,数据库连接是应用与数据交互的关键环节。本文通过案例分析,深入探讨Java连接池的原理与最佳实践,包括连接创建、分配、复用和释放等操作,并通过电商应用实例展示了如何选择合适的连接池库(如HikariCP)和配置参数,实现高效、稳定的数据库连接管理。
99 2
|
4月前
|
存储 机器学习/深度学习 监控
南大通用GBase 8s数据库onbar基础使用教程
数据备份与恢复是确保数据安全和业务连续性的关键。onbar作为GBase 8s数据库的备份工具,需配合存储管理器使用,通过配置BAR_BSALIB_PATH等参数,实现数据的备份与恢复。本文详细介绍了onbar的配置、备份、恢复及监控流程,帮助数据库管理员构建高效的数据保护方案。
|
5月前
|
tengine 关系型数据库 MySQL
Tengine、Nginx安装MySQL数据库命令教程
本指南详细介绍了在Linux系统上安装与配置MySQL数据库的步骤。首先通过下载并安装MySQL社区版本,接着启动MySQL服务,使用`systemctl start mysqld.service`命令。若启动失败,可尝试使用`sudo /etc/init.d/mysqld start`。利用`systemctl status mysqld.service`检查MySQL的服务状态,确保其处于运行中。通过日志文件获取初始密码,使用该密码登录数据库,并按要求更改初始密码以增强安全性。随后创建一个名为`tengine`的数据库,最后验证数据库创建是否成功以及完成整个设置流程。
|
5月前
|
存储 SQL 关系型数据库
【入门级教程】MySQL:从零开始的数据库之旅
本教程面向零基础用户,采用通俗易懂的语言和丰富的示例,帮助你快速掌握MySQL的基础知识和操作技巧。内容涵盖SQL语言基础(SELECT、INSERT、UPDATE、DELETE等常用语句)、使用索引提高查询效率、存储过程等。适合学生、开发者及数据库爱好者。
128 0
【入门级教程】MySQL:从零开始的数据库之旅