手把手教你 springBoot 整合 rabbitMQ,利用 MQ 实现事务补偿

简介: rabbitMQ 在互联网公司有着大规模应用,本篇将实战介绍 springboot 整合 rabbitMQ,同时也将在具体的业务场景中介绍利用 MQ 实现事务补偿操作。

rabbitMQ 在互联网公司有着大规模应用,本篇将实战介绍 springboot 整合 rabbitMQ,同时也将在具体的业务场景中介绍利用 MQ 实现事务补偿操作。

一、介绍

在上篇文章中,我们详细的介绍了 rabbitMQ 的内部架构以及使用操作,本篇我们一起来实操一下SpringBoot整合rabbitMQ,为后续业务处理做铺垫。

废话不多说,直奔主题!

二、整合实战

2.1、创建一个 maven 工程,引入 amqp 包

<!--amqp 支持-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.2、在全局文件中配置 rabbitMQ 服务信息

spring.rabbitmq.addresses=197.168.24.206:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/

其中,spring.rabbitmq.addresses参数值为 rabbitmq 服务器地址

2.3、编写 rabbitmq 配置类

@Slf4j
@Configuration
public class RabbitConfig {
    /**
     * 初始化连接工厂
     * @param addresses
     * @param userName
     * @param password
     * @param vhost
     * @return
     */
    @Bean
    ConnectionFactory connectionFactory(@Value("${spring.rabbitmq.addresses}") String addresses,
                                        @Value("${spring.rabbitmq.username}") String userName,
                                        @Value("${spring.rabbitmq.password}") String password,
                                        @Value("${spring.rabbitmq.virtual-host}") String vhost) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(addresses);
        connectionFactory.setUsername(userName);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(vhost);
        return connectionFactory;
    }
    /**
     * 重新实例化 RabbitAdmin 操作类
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        return new RabbitAdmin(connectionFactory);
    }
    /**
     * 重新实例化 RabbitTemplate 操作类
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
        //数据转换为json存入消息队列
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }
    /**
     * 将 RabbitUtil 操作工具类加入IOC容器
     * @return
     */
    @Bean
    public RabbitUtil rabbitUtil(){
        return new RabbitUtil();
    }
}

2.4、编写 RabbitUtil 工具类

