区块链教程Fabric1.0源代码分析流言算法Gossip服务端二-兄弟连

简介:

  区块链教程Fabric1.0源代码分析流言算法Gossip服务端二

Fabric 1.0源代码笔记 之 gossip(流言算法) #GossipServer(Gossip服务端)

5.2、commImpl结构体方法

//conn.serviceConnection(),启动连接服务
func (c *commImpl) GossipStream(stream proto.Gossip_GossipStreamServer) error
//return &proto.Empty{}
func (c *commImpl) Ping(context.Context, *proto.Empty) (*proto.Empty, error)

func (c *commImpl) GetPKIid() common.PKIidType
//向指定节点发送消息
func (c *commImpl) Send(msg *proto.SignedGossipMessage, peers ...*RemotePeer)
//探测远程节点是否有响应,_, err = cl.Ping(context.Background(), &proto.Empty{})
func (c *commImpl) Probe(remotePeer *RemotePeer) error
//握手验证远程节点,_, err = cl.Ping(context.Background(), &proto.Empty{})
func (c *commImpl) Handshake(remotePeer *RemotePeer) (api.PeerIdentityType, error)
func (c *commImpl) Accept(acceptor common.MessageAcceptor) <-chan proto.ReceivedMessage
func (c *commImpl) PresumedDead() <-chan common.PKIidType
func (c *commImpl) CloseConn(peer *RemotePeer)
func (c *commImpl) Stop()

//创建并启动gRPC Server,以及注册GossipServer实例
func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity api.PeerIdentityType,
//将GossipServer实例注册至peerServer
func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper,
func extractRemoteAddress(stream stream) string
func readWithTimeout(stream interface{}, timeout time.Duration, address string) (*proto.SignedGossipMessage, error) 
//创建gRPC Server,grpc.NewServer(serverOpts...)
func createGRPCLayer(port int) (*grpc.Server, net.Listener, api.PeerSecureDialOpts, []byte)

//创建与服务端连接
func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error)
//向指定节点发送消息
func (c *commImpl) sendToEndpoint(peer *RemotePeer, msg *proto.SignedGossipMessage)
//return atomic.LoadInt32(&c.stopping) == int32(1)
func (c *commImpl) isStopping() bool
func (c *commImpl) emptySubscriptions()
func (c *commImpl) authenticateRemotePeer(stream stream) (*proto.ConnectionInfo, error)
func (c *commImpl) disconnect(pkiID common.PKIidType)
func (c *commImpl) createConnectionMsg(pkiID common.PKIidType, certHash []byte, cert api.PeerIdentityType, signer proto.Signer) (*proto.SignedGossipMessage, error)
//代码在gossip/comm/comm_impl.go

5.2.1、func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity api.PeerIdentityType,secureDialOpts api.PeerSecureDialOpts, dialOpts ...grpc.DialOption) (Comm, error)

创建并启动gRPC Server,以及注册GossipServer实例

func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity api.PeerIdentityType,
    secureDialOpts api.PeerSecureDialOpts, dialOpts ...grpc.DialOption) (Comm, error) {

    var ll net.Listener
    var s *grpc.Server
    var certHash []byte

    if len(dialOpts) == 0 {
        //peer.gossip.dialTimeout,gRPC连接拨号的超时
        dialOpts = []grpc.DialOption{grpc.WithTimeout(util.GetDurationOrDefault("peer.gossip.dialTimeout", defDialTimeout))}
    }

    if port > 0 {
        //创建gRPC Server,grpc.NewServer(serverOpts...)
        s, ll, secureDialOpts, certHash = createGRPCLayer(port)
    }

    commInst := &commImpl{
        selfCertHash:   certHash,
        PKIID:          idMapper.GetPKIidOfCert(peerIdentity),
        idMapper:       idMapper,
        logger:         util.GetLogger(util.LoggingCommModule, fmt.Sprintf("%d", port)),
        peerIdentity:   peerIdentity,
        opts:           dialOpts,
        secureDialOpts: secureDialOpts,
        port:           port,
        lsnr:           ll,
        gSrv:           s,
        msgPublisher:   NewChannelDemultiplexer(),
        lock:           &sync.RWMutex{},
        deadEndpoints:  make(chan common.PKIidType, 100),
        stopping:       int32(0),
        exitChan:       make(chan struct{}, 1),
        subscriptions:  make([]chan proto.ReceivedMessage, 0),
    }
    commInst.connStore = newConnStore(commInst, commInst.logger)

    if port > 0 {
        commInst.stopWG.Add(1)
        go func() {
            defer commInst.stopWG.Done()
            s.Serve(ll) //启动gRPC Server
        }()
        //commInst注册到gRPC Server
        proto.RegisterGossipServer(s, commInst)
    }

    return commInst, nil
}

