RocketMq普通消息,死信队列,消息幂等性(redis)

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。

1 介绍

RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。

 

1.1 RocketMQ 特点

  • 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型
  • 在一个队列中可靠的先进先出(FIFO)和严格的顺序传递 (RocketMQ可以保证严格的消息顺序,而ActiveMQ无法保证)
  • 支持拉(pull)和推(push)两种消息模式

pull其实就是消费者主动从MQ中去拉消息,而push则像rabbit MQ一样,是MQ给消费者推送消息。但是RocketMQ的push其实是基于pull来实现的。

它会先由一个业务代码从MQ中pull消息,然后再由业务代码push给特定的应用/消费者。其实底层就是一个pull模式

  • 单一队列百万消息的堆积能力 (RocketMQ提供亿级消息的堆积能力,这不是重点,重点是堆积了亿级的消息后,依然保持写入低延迟)
  • 支持多种消息协议,如 JMS、MQTT 等
  • 分布式高可用的部署架构,满足至少一次消息传递语义(RocketMQ原生就是支持分布式的,而ActiveMQ原生存在单点性)
  • 提供 docker 镜像用于隔离测试和云集群部署
  • 提供配置、指标和监控等功能丰富的 Dashboard

新建springboot工程并,导入依赖,本文选用rocketmq2.2.2版本

1. <dependency>
2. <groupId>org.apache.rocketmq</groupId>
3. <artifactId>rocketmq-spring-boot-starter</artifactId>
4. <version>2.2.2</version>
5. </dependency>

YML配置文件

1. rocketmq:
2.   name-server: 192.168.198.129:9876
3.   producer:
4.     group: group1

消息提供者

1. @GetMapping("tset")
2. public String send(){
3. //尝试发送消息
4.         rocketMQTemplate.convertAndSend("topic1","smoky哥哥呀");
5. //        log.info("尝试发送同步消息:{}",syncSend);
6. return "success";
7.     }

消息消费者

1. @Component
2. @RocketMQMessageListener(topic = "topic1",consumerGroup = "group1")
3. @Slf4j
4. public class ReceiveConsumer implements RocketMQListener<String>,RocketMQPushConsumerLifecycleListener{
5. 
6. @Autowired
7.     StringRedisTemplate stringRedisTemplate;
8. 
9. @Override
10. public void onMessage(String s) {
11.         log.info("接收到MQ消息:{}",s);
12.     }
13. 
14. @Override
15. public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
16.         defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
17. MessageExt messageExt = list.get(0);
18. //尝试获取消息体内容
19. String body = new String(messageExt.getBody());
20. //尝试获取消息id
21. String msgId = messageExt.getMsgId();
22.             log.info("获取到消息id为:{}",msgId);
23. //尝试获取重试次数
24. int reconsumeTimes = messageExt.getReconsumeTimes();
25.             log.info("获取到当前消息重试次数:{}",reconsumeTimes);
26. try {
27. int i = 10/0;
28. //接口幂等性
29. Boolean rocketmq = stringRedisTemplate.opsForHash().hasKey("ROCKETMQ", msgId);
30. if (!rocketmq) {
31.                     log.info("接收到ACK消息:{}", body);
32. //存入redis日志表,代表消费成功
33.                     stringRedisTemplate.opsForHash().put("ROCKETMQ",msgId,"1");
34.                 }
35.             } catch (Exception e) {
36.                 e.printStackTrace();
37.                 log.info("异常捕获,尝试ACK重试机制");
38. if (reconsumeTimes>=3){
39. //加入死信队列
40.                     log.info("业务逻辑:加入死信队列做,做事后补偿");
41. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
42.                 }
43. //发起重试
44.                 log.info("尝试发起ACK重试");
45. return ConsumeConcurrentlyStatus.RECONSUME_LATER;
46.             }
47. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
48.         });
49.     }
50. }

 

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
3月前
|
编解码 NoSQL Java
使用Spring Boot + Redis 队列实现视频文件上传及FFmpeg转码的技术分享
【8月更文挑战第30天】在当前的互联网应用中,视频内容的处理与分发已成为不可或缺的一部分。对于视频平台而言,高效、稳定地处理用户上传的视频文件,并对其进行转码以适应不同设备的播放需求,是提升用户体验的关键。本文将围绕使用Spring Boot结合Redis队列技术来实现视频文件上传及FFmpeg转码的过程,分享一系列技术干货。
200 3
|
9天前
|
设计模式 NoSQL Go
Redis 实现高效任务队列:异步队列与延迟队列详解
本文介绍了如何使用 Redis 实现异步队列和延迟队列。通过 Go 语言的 `github.com/go-redis/redis` 客户端,详细讲解了 Redis 客户端的初始化、异步队列的实现和测试、以及延迟队列的实现和测试。文章从基础连接开始,逐步构建了完整的队列系统,帮助读者更好地理解和应用这些概念,提升系统的响应速度和性能。
28 6
|
1月前
|
消息中间件 存储 监控
RabbitMQ 队列之战:Classic 和 Quorum 的性能洞察
RabbitMQ 是一个功能强大的消息代理,用于分布式应用程序间的通信。它通过队列临时存储消息,支持异步通信和解耦。经典队列适合高吞吐量和低延迟场景,而仲裁队列则提供高可用性和容错能力,适用于关键任务系统。选择哪种队列取决于性能、持久性和容错性的需求。
121 6
|
1月前
|
消息中间件 存储 NoSQL
如何用Redis实现延迟队列?
综上所述,通过Redis的有序集合和一些基本命令,我们可以轻松地构建出功能完善的延迟队列系统。根据具体需求,可以进一步优化和扩展,以满足高性能和高可靠性的业务需求。
32 1
|
2月前
|
NoSQL 关系型数据库 Redis
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
mall在linux环境下的部署(基于Docker容器),docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongodb、minio详细教程,拉取镜像、运行容器
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
|
2月前
|
消息中间件 JSON Java
|
1月前
|
存储 NoSQL Java
Spring Boot项目中使用Redis实现接口幂等性的方案
通过上述方法,可以有效地在Spring Boot项目中利用Redis实现接口幂等性,既保证了接口操作的安全性,又提高了系统的可靠性。
37 0
|
2月前
|
消息中间件
rabbitmq,&队列
rabbitmq,&队列
|
2月前
|
消息中间件 JSON Java
玩转RabbitMQ声明队列交换机、消息转换器
玩转RabbitMQ声明队列交换机、消息转换器
84 0
|
3月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
86 2