区块链教程Fabric1.0源代码分析Orderer BroadcastServer

简介:

  兄弟连区块链教程Fabric1.0源代码分析Orderer BroadcastServer,2018年下半年,区块链行业正逐渐褪去发展之初的浮躁、回归理性,表面上看相关人才需求与身价似乎正在回落。但事实上,正是初期泡沫的渐退,让人们更多的关注点放在了区块链真正的技术之上。

Fabric 1.0源代码笔记 之 Orderer #BroadcastServer(Broadcast服务端)

1、BroadcastServer概述

BroadcastServer相关代码在protos/orderer、orderer目录下。

protos/orderer/ab.pb.go,AtomicBroadcastServer接口定义。
orderer/server.go,go,AtomicBroadcastServer接口实现。
有个图

2、AtomicBroadcastServer接口定义

2.1、AtomicBroadcastServer接口定义

type AtomicBroadcastServer interface {
    Broadcast(AtomicBroadcast_BroadcastServer) error
    Deliver(AtomicBroadcast_DeliverServer) error
}
//代码在protos/orderer/ab.pb.go
···

### 2.2、gRPC相关实现

var _AtomicBroadcast_serviceDesc = grpc.ServiceDesc{

ServiceName: "orderer.AtomicBroadcast",
HandlerType: (*AtomicBroadcastServer)(nil),
Methods:     []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
    {
        StreamName:    "Broadcast",
        Handler:       _AtomicBroadcast_Broadcast_Handler,
        ServerStreams: true,
        ClientStreams: true,
    },
    {
        StreamName:    "Deliver",
        Handler:       _AtomicBroadcast_Deliver_Handler,
        ServerStreams: true,
        ClientStreams: true,
    },
},
Metadata: "orderer/ab.proto",

}

func RegisterAtomicBroadcastServer(s *grpc.Server, srv AtomicBroadcastServer) {

s.RegisterService(&_AtomicBroadcast_serviceDesc, srv)

}

func _AtomicBroadcast_Broadcast_Handler(srv interface{}, stream grpc.ServerStream) error {

return srv.(AtomicBroadcastServer).Broadcast(&atomicBroadcastBroadcastServer{stream})

}

func _AtomicBroadcast_Deliver_Handler(srv interface{}, stream grpc.ServerStream) error {

return srv.(AtomicBroadcastServer).Deliver(&atomicBroadcastDeliverServer{stream})

}
//代码在protos/orderer/ab.pb.go


## 3、AtomicBroadcastServer接口实现

### 3.1、server结构体

server结构体:

type server struct {

bh broadcast.Handler
dh deliver.Handler

}

type broadcastSupport struct {

multichain.Manager
broadcast.ConfigUpdateProcessor

}
//代码在orderer/server.go


broadcast.Handler:

type Handler interface {

Handle(srv ab.AtomicBroadcast_BroadcastServer) error

}

type handlerImpl struct {

sm SupportManager

}

func NewHandlerImpl(sm SupportManager) Handler {

return &handlerImpl{
    sm: sm,
}

}

type SupportManager interface {

ConfigUpdateProcessor
GetChain(chainID string) (Support, bool)

}

type ConfigUpdateProcessor interface { //处理通道配置更新

Process(envConfigUpdate *cb.Envelope) (*cb.Envelope, error)

}
//代码在orderer/common/broadcast/broadcast.go


deliver.Handler:

type Handler interface {

Handle(srv ab.AtomicBroadcast_DeliverServer) error

}

type deliverServer struct {

sm SupportManager

}

type SupportManager interface {

GetChain(chainID string) (Support, bool)

}
//代码在orderer/common/deliver/deliver.go


### 3.2、server结构体相关方法

//构建server结构体
func NewServer(ml multichain.Manager, signer crypto.LocalSigner) ab.AtomicBroadcastServer
//s.bh.Handle(srv)
func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error
//s.dh.Handle(srv)
func (s *server) Deliver(srv ab.AtomicBroadcast_DeliverServer) error
//代码在orderer/server.go


func NewServer(ml multichain.Manager, signer crypto.LocalSigner) ab.AtomicBroadcastServer代码如下:

