RocketMQ使用教程相关系列 目录
目录
第一节:介绍
第二节:延时消息-生产者和消息者步骤说明
延时消息生产者代码实现步骤
延时消息消费者代码实现步骤
第三节:延时消息生产者
效果:
第四节:延时消息消费者
效果:
第一节:介绍
比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
它的实现和普通消息的生产者,消费者基本一样,多了一个设置延迟级别。
message.setDelayTimeLevel()
现在RocketMq(开源的版本,白嫖的,知足吧)并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18
注:阿里云收费版本支持任意时间延时
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
第二节:延时消息-生产者和消息者步骤说明
延时消息生产者代码实现步骤
1.创建消息生产者producer,并制定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象,指定主题Topic、Tag和消息体,设置延时级别
5.发送消息
6.关闭生产者producer
延时消息消费者代码实现步骤
1.创建消费者Consumer,制定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer
注意:消费者的 Topic 和 Tag 需要和生产者保持一致
第三节:延时消息生产者
public class Producer { public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { // 1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("demo_producer_delay_group"); // 2.指定Nameserver地址 producer.setNamesrvAddr("192.168.88.131:9876"); // 3.启动producer producer.start(); System.out.println("生产者启动"); for (int i = 0; i < 10; i++) { // 4.创建消息对象,指定主题Topic、Tag和消息体 /** * 参数一:消息主题Topic * 参数二:消息Tag * 参数三:消息内容 */ Message msg = new Message("DelayTopic", "Tag1", ("Hello 虚竹" + i).getBytes()); // 设定延迟时间 10s msg.setDelayTimeLevel(3); // 5.发送消息 SendResult result = producer.send(msg); // 发送状态 SendStatus status = result.getSendStatus(); System.out.println("发送结果:" + result); // 线程睡1秒 TimeUnit.SECONDS.sleep(1); } // 6.关闭生产者producer producer.shutdown(); } }
public class Consumer { public static void main(String[] args) throws Exception { // 1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_producer_delay_group"); // 2.指定Nameserver地址 consumer.setNamesrvAddr("192.168.88.131:9876"); // 3.订阅主题Topic和Tag consumer.subscribe("DelayTopic", "*"); // 4.设置回调函数,处理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { // 接受消息内容 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("消息ID:【" + msg.getMsgId() + "】,消息内容:" + new String(msg.getBody()) + ",延迟时间:" + (System.currentTimeMillis() - msg.getBornTimestamp())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 5.启动消费者consumer consumer.start(); System.out.println("消费者启动"); } }