RabbitMQ特殊应用

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: RabbitMQ特殊应用

一、简介

按照现有rabbitMQ的相关知识,⽣产者会发送消息到达消息服务器。但是在实际⽣产环境下,消息⽣产者发送的消息很有可能当到达了消息服务器之后,由于消息服务器的问题导致消息丢失,如宕机。因为消息服务器默认会将消息存储在内存中。⼀旦消息服务器宕机,则消息会产⽣丢失。因此要保证⽣产者的消息不丢失,要开始持久化策略。

rabbitMQ持久化:
 1. 交换机持久化
 2. 队列持久化
 3. 消息持久化

RabbitMQ数据保护机制:

事务机制

事务机制采⽤类数据库的事务机制进⾏数据保护,当消息到达消息服务器,⾸先会开启⼀个事务,接着进⾏数据磁盘持久化,只有持久化成功才会进⾏事务提交,向消息⽣产者返回成功通知,消息⽣产者⼀旦接收成功通知则不会再发送此条消息。当出现异常,则返回失败通知.消息⽣产者⼀旦接收失败通知,则继续发送该条消息。 事务机制虽然能够保证数据安全,但是此机制采⽤的是同步机制,会产⽣系统间消息阻塞,影响整个系统的消息吞吐量。从⽽导致整个系统的性能下降,因此不建议使⽤。

confirm机制

confirm模式需要基于channel进⾏设置, ⼀旦某条消息被投递到队列之后,消息队列就会发送⼀个确认信息给⽣产者,如果队列与消息是可持久化的, 那么确认消息会等到消息成功写⼊到磁盘之后发出。 confirm的性能⾼,主要得益于它是异步的.⽣产者在将第⼀条消息发出之后等待确认消息的同时也可以继续发送后续的消息.当确认消息到达之后,就可以通过回调⽅法处理这条确认消息. 如果MQ服务宕机了,则会返回nack消息. ⽣产者同样在回调⽅法中进⾏后续处理。

二、必达消息(confirm)

1、原理

基于实现的ConfirmCallback接口,假如RabbitMQ收到消息后,会回调实现这个接口的类。

@FunctionalInterface
public interface ConfirmCallback {
    void confirm(@Nullable CorrelationData var1, boolean var2, @Nullable String var3);
}

2、pom.xml

# 开启confirm机制
spring.rabbitmq.publisher-returns=true

3、配置类

@Configuration
public class RabbitMQConfig {
    //声明队列,并开启持久化
    @Bean
    public Queue queue() {
        /**
         * 第⼀个参数:队列名称
         * 第⼆个参数:是否开启队列持久化
         */
        return new Queue("seckill_order", true);
    }
}

4、业务实现

@Override
public boolean add(Long id, String time, String username) {
    //发送消息(消息必达)
    customMessageSender.sendMessage("", "seckill_order", JSON.toJSONString(seckillOrder));
}

5、必达工具类

import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
 * ⾃定义消息发送类
 * 增强RabbitTemplate
 */
@Component
public class CustomMessageSender implements RabbitTemplate.ConfirmCallback {
    static final Logger log = LoggerFactory.getLogger(CustomMessageSender.class);
    private static final String MESSAGE_CONFIRM_ = "message_confirm_";
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private RedisTemplate redisTemplate;
    /**
     * 构造⽅法
     *
     * @param rabbitTemplate
     */
    public CustomMessageSender(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this);
    }
    /**
     * ⽣产者通知回调⽅法
     *
     * @param correlationData 唯⼀标识
     * @param ack             成功/失败
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            //返回成功通知
            //删除redis中的相关数据
            redisTemplate.delete(correlationData.getId());
            redisTemplate.delete(MESSAGE_CONFIRM_ + correlationData.getId());
        } else {
            //返回失败通知
            Map<String, String> map = (Map<String, String>) redisTemplate.opsForHash().entries(MESSAGE_CONFIRM_ + correlationData.getId());
            String exchange = map.get("exchange");
            String routingKey = map.get("routingKey");
            String sendMessage = map.get("sendMessage");
            //重新发送
            rabbitTemplate.convertAndSend(exchange, routingKey, JSON.toJSONString(sendMessage));
        }
    }
    /**
     * ⾃定义发送⽅法
     *
     * @param exchange   交换器
     * @param routingKey 路由键
     * @param message    消息内容
     */
    public void sendMessage(String exchange, String routingKey, String message) {
        //设置消息唯⼀标识并存⼊缓存
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        redisTemplate.opsForValue().set(correlationData.getId(), message);
        //本次发送到相关元信息存⼊缓存
        Map<String, String> map = new HashMap<>();
        map.put("exchange", exchange);
        map.put("routingKey", routingKey);
        map.put("sendMessage", message);
        redisTemplate.opsForHash().putAll(MESSAGE_CONFIRM_ + correlationData.getId(),
                map);
        //携带唯⼀标识发送消息
        rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
    }
}

三、成功后回执

1、原理

⾃动应答机制: 消息消费者成功接收到消息后,会进⾏消费并⾃动通知消息服务器将该条消息删除。

手动应答机制: 只有在消息消费者将消息处理完,才会通知消息服务器将该条消息删除