//代码在gossip/comm/comm_impl.go

5.2.2、func NewCommInstance(s grpc.Server, cert tls.Certificate, idStore identity.Mapper,peerIdentity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts,dialOpts ...grpc.DialOption) (Comm, error)

将GossipServer实例注册至peerServer

func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper,
    peerIdentity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts,
    dialOpts ...grpc.DialOption) (Comm, error) {

    dialOpts = append(dialOpts, grpc.WithTimeout(util.GetDurationOrDefault("peer.gossip.dialTimeout", defDialTimeout)))
    //构造commImpl
    commInst, err := NewCommInstanceWithServer(-1, idStore, peerIdentity, secureDialOpts, dialOpts...)

    if cert != nil {
        inst := commInst.(*commImpl)
        inst.selfCertHash = certHashFromRawCert(cert.Certificate[0])
    }
    proto.RegisterGossipServer(s, commInst.(*commImpl))

    return commInst, nil
}

//代码在gossip/comm/comm_impl.go

//创建与服务端连接

5.2.3、func (c commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (connection, error)

func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error) {
    var err error
    var cc *grpc.ClientConn
    var stream proto.Gossip_GossipStreamClient
    var pkiID common.PKIidType
    var connInfo *proto.ConnectionInfo
    var dialOpts []grpc.DialOption

    dialOpts = append(dialOpts, c.secureDialOpts()...)
    dialOpts = append(dialOpts, grpc.WithBlock())
    dialOpts = append(dialOpts, c.opts...)
    cc, err = grpc.Dial(endpoint, dialOpts...)

    cl := proto.NewGossipClient(cc)
    if _, err = cl.Ping(context.Background(), &proto.Empty{}); err != nil {
        cc.Close()
        return nil, err
    }

    ctx, cf := context.WithCancel(context.Background())
    stream, err = cl.GossipStream(ctx)
    connInfo, err = c.authenticateRemotePeer(stream)
    pkiID = connInfo.ID
    conn := newConnection(cl, cc, stream, nil)
    conn.pkiID = pkiID
    conn.info = connInfo
    conn.logger = c.logger
    conn.cancel = cf

    h := func(m *proto.SignedGossipMessage) {
        c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{
            conn:                conn,
            lock:                conn,
            SignedGossipMessage: m,
            connInfo:            connInfo,
        })
    }
    conn.handler = h
    return conn, nil
}
//代码在gossip/comm/comm_impl.go

6、connectionStore和connection结构体及方法

6.1、connection结构体及方法

type connection struct {
    cancel       context.CancelFunc
    info         *proto.ConnectionInfo
    outBuff      chan *msgSending
    logger       *logging.Logger                 // logger
    pkiID        common.PKIidType                // pkiID of the remote endpoint
    handler      handler                         // function to invoke upon a message reception
    conn         *grpc.ClientConn                // gRPC connection to remote endpoint
    cl           proto.GossipClient              // gRPC stub of remote endpoint
    clientStream proto.Gossip_GossipStreamClient // client-side stream to remote endpoint
    serverStream proto.Gossip_GossipStreamServer // server-side stream to remote endpoint
    stopFlag     int32                           // indicates whether this connection is in process of stopping
    stopChan     chan struct{}                   // a method to stop the server-side gRPC call from a different go-routine
    sync.RWMutex                                 // synchronizes access to shared variables
}

//构造connection
func newConnection(cl proto.GossipClient, c *grpc.ClientConn, cs proto.Gossip_GossipStreamClient, ss proto.Gossip_GossipStreamServer) *connection
//关闭connection
func (conn *connection) close()
//atomic.LoadInt32(&(conn.stopFlag)) == int32(1)
func (conn *connection) toDie() bool
//conn.outBuff <- m,其中m为msgSending{envelope: msg.Envelope,onErr: onErr,}
func (conn *connection) send(msg *proto.SignedGossipMessage, onErr func(error))
//go conn.readFromStream(errChan, msgChan)、go conn.writeToStream(),同时msg := <-msgChan,conn.handler(msg)
func (conn *connection) serviceConnection() error
//循环不间断从conn.outBuff取数据,然后stream.Send(m.envelope)
func (conn *connection) writeToStream()
//循环不间断envelope, err := stream.Recv()、msg, err := envelope.ToGossipMessage()、msgChan <- msg
func (conn *connection) readFromStream(errChan chan error, msgChan chan *proto.SignedGossipMessage)
//获取conn.serverStream
func (conn *connection) getStream() stream
//代码在gossip/comm/conn.go

