剖析nsq消息队列(二) 去中心化代码源码解析

简介: 在上一篇帖子剖析nsq消息队列(一) 简介及去中心化实现原理中,我介绍了nsq的两种使用方式,一种是直接连接,还有一种是通过nslookup来实现去中心化的方式使用,并大概说了一下实现原理,没有什么难理解的东西,这篇帖子我把nsq实现去中心化的源码和其中的业物逻辑展示给大家看一下。

在上一篇帖子剖析nsq消息队列(一) 简介及去中心化实现原理中,我介绍了nsq的两种使用方式,一种是直接连接,还有一种是通过nslookup来实现去中心化的方式使用,并大概说了一下实现原理,没有什么难理解的东西,这篇帖子我把nsq实现去中心化的源码和其中的业物逻辑展示给大家看一下。

nsqd和nsqlookupd的通信实现

上一篇中在启动nsqd时我用了以下命令,我指定了一个参数 --lookupd-tcp-address

./nsqd -tcp-address ":8000"  -http-address ":8001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200 -data-path=./a

--lookupd-tcp-address 用于指定nsqlookupdtcp监听地址。

nsqdnsqlookupd的通信交流简单来说就是下图这样

nsqd启动后连接nsqlookupd,连接成功后,要发送一个魔法标识nsq.MagicV1,这个标识有啥魔法么,当然不是,他只是用于标明,客户端和服务端双方使用的信息通信版本,不能的版本有不同的处理方式,为了后期做新的消息处理版本方便吧。
nsqlookupd 的代码块

func (p *tcpServer) Handle(clientConn net.Conn) {    
    // ...
    buf := make([]byte, 4)
    _, err := io.ReadFull(clientConn, buf)
    // ...
    protocolMagic := string(buf)
    // ...
    var prot protocol.Protocol
    switch protocolMagic {
    case "  V1":
        prot = &LookupProtocolV1{ctx: p.ctx}
    default:
        // ...
        return
    }
    err = prot.IOLoop(clientConn)
    //...
}

这个时候的nsqd已经和nsqlookupd建立好了连接,但是这时,仅仅说明他俩连接成功。
nsqlookupd也并没有把这个连接加到可用的nsqd列表里。
建立连接完成后,nsqd会发送IDENTIFY命令,这个命令里包含了nsq的基本信息
nsqd的代码

        ci := make(map[string]interface{})
        ci["version"] = version.Binary
        ci["tcp_port"] = n.RealTCPAddr().Port
        ci["http_port"] = n.RealHTTPAddr().Port
        ci["hostname"] = hostname
        ci["broadcast_address"] = n.getOpts().BroadcastAddress

        cmd, err := nsq.Identify(ci)
        if err != nil {
            lp.Close()
            return
        }
        resp, err := lp.Command(cmd)

包含了nsqd 提供的tcphttp端口,主机名,版本等等,发送给nsqlookupd,nsqlookupd收到IDENTIFY命令后,解析信息然后加到nsqd的可用列表里
nsqlookupd 的代码块

func (p *LookupProtocolV1) IDENTIFY(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
    var err error
    if client.peerInfo != nil {
        return nil, protocol.NewFatalClientErr(err, "E_INVALID", "cannot IDENTIFY again")
    }
    var bodyLen int32
    err = binary.Read(reader, binary.BigEndian, &bodyLen)
    // ...
    body := make([]byte, bodyLen)
    _, err = io.ReadFull(reader, body)
    // ...    
    peerInfo := PeerInfo{id: client.RemoteAddr().String()}
    err = json.Unmarshal(body, &peerInfo)
    // ...
    client.peerInfo = &peerInfo
    // 把nsqd的连接加入到可用列表里    
    if p.ctx.nsqlookupd.DB.AddProducer(Registration{"client", "", ""}, &Producer{peerInfo: client.peerInfo}) {
        p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "client", "", "")
    }
    // ...
    return response, nil
}

