nsq 优秀的消息队列

简介: 简介NSQ是Go语言编写的,开源的分布式消息队列中间件,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征,是一个成熟的、已在大规模生成环境下应用的产品。

简介

NSQ是Go语言编写的,开源的分布式消息队列中间件,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征,是一个成熟的、已在大规模生成环境下应用的产品。

NSQ在国内公司用的很少,在使用当中愈发的觉得惊喜,比如他的简单易用、部署快捷,再比如之前比较困扰的 延时定时消息,发现nsq 也支持,官方文档比较全,咨询问题时回复也非常的耐心和即时,所以我觉得有必要发布一篇文章来介绍下nsq,惠及大众。

nsq 有三个必要的组建nsqd、nsqlookupd、nsqadmin 其中nsqd 和 nsqlookup是必须部署的 下面我们一一介绍。

nsqd :

负责接收消息,存储队列和将消息发送给客户端,nsqd 可以多机器部署,当你使用客户端向一个topic发送消息时,可以配置多个nsqd地址,消息会随机的分配到各个nsqd上,nsqd优先把消息存储到内存channel中,当内存channel满了之后,则把消息写到磁盘文件中。他监听了两个tcp端口,一个用来服务客户端,一个用来提供http的接口 ,nsqd 启动时置顶下nsqlookupd地址即可:

nsqd –lookupd-tcp-address=127.0.0.1:4160

也可以指定端口 与数据目录

nsqd –lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=127.0.0.1 -tcp-address=127.0.0.1:4154 -http-address=”0.0.0.0:4155″ –data-path=/data/nsqdata

其他配置项可详见官网

nsqlookupd
主要负责服务发现 负责nsqd的心跳、状态监测,给客户端、nsqadmin提供nsqd地址与状态

nsqadmin:
nsqadmin是一个web管理界面 启动方式如下:

nsqadmin –lookupd-http-address=127.0.0.1:4161

nsq_admin_1_

channel详情页示例图如下 ,empty可以清空当前channel的信息,delete删除当前channel, pause是暂停消息消费。

图中也有几个比较重要的参数 depth当前的积压量,in-flight代表已经投递还未消费掉的消息,deferred是未消费的定时(延时)消息数,ready count比较重要,go的客户端是通过设置max-in-flight 除以客户端连接数得到的,代表一次推给客户端多少条消息,或者客户端准备一次性接受多少条消息,谨慎设置其值,因为可能造成服务器压力,如果消费能力比较弱,rdy建议设置的低一点比如3

_1

Topic 和 Channel

其实nsqd相当于kafka当中的分区,channel和consumers客户端的多个连接 相当于kafka的消费组,但nsq比kafka使用方式便捷概念上更容易理解
抛开与kafka的对比,nsq的topic 可以设置多个channel,因为有可能有多个业务方需要定值topic的消息,这样互不影响,
当然一个消息会发送topic下的所有channel,然后会分配到不同客户端的连接上,如下图。
d46fdce8_9032_4921_8b9f_c83b8fc87af8

这篇文章主要介绍nsq的使用,源码就不展开讲,如果有兴趣的同学多的话 过几天我会再开一篇专门叙述nsq的源码与分析。

这里提下延时消息:

nsq支持延时消息的投递,比如我想这条消息5分钟之后才被投递出去被客户端消费,较于普通的消息投递,多了个毫秒数,默认支持最大的毫秒数为3600000毫秒也就是60分钟,不过这个值可以在nsqd 启动的时候 用 -max-req-timeout参数修改最大值。

延时消息可用于以下场景,比如一个订单超过30分钟未付款,修改其状态 或者给客户发短信提醒,比如之前看到的滴滴打车订单完成后 一定时间内未评价的可以未其设置默认值,再比如用户的积分过期,等等场景避免了全表扫描,异步处理,kafka不支持延时消息的投递,目前知道支持的有rabbitmq rocketmq,但是rabbitmq 有坑,有可能会超时投递,而rocketmq只有阿里云付费版支持的比较好。

