【消息队列】消息中间件RocketMQ4.X急速入门2

本文涉及的产品
传统型负载均衡 CLB,每月750个小时 15LCU
网络型负载均衡 NLB,每月750个小时 15LCU
应用型负载均衡 ALB,每月750个小时 15LCU
简介: 【消息队列】消息中间件RocketMQ4.X急速入门

4.RocketMQ4.X生产者配置

4.1.生产者常见核心配置

**compressMsgBodyOverHowmuch:**消息超过默认字节4096后进行压缩

**retryTimesWhenSendFailed:**失败重新发送的次数

**maxMessageSize:**最大消息配置,默认128k

**topicQueueNums:**主题下队列的数量,默认是4个

**autoCreateTopicEnable:**是否自动创建topic,开发建议为true,生产建议为false

**defaultTopicQueueNums:**自动创建的Topic创建的默认队列数

**autoCreateSubscriptionGroup:**是否允许Broker自动创建订阅组,建议线下开启,生产关闭。

**brokerClusterName:**集群名称

**brokerId:**0表示主节点,大于0表示从节点

**brokerIP1:**broker服务器地址,一定要配公网ip,否则外机访问不到

**brokerRole:**broker角色 ASYNC_MASTER /SYNC_MASTER /SLAVE

**deleteWhen:**每天执行删除过期文件的时间,默认是每天凌晨4点

**flushDiskType:**刷盘策略,默认为ASYNC_FLUSH(异步刷盘),另外是SYNC_FLUSH(同步刷盘)

**listenPort:**Broker监听的端口号

**mapedFileSizeCommitLog:**单个conmmitlog文件大小,默认是1GB

**mapedFileSizeConsumeQueue:**ConsumerQueue每个文件默认30w条,可以根据项目进行调整

**storePathRootDir:**存储消息以及一些配置信息的根目录,默认是${home}/store

**storePathCommitLog:**commitlog存储目录默认为${storePathRootDir}/commitlog

**storePathIndex:**消息索引存储路径

**syncFlushTimeout:**同步刷盘超时时间

**diskMaxUsedSpaceRatio:**检测可用的磁盘空间大小,超过后写入会报错

4.2.RocketMQ消息发送状态

Broker消息投递状态讲解

  • FLUSH_DISK_TIMEOUT
  • 没有在规定的时间内完成刷盘(刷盘策略为SYNC_FLUSH才会出现这个错误)
  • FLUSH_SLAVE_TIMEOUT
  • 主从模式下,broker的brokerRole是SYNC_MASTER,没有在规定时间内同步到slave
  • SLAVE_NOT_AVAILABLE
  • 主从模式下,broker的brokerRole是SYNC_MASTER,没有找到对应匹配的slave
  • SEND_OK
  • 发送成功,一切正常

4.3.RocketMQ生产和消费消息重试处理

(1)生产者Producer重试(异步和SendOneWay下配置无效)

  • 消息重投(保证数据的可靠性),本身内部支持重试,默认是2次,可以修改配置
//设置生产者发送broker失败重复发送的次数
producer.setRetryTimesWhenSendFailed(5);

(2)消费者重试

  • 消息处理异常、broker端到consumer端各种问题,如网络原因闪断,消费处理失败,ACK返回失败等等。
  • 重试间隔时间配置如下,默认重试16次
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
  • 超过重试次数人工补偿
  • 消费端去重
  • 一条消息无论重试多少次,这些重试消息的 Message ID,key 不会改变。
  • 消费重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息.
//设置广播模式,默认集群模式,广播模式下失败不会重新消费,继续消费下一个
consumer.setMessageModel(MessageModel.BROADCASTING);

4.4.RocketMQ发送消息的多种场景

(1)SYNC

//默认是同步发送
SendResult sendResult = payProducer.getProducer().send(message);
  • 同步响应
  • 应用场景:重要通知邮件、报名短信通知、营销短信通知等等

(2)ASYNC