然后每过15秒,会发送一个PING心跳命令给nsqlookupd,这样保持存活状态,nsqlookupd每次收到发过来的PING命令后,也会记下这个nsqd的最后更新时间,这样做为一个筛选条件,如果长时间没有更新,就认为这个节点有问题,不会把这个节点的信息加入到可用列表。
到此为止,一个nsqd就把自己的信息注册到nsqlookupd的可用列表了,我们可以启动多个nsqd和多个nsqlookupd,为nsqd
指定多个nsqlookupd,就如同我上一篇帖子写的那样

--lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200

nsqd和所有的nsqlookupd建立连接,注册服务信息,并保持心跳,保证可用列表的更新.

nsqlookupd 挂掉的处理方式

上面我们说了nsqd如果出现问题,nsqlookupdnsqd可用列表里就会处理掉这个连接信息。如nsqlookupd挂了怎么办呢

目前的处理方式是这样的,
无论是心跳,还是其他命令,nsqd会给所有的nsqlookup发送信息,当nsqd发现nsqlookupd出现问题时,在每次发送命令时,会不断的进行重新连接:

func (lp *lookupPeer) Command(cmd *nsq.Command) ([]byte, error) {
    initialState := lp.state
    if lp.state != stateConnected {
        err := lp.Connect()
        if err != nil {
            return nil, err
        }
        lp.state = stateConnected
        _, err = lp.Write(nsq.MagicV1)
        if err != nil {
            lp.Close()
            return nil, err
        }
        if initialState == stateDisconnected {
            lp.connectCallback(lp)
        }
        if lp.state != stateConnected {
            return nil, fmt.Errorf("lookupPeer connectCallback() failed")
        }
    }
    // ...
}

如果连接成功,会再次调用connectCallback方法,进行IDENTIFY命令的调用等。

客户端和nsqlookupd、nsqd的通信实现

上一篇帖子里介绍了,客户端如何连接nsqlookupd来进行通信

    adds := []string{"127.0.0.1:7201", "127.0.0.1:8201"}
    config := nsq.NewConfig()
    config.MaxInFlight = 1000
    config.MaxBackoffDuration = 5 * time.Second
    config.DialTimeout = 10 * time.Second

    topicName := "testTopic1"
    c, _ := nsq.NewConsumer(topicName, "ch1", config)
    testHandler := &MyTestHandler{consumer: c}

    c.AddHandler(testHandler)
    if err := c.ConnectToNSQLookupds(adds); err != nil {
        panic(err)
    }

需要注意adds里地址的端口,是nsqlookupdhttp端口
这里我还使用上一篇帖子中的图,给大家详细分析

调用方法c.ConnectToNSQLookupds(adds),他的实现是访问nsqlookupd的http端口http://127.0.0.1:7201/lookup?topic=testTopic1得到提供consumer订阅的topic所有的producers节点信息, url返回的数据信息如下。

{
  "channels": [
    "nsq_to_file",
    "ch1"
  ],
  "producers": [
    {
      "remote_address": "127.0.0.1:58606",
      "hostname": "li-peng-mc-macbook.local",
      "broadcast_address": "li-peng-mc-macbook.local",
      "tcp_port": 8000,
      "http_port": 8001,
      "version": "1.1.1-alpha"
    },
    {
      "remote_address": "127.0.0.1:58627",
      "hostname": "li-peng-mc-macbook.local",
      "broadcast_address": "li-peng-mc-macbook.local",
      "tcp_port": 7000,
      "http_port": 7001,
      "version": "1.1.1-alpha"
    }
  ]
}


方法queryLookupd就是进行的上图的操作

  • 得到提供订阅的topicnsqd列表
  • 进行连接
func (r *Consumer) queryLookupd() {
    retries := 0
retry:
    endpoint := r.nextLookupdEndpoint()

    // ...    
    err := apiRequestNegotiateV1("GET", endpoint, nil, &data)
    if err != nil {
        // ...
    }
    var nsqdAddrs []string
    for _, producer := range data.Producers {
        broadcastAddress := producer.BroadcastAddress
        port := producer.TCPPort
        joined := net.JoinHostPort(broadcastAddress, strconv.Itoa(port))
        nsqdAddrs = append(nsqdAddrs, joined)
    }
    // 进行连接
    for _, addr := range nsqdAddrs {
        err = r.ConnectToNSQD(addr)
        if err != nil && err != ErrAlreadyConnected {
            r.log(LogLevelError, "(%s) error connecting to nsqd - %s", addr, err)
            continue
        }
    }
}

