4 安装启动
4.1 下载
https://rocketmq.apache.org/dowloading/releases/
4.2 安装
先决条件:
- 64bit OS, Linux/Unix/Mac is recommended;(Windows user see guide below)
- 64bit JDK 1.8+;
- Maven 3.2.x;
- Git;
- 4g+ free disk for Broker server
4.3 启动
(1)配置环境变量
(2)启动NameServer和Broker
(3)测试生产者和消费者
发送信息:tools.cmd org.apache.rocketmq.example.quickstart.Producer
接收消息:tools.cmd org.apache.rocketmq.example.quickstart.Consumer
5 Java整合RocketMQ案例
5.1 入门案例
5.1.1 新建Maven项目
5.1.2 引入依赖
<!-- RocketMQ --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.0</version> </dependency>
5.1.3 生产者案例
(1)同步发送
/** * @desc: Producer端发送同步消息 * @author: YanMingXin * @create: 2021/9/14-14:09 * @info: 这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。 **/ public class SyncProducer { public static void main(String[] args) throws Exception { // 实例化消息生产者Producer DefaultMQProducer producer = new DefaultMQProducer("my_mq_one"); // 设置NameServer的地址 producer.setNamesrvAddr("localhost:9876"); // 启动Producer实例 producer.start(); for (int i = 0; i < 100; i++) { // 创建消息,并指定Topic,Tag和消息体 Message msg = new Message("TopicTest" , "TagA" , ("Hello RocketMQ " + i).getBytes() ); // 发送消息到一个Broker SendResult sendResult = producer.send(msg); // 通过sendResult返回消息是否成功送达 System.out.println(sendResult); } // 如果不再发送消息,关闭Producer实例。 producer.shutdown(); } }
(2)单向发送
/** * @desc: 单向发送消息 * @author: YanMingXin * @create: 2021/9/14-14:10 * @info: 这种方式主要用在不特别关心发送结果的场景,例如日志发送。 **/ public class OnewayProducer { public static void main(String[] args) throws Exception{ // 实例化消息生产者Producer DefaultMQProducer producer = new DefaultMQProducer("my_mq_one"); // 设置NameServer的地址 producer.setNamesrvAddr("localhost:9876"); // 启动Producer实例 producer.start(); for (int i = 0; i < 100; i++) { // 创建消息,并指定Topic,Tag和消息体 Message msg = new Message("TopicTest" , "TagA", ("Hello RocketMQ " + i).getBytes() ); // 发送单向消息,没有任何返回结果 producer.sendOneway(msg); } // 如果不再发送消息,关闭Producer实例。 producer.shutdown(); } }
(3)异步发送
/** * @desc: 发送异步消息 * @author: YanMingXin * @create: 2021/9/14-14:09 * @info: 异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。 **/ public class AsyncProducer { public static void main(String[] args) throws Exception { // 实例化消息生产者Producer DefaultMQProducer producer = new DefaultMQProducer("my_mq_one"); // 设置NameServer的地址 producer.setNamesrvAddr("localhost:9876"); // 启动Producer实例 producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); int messageCount = 100; // 根据消息数量实例化倒计时计算器 final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount); for (int i = 0; i < messageCount; i++) { final int index = i; // 创建消息,并指定Topic,Tag和消息体 Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // SendCallback接收异步返回结果的回调 producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override public void onException(Throwable e) { System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); } // 等待5s countDownLatch.await(5, TimeUnit.SECONDS); // 如果不再发送消息,关闭Producer实例。 producer.shutdown(); } }
5.1.4 消费者案例
/** * @desc: 消费消息 * @author: YanMingXin * @create: 2021/9/14-14:12 **/ public class Consumer { public static void main(String[] args) throws MQClientException { // 实例化消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_mq_one_consumer"); // 设置NameServer的地址 consumer.setNamesrvAddr("localhost:9876"); // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息 consumer.subscribe("TopicTest", "*"); // 注册回调实现类来处理从broker拉取回来的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); // 标记该消息已经被成功消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者实例 consumer.start(); System.out.printf("Consumer Started......"); } }
5.1.5 研究下Message类
(1)理解:
Message是一个RocketMQ中的类也可以说是一个普通的对象,是消息发送和消费的最小实体。
(2)构造方法:
(3)几个重要的成员变量:
private String topic; private int flag; private Map<String, String> properties; private byte[] body; private String transactionId;
- topic:用于区分消息的Topic
- flag:
- properties:用于获取某些外部属性和属性值
- body:消息体
- transactionId:在开启事务的情况下保存事务的ID
5.2 顺序消息案例
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
5.2.1 顺序消息生产
/** * @desc: Producer,发送顺序消息 * @author: YanMingXin * @create: 2021/9/14-15:15 **/ public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); String[] tags = new String[]{"TagA", "TagC", "TagD"}; // 订单列表 List<OrderStep> orderList = new Producer().buildOrders(); Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String dateStr = sdf.format(date); for (int i = 0; i < 10; i++) { // 加个时间前缀 String body = dateStr + " Hello RocketMQ " + orderList.get(i); Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Long id = (Long) arg; //根据订单id选择发送queue long index = id % mqs.size(); return mqs.get((int) index); } }, orderList.get(i).getOrderId());//订单id System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s", sendResult.getSendStatus(), sendResult.getMessageQueue().getQueueId(), body)); } producer.shutdown(); } /** * 订单的步骤 */ @Data @ToString @Accessors(chain = true) private static class OrderStep { private long orderId; private String desc; } /** * 生成模拟订单数据 */ private List<OrderStep> buildOrders() { final long order1 = 15103111039L; final long order2 = 15103111065L; final long order3 = 15103117235L; List<OrderStep> orderList = new ArrayList<>(); OrderStep orderDemo = new OrderStep(); orderDemo.setOrderId(order1).setDesc("order1创建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(order2).setDesc("order2创建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(order1).setDesc("order1付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(order3).setDesc("order3创建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(order2).setDesc("order2付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(order3).setDesc("order3付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(order2).setDesc("order2完成"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(order1).setDesc("order1推送"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(order3).setDesc("order3完成"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(order1).setDesc("order1完成"); orderList.add(orderDemo); return orderList; } }
5.2.2 顺序消息消费
/** * @desc: 顺序消息消费,带事务方式(应用可控制Offset什么时候提交) * @author: YanMingXin * @create: 2021/9/14-15:27 **/ public class ConsumerInOrder { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3"); consumer.setNamesrvAddr("127.0.0.1:9876"); /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> * 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "TagA || TagC || TagD"); consumer.registerMessageListener(new MessageListenerOrderly() { Random random = new Random(); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); for (MessageExt msg : msgs) { // 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序 System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody())); } try { //模拟业务逻辑处理中... TimeUnit.SECONDS.sleep(1); } catch (Exception e) { e.printStackTrace(); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
5.3 延时消息案例
5.3.1 延时消息生产者
/** * @desc: 发送延时消息 * @author: YanMingXin * @create: 2021/9/14-15:43 **/ public class ScheduledMessageProducer { public static void main(String[] args) throws Exception { // 实例化一个生产者来产生延时消息 DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup"); producer.setNamesrvAddr("localhost:9876"); // 启动生产者 producer.start(); int totalMessagesToSend = 100; for (int i = 0; i < totalMessagesToSend; i++) { Message message = new Message("TopicTest", ("Hello scheduled message " + i).getBytes()); // 设置延时等级4,这个消息将在30s之后发送(现在只支持固定的几个时间,详看delayTimeLevel) message.setDelayTimeLevel(4); // 发送消息 producer.send(message); } // 关闭生产者 producer.shutdown(); } }
5.3.2 演示消息消费者
/** * @desc: 启动消费者等待传入订阅消息 * @author: YanMingXin * @create: 2021/9/14-15:43 **/ public class ScheduledMessageConsumer { public static void main(String[] args) throws Exception { // 实例化消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer"); consumer.setNamesrvAddr("localhost:9876"); // 订阅Topics consumer.subscribe("TopicTest", "*"); // 注册消息监听者 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) { for (MessageExt message : messages) { // Print approximate delay time period System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getBornTimestamp()) + "ms later"); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者 consumer.start(); } }
5.3.3 使用场景和限制
(1)使用场景
比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
(2)限制
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; //等价于 1->1s 2->5s 3->10s ......
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18 消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关,详见代码SendMessageProcessor.java
5.4 批量消息案例
5.4.1 批量消息生产者
/** * @desc: 批量生产 * @author: YanMingXin * @create: 2021/9/14-16:24 **/ public class BatchMessageProducer { public static void main(String[] args) throws Exception { // 实例化一个生产者来产生延时消息 DefaultMQProducer producer = new DefaultMQProducer("BatchMessageProducerGroup"); producer.setNamesrvAddr("127.0.0.1:9876"); // 启动生产者 producer.start(); String topic = "TopicTest"; List<Message> messages = new ArrayList<>(); messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes())); messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes())); messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes())); try { producer.send(messages); } catch (Exception e) { e.printStackTrace(); //处理error } // 关闭生产者 producer.shutdown(); } }
5.4.2 批量消息消费者
/** * @desc: 启动消费者等待传入订阅消息 * @author: YanMingXin * @create: 2021/9/14-15:43 **/ public class BatchMessageConsumer { public static void main(String[] args) throws Exception { // 实例化消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BatchMessageConsumer"); consumer.setNamesrvAddr("localhost:9876"); // 订阅Topics consumer.subscribe("TopicTest", "*"); // 注册消息监听者 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) { for (MessageExt message : messages) { // Print approximate delay time period System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getBornTimestamp()) + "ms later"); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者 consumer.start(); } }
5.4.3 消息列表分割
复杂度只有当你发送大批量时才会增长,你可能不确定它是否超过了大小限制(4MB)。这时候你最好把你的消息列表分割一下:
public class ListSplitter implements Iterator<List<Message>> { private final int SIZE_LIMIT = 1024 * 1024 * 4; private final List<Message> messages; private int currIndex; public ListSplitter(List<Message> messages) { this.messages = messages; } @Override public boolean hasNext() { return currIndex < messages.size(); } @Override public List<Message> next() { int startIndex = getStartIndex(); int nextIndex = startIndex; int totalSize = 0; for (; nextIndex < messages.size(); nextIndex++) { Message message = messages.get(nextIndex); int tmpSize = calcMessageSize(message); if (tmpSize + totalSize > SIZE_LIMIT) { break; } else { totalSize += tmpSize; } } List<Message> subList = messages.subList(startIndex, nextIndex); currIndex = nextIndex; return subList; } private int getStartIndex() { Message currMessage = messages.get(currIndex); int tmpSize = calcMessageSize(currMessage); while(tmpSize > SIZE_LIMIT) { currIndex += 1; Message message = messages.get(curIndex); tmpSize = calcMessageSize(message); } return currIndex; } private int calcMessageSize(Message message) { int tmpSize = message.getTopic().length() + message.getBody().length(); Map<String, String> properties = message.getProperties(); for (Map.Entry<String, String> entry : properties.entrySet()) { tmpSize += entry.getKey().length() + entry.getValue().length(); } tmpSize = tmpSize + 20; // 增加⽇日志的开销20字节 return tmpSize; } } //把大的消息分裂成若干个小的消息 ListSplitter splitter = new ListSplitter(messages); while (splitter.hasNext()) { try { List<Message> listItem = splitter.next(); producer.send(listItem); } catch (Exception e) { e.printStackTrace(); //处理error } }