public class RabbitUtil {
    private static final Logger logger = LoggerFactory.getLogger(RabbitUtil.class);
    @Autowired
    private RabbitAdmin rabbitAdmin;
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 创建Exchange
     * @param exchangeName
     */
    public void addExchange(String exchangeType, String exchangeName){
        Exchange exchange = createExchange(exchangeType, exchangeName);
        rabbitAdmin.declareExchange(exchange);
    }
    /**
     * 删除一个Exchange
     * @param exchangeName
     */
    public boolean deleteExchange(String exchangeName){
        return rabbitAdmin.deleteExchange(exchangeName);
    }
    /**
     * 创建一个指定的Queue
     * @param queueName
     * @return queueName
     */
    public void addQueue(String queueName){
        Queue queue = createQueue(queueName);
        rabbitAdmin.declareQueue(queue);
    }
    /**
     * 删除一个queue
     * @return queueName
     * @param queueName
     */
    public boolean deleteQueue(String queueName){
        return rabbitAdmin.deleteQueue(queueName);
    }
    /**
     * 按照筛选条件,删除队列
     * @param queueName
     * @param unused 是否被使用
     * @param empty 内容是否为空
     */
    public void deleteQueue(String queueName, boolean unused, boolean empty){
        rabbitAdmin.deleteQueue(queueName,unused,empty);
    }
    /**
     * 清空某个队列中的消息,注意,清空的消息并没有被消费
     * @return queueName
     * @param queueName
     */
    public void purgeQueue(String queueName){
        rabbitAdmin.purgeQueue(queueName, false);
    }
    /**
     * 判断指定的队列是否存在
     * @param queueName
     * @return
     */
    public boolean existQueue(String queueName){
        return rabbitAdmin.getQueueProperties(queueName) == null ? false : true;
    }
    /**
     * 绑定一个队列到一个匹配型交换器使用一个routingKey
     * @param exchangeType
     * @param exchangeName
     * @param queueName
     * @param routingKey
     * @param isWhereAll
     * @param headers EADERS模式类型设置,其他模式类型传空
     */
    public void addBinding(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){
        Binding binding = bindingBuilder(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers);
        rabbitAdmin.declareBinding(binding);
    }
    /**
     * 声明绑定
     * @param binding
     */
    public void addBinding(Binding binding){
        rabbitAdmin.declareBinding(binding);
    }
    /**
     * 解除交换器与队列的绑定
     * @param exchangeType
     * @param exchangeName
     * @param queueName
     * @param routingKey
     * @param isWhereAll
     * @param headers
     */
    public void removeBinding(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){
        Binding binding = bindingBuilder(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers);
        removeBinding(binding);
    }
    /**
     * 解除交换器与队列的绑定
     * @param binding
     */
    public void removeBinding(Binding binding){
        rabbitAdmin.removeBinding(binding);
    }
    /**
     * 创建一个交换器、队列,并绑定队列
     * @param exchangeType
     * @param exchangeName
     * @param queueName
     * @param routingKey
     * @param isWhereAll
     * @param headers
     */
    public void andExchangeBindingQueue(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){
        //声明交换器
        addExchange(exchangeType, exchangeName);
        //声明队列
        addQueue(queueName);
        //声明绑定关系
        addBinding(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers);
    }
    /**
     * 发送消息
     * @param exchange
     * @param routingKey
     * @param object
     */
    public void convertAndSend(String exchange, String routingKey, final Object object){
        rabbitTemplate.convertAndSend(exchange, routingKey, object);
    }
    /**
     * 转换Message对象
     * @param messageType
     * @param msg
     * @return
     */
    public Message getMessage(String messageType, Object msg){
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType(messageType);
        Message message = new Message(msg.toString().getBytes(),messageProperties);
        return message;
    }
    /**
     * 声明交换机
     * @param exchangeType
     * @param exchangeName
     * @return
     */
    private Exchange createExchange(String exchangeType, String exchangeName){
        if(ExchangeType.DIRECT.equals(exchangeType)){
            return new DirectExchange(exchangeName);
        }
        if(ExchangeType.TOPIC.equals(exchangeType)){
            return new TopicExchange(exchangeName);
        }
        if(ExchangeType.HEADERS.equals(exchangeType)){
            return new HeadersExchange(exchangeName);
        }
        if(ExchangeType.FANOUT.equals(exchangeType)){
            return new FanoutExchange(exchangeName);
        }
        return null;
    }
    /**
     * 声明绑定关系
     * @param exchangeType
     * @param exchangeName
     * @param queueName
     * @param routingKey
     * @param isWhereAll
     * @param headers
     * @return
     */
    private Binding bindingBuilder(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){
        if(ExchangeType.DIRECT.equals(exchangeType)){
            return BindingBuilder.bind(new Queue(queueName)).to(new DirectExchange(exchangeName)).with(routingKey);
        }
        if(ExchangeType.TOPIC.equals(exchangeType)){
            return BindingBuilder.bind(new Queue(queueName)).to(new TopicExchange(exchangeName)).with(routingKey);
        }
        if(ExchangeType.HEADERS.equals(exchangeType)){
            if(isWhereAll){
                return BindingBuilder.bind(new Queue(queueName)).to(new HeadersExchange(exchangeName)).whereAll(headers).match();
            }else{
                return BindingBuilder.bind(new Queue(queueName)).to(new HeadersExchange(exchangeName)).whereAny(headers).match();
            }
        }
        if(ExchangeType.FANOUT.equals(exchangeType)){
            return BindingBuilder.bind(new Queue(queueName)).to(new FanoutExchange(exchangeName));
        }
        return null;
    }
    /**
     * 声明队列
     * @param queueName
     * @return
     */
    private Queue createQueue(String queueName){
        return new Queue(queueName);
    }
    /**
     * 交换器类型
     */
    public final static class ExchangeType {
        /**
         * 直连交换机(全文匹配)
         */
        public final static String DIRECT = "DIRECT";
        /**
         * 通配符交换机(两种通配符:*只能匹配一个单词,#可以匹配零个或多个)
         */
        public final static String TOPIC = "TOPIC";
        /**
         * 头交换机(自定义键值对匹配,根据发送消息内容中的headers属性进行匹配)
         */
        public final static String HEADERS = "HEADERS";
        /**
         * 扇形(广播)交换机 (将消息转发到所有与该交互机绑定的队列上)
         */
        public final static String FANOUT = "FANOUT";
    }
}