nsq延时消息的实现是用最小堆算法完成,作者继承实现heap的一系类接口,专门写了一个pqueque最小堆的优先队列,在internal/pequeque 目录可以看到相关实现,pub的时候如果chanMsg.deferred != 0则会调用channel.PutMessageDeferred方法,最终会调用继承了go heap接口的pqueque.push方法

延时消息的处理 和普通消息一样都是 nsqd/protocol_v2.go下messagePump 中把消息发送给客户端 然后在queueScanWorker中分别处理,pop是peekAndShift方法中,拿当前时间 和 deferred[0]对比如果大于 就弹出发送给客户端 如下代码:

func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    for {
        select {
        case c := <-workCh:
            now := time.Now().UnixNano()
            dirty := false
            if c.processInFlightQueue(now) {
                dirty = true
            }
            if c.processDeferredQueue(now) {
                dirty = true
            }
            responseCh <- dirty
        case <-closeCh:
            return
        }
    }
}

func (c *Channel) processDeferredQueue(t int64) bool {
    c.exitMutex.RLock()
    defer c.exitMutex.RUnlock()

    if c.Exiting() {
        return false
    }

    dirty := false
    for {
        c.deferredMutex.Lock()
        item, _ := c.deferredPQ.PeekAndShift(t)
        c.deferredMutex.Unlock()

        if item == nil {
            goto exit
        }
        dirty = true

        msg := item.Value.(*Message)
        _, err := c.popDeferredMessage(msg.ID)
        if err != nil {
            goto exit
        }
        c.put(msg)
    }

exit:
    return dirty
}

func (pq *PriorityQueue) PeekAndShift(max int64) (*Item, int64) {
    if pq.Len() == 0 {
        return nil, 0
    }

    item := (*pq)[0]
    if item.Priority > max {
        return nil, item.Priority - max
    }
    heap.Remove(pq, 0)

    return item, 0
}

php和go的客户端的使用

官网客户端链接:Client Libraries php客户端之前官网有一个5年前比较老的客户端,已经没人维护 甚至无法运行,于是我贡献了一个php72扩展版本 php-nsq,速度块了近三倍,正在逐步完善,支持各种配置与特性,目前已被官网收纳,简单介绍下使用 顺便求下star

php-nsq pub :

$nsqd_addr = array(
    "127.0.0.1:4150",
    "127.0.0.1:4154"
);

$nsq = new Nsq();
$is_true = $nsq->connect_nsqd($nsqd_addr);
for($i = 0; $i < 20; $i++){
    $nsq->publish("test", "nihao");
}

php-nsq 延时pub :

参数 仅仅多一个毫秒参数,so easy!

$deferred = new Nsq();
$isTrue = $deferred->connectNsqd($nsqdAddr);
for($i = 0; $i < 20; $i++){
    $deferred->deferredPublish("test", "message daly", 3000); // 第三值默认范围 millisecond default : [0 < millisecond < 3600000] ,可以更改 上面已提到
}

php-nsq sub :

抛异常消息可以自动重试,重试时间可以有retry_delay_time设定,多少时间后再次接收被重试的消息

$nsq_lookupd = new NsqLookupd("127.0.0.1:4161"); //the nsqlookupd tcp addr
$nsq = new Nsq();
$config = array(
    "topic" => "test",
    "channel" => "struggle",
    "rdy" => 2,                //optional , default 1
    "connect_num" => 1,        //optional , default 1
    "retry_delay_time" => 5000,  //optional, default 0 , after 5000 msec, message will be retried
);

$nsq->subscribe($nsq_lookupd, $config, function($msg){

    echo $msg->payload;
    echo $msg->attempts;
    echo $msg->message_id;
    echo $msg->timestamp;

});

go client pub

package main

import (
        "github.com/nsqio/go-nsq"
       )

var producer *nsq.Producer

func main() {
    nsqd := "127.0.0.1:4150"
    producer, err := nsq.NewProducer(nsqd, nsq.NewConfig())
    producer.Publish("test", []byte("nihao"))
    if err != nil {
        panic(err)
    }
}

go client sub

package main

import (
 "fmt"
 "sync"
 "github.com/nsqio/go-nsq"
)
type NSQHandler struct {
}

