- 模块解读
逻辑控制对象解读
控制流程梳理
7.2.1. 逻辑控制服务解读
7.2.1.1. 同步模块服务
type BlockChainSyncServer struct {
chainId string
net protocol.NetService // 网络服务,向其他节点获取/发送数据,由外部模块提供实现
msgBus msgbus.MessageBus // 内部消息服务,提供订阅指定 Topic 的功能,由外部模块提供实现
blockChainStore protocol.BlockchainStore // 存储服务,提供本地区块的读写,由外部模块提供实现
ledgerCache protocol.LedgerCache // 节点本地最高快的缓存
blockVerifier protocol.BlockVerifier // 区块验证服务,校验区块有效性
blockCommitter protocol.BlockCommitter // 区块提交服务,提交验证有效的区块
log *logger.CMLogger
conf *BlockSyncServerConf // Sync 模块基础配置
start int32 // 服务是否启动的原子数标记位
close chan bool // 关闭信号通道
scheduler *Routine // 调度服务通道,包含处理信号接收,处理入口等逻辑
processor *Routine // 处理服务通道,处理区块
}
主要的方法入口:
func (sync *BlockChainSyncServer) Start() error {
// 检查服务是否启动
check sync.start
// 1. 初始化配置
// 1)初始化任务调度服务入口
processor := newProcessor()
scheduler := newScheduler()
// 2)初始化消息队列
sync.scheduler = NewRoutine("scheduler", scheduler.handler, scheduler.getServiceState, sync.log)
sync.processor = NewRoutine("processor", processor.handler, processor.getServiceState, sync.log)
// 2. 注册监听消息
// 1)在 msgBus 中注册监听 "msgbus.BlockInfo" 的Topic
sync.msgBus.Register(msgbus.BlockInfo, sync)
// 2) 在 Net 服务中注册监听 “netPb.NetMsg_SYNC_BLOCK_MSG” 消息类型
sync.net.Subscribe(netPb.NetMsg_SYNC_BLOCK_MSG, sync.blockSyncMsgHandler)
sync.net.ReceiveMsg(netPb.NetMsg_SYNC_BLOCK_MSG, sync.blockSyncMsgHandler)
// 3. 启动消息队列
sync.scheduler.begin()
sync.processor.begin()
// 4. 启动 同步模块的定时任务
go sync.loop()
}
7.2.1.2. 调度服务
// scheduler Retrieves block data of specified height from different nodes
type scheduler struct {
peers map[string]uint64 // 网络中节点的高度 {"node1": 101, "node2": 102, "node3": 103}
blockStates map[uint64]blockState // 指定高度的处理状态 {101: Received, 102: Pending, 103: New}
pendingTime map[uint64]time.Time // 指定高度的处理发起时间 {101: time1, 102: time2, 103: time3}
pendingBlocks map[uint64]string // 指定高度的查询节点ID {101: "node1", 102: "node2", 103: "node3"}
receivedBlocks map[uint64]string // 指定高度已收到区块记录的缓存 {101: "node1", 102: "node2", 103: "node3"}
lastRequest time.Time // 批量请求发起时间
pendingRecvHeight uint64 // 本地当前最高块 + 1,下一个请求批次的起始高度
maxPendingBlocks uint64 // 最大区块处理数量,等同于 blockStates 的长度
BatchesizeInEachReq uint64 // 最大单次请求批次区块数量
peerReqTimeout time.Duration // 网络中区块高度查询超时时间
reqTimeThreshold time.Duration // 是否开始同步的检查项,time.Now - lastRequest > reqTimeThreshold == true 开始同步
log *logger.CMLogger
sender syncSender // 同步模块对象实例
ledger protocol.LedgerCache
}
7.2.1.3. 处理服务
type processor struct {
queue map[uint64]blockWithPeerInfo // 收到的区块消息缓存
hasCommitBlock uint64 // 本地已经提交的区块高度记录
log *logger.CMLogger
ledgerCache protocol.LedgerCache // 本地账本缓存
verifyAndAddBlock // 校验和存储模块接口实例
}
7.2.1.4. 消息队列
// Routine Provide hosting of the service in goroutine
type Routine struct {
name string
handle handleFunc // 调度逻辑入口
queryState getServiceState
log *logger.CMLogger
start int32 // 服务启动标记位
queue *queue.PriorityQueue // 消息队列
out chan queue.Item // 结果队列,转发消息类型,scheduler 的消息由 processor 的 handle 再次处理,processor 由 scheduler 处理
stop chan struct{}
}