此致, rabbitMQ 核心操作功能操作已经开发完毕!

2.5、编写队列监听类(静态)

@Slf4j
@Configuration
public class DirectConsumeListener {
    /**
     * 监听指定队列,名称:mq.direct.1
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitListener(queues = "mq.direct.1")
    public void consume(Message message, Channel channel) throws IOException {
        log.info("DirectConsumeListener,收到消息: {}", message.toString());
    }
}

如果你需要监听指定的队列,只需要方法上加上@RabbitListener(queues = "")即可,同时填写对应的队列名称。

但是,如果你想动态监听队列,而不是通过写死在方法上呢?

请看下面介绍!

2.6、编写队列监听类(动态)

重新实例化一个SimpleMessageListenerContainer对象,这个对象就是监听容器。

@Slf4j
@Configuration
public class DynamicConsumeListener {
    /**
     * 使用SimpleMessageListenerContainer实现动态监听
     * @param connectionFactory
     * @return
     */
    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setMessageListener((MessageListener) message -> {
            log.info("ConsumerMessageListen,收到消息: {}", message.toString());
        });
        return container;
    }
}

如果想向SimpleMessageListenerContainer添加监听队列或者移除队列,只需通过如下方式即可操作。

@Slf4j
@RestController
@RequestMapping("/consumer")
public class ConsumerController {
    @Autowired
    private SimpleMessageListenerContainer container;
    @Autowired
    private RabbitUtil rabbitUtil;
    /**
     * 添加队列到监听器
     * @param consumerInfo
     */
    @PostMapping("addQueue")
    public void addQueue(@RequestBody ConsumerInfo consumerInfo) {
        boolean existQueue = rabbitUtil.existQueue(consumerInfo.getQueueName());
        if(!existQueue){
            throw new CommonExecption("当前队列不存在");
        }
        //消费mq消息的类
        container.addQueueNames(consumerInfo.getQueueName());
        //打印监听容器中正在监听到队列
        log.info("container-queue:{}", JsonUtils.toJson(container.getQueueNames()));
    }
    /**
     * 移除正在监听的队列
     * @param consumerInfo
     */
    @PostMapping("removeQueue")
    public void removeQueue(@RequestBody ConsumerInfo consumerInfo) {
        //消费mq消息的类
        container.removeQueueNames(consumerInfo.getQueueName());
        //打印监听容器中正在监听到队列
        log.info("container-queue:{}", JsonUtils.toJson(container.getQueueNames()));
    }
    /**
     * 查询监听容器中正在监听到队列
     */
    @PostMapping("queryListenerQueue")
    public void queryListenerQueue() {
        log.info("container-queue:{}", JsonUtils.toJson(container.getQueueNames()));
    }
}

2.7、发送消息到交换器

发送消息到交换器,非常简单,只需要通过如下方式即可!

  • 先编写一个请求参数实体类
@Data
public class ProduceInfo implements Serializable {
    private static final long serialVersionUID = 1l;
    /**
     * 交换器名称
     */
    private String exchangeName;
    /**
     * 路由键key
     */
    private String routingKey;
    /**
     * 消息内容
     */
    public String msg;
}
  • 编写接口api
@RestController
@RequestMapping("/produce")
public class ProduceController {
    @Autowired
    private RabbitUtil rabbitUtil;
    /**
     * 发送消息到交换器
     * @param produceInfo
     */
    @PostMapping("sendMessage")
    public void sendMessage(@RequestBody ProduceInfo produceInfo) {
        rabbitUtil.convertAndSend(produceInfo.getExchangeName(), produceInfo.getRoutingKey(), produceInfo);
    }
}

当然,你也可以直接使用rabbitTemplate操作类,来实现发送消息。

rabbitTemplate.convertAndSend(exchange, routingKey, message);