func (this *NSQHandler) HandleMessage(msg *nsq.Message) error {
    fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
    return nil
}

func testNSQ() {
    waiter := sync.WaitGroup{}
    waiter.Add(1)

    go func() {
        defer waiter.Done()
        config:=nsq.NewConfig()
        config.MaxInFlight=9

    //建立多个连接
    for i := 0; i<10; i++ {
        consumer, err := nsq.NewConsumer("test", "struggle", config)
        if nil != err {
            fmt.Println("err", err)
            return
        }

        consumer.AddHandler(&NSQHandler{})
        err = consumer.ConnectToNSQD("127.0.0.1:4150")
        if nil != err {
            fmt.Println("err", err)
            return
        }
    }
        select{}

    }()

    waiter.Wait()
}
func main() {
        testNSQ();

}
目录
相关文章
|
8月前
|
消息中间件 自然语言处理 Go
Golang 语言编写的消息队列 NSQ 官方客户端 go-nsq 怎么使用?
Golang 语言编写的消息队列 NSQ 官方客户端 go-nsq 怎么使用?
74 0
|
7月前
|
消息中间件 存储 Go
Golang微服务框架Kratos应用NSQ消息队列
NSQ是一个基于Go语言的分布式实时消息平台,它基于MIT开源协议发布,由bitly公司开源出来的一款简单易用的消息中间件。 NSQ可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消息,其设计目标是为在分布式环境下运行的去中心化服务提供一个强大的基础架构。 NSQ具有分布式、去中心化的拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。NSQ非常容易配置和部署,且具有最大的灵活性,支持众多消息协议。
55 1
|
9月前
|
消息中间件 负载均衡 NoSQL
消息队列 NSQ
消息队列 NSQ
102 0
|
消息中间件 中间件 Go
【消息队列】windows安装NSQ
【消息队列】windows安装NSQ
288 0
|
算法 NoSQL Redis
剖析nsq消息队列(三) 消息传输的可靠性和持久化[一]
上两篇帖子主要说了一下nsq的拓扑结构,如何进行故障处理和横向扩展,保证了客户端和服务端的长连接,连接保持了,就要传输数据了,nsq如何保证消息被订阅者消费,如何保证消息不丢失,就是今天要阐述的内容。
1349 0
|
消息中间件 算法
剖析nsq消息队列(四) 消息的负载处理
剖析nsq消息队列-目录 实际应用中,一部分服务集群可能会同时订阅同一个topic,并且处于同一个channel下。当nsqd有消息需要发送给订阅客户端去处理时,发给哪个客户端是需要考虑的,也就是我要说的消息的负载。
987 0
|
消息中间件 网络协议 Go
剖析nsq消息队列(二) 去中心化代码源码解析
在上一篇帖子剖析nsq消息队列(一) 简介及去中心化实现原理中,我介绍了nsq的两种使用方式,一种是直接连接,还有一种是通过nslookup来实现去中心化的方式使用,并大概说了一下实现原理,没有什么难理解的东西,这篇帖子我把nsq实现去中心化的源码和其中的业物逻辑展示给大家看一下。
3662 0
|
消息中间件 Go 网络协议
剖析nsq消息队列(一) 简介及去中心化实现原理
分布式消息队列nsq,简单易用,去中心化的设计使nsq更健壮,nsq充分利用了go语言的goroutine和channel来实现的消息处理,代码量也不大,读不了多久就没了。后期的文章我会把nsq的源码分析给大家看。
1641 0
|
1月前
|
消息中间件 存储 监控
RabbitMQ:分布式系统中的高效消息队列
RabbitMQ:分布式系统中的高效消息队列
|
1月前
|
消息中间件 分布式计算 监控
Python面试:消息队列(RabbitMQ、Kafka)基础知识与应用
【4月更文挑战第18天】本文探讨了Python面试中RabbitMQ与Kafka的常见问题和易错点,包括两者的基础概念、特性对比、Python客户端使用、消息队列应用场景及消息可靠性保证。重点讲解了消息丢失与重复的避免策略,并提供了实战代码示例,帮助读者提升在分布式系统中使用消息队列的能力。
75 2

热门文章

最新文章