6.2、connectionStore结构体及方法

type connectionStore struct {
    logger           *logging.Logger          // logger
    isClosing        bool                     // whether this connection store is shutting down
    connFactory      connFactory              // creates a connection to remote peer
    sync.RWMutex                              // synchronize access to shared variables
    pki2Conn         map[string]*connection   //connection map, key为pkiID,value为connection
    destinationLocks map[string]*sync.RWMutex //mapping between pkiIDs and locks,
    // used to prevent concurrent connection establishment to the same remote endpoint
}

//构造connectionStore
func newConnStore(connFactory connFactory, logger *logging.Logger) *connectionStore
//从connection map中获取连接,如无则创建并启动连接,并写入connection map中
func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error)
//连接数量
func (cs *connectionStore) connNum() int
//关闭指定连接
func (cs *connectionStore) closeConn(peer *RemotePeer)
//关闭所有连接
func (cs *connectionStore) shutdown()
func (cs *connectionStore) onConnected(serverStream proto.Gossip_GossipStreamServer, connInfo *proto.ConnectionInfo) *connection
//注册连接
func (cs *connectionStore) registerConn(connInfo *proto.ConnectionInfo, serverStream proto.Gossip_GossipStreamServer) *connection
//关闭指定连接
func (cs *connectionStore) closeByPKIid(pkiID common.PKIidType) 
//代码在gossip/comm/conn.go

6.2.1、func (cs connectionStore) getConnection(peer RemotePeer) (*connection, error)

func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error) {
    cs.RLock()
    isClosing := cs.isClosing
    cs.RUnlock()

    pkiID := peer.PKIID
    endpoint := peer.Endpoint

    cs.Lock()
    destinationLock, hasConnected := cs.destinationLocks[string(pkiID)]
    if !hasConnected {
        destinationLock = &sync.RWMutex{}
        cs.destinationLocks[string(pkiID)] = destinationLock
    }
    cs.Unlock()

    destinationLock.Lock()
    cs.RLock()
    //从connection map中获取
    conn, exists := cs.pki2Conn[string(pkiID)]
    if exists {
        cs.RUnlock()
        destinationLock.Unlock()
        return conn, nil
    }
    cs.RUnlock()

    //创建连接
    createdConnection, err := cs.connFactory.createConnection(endpoint, pkiID)
    destinationLock.Unlock()


    conn = createdConnection
    cs.pki2Conn[string(createdConnection.pkiID)] = conn
    go conn.serviceConnection() //启动连接的消息接收处理、以及向对方节点发送消息

    return conn, nil
}
//代码在gossip/comm/conn.go

7、ChannelDeMultiplexer结构体及方法(多路复用器)

type ChannelDeMultiplexer struct {
    channels []*channel
    lock     *sync.RWMutex
    closed   int32
}