参数内容解释

  • exchange:表示交换器名称
  • routingKey:表示路由键key
  • message:表示消息

2.8、交换器、队列维护操作

如果想通过接口对 rabbitMQ 中的交换器、队列以及绑定关系进行维护,通过如下方式接口操作,即可实现!

  • 先编写一个请求参数实体类
@Data
public class QueueConfig implements Serializable{
    private static final long serialVersionUID = 1l;
    /**
     * 交换器类型
     */
    private String exchangeType;
    /**
     * 交换器名称
     */
    private String exchangeName;
    /**
     * 队列名称
     */
    private String queueName;
    /**
     * 路由键key
     */
    private String routingKey;
}
  • 编写接口api
/**
 * rabbitMQ管理操作控制层
 */
@RestController
@RequestMapping("/config")
public class RabbitController {
    @Autowired
    private RabbitUtil rabbitUtil;
    /**
     * 创建交换器
     * @param config
     */
    @PostMapping("addExchange")
    public void addExchange(@RequestBody QueueConfig config) {
        rabbitUtil.addExchange(config.getExchangeType(), config.getExchangeName());
    }
    /**
     * 删除交换器
     * @param config
     */
    @PostMapping("deleteExchange")
    public void deleteExchange(@RequestBody QueueConfig config) {
        rabbitUtil.deleteExchange(config.getExchangeName());
    }
    /**
     * 添加队列
     * @param config
     */
    @PostMapping("addQueue")
    public void addQueue(@RequestBody QueueConfig config) {
        rabbitUtil.addQueue(config.getQueueName());
    }
    /**
     * 删除队列
     * @param config
     */
    @PostMapping("deleteQueue")
    public void deleteQueue(@RequestBody QueueConfig config) {
        rabbitUtil.deleteQueue(config.getQueueName());
    }
    /**
     * 清空队列数据
     * @param config
     */
    @PostMapping("purgeQueue")
    public void purgeQueue(@RequestBody QueueConfig config) {
        rabbitUtil.purgeQueue(config.getQueueName());
    }
    /**
     * 添加绑定
     * @param config
     */
    @PostMapping("addBinding")
    public void addBinding(@RequestBody QueueConfig config) {
        rabbitUtil.addBinding(config.getExchangeType(), config.getExchangeName(), config.getQueueName(), config.getRoutingKey(), false, null);
    }
    /**
     * 解除绑定
     * @param config
     */
    @PostMapping("removeBinding")
    public void removeBinding(@RequestBody QueueConfig config) {
        rabbitUtil.removeBinding(config.getExchangeType(), config.getExchangeName(), config.getQueueName(), config.getRoutingKey(), false, null);
    }
    /**
     * 创建头部类型的交换器
     * 判断条件是所有的键值对都匹配成功才发送到队列
     * @param config
     */
    @PostMapping("andExchangeBindingQueueOfHeaderAll")
    public void andExchangeBindingQueueOfHeaderAll(@RequestBody QueueConfig config) {
        HashMap<String, Object> header = new HashMap<>();
        header.put("queue", "queue");
        header.put("bindType", "whereAll");
        rabbitUtil.andExchangeBindingQueue(RabbitUtil.ExchangeType.HEADERS, config.getExchangeName(), config.getQueueName(), null, true, header);
    }
    /**
     * 创建头部类型的交换器
     * 判断条件是只要有一个键值对匹配成功就发送到队列
     * @param config
     */
    @PostMapping("andExchangeBindingQueueOfHeaderAny")
    public void andExchangeBindingQueueOfHeaderAny(@RequestBody QueueConfig config) {
        HashMap<String, Object> header = new HashMap<>();
        header.put("queue", "queue");
        header.put("bindType", "whereAny");
        rabbitUtil.andExchangeBindingQueue(RabbitUtil.ExchangeType.HEADERS, config.getExchangeName(), config.getQueueName(), null, false, header);
    }
}

至此,rabbitMQ 管理器基本的 crud 全部开发完成!

三、利用 MQ 实现事务补偿

当然,我们花了这么大的力气,绝不仅仅是为了将 rabbitMQ 通过 web 项目将其管理起来,最重要的是能投入业务使用中去!

