区块链教程Fabric1.0源代码分析Ledger historydb历史数据库

简介:

1、historydb概述

historydb,用于存储所有块读写集中写集的内容。
代码分布在core/ledger/kvledger/history/historydb目录下,目录结构如下:

  • historydb.go,定义核心接口HistoryDBProvider和HistoryDB。
  • histmgr_helper.go,historydb工具函数。
  • historyleveldb目录,historydb基于leveldb的实现。
        * historyleveldb.go,HistoryDBProvider和HistoryDB接口实现,即historyleveldb.HistoryDBProvider和historyleveldb.historyDB结构体及方法。

    * historyleveldb_query_executer.go,定义LevelHistoryDBQueryExecutor和historyScanner结构体及方法。

2、核心接口定义

HistoryDBProvider接口定义:

type HistoryDBProvider interface {
    GetDBHandle(id string) (HistoryDB, error) //获取HistoryDB
    Close() //关闭所有HistoryDB
}
//代码在core/ledger/kvledger/history/historydb/historydb.go

HistoryDB接口定义:

type HistoryDB interface {
    //构造 LevelHistoryDBQueryExecutor
    NewHistoryQueryExecutor(blockStore blkstorage.BlockStore) (ledger.HistoryQueryExecutor, error)
    //提交Block入historyDB
    Commit(block *common.Block) error
    //获取savePointKey,即version.Height
    GetLastSavepoint() (*version.Height, error)
    //是否应该恢复,比较lastAvailableBlock和Savepoint
    ShouldRecover(lastAvailableBlock uint64) (bool, uint64, error)
    //提交丢失的块
    CommitLostBlock(block *common.Block) error
}
//代码在core/ledger/kvledger/history/historydb/historydb.go

补充ledger.HistoryQueryExecutor接口定义:执行历史记录查询。

type HistoryQueryExecutor interface {
    GetHistoryForKey(namespace string, key string) (commonledger.ResultsIterator, error) //按key查历史记录
}
//代码在core/ledger/ledger_interface.go

3、historydb工具函数

//构造复合HistoryKey,ns 0x00 key 0x00 blocknum trannum
func ConstructCompositeHistoryKey(ns string, key string, blocknum uint64, trannum uint64) []byte
//构造部分复合HistoryKey,ns 0x00 key 0x00 0xff
func ConstructPartialCompositeHistoryKey(ns string, key string, endkey bool) []byte 
//按分隔符separator,分割bytesToSplit
func SplitCompositeHistoryKey(bytesToSplit []byte, separator []byte) ([]byte, []byte) 
//代码在core/ledger/kvledger/history/historydb/histmgr_helper.go

4、HistoryDB接口实现

HistoryDB接口实现,即historyleveldb.historyDB结构体及方法。historyDB结构体定义如下:

type historyDB struct {
    db     *leveldbhelper.DBHandle //leveldb
    dbName string //dbName
}
//代码在core/ledger/kvledger/history/historydb/historyleveldb/historyleveldb.go

涉及方法如下:

//构造historyDB
func newHistoryDB(db *leveldbhelper.DBHandle, dbName string) *historyDB
//do nothing
func (historyDB *historyDB) Open() error
//do nothing
func (historyDB *historyDB) Close()
//提交Block入historyDB,将读写集中写集入库,并更新savePointKey
func (historyDB *historyDB) Commit(block *common.Block) error
//构造 LevelHistoryDBQueryExecutor
func (historyDB *historyDB) NewHistoryQueryExecutor(blockStore blkstorage.BlockStore) (ledger.HistoryQueryExecutor, error)
获取savePointKey,即version.Height
func (historyDB *historyDB) GetLastSavepoint() (*version.Height, error)
//是否应该恢复,比较lastAvailableBlock和Savepoint
func (historyDB *historyDB) ShouldRecover(lastAvailableBlock uint64) (bool, uint64, error)
//提交丢失的块,即调用historyDB.Commit(block)
func (historyDB *historyDB) CommitLostBlock(block *common.Block) error
//代码在core/ledger/kvledger/history/historydb/historyleveldb/historyleveldb.go

func (historyDB historyDB) Commit(block common.Block) error代码如下:

blockNo := block.Header.Number //区块编号
var tranNo uint64 //交易编号,初始化值为0
dbBatch := leveldbhelper.NewUpdateBatch() //leveldb批量更新