//
SendResult sendResult = payProducer.getProducer().send(message,new SendCallback(){
    //发送成功处理函数
    onSuccess(){}
    //异常处理函数
    onException(){}
});
  • 异步响应
  • 应用场景:对RT时间敏感,可以支持更高的并发,会i到成功触发向对应的业务,比如注册成功后通知信人优惠卷发放系统。

(3)ONEWAY

//单向发送
payProducer.getProducer().sendOneway(message);
  • 无需等待响应
  • 使用场景:主要是日志收集,使用与某ixe好事非常短,单对可靠性要寻求不高的场景,LogServer中常用,只负责发送消息,不关注结果。

汇总对比

发送方式 发送TPS 发送结果反馈 可靠性
同步发送(SYNC) 不丢失
异步发送(ASYNC) 不丢失
单向发送(ONEWAY) 最快 可能丢失

4.5.RocketMQ延迟消息介绍和应用

什么是延迟消息

Producer将消息发送到RocketMQ服务端,单是并不希望消息马上被消费,而是推迟到某一个时间点之后在去消费,目前支持固定的精度消息

源码:rocketmq-store > MessageStoreConfig.java 属性messageDelayLevel

"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

使用message.setDelayTimeLevel(xxxx) //xxx是级别,1表示配置里面的第一个级别,2表示第二个级别

定时消息:目前rocketmq开源版本还不支持,商业版本有,两者是用场景类似

使用场景

  • 通过消息触发一些定时任务,比如在某一固定时间点向用户发送提醒消息。
  • 消息生产和消费有时间窗口要求,淘宝中超时支付关单

4.6.RocketMQ生产者指定队列发送

  • RocketMQ生产者之MessageQueueSelector指定队列发送
  • 应用场景:顺序消息,分摊负载
  • 默认topic下的queue数量是4,可以配置
producer.getProducer().send(message,new MessageQueueSelector(){
    select(List<MessageQueue> mqs, Message msg, Object arg){
        Integer queueNum = (Integer)arg;
        return mqs.get(queueNum);
      }
},0);
  • 支持同步,异步发送指定的MessageQueue
  • 选择的queue数量必须小于配置的,否则会出错
RestController
@RequestMapping("api/v1/pay")
public class PayController {
    @Autowired
    private PayProducer payProducer;
    @RequestMapping("/pay_cb")
    public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        Message message = new Message(JmsConfig.TOPIC,"taga","6688",text.getBytes());
        /**
         * 异步发送到指定队列
         */
        payProducer.getProducer().send(message, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                int queueNum = Integer.parseInt(arg.toString());
                return mqs.get(queueNum);
            }
        }, 3, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult);
            }
            @Override
            public void onException(Throwable e) {
                e.printStackTrace();
            }
        });
        /**
         * 同步发送到指定队列
         */
       /* SendResult sendResult = payProducer.getProducer().send(message, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                //查找队列
                int queueNum = Integer.parseInt(arg.toString());
                return mqs.get(queueNum);
            }
        },0);
        System.out.println(sendResult);*/
        /**
         * 单向发送
         */
     /*   message.setDelayTimeLevel(3);
        SendResult sendResult = payProducer.getProducer().send(message);
        //payProducer.getProducer().sendOneway(message);
        System.out.println(sendResult);*/
        return new HashMap<>();
    }
}

4.7.RocketMQ顺序消息讲解

(1)什么是顺序消息:消息的生产和消费顺序一致

  • 全局顺序:topic下面全部消息都要有序

  • 性能要求不高,所有的消息严格按照FIFO原则进行发布和消费的场景,并行度成为消息的瓶颈,吞吐量不够
  • 局部顺序:只要保证一组消息被顺序消费即可(RocketMQ中使用)
  • 性能要求高,电商的订单创建,同一个订单相关的创建订单消息、订单支付消息、订单物流消息、订单交易成功消息都会按照先后顺序来发布消费
  • 顺序发布:对于指定的一个Topic,客户端将按照一定的先后顺序发送消息
  • 顺序消费:对于指定的一个Topic,按照一定的先后顺序消费接受的消息,机先发送的消息一定会被客户端接收到
  • 注意:
  • 顺序消息不支持广播模式
  • 顺序消息不支持异步发送
  • (2)RocketMQ中顺序消息的使用
  • 生产端保证发送消息的有序,且发送同一个topic下的queue里面
  • 生产端根据MessageQueueSelector可以自定义策略,根据同个业务的订单id放置到同个queue里面,如订单号取模,同一条订单的消息就会被放在同一个队列中
