剖析nsq消息队列(四) 消息的负载处理

简介: 剖析nsq消息队列-目录 实际应用中,一部分服务集群可能会同时订阅同一个topic,并且处于同一个channel下。当nsqd有消息需要发送给订阅客户端去处理时,发给哪个客户端是需要考虑的,也就是我要说的消息的负载。

剖析nsq消息队列-目录
实际应用中,一部分服务集群可能会同时订阅同一个topic,并且处于同一个channel下。当nsqd有消息需要发送给订阅客户端去处理时,发给哪个客户端是需要考虑的,也就是我要说的消息的负载。

如果不考虑负载情况,把随机的把消息发送到某一个客服端去处理消息,如果机器的性能不同,可能发生的情况就是某一个或几个客户端处理速度慢,但还有大量新的消息需要处理,其他的客户端处于空闲状态。理想的状态是,找到当前相对空闲的客户端去处理消息。

nsq的处理方式是客户端主动向nsqd报告自已的可处理消息数量(也就是RDY命令)。nsqd根据每个连接的客户端的可处理消息的状态来随机把消息发送到可用的客户端,来进行消息处理

如下图所示:

客户端更新RDY

从第一篇帖子的例子中我们就有配置consumer的config

    config := nsq.NewConfig()
    config.MaxInFlight = 1000
    config.MaxBackoffDuration = 5 * time.Second
    config.DialTimeout = 10 * time.Second

MaxInFlight 来设置最大的处理中的消息数量,会根据这个变量计算在是否更新RDY
初始化的时候 客户端会向连接的nsqd服务端来发送updateRDY来设置最大处理数,

func (r *Consumer) maybeUpdateRDY(conn *Conn) {
    inBackoff := r.inBackoff()
    inBackoffTimeout := r.inBackoffTimeout()
    if inBackoff || inBackoffTimeout {
        r.log(LogLevelDebug, "(%s) skip sending RDY inBackoff:%v || inBackoffTimeout:%v",
            conn, inBackoff, inBackoffTimeout)
        return
    }

    remain := conn.RDY()
    lastRdyCount := conn.LastRDY()
    count := r.perConnMaxInFlight()

    // refill when at 1, or at 25%, or if connections have changed and we're imbalanced
    if remain <= 1 || remain < (lastRdyCount/4) || (count > 0 && count < remain) {
        r.log(LogLevelDebug, "(%s) sending RDY %d (%d remain from last RDY %d)",
            conn, count, remain, lastRdyCount)
        r.updateRDY(conn, count)
    } else {
        r.log(LogLevelDebug, "(%s) skip sending RDY %d (%d remain out of last RDY %d)",
            conn, count, remain, lastRdyCount)
    }
}

当剩余的可用处理数量remain 小于等于1,或者小于最后一次设置的可用数量lastRdyCount的1/4时,或者可用连接平均的maxInFlight大于0并且小于remain时,则更新RDY状态

当有多个nsqd时,会把最大的消息进行平均计算:

// perConnMaxInFlight calculates the per-connection max-in-flight count.
//
// This may change dynamically based on the number of connections to nsqd the Consumer
// is responsible for.
func (r *Consumer) perConnMaxInFlight() int64 {
    b := float64(r.getMaxInFlight())
    s := b / float64(len(r.conns()))
    return int64(math.Min(math.Max(1, s), b))
}

当有消息从nsqd发送过来后也会调用maybeUpdateRDY方法,计算是否需要发送RDY命令

func (r *Consumer) onConnMessage(c *Conn, msg *Message) {
    atomic.AddInt64(&r.totalRdyCount, -1)
    atomic.AddUint64(&r.messagesReceived, 1)
    r.incomingMessages <- msg
    r.maybeUpdateRDY(c)
}

上面就是主要的处理逻辑,但还有一些逻辑,就是当消息处理发生错误时,nsq有自己的退避算法backoff也会更新RDY 简单来说就是当发现有处理错误时,来进行重试和指数退避,在退避期间RDY会为0,重试时会先放尝试RDY为1看有没有错误,如果没有错误则全部放开,这个算法这篇帖子我就不详细说了。

服务端nsqd选择客户端进行发送消息

同时订阅同一topic的客户端(comsumer)有很多个,每个客户端根据自己的配置或状态发送RDY命令到nsqd表明自己能处理多少消息量
nsqd服务端会检查每个客户端的的状态是否可以发送消息。也就是IsReadyForMessages方法,判断inFlightCount是否大于readyCount,如果大于或者等于就不再给客户端发送数据,等待Ready后才会再给客户端发送数据