func NewServer(ml multichain.Manager, signer crypto.LocalSigner) ab.AtomicBroadcastServer {

s := &server{
    dh: deliver.NewHandlerImpl(deliverSupport{Manager: ml}),
    bh: broadcast.NewHandlerImpl(broadcastSupport{
        Manager:               ml,
        ConfigUpdateProcessor: configupdate.New(ml.SystemChannelID(), configUpdateSupport{Manager: ml}, signer),
    }),
}
return s

}
//代码在orderer/server.go


### 3.3、Broadcast服务端Broadcast处理流程

Broadcast服务端Broadcast处理流程,即broadcast.handlerImpl.Handle方法。

#### 3.3.1、接收Envelope消息,并获取Payload和ChannelHeader

msg, err := srv.Recv() //接收Envelope消息
payload, err := utils.UnmarshalPayload(msg.Payload) //反序列化获取Payload
chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader) //反序列化获取ChannelHeader
//代码在orderer/common/broadcast/broadcast.go


#### 3.3.2、如果消息类型为channel配置或更新,则使用multichain.Manager处理消息

if chdr.Type == int32(cb.HeaderType_CONFIG_UPDATE) { //如果是channel配置或更新

msg, err = bh.sm.Process(msg) //configupdate.Processor.Process方法

}
//代码在orderer/common/broadcast/broadcast.go


msg, err = bh.sm.Process(msg)代码如下:

func (p Processor) Process(envConfigUpdate cb.Envelope) (*cb.Envelope, error) {

channelID, err := channelID(envConfigUpdate) //获取ChannelHeader.ChannelId
//multichain.Manager.GetChain方法,获取chainSupport,以及chain是否存在
support, ok := p.manager.GetChain(channelID)
if ok {
    //已存在的channel配置,调取multichain.Manager.ProposeConfigUpdate方法
    return p.existingChannelConfig(envConfigUpdate, channelID, support)
}
//新channel配置,调取multichain.Manager.NewChannelConfig方法
return p.newChannelConfig(channelID, envConfigUpdate)

}
//代码在orderer/configupdate/configupdate.go


#### 3.3.3、其他消息类型或channel消息处理后,接受消息并加入排序

support, ok := bh.sm.GetChain(chdr.ChannelId) //获取chainSupport
_, filterErr := support.Filters().Apply(msg) //filter.RuleSet.Apply方法
//调取Chain.Enqueue方法,接受消息,加入排序
support.Enqueue(msg)
//代码在orderer/common/broadcast/broadcast.go


#### 3.3.4、向客户端发送响应信息

err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})
//代码在orderer/common/broadcast/broadcast.go


### 3.4、Broadcast服务端Deliver处理流程

Broadcast服务端Deliver处理流程,即deliver.deliverServer.Handle方法。

func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error {

for {
    //接收客户端查询请求
    envelope, err := srv.Recv()
    payload, err := utils.UnmarshalPayload(envelope.Payload)
    chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
    chain, ok := ds.sm.GetChain(chdr.ChannelId)

    erroredChan := chain.Errored()
    select {
    case <-erroredChan:
        return sendStatusReply(srv, cb.Status_SERVICE_UNAVAILABLE)
    default:

    }

    lastConfigSequence := chain.Sequence()

    sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager())
    result, _ := sf.Apply(envelope)

    seekInfo := &ab.SeekInfo{}
    err = proto.Unmarshal(payload.Data, seekInfo)

    cursor, number := chain.Reader().Iterator(seekInfo.Start)
    var stopNum uint64
    switch stop := seekInfo.Stop.Type.(type) {
    case *ab.SeekPosition_Oldest:
        stopNum = number
    case *ab.SeekPosition_Newest:
        stopNum = chain.Reader().Height() - 1
    case *ab.SeekPosition_Specified:
        stopNum = stop.Specified.Number
        if stopNum < number {
            return sendStatusReply(srv, cb.Status_BAD_REQUEST)
        }
    }

    for {
        if seekInfo.Behavior == ab.SeekInfo_BLOCK_UNTIL_READY {
            select {
            case <-erroredChan:
                return sendStatusReply(srv, cb.Status_SERVICE_UNAVAILABLE)
            case <-cursor.ReadyChan():
            }
        } else {
            select {
            case <-cursor.ReadyChan():
            default:
                return sendStatusReply(srv, cb.Status_NOT_FOUND)
            }
        }

        currentConfigSequence := chain.Sequence()
        if currentConfigSequence > lastConfigSequence {
            lastConfigSequence = currentConfigSequence
            sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager())
            result, _ := sf.Apply(envelope)

        }

        block, status := cursor.Next()
        err := sendBlockReply(srv, block)
        if stopNum == block.Header.Number {
            break
        }
    }

    err := sendStatusReply(srv, cb.Status_SUCCESS)
}

}