public MessageQueue select(List<MessageQueue> mqs,Message msg,Object arg){
    //订单id进行取模
    Long orderId  = Integer.parseLong(arg.toString());
    long index = orderId%mqs.size();
    return mqs.get((int)index);
}

消费端要在保证消费同个topic里面的同个queue,不应该用MessageListenerConcurrently,应该使用MessageListenerOrderly,自带单线程消费消息,不能在Consumer端使用多线程去消费,消费端分配到queue数量是固定的

  • ,集群会锁住当前正在消费的队列集群的消息,所以会保证顺序消费。

(3)顺序消息实战

  • 生产者编码保证同步发送和发送指定队列
@RestController
@RequestMapping("api/v1/pay")
public class PayController {
    @Autowired
    private PayProducer payProducer;
    @RequestMapping("/pay_cb")
    public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        List<ProducerOrder> producerList = ProducerOrder.getProducerList();
        for (ProducerOrder producerOrder : producerList) {
            Message message = new Message(JmsConfig.TOPIC,"",producerOrder.getOrderId()+"",producerOrder.toString().getBytes());
            SendResult send = payProducer.getProducer().send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Long orderId = (Long)arg;
                    //订单号和队列集合长度取模
                    long index = orderId % mqs.size();
                    return mqs.get((int)index);
                }
            },producerOrder.getOrderId());
            MessageQueue messageQueue = send.getMessageQueue();
            int queueId = messageQueue.getQueueId();
            System.out.println("orderId:"+producerOrder.getOrderId()+"---orderType:"+producerOrder.getType()+"---queueId:"+queueId);
            System.out.println("---------------------------");
        }
        return new HashMap<>();
    }
}
  • 消费者编码用MessageListenerOrderly
concurrently多线程消费
@Component
public class PayConsumer {
    private DefaultMQPushConsumer consumer;
    private String consumerGroup = "pay_consumer_group";
    public PayConsumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            consumer.subscribe("","*");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    MessageExt message = msgs.get(0);
                    int times = message.getReconsumeTimes();
                    try{
//                        if(message.getKeys().equalsIgnoreCase("6688")){
//                            throw new Exception();
//                        }
                        String body = new String(msgs.get(0).getBody());
                        String tags = msgs.get(0).getTags();
                        String keys = msgs.get(0).getKeys();
                        System.out.println("message:"+message+"--body:"+body+"--tags:"+tags+"--keys:"+keys);
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }catch (Exception e){
                        if(times>=3){
                            System.out.println("消息失败,转入人工处理");
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
                        System.out.println("消息异常,重新投递");
                        System.out.println("消息重投次数:"+times);
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
            });
            consumer.start();
            System.out.println("consumer.start()...");
    }
}
orderly单线程消费(保证顺序消费必须用单线程消费)
@Component
public class PayOrderlyConsumer {
    private DefaultMQPushConsumer consumer;
    private String consumerGroup = "pay_orderly_consumer_group";
    public PayOrderlyConsumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            consumer.subscribe(JmsConfig.TOPIC,"*");
            consumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeConcurrentlyContext) {
                    MessageExt message = msgs.get(0);
                    int times = message.getReconsumeTimes();
                    try{
                        String body = new String(msgs.get(0).getBody());
                        String tags = msgs.get(0).getTags();
                        String keys = msgs.get(0).getKeys();
                        System.out.println("message:"+message+"--body:"+body+"--tags:"+tags+"--keys:"+keys);
                        return ConsumeOrderlyStatus.SUCCESS;
                    }catch (Exception e){
                        System.out.println("消息异常,重新投递");
                        System.out.println("消息重投次数:"+times);
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
                }
            });
            consumer.start();
            System.out.println("consumer.start()...");
    }
}

