前言
大家好,我是小郭,前面的文章介绍了,RocketMQ的搭建,以及RocketMQ的NameServer,接下来我们配合着官方提供的demo,进行实际的消息发送学习,主要学习发送方式、发送参数的含义,以及发送中的一些问题。
消息发送的方式
- 可靠同步发送,需要等待服务器响应结果。
- 可靠异步发送,不等待服务器响应结果直接返回,当收到Broker的响应结果后调用SendCallback回调函数。
- 单向发送,不等待响应结果,不调用回调函数。
实战使用
在实际使用中,我们通常会选择可靠同步发送,因为我们快速的得到成功和失败的反馈。
业务场景:用户A下单创建订单,付款、完成订单
在这过程中,可能会产生三条消息,那在发送消息可能会因为负载均衡的策略,被分配到不同的消息队列中去
RocketMQ常用的两种平均分配算法
- AllocateMessageQueueAveragely
平均分配,按照总数除以消费者个数进行,对每个消费者进行分配 - AllocateMessageQueueAveragelyByCircle 轮流平均分配,按照消费者个数,进行轮询分配
所以为了保证局部顺序消息,只要保证每一组消息被顺序消费即可,我们需要考虑修改MessageQueueSelector方法,保证消息投递到同一个队列中。
// 实例化消息生产者Producer org.apache.rocketmq.client.producer.DefaultMQProducer producer = new org.apache.rocketmq.client.producer.DefaultMQProducer("please_rename_unique_group_name"); // 设置NameServer的地址 producer.setNamesrvAddr("xxx:9876"); //设置重试次数 producer.setRetryTimesWhenSendFailed(3); producer.setSendMsgTimeout(5000); private static SendResult sendMsg(DefaultMQProducer producer, Order order) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { Message msg = null; try { msg = new Message("orderTest","TagA", order.getOrderNo(), JSON.toJSONString(order).getBytes(RemotingHelper.DEFAULT_CHARSET)); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } // 发送消息到一个Broker SendResult sendResult = producer.send(msg, (list, message, arg) -> { if (message == null || list.isEmpty()) { return null; } // 取模 int index = Math.abs(arg.hashCode()) % list.size(); //模拟超时 //if (order.getId() == 1L){ try { Thread.sleep(6000); } catch (Exception e) { } }// return list.get(Math.max(index, 0)); }, order.getId()); String result = JSONObject.toJSONString(sendResult); System.out.println(result); return sendResult; }
- List mqs:消息要发送的Topic下所有的分区
- Message msg:消息对象
- 额外的参数:用户可以传递自己的参数
发送结果
数据按照我们修改的MessageQueueSelector,进行了队列的选择,正式我们想要的结果,这样我们就可以按照队列的顺序消费。
发送结果参数
messageQueue对象
String topic = msg.getTopic(); if (StringUtils.isNotEmpty(this.clientConfig.getNamespace())) { topic = NamespaceUtil.withoutNamespace(topic, this.clientConfig.getNamespace()); } MessageQueue messageQueue = new MessageQueue(topic, brokerName, responseHeader.getQueueId());
主要有brokerName,topic和QueueId组成,RocketMQ支持顺序投递,利用MessageQueueSelector,将相同的Key投递到同一个队列中,保证局部顺序。
全局msgId生成规则
规则:IP+进程PID+类加载器HashCode+自增
初始化值
static { byte[] ip; try { ip = UtilAll.getIP(); } catch (Exception e) { ip = createFakeIP(); } LEN = ip.length + 2 + 4 + 4 + 2; // 分配大小 ip长度+2+4 ByteBuffer tempBuffer = ByteBuffer.allocate(ip.length + 2 + 4); tempBuffer.put(ip); // 进程PID tempBuffer.putShort((short) UtilAll.getPid()); // 类加载器hashCode tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode()); FIX_STRING = UtilAll.bytes2string(tempBuffer.array()).toCharArray(); setStartTime(System.currentTimeMillis()); COUNTER = new AtomicInteger(0); }
构建UniqID
public static String createUniqID() { char[] sb = new char[LEN * 2]; System.arraycopy(FIX_STRING, 0, sb, 0, FIX_STRING.length); long current = System.currentTimeMillis(); if (current >= nextStartTime) { setStartTime(current); } int diff = (int)(current - startTime); if (diff < 0 && diff > -1000_000) { // may cause by NTP diff = 0; } int pos = FIX_STRING.length; UtilAll.writeInt(sb, pos, diff); pos += 8; UtilAll.writeShort(sb, pos, COUNTER.getAndIncrement()); return new String(sb); }
获取MsgId
String uniqMsgId = MessageClientIDSetter.getUniqID(msg); if (msg instanceof MessageBatch) { StringBuilder sb = new StringBuilder(); for (Message message : (MessageBatch) msg) { sb.append(sb.length() == 0 ? "" : ",").append(MessageClientIDSetter.getUniqID(message)); } uniqMsgId = sb.toString(); }
queueOffset Key中为剩余的消费偏移量
responseHeader.getQueueOffset();
消息发送参数的作用
Keys
Tags标签
标签的作用 :对Topic中的消息进行过滤,选择处理
下面我们做一个Test消费的测试
public static void main(String[] args) throws Exception { // 实例化消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); // 设置NameServer的地址 consumer.setNamesrvAddr("xxx:9876"); // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息 consumer.subscribe("TopicTest", ""); //设置 负载均衡 | 广播模式 默认是负载均衡 consumer.setMessageModel(MessageModel.CLUSTERING); /** * 消息顺序: * 全局消息顺序 * 局部消息顺序 */ //设置回调函数处理消息 consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), list); list.forEach(msg ->{ System.out.println("Receive message[msgId=" + msg.getMsgId() + "] " + (System.currentTimeMillis() - msg.getStoreTimestamp()) + "ms later"); }); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); }
在上面发送消息的时候Tags参数设定为TagA,在消费端将tags参数设置为空,则拉取不到消息
使用MessageQueueSelector,报错却没有重发消息,怎么办?
if (order.getId() == 1L){ try { Thread.sleep(6000); } catch (Exception e) { } } //设置重试次数 producer.setRetryTimesWhenSendFailed(3); producer.setSendMsgTimeout(5000);
上面的代码中,进行了重试的设置,同时也在代码中设置了超时的场景
代码很快就抛出了异常信息,发现我们设置的重试设置没有作用,没有进行消息重发,也没有对队列阻塞
缺陷:
- 设置的重试没有生效,发送顺序消息无法利用集群的Failover特性,因为不能更换MessageQueue进行重试
- 因为发送的路由策略导致的热点问题,可能某一些MessageQueue的数据量特别大
- 没有消息重发,也没有队列阻塞
解决方案
典型的就是消息发送失败后存在数据库中,然后定时调度,最终将消息发送到 MQ
但是我觉得这个方案还是存在缺陷的,重新发送消息后,消息的顺序性就发生变动了,这个问题需要思考
总结
今天这篇文章,主要介绍了同步发送顺序消息,以及发送消息中的主要参数的作用和生成规则,
最后的问题,在后面的学习中在进行思考,来完善使用MessageQueueSelector,报错却没有重发消息的问题。
下一篇,我们主要来看RocketMQ消息发送的源码,进行更深入的学习。