消费者发起成功通知

  • DeliveryTag: 消息的唯⼀标识 channel+消息编号
  • 第⼆个参数:是否开启批量处理。false:不开启批量
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

返回失败通知

  • 第⼀个booleantrue 所有消费者都会拒绝这个消息。false代表只有当前消费者拒绝。
  • 第⼆个booleantrue当前消息会进⼊到死信队列。false重新回到原有队列中,默认回到头部。
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);

2、pom.xml

# 关闭自动提交
spring.rabbitmq.listener.simple.acknowledge-mode=manual

3、成功与失败处理机制

package com.lydms.demorabbitmq.client;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class SecKillOrderListener {
    @RabbitListener(queues = "cancel_order_queue")
    public void receiveSecKillOrderMessage(Channel channel, Message message) {
        boolean result = true;
        if (result) {
            /**
             * 更新数据库操作成功
             * 消费者发起成功通知
             * DeliveryTag: 消息的唯⼀标识 channel+消息编号
             * 第⼆个参数:是否开启批量处理 false:不开启批量
             */
            try {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (IOException e) {
                e.printStackTrace();
            }
        } else {
            /**
             * 返回失败通知
             * 第⼀个boolean true所有消费者都会拒绝这个消息,false代表只有当前消费者拒绝
             * 第⼆个boolean true当前消息会进⼊到死信队列,false重新回到原有队列中,默认回到头部
             */
            try {
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

四、流量削峰

在秒杀这种⾼并发的场景下,每秒都有可能产⽣⼏万甚⾄⼗⼏万条消息,如果没有对消息处理量进⾏任何限制的话,很有可能因为过多的消息堆积从⽽导致消费者宕机的情况。因此官⽹建议对每⼀个消息消费者都设置处理消息总数(消息抓取总数)。

消息抓取总数的值,设置过⼤或者过⼩都不好,过⼩的话,会导致整个系统消息吞吐能⼒下降,造成性能浪费。过⼤的话,则很有可能导致消息过多,导致整个系统OOM(out of memory)内存溢出。因此官⽹建议每⼀个消费者将该值设置在100-300之间。

@RabbitListener(queues = "cancel_order_queue")
public void receiveSecKillOrderMessage(Channel channel, Message message) {
    // 预抓取总数
    try {
        channel.basicQos(300);
    } catch (IOException e) {
        e.printStackTrace();
    }
}
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
传感器 监控 物联网
golang开源的可嵌入应用程序高性能的MQTT服务
golang开源的可嵌入应用程序高性能的MQTT服务
201 3
|
3月前
|
消息中间件 存储 监控
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
34 1
|
6月前
|
消息中间件 存储 网络协议
我们一起来学RabbitMQ 二:RabbiMQ 的 6 种模式的基本应用
我们一起来学RabbitMQ 二:RabbiMQ 的 6 种模式的基本应用
|
3月前
|
网络协议 Go 数据安全/隐私保护
golang开源的可嵌入应用程序高性能的MQTT服务
golang开源的可嵌入应用程序高性能的MQTT服务
253 2
|
8月前
|
消息中间件 运维 Java
RocketMQ的常规运维实践应用
RocketMQ的常规运维实践应用
483 1
|
4月前
|
消息中间件 监控 负载均衡
Kafka高级应用:如何配置处理MQ百万级消息队列?
在大数据时代,Apache Kafka作为一款高性能的分布式消息队列系统,广泛应用于处理大规模数据流。本文将深入探讨在Kafka环境中处理百万级消息队列的高级应用技巧。
178 0
|
5月前
|
消息中间件 缓存 NoSQL
[中间件] 秒杀系统秒杀率提高300%?教你如何利用redis和rabbitmq 优化应用!
[中间件] 秒杀系统秒杀率提高300%?教你如何利用redis和rabbitmq 优化应用!
171 0
|
5月前
|
消息中间件 Java
RabbitMQ【应用 01】SpringBoot集成RabbitMQ及设置RabbitMQ启动总开关
RabbitMQ【应用 01】SpringBoot集成RabbitMQ及设置RabbitMQ启动总开关
94 0
|
6月前
|
消息中间件 网络协议 物联网
Golang微服务框架Kratos应用MQTT消息队列
MQTT 协议 是由`IBM`的`Andy Stanford-Clark博士`和`Arcom`(已更名为Eurotech)的`Arlen Nipper博士`于 1999 年发明,用于石油和天然气行业。工程师需要一种协议来实现最小带宽和最小电池损耗,以通过卫星监控石油管道。最初,该协议被称为消息队列遥测传输,得名于首先支持其初始阶段的 IBM 产品 MQ 系列。2010 年,IBM 发布了 MQTT 3.1 作为任何人都可以实施的免费开放协议,然后于 2013 年将其提交给结构化信息标准促进组织 (OASIS) 规范机构进行维护。2019 年,OASIS 发布了升级的 MQTT 版本 5。
45 0
|
6月前
|
消息中间件 存储 中间件
Golang微服务框架Kratos应用RabbitMQ消息队列
RabbitMQ是一套开源(MPL)的消息队列服务软件,是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成。
85 1