Consumer会平衡分配queue的数量,并不是简单的禁止并发处理,而是为每个Consumer Queue加个锁,消费每个消息前,需要获取这个消息的所在锁,这样同个时间,同个queue不能并发处理,但是不同的queue的消息可以并发处理,采用分段锁Segment。



2922a485d3f947be8baf4181e7823386.jpg

5.RocketMQ4.X消费者配置

5.1.RocketMQ消费者核心配置

  • consumeFromWhere
  • CONSUME_FROM_FIRST_OFFSET:初次从消息队列的头部开始消费,即历史消息全部消费一遍,后续在消费就是接着上次消费的进度来消费

CONSUME_FROM_LAST_OFFSET:初次从消息队列的尾部开始消费,及历史消息全部消费一遍,后续在消费就是接着上次消费的进度来消费

  • CONSUME_FROM_TIMESTAMP:从某个时间点开始消费,默认是半小时以前,后须也是跟着上次消费的进度开始消费
  • allocateMessageQueueStrategy
  • 负载均衡算法,即消费者分配到queue的算法,默认值是AllocateMessageQueueAveragely即取模平均分配
  • offsetStore
  • 消息消费进度存储器,offsetStore有两个策略
  • LocalFileOffsetStore本地存储消费进度的具体实现,给广播模式使用
  • RemoteBrokerOffsetStore远程存储消费进度的具体实现,给集群模式使用,将消费进度存储在broker中
  • consumeThreadMin
  • 最小消费线程池数量
  • consumeThreadMax
  • 最大消费线程池数量
  • pullBatchSize
  • 消费者去broker拉取消息时,一次拉去多少条,默认是32条
  • consumeMessageBatchMaxSize
  • 单次消费时一次性拉取多少条消息,批量消费接口才有用,默认是1
  • messageModel
  • 消费者消费模式,CLUSTERING默认是集群模式,BROADCASTING广播模式

5.2.消费者集群模式和广播模式

  • Topic下队列的奇偶数回影响Customer个数里面的消费数量
  • 如果是4个队列,8个消息,4个节点则会各消费2条,如果不等,则会负载均衡分配不均
  • 如果consumer实例的数量比message queue的总数量还多的话,那么多出来的consumer实例将无法分到queue,也就无法消费消息,无法起到负载均衡的作用,锁以要控制让queue的总数量大于等于consumer的数量。

集群模式(默认)CONUSMEING

  • Consumer实例平均分摊消费生产发送的消息
  • 例子:订单消息,一般只消费一次

广播模式 BROADCASTING

  • 广播模式下消费消息:投递到broker的消息会被每个Consumer进行消费,一条消息会被多个Consumer进行消费,广播模式下ConsumerGroup暂时无用
  • 例子:群公告、每个人都需要消费这个消息

通过setMessageModel()方法来进行切换消费者模式

MessageModel.CONSUMEING \ MessageModel.BROADCASTING

5.3.Tag实战和消息过滤

(1)讲解RocketMQ里面的Tag作用和消息过滤原理

  • 一个Message只有一个Tag,tag是二级分类
  • 举例:订单类----数码订单,服装订单
  • 过滤分为Broker端和Consumer端过滤
  • Broker端过滤,减少了无用的消息进行网络传输,但是增加了broker 的负担
  • Consumer端过滤,完全可以根据业务需要进行筛选,但是增加了无用的消息的传输
  • 一般监听是"*",或者指定Tag,||运算,SLQ92,FilterServer等
  • tag性能高,逻辑简单
  • SQL92性能较差,支持复杂的逻辑,MessageSelector.bySql
  • 语法:>,<,=,IS NULL, OR, AND ,NOT等,sql where后面的基本都支持
  • 注意:消费者订阅关系要一直,不然会消费混乱,甚至会消息丢失
  • 订阅者关系一致,订阅关系由topic和tag组成,同一个group name,订阅的topic和tag必须是一样的