如何刷新nsqd的可用列表

有新的nsqd加入,是如何处理的呢?
在调用ConnectToNSQLookupd时会启动一个协程go r.lookupdLoop() 调用方法lookupdLoop的定时循环访问 queryLookupd 更新 nsqd的可用列表

// poll all known lookup servers every LookupdPollInterval
func (r *Consumer) lookupdLoop() {
    // ...
    var ticker *time.Ticker
    select {
    case <-time.After(jitter):
    case <-r.exitChan:
        goto exit
    }
    // 设置Interval 来循环访问 queryLookupd
    ticker = time.NewTicker(r.config.LookupdPollInterval)
    for {
        select {
        case <-ticker.C:
            r.queryLookupd()
        case <-r.lookupdRecheckChan:
            r.queryLookupd()
        case <-r.exitChan:
            goto exit
        }
    }

exit:
    // ...
}

处理 nsqd 的单点故障


当有nsqd出现故障时怎么办?当前的处理方式是

  • nsqdlookupd会把这个故障节点从可用列表中去除,客户端从接口得到的可用列表永远都是可用的。
  • 客户端会把这个故障节点从可用节点上移除,然后要去判断是否使用了nsqlookup进行了连接,如果是则case r.lookupdRecheckChan <- 1 去刷新可用列表queryLookupd,如果不是,然后启动一个协程去定时做重试连接,如果故障恢复,连接成功,会重新加入到可用列表.
    客户端实现的代码