func (c *clientV2) IsReadyForMessages() bool {
    if c.Channel.IsPaused() {
        return false
    }

    readyCount := atomic.LoadInt64(&c.ReadyCount)
    inFlightCount := atomic.LoadInt64(&c.InFlightCount)

    c.ctx.nsqd.logf(LOG_DEBUG, "[%s] state rdy: %4d inflt: %4d", c, readyCount, inFlightCount)

    if inFlightCount >= readyCount || readyCount <= 0 {
        return false
    }

    return true

每一次发送消息inFlightCount会+1并保存到发送中的队列中,当客户端发送FIN会-1在之前的帖子中有说过。

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
    // ...
    for {
        // 检查订阅状态和消息是否可处理状态    
        if subChannel == nil || !client.IsReadyForMessages() {
            // the client is not ready to receive messages...
            memoryMsgChan = nil
            backendMsgChan = nil
            flusherChan = nil
            // ...
            flushed = true
        } else if flushed {
            memoryMsgChan = subChannel.memoryMsgChan
            backendMsgChan = subChannel.backend.ReadChan()
            flusherChan = nil
        } else {
            memoryMsgChan = subChannel.memoryMsgChan
            backendMsgChan = subChannel.backend.ReadChan()
            flusherChan = outputBufferTicker.C
        }

        select {
        case <-flusherChan:
            // ...
        // 消息处理            
        case b := <-backendMsgChan:
            client.SendingMessage()
            // ...
        case msg := <-memoryMsgChan:
            client.SendingMessage()        
            //...
        }
    }
// ...
}
目录
相关文章
|
7月前
|
消息中间件 自然语言处理 Go
Golang 语言编写的消息队列 NSQ 官方客户端 go-nsq 怎么使用?
Golang 语言编写的消息队列 NSQ 官方客户端 go-nsq 怎么使用?
57 0
|
6月前
|
消息中间件 存储 Go
Golang微服务框架Kratos应用NSQ消息队列
NSQ是一个基于Go语言的分布式实时消息平台,它基于MIT开源协议发布,由bitly公司开源出来的一款简单易用的消息中间件。 NSQ可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消息,其设计目标是为在分布式环境下运行的去中心化服务提供一个强大的基础架构。 NSQ具有分布式、去中心化的拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。NSQ非常容易配置和部署,且具有最大的灵活性,支持众多消息协议。
49 1
|
消息中间件 存储 前端开发
同步异步调用,并谈谈消息队列mq;RocketMQ发送消息和消费消息测试类
同步调用优点: 时效性强,打电话、直播,很快可以得到结果 同步调用的问题:
498 1
|
消息中间件 关系型数据库 MySQL
5. 消息队列中,如何保证消息的顺序性?
5. 消息队列中,如何保证消息的顺序性?
457 0
5. 消息队列中,如何保证消息的顺序性?
|
消息中间件 物联网 Linux
消息队列中消息的格式|学习笔记
快速学习消息队列中消息的格式
316 0
|
消息中间件
|
消息中间件 存储 SQL
阿里云消息队列 Kafka-消息检索实践
本文章主要介绍消息队列使用过程中所遇到的消息丢失、重复消费等痛点问题的排查办法,以及消息队列 Kafka「检索组件」的场景实践,并对其关键技术进行解读。旨在帮助大家对消息队列 Kafka「检索组件」的特点和使用方式更加熟悉,以更有效地解决消息排查过程中所遇到的问题。
阿里云消息队列 Kafka-消息检索实践
|
消息中间件 NoSQL Kafka
消息队列之事务消息,RocketMQ 和 Kafka 是如何做的?(下)
消息队列之事务消息,RocketMQ 和 Kafka 是如何做的?(下)
消息队列之事务消息,RocketMQ 和 Kafka 是如何做的?(下)
|
消息中间件 Kafka 数据库
消息队列之事务消息,RocketMQ 和 Kafka 是如何做的?(上)
消息队列之事务消息,RocketMQ 和 Kafka 是如何做的?(上)
消息队列之事务消息,RocketMQ 和 Kafka 是如何做的?(上)
|
消息中间件 存储 SQL
消息队列面试连环问:如何保证消息不丢失?处理重复消息?消息有序性?消息堆积处理?(下)
消息队列面试连环问:如何保证消息不丢失?处理重复消息?消息有序性?消息堆积处理?(下)
消息队列面试连环问:如何保证消息不丢失?处理重复消息?消息有序性?消息堆积处理?(下)

热门文章

最新文章