上面的操作只是告诉我们怎么使用 rabbitMQ

当你仔细回想整个过程的时候,其实还是回到最初那个问题,什么时候使用 MQ ?

以常见的订单系统为例,用户点击【下单】按钮之后的业务逻辑可能包括:支付订单、扣减库存、生成相应单据、发红包、发短信通知等等

在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作拆分出来异步执行,比如发放红包、发短信通知等。这种场景下就可以用 MQ ,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取 MQ 的消息(或者由 MQ 推送消息),当发现 MQ 中有发红包或发短信之类的消息时,执行相应的业务逻辑。

这种是利用 MQ 实现业务解耦,其它的场景包括最终一致性、广播、错峰流控等等。

利用 MQ 实现业务解耦的过程其实也很简单。

  • 当主流程结束之后,将消息推送到发红包、发短信交换器中即可
@Service
public class OrderService {
    @Autowired
    private RabbitUtil rabbitUtil;
    /**
     * 创建订单
     * @param order
     */
    @Transactional
    public void createOrder(Order order){
        //1、创建订单
        //2、调用库存接口,减库存
        //3、向客户发放红包
        rabbitUtil.convertAndSend("exchange.send.bonus", null, order);
        //4、发短信通知
        rabbitUtil.convertAndSend("exchange.sms.message", null, order);
    }
}
  • 监听发红包操作
/**
 * 监听发红包
 * @param message
 * @param channel
 * @throws IOException
 */
@RabbitListener(queues = "exchange.send.bonus")
public void consume(Message message, Channel channel) throws IOException {
    String msgJson = new String(message.getBody(),"UTF-8");
    log.info("收到消息: {}", message.toString());
    //调用发红包接口
}
  • 监听发短信操作
/**
 * 监听发短信
 * @param message
 * @param channel
 * @throws IOException
 */
@RabbitListener(queues = "exchange.sms.message")
public void consume(Message message, Channel channel) throws IOException {
    String msgJson = new String(message.getBody(),"UTF-8");
    log.info("收到消息: {}", message.toString());
    //调用发短信接口
}

既然 MQ 这么好用,那是不是完全可以将以前的业务也按照整个模型进行拆分呢?

答案显然不是!

当引入 MQ 之后业务的确是解耦了,但是当 MQ 一旦挂了,所有的服务基本都挂了,是不是很可怕!

但是没关系,俗话说,兵来将挡、水来土掩,这句话同样适用于 IT 开发者,有坑填坑!

在下篇文章中,我们会详细介绍 rabbitMQ 的集群搭建和部署,保证消息几乎 100% 的投递和消费。

四、总结

本篇主要围绕SpringBoot整合rabbitMQ做内容介绍,可能也有理解不到位的地方,欢迎网友批评指出!

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件 Java 网络架构
|
11天前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
27 6
|
2月前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
6月前
|
消息中间件 Java RocketMQ
消息队列 MQ产品使用合集之当SpringBoot应用因网络不通而启动失败时,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 监控 供应链
深度剖析 RocketMQ 事务消息!
本文深入探讨了 RocketMQ 的事务消息原理及其应用场景。通过详细的源码分析,阐述了事务消息的基本流程,包括准备阶段、提交阶段及补偿机制。文章还提供了示例代码,帮助读者更好地理解整个过程。此外,还讨论了事务消息的优缺点、适用场景及注意事项,如确保本地事务的幂等性、合理设置超时时间等。尽管事务消息增加了系统复杂性,但在需要保证消息一致性的场景中,它仍是一种高效的解决方案。
127 2
|
4月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
7月前
|
消息中间件 存储 RocketMQ
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
|
4月前
|
消息中间件 存储 缓存
RocketMQ发送消息原理(含事务消息)
本文深入探讨了RocketMQ发送消息的原理,包括生产者端的发送流程、Broker端接收和处理消息的流程,以及事务消息的特殊处理机制,提供了对RocketMQ消息发送机制全面的理解。
RocketMQ发送消息原理(含事务消息)
|
5月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
359 1
|
5月前
|
消息中间件 调度 RocketMQ
【RocketMQ系列六】RocketMQ事务消息
【RocketMQ系列六】RocketMQ事务消息
977 1