//构造ChannelDeMultiplexer
func NewChannelDemultiplexer() *ChannelDeMultiplexer
//atomic.LoadInt32(&m.closed) == int32(1)
func (m *ChannelDeMultiplexer) isClosed() bool
//关闭
func (m *ChannelDeMultiplexer) Close() 
//添加通道
func (m *ChannelDeMultiplexer) AddChannel(predicate common.MessageAcceptor) chan interface{} 
//挨个通道发送消息
func (m *ChannelDeMultiplexer) DeMultiplex(msg interface{}) 
相关文章
|
2月前
|
机器学习/深度学习 算法 搜索推荐
从理论到实践,Python算法复杂度分析一站式教程,助你轻松驾驭大数据挑战!
【10月更文挑战第4天】在大数据时代,算法效率至关重要。本文从理论入手,介绍时间复杂度和空间复杂度两个核心概念,并通过冒泡排序和快速排序的Python实现详细分析其复杂度。冒泡排序的时间复杂度为O(n^2),空间复杂度为O(1);快速排序平均时间复杂度为O(n log n),空间复杂度为O(log n)。文章还介绍了算法选择、分而治之及空间换时间等优化策略,帮助你在大数据挑战中游刃有余。
63 4
|
2月前
|
并行计算 算法 IDE
【灵码助力Cuda算法分析】分析共享内存的矩阵乘法优化
本文介绍了如何利用通义灵码在Visual Studio 2022中对基于CUDA的共享内存矩阵乘法优化代码进行深入分析。文章从整体程序结构入手,逐步深入到线程调度、矩阵分块、循环展开等关键细节,最后通过带入具体值的方式进一步解析复杂循环逻辑,展示了通义灵码在辅助理解和优化CUDA编程中的强大功能。
|
2月前
|
算法
PID算法原理分析
【10月更文挑战第12天】PID控制方法从提出至今已有百余年历史,其由于结构简单、易于实现、鲁棒性好、可靠性高等特点,在机电、冶金、机械、化工等行业中应用广泛。
|
3月前
|
算法 搜索推荐 开发者
别再让复杂度拖你后腿!Python 算法设计与分析实战,教你如何精准评估与优化!
在 Python 编程中,算法的性能至关重要。本文将带您深入了解算法复杂度的概念,包括时间复杂度和空间复杂度。通过具体的例子,如冒泡排序算法 (`O(n^2)` 时间复杂度,`O(1)` 空间复杂度),我们将展示如何评估算法的性能。同时,我们还会介绍如何优化算法,例如使用 Python 的内置函数 `max` 来提高查找最大值的效率,或利用哈希表将查找时间从 `O(n)` 降至 `O(1)`。此外,还将介绍使用 `timeit` 模块等工具来评估算法性能的方法。通过不断实践,您将能更高效地优化 Python 程序。
63 4
|
3月前
|
算法 程序员 Python
程序员必看!Python复杂度分析全攻略,让你的算法设计既快又省内存!
在编程领域,Python以简洁的语法和强大的库支持成为众多程序员的首选语言。然而,性能优化仍是挑战。本文将带你深入了解Python算法的复杂度分析,从时间与空间复杂度入手,分享四大最佳实践:选择合适算法、优化实现、利用Python特性减少空间消耗及定期评估调整,助你写出高效且节省内存的代码,轻松应对各种编程挑战。
45 1
|
2月前
|
算法
PID算法原理分析及优化
【10月更文挑战第6天】PID控制方法从提出至今已有百余年历史,其由于结构简单、易于实现、鲁棒性好、可靠性高等特点,在机电、冶金、机械、化工等行业中应用广泛。
|
3月前
|
算法 数据可视化
基于SSA奇异谱分析算法的时间序列趋势线提取matlab仿真
奇异谱分析(SSA)是一种基于奇异值分解(SVD)和轨迹矩阵的非线性、非参数时间序列分析方法,适用于提取趋势、周期性和噪声成分。本项目使用MATLAB 2022a版本实现从强干扰序列中提取趋势线,并通过可视化展示了原时间序列与提取的趋势分量。代码实现了滑动窗口下的奇异值分解和分组重构,适用于非线性和非平稳时间序列分析。此方法在气候变化、金融市场和生物医学信号处理等领域有广泛应用。
140 19
|
3月前
|
机器学习/深度学习 存储 人工智能
文本情感识别分析系统Python+SVM分类算法+机器学习人工智能+计算机毕业设计
使用Python作为开发语言,基于文本数据集(一个积极的xls文本格式和一个消极的xls文本格式文件),使用Word2vec对文本进行处理。通过支持向量机SVM算法训练情绪分类模型。实现对文本消极情感和文本积极情感的识别。并基于Django框架开发网页平台实现对用户的可视化操作和数据存储。
50 0
文本情感识别分析系统Python+SVM分类算法+机器学习人工智能+计算机毕业设计
|
2月前
|
算法 安全 Go
Python与Go语言中的哈希算法实现及对比分析
Python与Go语言中的哈希算法实现及对比分析
41 0
|
3月前
|
编解码 算法 图形学
同一路RTSP|RTMP流如何同时回调YUV和RGB数据实现渲染和算法分析
我们播放RTSP|RTMP流,如果需要同时做渲染和算法分析的话,特别是渲染在上层实现(比如Unity),算法是python这种情况,拉两路流,更耗费带宽和性能,拉一路流,同时回调YUV和RGB数据也可以,但是更灵活的是本文提到的按需转算法期望的RGB数据,然后做算法处理