4.RocketMQ4.X生产者配置
4.1.生产者常见核心配置
**compressMsgBodyOverHowmuch:**消息超过默认字节4096后进行压缩
**retryTimesWhenSendFailed:**失败重新发送的次数
**maxMessageSize:**最大消息配置,默认128k
**topicQueueNums:**主题下队列的数量,默认是4个
**autoCreateTopicEnable:**是否自动创建topic,开发建议为true,生产建议为false
**defaultTopicQueueNums:**自动创建的Topic创建的默认队列数
**autoCreateSubscriptionGroup:**是否允许Broker自动创建订阅组,建议线下开启,生产关闭。
**brokerClusterName:**集群名称
**brokerId:**0表示主节点,大于0表示从节点
**brokerIP1:**broker服务器地址,一定要配公网ip,否则外机访问不到
**brokerRole:**broker角色 ASYNC_MASTER /SYNC_MASTER /SLAVE
**deleteWhen:**每天执行删除过期文件的时间,默认是每天凌晨4点
**flushDiskType:**刷盘策略,默认为ASYNC_FLUSH(异步刷盘),另外是SYNC_FLUSH(同步刷盘)
**listenPort:**Broker监听的端口号
**mapedFileSizeCommitLog:**单个conmmitlog文件大小,默认是1GB
**mapedFileSizeConsumeQueue:**ConsumerQueue每个文件默认30w条,可以根据项目进行调整
**storePathRootDir:**存储消息以及一些配置信息的根目录,默认是${home}/store
**storePathCommitLog:**commitlog存储目录默认为${storePathRootDir}/commitlog
**storePathIndex:**消息索引存储路径
**syncFlushTimeout:**同步刷盘超时时间
**diskMaxUsedSpaceRatio:**检测可用的磁盘空间大小,超过后写入会报错
4.2.RocketMQ消息发送状态
Broker消息投递状态讲解
- FLUSH_DISK_TIMEOUT
- 没有在规定的时间内完成刷盘(刷盘策略为SYNC_FLUSH才会出现这个错误)
- FLUSH_SLAVE_TIMEOUT
- 主从模式下,broker的brokerRole是SYNC_MASTER,没有在规定时间内同步到slave
- SLAVE_NOT_AVAILABLE
- 主从模式下,broker的brokerRole是SYNC_MASTER,没有找到对应匹配的slave
- SEND_OK
- 发送成功,一切正常
4.3.RocketMQ生产和消费消息重试处理
(1)生产者Producer重试(异步和SendOneWay下配置无效)
- 消息重投(保证数据的可靠性),本身内部支持重试,默认是2次,可以修改配置
//设置生产者发送broker失败重复发送的次数 producer.setRetryTimesWhenSendFailed(5);
(2)消费者重试
- 消息处理异常、broker端到consumer端各种问题,如网络原因闪断,消费处理失败,ACK返回失败等等。
- 重试间隔时间配置如下,默认重试16次
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
- 超过重试次数人工补偿
- 消费端去重
- 一条消息无论重试多少次,这些重试消息的 Message ID,key 不会改变。
- 消费重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息.
//设置广播模式,默认集群模式,广播模式下失败不会重新消费,继续消费下一个 consumer.setMessageModel(MessageModel.BROADCASTING);
4.4.RocketMQ发送消息的多种场景
(1)SYNC
//默认是同步发送 SendResult sendResult = payProducer.getProducer().send(message);
- 同步响应
- 应用场景:重要通知邮件、报名短信通知、营销短信通知等等
(2)ASYNC
// SendResult sendResult = payProducer.getProducer().send(message,new SendCallback(){ //发送成功处理函数 onSuccess(){} //异常处理函数 onException(){} });
- 异步响应
- 应用场景:对RT时间敏感,可以支持更高的并发,会i到成功触发向对应的业务,比如注册成功后通知信人优惠卷发放系统。
(3)ONEWAY
//单向发送 payProducer.getProducer().sendOneway(message);
- 无需等待响应
- 使用场景:主要是日志收集,使用与某ixe好事非常短,单对可靠性要寻求不高的场景,LogServer中常用,只负责发送消息,不关注结果。
汇总对比
发送方式 | 发送TPS | 发送结果反馈 | 可靠性 |
同步发送(SYNC) | 快 | 有 | 不丢失 |
异步发送(ASYNC) | 快 | 有 | 不丢失 |
单向发送(ONEWAY) | 最快 | 无 | 可能丢失 |
4.5.RocketMQ延迟消息介绍和应用
什么是延迟消息
Producer将消息发送到RocketMQ服务端,单是并不希望消息马上被消费,而是推迟到某一个时间点之后在去消费,目前支持固定的精度消息
源码:rocketmq-store > MessageStoreConfig.java 属性messageDelayLevel
"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
使用message.setDelayTimeLevel(xxxx) //xxx是级别,1表示配置里面的第一个级别,2表示第二个级别
定时消息:目前rocketmq开源版本还不支持,商业版本有,两者是用场景类似
使用场景
- 通过消息触发一些定时任务,比如在某一固定时间点向用户发送提醒消息。
- 消息生产和消费有时间窗口要求,淘宝中超时支付关单
4.6.RocketMQ生产者指定队列发送
- RocketMQ生产者之MessageQueueSelector指定队列发送
- 应用场景:顺序消息,分摊负载
- 默认topic下的queue数量是4,可以配置
producer.getProducer().send(message,new MessageQueueSelector(){ select(List<MessageQueue> mqs, Message msg, Object arg){ Integer queueNum = (Integer)arg; return mqs.get(queueNum); } },0);
- 支持同步,异步发送指定的MessageQueue
- 选择的queue数量必须小于配置的,否则会出错
RestController @RequestMapping("api/v1/pay") public class PayController { @Autowired private PayProducer payProducer; @RequestMapping("/pay_cb") public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { Message message = new Message(JmsConfig.TOPIC,"taga","6688",text.getBytes()); /** * 异步发送到指定队列 */ payProducer.getProducer().send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { int queueNum = Integer.parseInt(arg.toString()); return mqs.get(queueNum); } }, 3, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } @Override public void onException(Throwable e) { e.printStackTrace(); } }); /** * 同步发送到指定队列 */ /* SendResult sendResult = payProducer.getProducer().send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { //查找队列 int queueNum = Integer.parseInt(arg.toString()); return mqs.get(queueNum); } },0); System.out.println(sendResult);*/ /** * 单向发送 */ /* message.setDelayTimeLevel(3); SendResult sendResult = payProducer.getProducer().send(message); //payProducer.getProducer().sendOneway(message); System.out.println(sendResult);*/ return new HashMap<>(); } }
4.7.RocketMQ顺序消息讲解
(1)什么是顺序消息:消息的生产和消费顺序一致
- 全局顺序:topic下面全部消息都要有序
- 性能要求不高,所有的消息严格按照FIFO原则进行发布和消费的场景,并行度成为消息的瓶颈,吞吐量不够
- 局部顺序:只要保证一组消息被顺序消费即可(RocketMQ中使用)
- 性能要求高,电商的订单创建,同一个订单相关的创建订单消息、订单支付消息、订单物流消息、订单交易成功消息都会按照先后顺序来发布消费
- 顺序发布:对于指定的一个Topic,客户端将按照一定的先后顺序发送消息
- 顺序消费:对于指定的一个Topic,按照一定的先后顺序消费接受的消息,机先发送的消息一定会被客户端接收到
- 注意:
- 顺序消息不支持广播模式
- 顺序消息不支持异步发送
- (2)RocketMQ中顺序消息的使用
- 生产端保证发送消息的有序,且发送同一个topic下的queue里面
- 生产端根据MessageQueueSelector可以自定义策略,根据同个业务的订单id放置到同个queue里面,如订单号取模,同一条订单的消息就会被放在同一个队列中
public MessageQueue select(List<MessageQueue> mqs,Message msg,Object arg){ //订单id进行取模 Long orderId = Integer.parseLong(arg.toString()); long index = orderId%mqs.size(); return mqs.get((int)index); }
消费端要在保证消费同个topic里面的同个queue,不应该用MessageListenerConcurrently,应该使用MessageListenerOrderly,自带单线程消费消息,不能在Consumer端使用多线程去消费,消费端分配到queue数量是固定的
- ,集群会锁住当前正在消费的队列集群的消息,所以会保证顺序消费。
(3)顺序消息实战
- 生产者编码保证同步发送和发送指定队列
@RestController @RequestMapping("api/v1/pay") public class PayController { @Autowired private PayProducer payProducer; @RequestMapping("/pay_cb") public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { List<ProducerOrder> producerList = ProducerOrder.getProducerList(); for (ProducerOrder producerOrder : producerList) { Message message = new Message(JmsConfig.TOPIC,"",producerOrder.getOrderId()+"",producerOrder.toString().getBytes()); SendResult send = payProducer.getProducer().send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Long orderId = (Long)arg; //订单号和队列集合长度取模 long index = orderId % mqs.size(); return mqs.get((int)index); } },producerOrder.getOrderId()); MessageQueue messageQueue = send.getMessageQueue(); int queueId = messageQueue.getQueueId(); System.out.println("orderId:"+producerOrder.getOrderId()+"---orderType:"+producerOrder.getType()+"---queueId:"+queueId); System.out.println("---------------------------"); } return new HashMap<>(); } }
- 消费者编码用MessageListenerOrderly
concurrently多线程消费 @Component public class PayConsumer { private DefaultMQPushConsumer consumer; private String consumerGroup = "pay_consumer_group"; public PayConsumer() throws MQClientException { consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setNamesrvAddr(JmsConfig.NAME_SERVER); consumer.setMessageModel(MessageModel.CLUSTERING); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe("","*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) { MessageExt message = msgs.get(0); int times = message.getReconsumeTimes(); try{ // if(message.getKeys().equalsIgnoreCase("6688")){ // throw new Exception(); // } String body = new String(msgs.get(0).getBody()); String tags = msgs.get(0).getTags(); String keys = msgs.get(0).getKeys(); System.out.println("message:"+message+"--body:"+body+"--tags:"+tags+"--keys:"+keys); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }catch (Exception e){ if(times>=3){ System.out.println("消息失败,转入人工处理"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } System.out.println("消息异常,重新投递"); System.out.println("消息重投次数:"+times); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }); consumer.start(); System.out.println("consumer.start()..."); } }
orderly单线程消费(保证顺序消费必须用单线程消费) @Component public class PayOrderlyConsumer { private DefaultMQPushConsumer consumer; private String consumerGroup = "pay_orderly_consumer_group"; public PayOrderlyConsumer() throws MQClientException { consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setNamesrvAddr(JmsConfig.NAME_SERVER); consumer.setMessageModel(MessageModel.CLUSTERING); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe(JmsConfig.TOPIC,"*"); consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeConcurrentlyContext) { MessageExt message = msgs.get(0); int times = message.getReconsumeTimes(); try{ String body = new String(msgs.get(0).getBody()); String tags = msgs.get(0).getTags(); String keys = msgs.get(0).getKeys(); System.out.println("message:"+message+"--body:"+body+"--tags:"+tags+"--keys:"+keys); return ConsumeOrderlyStatus.SUCCESS; }catch (Exception e){ System.out.println("消息异常,重新投递"); System.out.println("消息重投次数:"+times); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } }); consumer.start(); System.out.println("consumer.start()..."); } }
Consumer会平衡分配queue的数量,并不是简单的禁止并发处理,而是为每个Consumer Queue加个锁,消费每个消息前,需要获取这个消息的所在锁,这样同个时间,同个queue不能并发处理,但是不同的queue的消息可以并发处理,采用分段锁Segment。
5.RocketMQ4.X消费者配置
5.1.RocketMQ消费者核心配置
- consumeFromWhere
- CONSUME_FROM_FIRST_OFFSET:初次从消息队列的头部开始消费,即历史消息全部消费一遍,后续在消费就是接着上次消费的进度来消费
CONSUME_FROM_LAST_OFFSET:初次从消息队列的尾部开始消费,及历史消息全部消费一遍,后续在消费就是接着上次消费的进度来消费
- CONSUME_FROM_TIMESTAMP:从某个时间点开始消费,默认是半小时以前,后须也是跟着上次消费的进度开始消费
- allocateMessageQueueStrategy
- 负载均衡算法,即消费者分配到queue的算法,默认值是AllocateMessageQueueAveragely即取模平均分配
- offsetStore
- 消息消费进度存储器,offsetStore有两个策略
- LocalFileOffsetStore本地存储消费进度的具体实现,给广播模式使用
- RemoteBrokerOffsetStore远程存储消费进度的具体实现,给集群模式使用,将消费进度存储在broker中
- consumeThreadMin
- 最小消费线程池数量
- consumeThreadMax
- 最大消费线程池数量
- pullBatchSize
- 消费者去broker拉取消息时,一次拉去多少条,默认是32条
- consumeMessageBatchMaxSize
- 单次消费时一次性拉取多少条消息,批量消费接口才有用,默认是1
- messageModel
- 消费者消费模式,CLUSTERING默认是集群模式,BROADCASTING广播模式
5.2.消费者集群模式和广播模式
- Topic下队列的奇偶数回影响Customer个数里面的消费数量
- 如果是4个队列,8个消息,4个节点则会各消费2条,如果不等,则会负载均衡分配不均
- 如果consumer实例的数量比message queue的总数量还多的话,那么多出来的consumer实例将无法分到queue,也就无法消费消息,无法起到负载均衡的作用,锁以要控制让queue的总数量大于等于consumer的数量。
集群模式(默认)CONUSMEING
- Consumer实例平均分摊消费生产发送的消息
- 例子:订单消息,一般只消费一次
广播模式 BROADCASTING
- 广播模式下消费消息:投递到broker的消息会被每个Consumer进行消费,一条消息会被多个Consumer进行消费,广播模式下ConsumerGroup暂时无用
- 例子:群公告、每个人都需要消费这个消息
通过setMessageModel()方法来进行切换消费者模式
MessageModel.CONSUMEING \ MessageModel.BROADCASTING
5.3.Tag实战和消息过滤
(1)讲解RocketMQ里面的Tag作用和消息过滤原理
- 一个Message只有一个Tag,tag是二级分类
- 举例:订单类----数码订单,服装订单
- 过滤分为Broker端和Consumer端过滤
- Broker端过滤,减少了无用的消息进行网络传输,但是增加了broker 的负担
- Consumer端过滤,完全可以根据业务需要进行筛选,但是增加了无用的消息的传输
- 一般监听是"*",或者指定Tag,||运算,SLQ92,FilterServer等
- tag性能高,逻辑简单
- SQL92性能较差,支持复杂的逻辑,MessageSelector.bySql
- 语法:>,<,=,IS NULL, OR, AND ,NOT等,sql where后面的基本都支持
- 注意:消费者订阅关系要一直,不然会消费混乱,甚至会消息丢失
- 订阅者关系一致,订阅关系由topic和tag组成,同一个group name,订阅的topic和tag必须是一样的
常见错误:
The broker does not support consumer to filter message by SQL92 broker.conf配置文件中配置 enablePropertyFilter=true 备注,修改之后要重启Broker
(2)消息过滤
RocketMQ的消息过滤方式有别于其他的消息中间件,是在订阅时,在做过滤。
Consume Queue的存储结构
在Broker端进行Message Tag比对,先遍历Conusme Queue,如果存储的Message Tag与订阅Message Tag不符合,则跳过,继续比对下一个,符合则传输给Consumer 。Consumer收到过滤的消息后,同样也是执行在broker端的操作,但是对别的真实的Message Tag字符串,而不是HashCode。
- 在Conusme Queue中存储HashCode,是为了在Consume Queue中定长的方式存储,节约空间。
- 过滤式不会访问Commit Log的数据,可以保证高效的过滤
- 即使存在Hash冲突,也可以在Conusmer端进行修正
5.4.PushConsumer和PullConsumer
(1)消费模式
- Push:实时性高,但增加服务端的负载,消费能力不同,如果推送的过快,消费端会出现很多问题
- pull:消费者从server拉去数据,主动权在消费者端,可控性好,但是间隔时间不好设置,间隔太短,则空请求,间隔太长则消息不能及时处理
- 长轮询:Client请求Server端也就是Broker,Broker会保持一段时间的长连接,默认是15s,超过15s则返回空,在进行重新请求。缺点:服务端需要保持Consumer的请求,会占用资源,需要客户端连接数可控否则会堆积一堆连接
- (2)PushConsumer本质就是长轮询
- 系统收到消息后自动处理消息和offset,如果有新的Consumer加入会自动做负载均衡
- 在broker端可通过longPollingEnable=true来开启长轮询
- 消费端代码:DefaultMQPushConsumerImpl->pullMessage->PullCallback
- 服务端代码:broker.longing
- 虽然是push,但是代码里大量用了pull,因为使用了长轮询方式达到了push的效果也有pull的效果
- (3)PullConsumer需要自己维护Offset
- 获取MessageQueue遍历
- 客户端维护Offset,需要用本地存储Offset,存储内存、磁盘、数据库等
- 处理不同状态的消息,FOUND(新消息)、NO_NEW_MSG(不是新消息)、OFFSET_ILLRGL(非法偏移量)、NO_MATCHED_MSG(筛选结果不匹配)四种状态
- 灵活性高可控性强,但是编码复杂度会高
- 优雅关闭:释放资源和保存Offset,需要程序自己保护好Offset,特别是异常处理的时候