func (r *Consumer) onConnClose(c *Conn) {
    // ...
    // remove this connections RDY count from the consumer's total
    delete(r.connections, c.String())
    left := len(r.connections)
    // ...
    r.mtx.RLock()
    numLookupd := len(r.lookupdHTTPAddrs)
    reconnect := indexOf(c.String(), r.nsqdTCPAddrs) >= 0
    // 如果使用的是nslookup则去刷新可用列表
    if numLookupd > 0 {
        // trigger a poll of the lookupd
        select {
        case r.lookupdRecheckChan <- 1:
        default:
        }
    } else if reconnect {
        // ... 
        }(c.String())
    }
}
目录
相关文章
|
2月前
|
算法 PyTorch 算法框架/工具
昇腾 msmodelslim w8a8量化代码解析
msmodelslim w8a8量化算法原理和代码解析
130 5
|
4月前
|
搜索推荐 UED Python
实现一个带有昼夜背景切换的动态时钟:从代码到功能解析
本文介绍了一个使用Python和Tkinter库实现的动态时钟程序,具有昼夜背景切换、指针颜色随机变化及整点和半点报时功能。通过设置不同的背景颜色和随机变换指针颜色,增强视觉吸引力;利用多线程技术确保音频播放不影响主程序运行。该程序结合了Tkinter、Pygame、Pytz等库,提供了一个美观且实用的时间显示工具。欢迎点赞、关注、转发、收藏!
184 94
|
2月前
|
传感器 监控 Java
Java代码结构解析:类、方法、主函数(1分钟解剖室)
### Java代码结构简介 掌握Java代码结构如同拥有程序世界的建筑蓝图,类、方法和主函数构成“黄金三角”。类是独立的容器,承载成员变量和方法;方法实现特定功能,参数控制输入环境;主函数是程序入口。常见错误包括类名与文件名不匹配、忘记static修饰符和花括号未闭合。通过实战案例学习电商系统、游戏角色控制和物联网设备监控,理解类的作用、方法类型和主函数任务,避免典型错误,逐步提升编程能力。 **脑图速记法**:类如太空站,方法即舱段;main是发射台,static不能换;文件名对仗,括号要成双;参数是坐标,void不返航。
106 5
|
3月前
|
人工智能 文字识别 自然语言处理
保单AI识别技术及代码示例解析
车险保单包含基础信息、车辆信息、人员信息、保险条款及特别约定等关键内容。AI识别技术通过OCR、文档结构化解析和数据校验,实现对保单信息的精准提取。然而,版式多样性、信息复杂性、图像质量和法律术语解析是主要挑战。Python代码示例展示了如何使用PaddleOCR进行保单信息抽取,并提出了定制化训练、版式分析等优化方向。典型应用场景包括智能录入、快速核保、理赔自动化等。未来将向多模态融合、自适应学习和跨区域兼容性发展。
|
5月前
|
自然语言处理 搜索推荐 数据安全/隐私保护
鸿蒙登录页面好看的样式设计-HarmonyOS应用开发实战与ArkTS代码解析【HarmonyOS 5.0(Next)】
鸿蒙登录页面设计展示了 HarmonyOS 5.0(Next)的未来美学理念,结合科技与艺术,为用户带来视觉盛宴。该页面使用 ArkTS 开发,支持个性化定制和无缝智能设备连接。代码解析涵盖了声明式 UI、状态管理、事件处理及路由导航等关键概念,帮助开发者快速上手 HarmonyOS 应用开发。通过这段代码,开发者可以了解如何构建交互式界面并实现跨设备协同工作,推动智能生态的发展。
333 10
鸿蒙登录页面好看的样式设计-HarmonyOS应用开发实战与ArkTS代码解析【HarmonyOS 5.0(Next)】
|
4月前
|
SQL Java 数据库连接
如何在 Java 代码中使用 JSqlParser 解析复杂的 SQL 语句?
大家好,我是 V 哥。JSqlParser 是一个用于解析 SQL 语句的 Java 库,可将 SQL 解析为 Java 对象树,支持多种 SQL 类型(如 `SELECT`、`INSERT` 等)。它适用于 SQL 分析、修改、生成和验证等场景。通过 Maven 或 Gradle 安装后,可以方便地在 Java 代码中使用。
1077 11
|
5月前
|
PHP 开发者 容器
PHP命名空间深度解析:避免命名冲突与提升代码组织####
本文深入探讨了PHP中命名空间的概念、用途及最佳实践,揭示其在解决全局命名冲突、提高代码可维护性方面的重要性。通过生动实例和详尽分析,本文将帮助开发者有效利用命名空间来优化大型项目结构,确保代码的清晰与高效。 ####
98 20
|
6月前
|
机器学习/深度学习 存储 人工智能
强化学习与深度强化学习:深入解析与代码实现
本书《强化学习与深度强化学习:深入解析与代码实现》系统地介绍了强化学习的基本概念、经典算法及其在深度学习框架下的应用。从强化学习的基础理论出发,逐步深入到Q学习、SARSA等经典算法,再到DQN、Actor-Critic等深度强化学习方法,结合Python代码示例,帮助读者理解并实践这些先进的算法。书中还探讨了强化学习在无人驾驶、游戏AI等领域的应用及面临的挑战,为读者提供了丰富的理论知识和实战经验。
256 5
|
6月前
|
存储 安全 Java
系统安全架构的深度解析与实践:Java代码实现
【11月更文挑战第1天】系统安全架构是保护信息系统免受各种威胁和攻击的关键。作为系统架构师,设计一套完善的系统安全架构不仅需要对各种安全威胁有深入理解,还需要熟练掌握各种安全技术和工具。
374 10
|
6月前
|
前端开发 JavaScript 开发者
揭秘前端高手的秘密武器:深度解析递归组件与动态组件的奥妙,让你代码效率翻倍!
【10月更文挑战第23天】在Web开发中,组件化已成为主流。本文深入探讨了递归组件与动态组件的概念、应用及实现方式。递归组件通过在组件内部调用自身,适用于处理层级结构数据,如菜单和树形控件。动态组件则根据数据变化动态切换组件显示,适用于不同业务逻辑下的组件展示。通过示例,展示了这两种组件的实现方法及其在实际开发中的应用价值。
98 1

推荐镜像

更多