三、入门使用
3.1、导入依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency> 复制代码
3.2、发送消息
发送消息的步骤1:
- 创建消息生产者, 指定生产者所属的组名。
- 指定Nameserver地址。
- 启动生产者。
- 创建消息对象,指定主题、标签和消息体。
- 发送消息。
- 关闭生产者。
public class Producer { public static void main(String[] args) throws Exception { // 创建一个生产者对象,并且指定一个生产者组 DefaultMQProducer producer = new DefaultMQProducer("xiaolin-producer"); // 指定名字服务器地址 producer.setNamesrvAddr("127.0.0.1:9876"); // 启动生产者 producer.start(); // 创建一个消息,参数分别为:主题、标签、消息体 Message message = new Message("hello_demo1","tag1","你好,我是消息1".getBytes("utf-8")); // 发送消息 producer.send(message); // 关闭资源 producer.shutdown(); } } 复制代码
3.3、消费消息(接收消息)
消费消息的步骤:
1. 创建消息消费者, 指定消费者所属的组名 2. 指定Nameserver地址 3. 指定消费者订阅的主题和标签 4. 设置回调函数,编写处理消息的方法 5. 启动消息消费者
public class ConsumerDemo { public static void main(String[] args) throws MQClientException { // 创建一个拉取消息对象,并指定所属组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-demo"); // 指定名字服务器地址 producer.setNamesrvAddr("127.0.0.1:9876"); // 指定消费者订阅的主题和标签,第二个参数用于过滤条件用于指定接收什么tag,什么都接收 consumer.subscribe("hello_demo1", "*"); // 注册一个消息监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) { // 可能有多个消息 for (MessageExt msg : msgs) { // 打印消息 System.out.println(new String(msg.getBody())); } // 返回发送成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者 consumer.start(); } } 复制代码
四、消息的类型
4.1、普通消息
RocketMQ提供三种方式来发送普通消息:可靠同步发送、可靠异步发送和单向发送。
4.1.1、可靠同步发送
同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。这种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。
public class SyncProducer { public static void main(String[] args) throws Exception { // 实例化消息生产者Producer DefaultMQProducer producer = new DefaultMQProducer("xiaolin-producer"); // 设置NameServer的地址 producer.setNamesrvAddr("127.0.0.1:9876"); // 启动Producer实例 producer.start(); for (int i = 0; i < 100; i++) { // 创建消息,并指定Topic,Tag和消息体 Message msg = new Message("04-producer-type" /* Topic */, "TagA" /* Tag */, ("这是一条同步消息 " + i).getBytes("utf-8") /* Message body */); //发送同步消息到一个Broker SendResult sendResult = producer.send(msg); // 通过sendResult返回消息是否成功送达 System.out.println(JSON.toJSONString(sendResult)); } // 如果不再发送消息,关闭Producer实例。 producer.shutdown(); } } 复制代码
4.1.2、异步消息
异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。发送方通过回调接口接收服务器响应,并对响应结果进行处理。
异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
public class ASyncProducer { public static void main(String[] args) throws Exception { // 实例化消息生产者Producer DefaultMQProducer producer = new DefaultMQProducer("xiaolin-producer"); // 设置NameServer的地址 producer.setNamesrvAddr("127.0.0.1:9876"); // 启动Producer实例 producer.start(); for (int i = 0; i < 100; i++) { // 创建消息,并指定Topic,Tag和消息体 Message msg = new Message("04-producer-type" /* Topic */, "TagA" /* Tag */, ("我是异步消息" + i).getBytes("utf-8") /* Message body */ ); //发送同步消息到一个Broker producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("消息发送成功"); System.out.println(JSON.toJSONString(sendResult)); } @Override public void onException(Throwable e) { System.out.println("消息发送失败"+e.getMessage()); System.out.println("处理失败消息"); } }); } // 让线程不要终止,否则会报错 Thread.sleep(30000000); // 如果不再发送消息,关闭Producer实例。 producer.shutdown(); } } 复制代码
4.1.3、单向消息
单向发送是指发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
public class OneWayProducer { public static void main(String[] args) throws Exception { // 实例化消息生产者Producer DefaultMQProducer producer = new DefaultMQProducer("xiaolin-producer"); // 设置NameServer的地址 producer.setNamesrvAddr("127.0.0.1:9876"); // 启动Producer实例 producer.start(); for (int i = 0; i < 100; i++) { // 创建消息,并指定Topic,Tag和消息体 Message msg = new Message("04-producer-type" /* Topic */, "TagA" /* Tag */, ("我是单向消息" + i).getBytes("utf-8") /* Message body */ ); //发送单向消息到一个Broker producer.sendOneway(msg); } // 如果不再发送消息,关闭Producer实例。 producer.shutdown(); } } 复制代码
4.1.4、三种发送方式的对比
发送方式 | 发送的时间 | 发送反馈结果 | 是否丢失数据 |
同步发送 | 快 | 有 | 不丢失 |
异步发送 | 快 | 有 | 不丢失 |
单向消息 | 较快 | 无 | 可能丢失 |
4.2、顺序消息
虽然RocketMQ的数据结构是队列,看起来天生支持顺序消息,当只有一个队列的时候,他就天生支持顺序消息,但是Brocket内部有多个队列,发送多条消息的时候,Broker会按照轮询的方式将多个消息放在不同的队列,消费者采用多线程的方式去消费消息,所以无法保证消费消息的方式和发送消息的方式一样的。解决方式是将消息全部发送到一个队列里面。
比如一个订单的流程是:创建、付款、推送、完成。订单号相同的
顺序消息是消息队列提供的一种严格按照顺序来发布和消费的消息类型。
/** * 订单构建者 */ public class OrderStep { private long orderId; private String desc; public long getOrderId() { return orderId; } public void setOrderId(long orderId) { this.orderId = orderId; } public String getDesc() { return desc; } public void setDesc(String desc) { this.desc = desc; } @Override public String toString() { return "OrderStep{" + "orderId=" + orderId + ", desc='" + desc + '\'' + '}'; } public static List<OrderStep> buildOrders() { // 1039L : 创建 付款 推送 完成 // 1065L : 创建 付款 // 7235L :创建 付款 List<OrderStep> orderList = new ArrayList<OrderStep>(); OrderStep orderDemo = new OrderStep(); orderDemo.setOrderId(1039L); orderDemo.setDesc("创建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(1065L); orderDemo.setDesc("创建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(1039L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(7235L); orderDemo.setDesc("创建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(1065L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(7235L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(1065L); orderDemo.setDesc("完成"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(1039L); orderDemo.setDesc("推送"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(7235L); orderDemo.setDesc("完成"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(1039L); orderDemo.setDesc("完成"); orderList.add(orderDemo); return orderList; } } 复制代码
public class Producer { public static void main(String[] args) throws Exception { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.指定Nameserver地址 producer.setNamesrvAddr("127.0.0.1:9876"); //3.启动producer producer.start(); //构建消息集合 List<OrderStep> orderSteps = OrderStep.buildOrders(); //发送消息 for (int i = 0; i < orderSteps.size(); i++) { String body = orderSteps.get(i) + ""; Message message = new Message("OrderTopic", "Order", "i" + i, body.getBytes()); /** * 参数一:消息对象 * 参数二:消息队列的选择器 * 参数三:选择队列的业务标识(订单ID) */ SendResult sendResult = producer.send(message, new MessageQueueSelector() { /** * * @param mqs:队列集合 * @param msg:消息对象 * @param arg:业务标识的参数 * @return */ @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { long orderId = (long) arg; long index = orderId % mqs.size(); return mqs.get((int) index); } }, orderSteps.get(i).getOrderId()); System.out.println("发送结果:" + sendResult); } producer.shutdown(); } } 复制代码
public class Consumer { public static void main(String[] args) throws MQClientException { //1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.指定Nameserver地址 consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); //3.订阅主题Topic和Tag consumer.subscribe("OrderTopic", "*"); //4.注册消息监听器 consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { for (MessageExt msg : msgs) { System.out.println("线程名称:【" + Thread.currentThread().getName() + "】:" + new String(msg.getBody())); } return ConsumeOrderlyStatus.SUCCESS; } }); //5.启动消费者 consumer.start(); System.out.println("消费者启动"); } } 复制代码
4.3、事务消息
RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致。事务消息交互的过程如下:
事务消息的基本概念:
- 半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了RocketMQ服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
- 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,RocketMQ服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。
事务消息的发送步骤:
- 发送方将半事务消息发送到RocketMQ服务端。
- RocketMQ服务端将消息持久化后,向发送方返回确认消息已经发送成功,此时消息为半事务消息。
- 发送方开始执行本地事务逻辑。
- 发送方根据本地事务执行结果向服务端提交二次确认(Commit或者是Rollback),服务端收到Commit状态则将事务消息标记为可投递,订阅方最终将收到该消息。服务端如果收到的是Rollback状态则删除半事务消息,订阅方将不会接收该消息。
事务消息回查步骤:
- 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
- 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
- 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。
4.4、延时消息
比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。我们就可以使用延时消息来完成这个功能。
延时消息的使用限制,现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18。
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; 复制代码
消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关。
4.4.1、生产者
public class ScheduledMessageProducer { public static void main(String[] args) throws Exception { // 实例化一个生产者来产生延时消息 DefaultMQProducer producer = new DefaultMQProducer("wolfcode-producer"); producer.setNamesrvAddr("127.0.0.1:9876"); // 启动生产者 producer.start(); Message message = new Message("06-delay", ("delay message").getBytes()); // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel) message.setDelayTimeLevel(3); // 发送消息 producer.send(message); // 关闭生产者 producer.shutdown(); } } 复制代码
4.4.2、消费者
public class ScheduledMessageConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wolfcode_consumer"); consumer.setNamesrvAddr("127.0.0.1:9876"); // 订阅Topics consumer.subscribe("06-delay", "*"); // 注册消息监听者 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) { for (MessageExt message : messages) { System.out.println("Receive message[msgId=" + message.getMsgId() + "] " ); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者 consumer.start(); } } 复制代码
4.5、消息的过滤
在消费消息的时候,我们可以指定消费哪些消息,这个时候就需要用到消息的过滤,他分为1两种过滤:
- 通过标签过滤。
- 通过SQL语句的方式过滤。
4.5.1、通过标签过滤
4.5.1.1、生产者
public class Producer { public static void main(String[] args) throws Exception { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.指定Nameserver地址 producer.setNamesrvAddr("127.0.0.1:9876"); //3.启动producer producer.start(); for (int i = 0; i < 3; i++) { //4.创建消息对象,指定主题Topic、Tag和消息体 /** * 参数一:消息主题Topic * 参数二:消息Tag * 参数三:消息内容 */ Message msg = new Message("FilterTagTopic", "Tag2", ("消息的过滤" + i).getBytes()); //5.发送消息 SendResult result = producer.send(msg); //发送状态 SendStatus status = result.getSendStatus(); System.out.println("发送结果:" + result); //线程睡1秒 TimeUnit.SECONDS.sleep(1); } //6.关闭生产者producer producer.shutdown(); } } 复制代码
4.5.1.2、消费者
public class Consumer { public static void main(String[] args) throws Exception { //1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.指定Nameserver地址 consumer.setNamesrvAddr("127.0.0.1:9876"); //3.订阅主题Topic和Tag consumer.subscribe("FilterTagTopic", "Tag1 || Tag2 "); //4.设置回调函数,处理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { //接受消息内容 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5.启动消费者consumer consumer.start(); System.out.println("消费者启动"); } } 复制代码
4.5.2、通过SQL语句过滤
RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
- 数值比较,比如:>,>=,<,<=,BETWEEN,=
- 字符比较,比如:=,<>,IN
- IS NULL 或者 IS NOT NULL
- 逻辑符号 AND,OR,NOT
常量支持类型为:
- 数值,比如:123,3.1415
- 字符,比如:'abc',必须用单引号包裹起来
- NULL,特殊的常量
- 布尔值,TRUE 或 FALSE
4.5.2.1、消息生产者
发送消息时,你能通过putUserProperty
来设置消息的属性。
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.start(); Message msg = new Message("TopicTest", tag, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); // 设置一些属性 msg.putUserProperty("a", String.valueOf(i)); SendResult sendResult = producer.send(msg); producer.shutdown(); 复制代码
4.5.2.2、消息消费者
用MessageSelector.bySql来使用sql筛选消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); // 只有订阅的消息有这个属性a, a >=0 and a <= 3 consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start();