//交易验证代码,type TxValidationFlags []uint8
//交易筛选器
txsFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
if len(txsFilter) == 0 {
    txsFilter = util.NewTxValidationFlags(len(block.Data.Data))
    block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter
}
for _, envBytes := range block.Data.Data {
    if txsFilter.IsInvalid(int(tranNo)) { //检查指定的交易是否有效
        tranNo++
        continue
    }
    //[]byte反序列化为Envelope
    env, err := putils.GetEnvelopeFromBlock(envBytes)
    payload, err := putils.GetPayload(env) //e.Payload反序列化为Payload
    //[]byte反序列化为ChannelHeader
    chdr, err := putils.UnmarshalChannelHeader(payload.Header.ChannelHeader)

    if common.HeaderType(chdr.Type) == common.HeaderType_ENDORSER_TRANSACTION { //背书交易,type HeaderType int32
        respPayload, err := putils.GetActionFromEnvelope(envBytes) //获取ChaincodeAction
        txRWSet := &rwsetutil.TxRwSet{}
        err = txRWSet.FromProtoBytes(respPayload.Results) //[]byte反序列化后构造NsRwSet,加入txRWSet.NsRwSets
        for _, nsRWSet := range txRWSet.NsRwSets {
            ns := nsRWSet.NameSpace
            for _, kvWrite := range nsRWSet.KvRwSet.Writes {
                writeKey := kvWrite.Key
                //txRWSet中写集入库
                compositeHistoryKey := historydb.ConstructCompositeHistoryKey(ns, writeKey, blockNo, tranNo)
                dbBatch.Put(compositeHistoryKey, emptyValue)
            }
        }
    } else {
        logger.Debugf("Skipping transaction [%d] since it is not an endorsement transaction\n", tranNo)
    }
    tranNo++
}

height := version.NewHeight(blockNo, tranNo)
dbBatch.Put(savePointKey, height.ToBytes())
err := historyDB.db.WriteBatch(dbBatch, true)
//代码在core/ledger/kvledger/history/historydb/historyleveldb/historyleveldb.go

Tx(Transaction 交易)相关更详细内容,参考:Fabric 1.0源代码笔记 之 Tx(Transaction 交易)

5、HistoryDBProvider接口实现

HistoryDBProvider接口实现,即historyleveldb.HistoryDBProvider结构体和方法。

type HistoryDBProvider struct {
    dbProvider *leveldbhelper.Provider
}

//构造HistoryDBProvider
func NewHistoryDBProvider() *HistoryDBProvider
//获取HistoryDB
func (provider *HistoryDBProvider) GetDBHandle(dbName string) (historydb.HistoryDB, error)
//关闭所有HistoryDB句柄,调取provider.dbProvider.Close()
func (provider *HistoryDBProvider) Close()
//代码在core/ledger/kvledger/history/historydb/historyleveldb/historyleveldb.go

6、LevelHistoryDBQueryExecutor和historyScanner结构体及方法

LevelHistoryDBQueryExecutor结构体及方法:实现ledger.HistoryQueryExecutor接口。

type LevelHistoryDBQueryExecutor struct {
    historyDB  *historyDB
    blockStore blkstorage.BlockStore //用于传递给historyScanner
}
//按key查historyDB,调用q.historyDB.db.GetIterator(compositeStartKey, compositeEndKey)
func (q *LevelHistoryDBQueryExecutor) GetHistoryForKey(namespace string, key string) (commonledger.ResultsIterator, error) 
//代码在core/ledger/kvledger/history/historydb/historyleveldb/historyleveldb_query_executer.go

historyScanner结构体及方法:实现ledger.ResultsIterator接口。

type historyScanner struct {
    compositePartialKey []byte //ns 0x00 key 0x00
    namespace           string
    key                 string
    dbItr               iterator.Iterator //leveldb迭代器
    blockStore          blkstorage.BlockStore
}

//构造historyScanner
func newHistoryScanner(compositePartialKey []byte, namespace string, key string, dbItr iterator.Iterator, blockStore blkstorage.BlockStore) *historyScanner
//按迭代器中key取blockNum和tranNum,再按blockNum和tranNum从blockStore中取Envelope,然后从Envelope的txRWSet.NsRwSets中按key查找并构造queryresult.KeyModification
func (scanner *historyScanner) Next() (commonledger.QueryResult, error)
func (scanner *historyScanner) Close() //scanner.dbItr.Release()
从Envelope的txRWSet.NsRwSets中按key查找并构造queryresult.KeyModification
func getKeyModificationFromTran(tranEnvelope *common.Envelope, namespace string, key string) (commonledger.QueryResult, error)
//代码在core/ledger/kvledger/history/historydb/historyleveldb/historyleveldb_query_executer.go

补充queryresult.KeyModification:

type KeyModification struct {
    TxId      string //交易ID,ChannelHeader.TxId
    Value     []byte //读写集中Value,KVWrite.Value
    Timestamp *google_protobuf.Timestamp //ChannelHeader.Timestamp
    IsDelete  bool //KVWrite.IsDelete
}
//代码在protos/ledger/queryresult/kv_query_result.pb.go

感谢关注兄弟连区块链教程分享!

相关文章
|
18天前
|
SQL 数据库连接 API
Perl 教程 之 Perl 数据库连接 7
Perl DBI教程讲解了如何连接数据库,它是与数据库交互的标准接口,提供平台无关的访问。支持事务处理,可通过设置`AutoCommit => 0`在连接时开始事务或使用`$dbh->begin_work()`。事务结束后,用`commit`提交或`rollback`回滚。完成工作后,用`$dbh->disconnect`断开连接。
18 1
|
19天前
|
SQL 数据库连接 API
Perl 教程 之 Perl 数据库连接 4
Perl的DBI模块提供数据库独立接口,用于连接和操作数据库。通过prepare()预处理SQL,execute()执行,finish()释放句柄,及commit()提交事务。
12 1
|
21天前
|
Cloud Native OLAP OLTP
在业务处理分析一体化的背景下,开发者如何平衡OLTP和OLAP数据库的技术需求与选型?
在业务处理分析一体化的背景下,开发者如何平衡OLTP和OLAP数据库的技术需求与选型?
122 4
|
21天前
|
SQL 关系型数据库 MySQL
【MySQL技术专题】「问题实战系列」深入探索和分析MySQL数据库的数据备份和恢复实战开发指南(8.0版本升级篇)
【MySQL技术专题】「问题实战系列」深入探索和分析MySQL数据库的数据备份和恢复实战开发指南(8.0版本升级篇)
94 0
|
27天前
|
SQL 关系型数据库 MySQL
阿里云MySQL数据库价格、购买、创建账号密码和连接数据库教程
阿里云数据库使用指南:购买MySQL、SQL Server等RDS实例,选择配置和地区,完成支付。创建数据库和账号,设置权限。通过DMS登录数据库,使用账号密码访问。同地域VPC内的ECS需将IP加入白名单以实现内网连接。参考链接提供详细步骤。
367 3
|
16天前
|
弹性计算 关系型数据库 MySQL
阿里云数据库服务器价格表,数据库创建、连接和使用教程
阿里云数据库使用流程包括购买和管理。选择所需数据库类型如MySQL,完成实名认证后购买,配置CPU、内存和存储。确保数据库地域与ECS相同以允许内网连接。创建数据库和账号,设置权限。通过DMS登录数据库,使用账号密码连接。同一VPC内的ECS需添加至白名单以进行内网通信。参考官方文档进行详细操作。
76 3
|
19天前
|
SQL 关系型数据库 数据库连接
Perl 教程 之 Perl 数据库连接 1
Perl教程:使用DBI模块实现数据库连接,DBI是数据库独立接口,适用于Oracle、MySQL等。它定义通用方法,通过API处理SQL,分配给驱动执行。常用变量如$dsn(数据库源),$dbh(数据库句柄),$sth(语句句柄),返回值用$rc和$rv,查询结果存入@ary或(rows)。文件操作用$fh,属性用%\attr。
143 2
|
21天前
|
SQL 关系型数据库 MySQL
【MySQL技术专题】「问题实战系列」深入探索和分析MySQL数据库的数据备份和恢复实战开发指南(数据恢复补充篇)(一)
【MySQL技术专题】「问题实战系列」深入探索和分析MySQL数据库的数据备份和恢复实战开发指南(数据恢复补充篇)
29 0
|
27天前
|
弹性计算 关系型数据库 MySQL
阿里云MySQL云数据库优惠价格、购买和使用教程分享!
阿里云数据库使用流程包括购买和管理。首先,选购支持MySQL、SQL Server、PostgreSQL等的RDS实例,如选择2核2GB的MySQL,设定地域和可用区。购买后,等待实例创建。接着,创建数据库和账号,设置DB名称、字符集及账号权限。最后,通过DMS登录数据库,填写账号和密码。若ECS在同一地域和VPC内,可内网连接,记得将ECS IP加入白名单。
428 2
|
28天前
|
SQL 关系型数据库 MySQL
阿里云mysql数据库价格购买和使用教程
阿里云数据库使用指南:购买MySQL、SQL Server等RDS实例,通过选择配置、地域和可用区完成购买。创建数据库和账号,分配权限。使用DMS登录数据库,进行管理操作。确保ECS与RDS在同一地域的VPC内,配置白名单实现内网连接。详细步骤见官方文档。
628 1