常见错误:

The broker does not support consumer to filter message by SQL92
broker.conf配置文件中配置
enablePropertyFilter=true
备注,修改之后要重启Broker

(2)消息过滤

RocketMQ的消息过滤方式有别于其他的消息中间件,是在订阅时,在做过滤。

Consume Queue的存储结构

d0d95dfc4d044567abef4de58b34c498.jpg

在Broker端进行Message Tag比对,先遍历Conusme Queue,如果存储的Message Tag与订阅Message Tag不符合,则跳过,继续比对下一个,符合则传输给Consumer 。Consumer收到过滤的消息后,同样也是执行在broker端的操作,但是对别的真实的Message Tag字符串,而不是HashCode。

  • 在Conusme Queue中存储HashCode,是为了在Consume Queue中定长的方式存储,节约空间。
  • 过滤式不会访问Commit Log的数据,可以保证高效的过滤
  • 即使存在Hash冲突,也可以在Conusmer端进行修正

5.4.PushConsumer和PullConsumer

(1)消费模式

  • Push:实时性高,但增加服务端的负载,消费能力不同,如果推送的过快,消费端会出现很多问题
  • pull:消费者从server拉去数据,主动权在消费者端,可控性好,但是间隔时间不好设置,间隔太短,则空请求,间隔太长则消息不能及时处理
  • 长轮询:Client请求Server端也就是Broker,Broker会保持一段时间的长连接,默认是15s,超过15s则返回空,在进行重新请求。缺点:服务端需要保持Consumer的请求,会占用资源,需要客户端连接数可控否则会堆积一堆连接
  • (2)PushConsumer本质就是长轮询
  • 系统收到消息后自动处理消息和offset,如果有新的Consumer加入会自动做负载均衡
  • 在broker端可通过longPollingEnable=true来开启长轮询
  • 消费端代码:DefaultMQPushConsumerImpl->pullMessage->PullCallback
  • 服务端代码:broker.longing
  • 虽然是push,但是代码里大量用了pull,因为使用了长轮询方式达到了push的效果也有pull的效果
  • (3)PullConsumer需要自己维护Offset
  • 获取MessageQueue遍历
  • 客户端维护Offset,需要用本地存储Offset,存储内存、磁盘、数据库等
  • 处理不同状态的消息,FOUND(新消息)、NO_NEW_MSG(不是新消息)、OFFSET_ILLRGL(非法偏移量)、NO_MATCHED_MSG(筛选结果不匹配)四种状态
  • 灵活性高可控性强,但是编码复杂度会高
  • 优雅关闭:释放资源和保存Offset,需要程序自己保护好Offset,特别是异常处理的时候


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
27天前
|
消息中间件 存储 Java
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
【10月更文挑战第15天】本文介绍了消息中间件的基本概念和特点,重点解析了RocketMQ的整体架构和核心组件。消息中间件如RocketMQ、RabbitMQ、Kafka等,具备异步通信、持久化、削峰填谷、系统解耦等特点,适用于分布式系统。RocketMQ的架构包括NameServer、Broker、Producer、Consumer等组件,通过这些组件实现消息的生产、存储和消费。文章还提供了Spring Boot快速上手RocketMQ的示例代码,帮助读者快速入门。
|
1月前
|
消息中间件 监控 中间件
常用的消息队列中间件都有什么?优缺点是什么?如何选择?
常用的消息队列中间件都有什么?优缺点是什么?如何选择?
88 5
|
1月前
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
85 1
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
26天前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
66 5
|
20天前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
20天前
|
消息中间件 存储 Java
吃透 RocketMQ 消息中间件,看这篇就够了!
本文详细介绍 RocketMQ 的五大要点、核心特性及应用场景,涵盖高并发业务场景下的消息中间件关键知识点。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
吃透 RocketMQ 消息中间件,看这篇就够了!
|
24天前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
1月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
67 6
|
1月前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
1月前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
64 4