相关文章
|
8月前
|
存储 供应链 监控
区块链技术在供应链管理中的应用与前景分析
随着信息化时代的到来,供应链管理面临着越来越多的挑战和机遇。本文主要探讨了区块链技术在供应链管理中的应用,以及未来的发展前景。通过对区块链技术的特点和优势进行分析,结合实际案例和趋势展望,展示了区块链技术在提升供应链透明度、效率和安全性方面的潜力,以及未来发展的可能方向。
|
8月前
|
安全 区块链
区块链积分商城系统开发详细指南//需求功能/指南教程/源码流程
Developing a blockchain points mall system involves multiple aspects such as blockchain technology, smart contracts, front-end development, and business logic design. The following is the general process for developing a blockchain points mall system
|
8月前
|
存储 算法 API
面向企业的区块链教程(一)(2)
面向企业的区块链教程(一)
118 6
|
5月前
|
安全 区块链
Massa Layer 1区块链 POS 安全性分析
Massa Labs 回应 Certik 的挑战,通过严格的数学分析证明了其权益证明系统的安全性,抵抗了潜在攻击者试图操纵随机抽签的企图。
67 0
Massa Layer 1区块链 POS 安全性分析
|
8月前
|
存储 供应链 安全
基于区块链技术的智能合约安全性分析
【5月更文挑战第31天】本文深入探讨了区块链技术中智能合约的安全性问题,通过分析现有智能合约的安全漏洞和攻击手段,提出了一系列增强智能合约安全性的策略。文章首先介绍了区块链和智能合约的基本概念,随后详细讨论了智能合约面临的安全挑战,包括代码漏洞、重入攻击等问题,并对比分析了不同平台下智能合约的安全性差异。最后,文章提出了一系列提高智能合约安全性的建议,旨在为区块链应用的健康发展提供参考。
|
7月前
|
存储 安全 区块链
元宇宙与区块链技术的关系可以从多个角度进行阐述。以下是对这两者之间关系的详细分析
**元宇宙:虚拟世界融合现实元素,强调交互与沉浸;区块链:去中心化、安全的分布式账本。两者结合,区块链确保元宇宙中虚拟资产安全、支付高效、身份验证私密、治理透明,支撑其经济体系与用户信任,驱动未来发展。**
|
8月前
|
存储 算法 安全
区块链系统开发技术规则分析
区块链核心技术包括:1) 哈希算法,利用单向函数将任意数据转化为固定长度代码,确保安全验证;2) 非对称加密,使用公钥和私钥一对进行加密解密,保证信息安全;3) 共识机制,如PoW、PoS、DPoS等,实现快速交易验证和确认;4) 智能合约,自动执行的可信代码,一旦编写即不可更改,用于自动化交易;5) 分布式存储,将数据分散存储在网络各处,涵盖结构化、非结构化和半结构化数据。
|
7月前
|
区块链
近期区块链市场趋势分析
**区块链市场趋势摘要:** - 跨链技术成熟,提升互操作性,助力区块链网络融合。 - DeFi持续繁荣,智能合约与AMM创新活跃,市场竞争驱动市场壮大。 - NFT市场多样化,拓展至游戏、音乐等领域,实用性增强。 - 区块链寻求绿色转型,通过PoS共识与绿色能源减少能耗。 - 技术模块化、可组合性提升,降低成本,增强系统灵活性。 这些趋势展现区块链潜力,带来机遇与挑战,促进行业创新。
|
8月前
|
供应链 区块链 数据安全/隐私保护
探索区块链技术在金融领域的应用与前景分析
本文将深入探讨区块链技术在金融领域的具体应用场景,分析其优势与挑战,并展望未来发展趋势。通过案例分析和技术解析,揭示区块链技术在金融行业中的革新意义及前景。
1571 15
|
8月前
|
安全 区块链
区块链游戏系统开发步骤需求丨功能逻辑丨规则玩法丨指南教程丨源码详细
Developing blockchain game systems has been a highly anticipated field in recent years. By combining blockchain technology and game mechanics, players can enjoy a brand new gaming experience and higher game credibility.

热门文章

最新文章