RabbitMQ消息队列

简介: RabbitMQ消息队列

RabbitMQ简介

RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现

11a67eee6064ccee9c15cdb2a41cbaba_202110161038857.png

核心概念

Message

消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成, 这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可 能需要持久性存储)等。

Publisher

消息的生产者,也是一个向交换器发布消息的客户端应用程序。

Exchange类型

交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

Exchange有4种类型:direct(默认),fanout,topic,和headers,不同类型的Exchange转发消息的策略有所区别

Direct Exchange

消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routingkey 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard” 等等。它是完全匹配、单播的模式。

43a2ccc847a65610e866196606de7c49_202110171149184.png

Fanout Exchange

每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。

4ad8f596894161c53f3bb25f2a49532a_202110171150590.png

Topic Exchange

topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。 它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“”和符号 。匹配0个或多个单词,匹配一个单词。

f735e59095701c5420ccc9c4a733c4e3_202110171154960.png

Queue

消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直 在队列里面,等待消费者连接到这个队列将其取走。

Binding

绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交 换器理解成一个由绑定构成的路由表。

Exchange和Queue的绑定可以是多对多的关系。

Connection

网络连接,比如一个TCP连接。

Channel

信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP命令都是通过信道 发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁TCP都 是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。

Consumer

消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

Virtual Host

虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加 密环境的独立服务器域。每个 vhost 本质上就是一个mini版的RabbitMQ 服务器,拥 有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在连接时 指定,RabbitMQ 默认的vhost是/。

Broker

表示消息队列服务器实体

Docker安装RabbitMQ

docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p  25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

访问15672端口

83d5cb64b563983b603d5aca1af0cc16_202110161037384.png

https://www.rabbitmq.com/networking.html

SpringCloud整合RabbitMQ

引入RabbitMQ包

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

引入RabbitMQ,RabbitAutoConfiguration就会自动生效

给容器中自动配置了RabbitTemplate、AmqpAdmin等等

配置文件

spring.rabbitmq.host=192.168.195.100
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/

测试类

@Slf4j
@SpringBootTest
class GulimallOrderApplicationTests {
    @Autowired
    AmqpAdmin amqpAdmin;
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Test
    void sendMessageTest() {
        //因为存到rabbit中是经过序列化的,所以加上配置转成json发出去
        OrderReturnReasonEntity orderReturnReasonEntity=new OrderReturnReasonEntity();
        orderReturnReasonEntity.setId(1L);
        orderReturnReasonEntity.setCreateTime(new Date());
        rabbitTemplate.convertAndSend("hello-java-exchange","hello-java",orderReturnReasonEntity);
        log.info("消息发送成功");
    }
    @Test
    void createExchange() {
        //创建了一个Direct类型的交换机  是否持久化 是否自动删除
        DirectExchange directExchange=new DirectExchange("hello-java-exchange",true,false);
        amqpAdmin.declareExchange(directExchange);
        log.info("Exchange创建成功");
    }
    @Test
    void createQueue() {
        Queue queue=new Queue("hello-java-Queue",true,false,false);
        amqpAdmin.declareQueue(queue);
        log.info("Queue创建成功");
    }
    @Test
    void createBinding() {
        //将exchange指定的交换机和Directnation目的地进行绑定,使用routingkey作为路由键
        Binding binding=new Binding("hello-java-Queue",
                Binding.DestinationType.QUEUE,
                "hello-java-exchange",
                "hello-java",null);
        amqpAdmin.declareBinding(binding);
        log.info("Binding绑定成功");
    }
}

先创建交换机,然后创建对队列,绑定路由键,利用rabbitTemplate发送消息

b36aea539772377fd45d9de88ebe7cc5_202110171023126.png

@RabbitListenter&@RabbitHandler接收消息

@RabbitListenter监听消息

@RabbitListener(queues = {"hello-java-Queue"})
    public  void  recieveMessage(Message message, OrderReturnReasonEntity content, Channel channel)
    {
        System.out.println("接收到消息内容:"+message+"内容==》"+content);
    }

c3aa6527223279a381a1ca08b4d2b021_202110171106166.png

如果有多个客户端,只有一个会收到消息,并且只有当一个消息处理完才会收到下一个消息

如果需要监听一个队列里的多个消息,消息的类型都不一样利用@RabbitHandler

监听hello-java-Queue队列里不同的消息

@RabbitListener(queues = {"hello-java-Queue"})
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {
    @RabbitHandler
    public  void  recieveMessage(Message message, OrderReturnReasonEntity content)
    {
        System.out.println("接收到消息内容:"+message+"内容==》"+content);
    }
    @RabbitHandler
    public  void  recieveMessage2(OrderEntity orderEntity)
    {
        System.out.println("接收到消息内容:"+orderEntity);
    }
}

控制器

@Slf4j
@Controller
public class RabbitController {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @GetMapping("/sendMq")
    public String sendMessageTest() {
        for (int i = 0; i < 10; i++) {
            if(i%2==0) {
                //因为存到rabbit中是经过序列化的,所以加上配置转成json发出去
                OrderReturnReasonEntity orderReturnReasonEntity=new OrderReturnReasonEntity();
                orderReturnReasonEntity.setId(1L);
                orderReturnReasonEntity.setCreateTime(new Date());
                rabbitTemplate.convertAndSend("hello-java-exchange","hello-java",orderReturnReasonEntity);
                log.info("消息发送成功");
            }
            else {
                OrderEntity orderEntity=new OrderEntity();
                orderEntity.setOrderSn(UUID.randomUUID().toString());
                rabbitTemplate.convertAndSend("hello-java-exchange","hello-java",orderEntity);
                log.info("消息发送成功");
            }
        }
        return  "";
    }
}

52e8ab956fab2f70b09635d147bf96f8_202110171125005.png

RabbitMQ消息确认机制-可靠抵达

保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍,为此引入确认机制

publisher confirmCallback 确认模式

publisher returnCallback 未投递到 queue 退回模式

consumer ack机制

0865930060566b3f4b7e2ed9954f6e2b_202110171213896.png

可靠抵达-ConfirmCallback

如果要使用confirmCallback ,需要配置

#开启发送端确认
spring.rabbitmq.publisher-confirm-type=correlated
  1. 在创建 connectionFactory 的时候设置 PublisherConfirms(true) 选项,开启 confirmcallback 。
  2. CorrelationData:用来表示当前消息唯一性。
  3. 生产者只要把消息发送给Broker,消息只要被 broker 接收到就会执行 confirmCallback,如果是 cluster 模式,需要所有 broker 接收到才会调用 confirmCallback。
  4. 被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递到目标 queue 里。所以需要用到接下来的 returnCallback 。
@PostConstruct
    public  void initRabbitTemplate()
    {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /*
            * @param correlationData 当前消息的唯一关联数据(消息的唯一id)
            * @param ack 消息是否成功收到
            * @param cuase 失败的原因
            * */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cuase) {
                System.out.println("confirm... cottrlationData"+correlationData+",[ack]"+ack+",cuase"+cuase);
            }
        });
    }

可靠抵达-ReturnCallback

开启发送消息抵达队列的确认

spring.rabbitmq.publisher-returns=true
#只要抵达队列,以异步发动有限回调我们这个returnconfig
spring.rabbitmq.template.mandatory=true

只有当消息没有抵达队列才会触发方法

@PostConstruct
    public  void initRabbitTemplate()
    {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /*
            * @param correlationData 当前消息的唯一关联数据(消息的唯一id)
            * @param ack 消息是否成功收到
            * @param cuase 失败的原因
            * */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cuase) {
                System.out.println("confirm... cottrlationData"+correlationData+",[ack]"+ack+",cuase"+cuase);
            }
        });
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            //投递失败的详细信息 回复的状态码 回复的文本内容 当时这个消息给给哪个交换机 当时消息的路由键
            @Override
            public void returnedMessage(Message message, int replaycode, String replytext, String exchange, String routekey) {
                System.out.println("Fail...message"+message+",[replaycode]"+replaycode+",[replytext]"+replytext+",[exchange]"+exchange+",[routekey]"+routekey);
            }
        });
    }

可靠抵达-Ack消息确认机制

在不开启手动确认的时候,发送消息突然服务器关机会导致消息丢失,因此需要开启手动模式保证消息的可达性

#手动确认消息达到
spring.rabbitmq.listener.simple.acknowledge-mode=manual

消费者手动确认模式下 只要没有明确确认消息,就一直是unached状态,即使关机 消息也不会丢失,会重新变为Ready

@RabbitHandler
    public  void  recieveMessage(Message message, OrderReturnReasonEntity content, Channel channel) throws IOException {
        System.out.println("接收到消息内容:"+message+"内容==》"+content);
        long deliveryTag=message.getMessageProperties().getDeliveryTag();
        //签收消息
        channel.basicAck(deliveryTag,false);//true就是重新发回服务器
        System.out.println("消息签收"+deliveryTag);
    }
    @RabbitHandler
    public  void  recieveMessage2(Message message,OrderEntity orderEntity,Channel channel) throws IOException {
        System.out.println("接收到消息内容:"+orderEntity);
        long deliveryTag=message.getMessageProperties().getDeliveryTag();
        //签收消息
        channel.basicNack(deliveryTag,false,true);// 退货  true就是重新发回服务器
        System.out.println("没有签收"+deliveryTag);
    }

消息处理成功,ack(),接受下一个消息,此消息broker就会移除

消息处理失败,nack()/reject(),重新发送给其他人进行处理,或者容错处理后ack

消息一直没有调用ack/nack方法,broker认为此消息正在被处理,不会投递给别人,此时客户端断开,消息不会被broker移除,会投递给别人

如何签收

channel.basicAck(deliveryTag,false) 签收
channel.basicNack(deliveryTag,false,true); 拒签


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
25天前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
66 6
|
20天前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
24天前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
1月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
67 7
|
1月前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
1月前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
64 4
|
2月前
|
消息中间件 监控 数据处理
解决方案 | 云消息队列RabbitMQ实践
解决方案 | 云消息队列RabbitMQ实践
52 1
|
1月前
|
消息中间件 监控 测试技术
云消息队列RabbitMQ实践 - 评测
根据反馈,对本解决方案的实践原理已有一定理解,描述整体清晰但需在消息队列配置与使用上增加更多示例和说明以助理解。部署体验中获得了一定的引导和文档支持,尽管文档仍有待完善;期间出现的配置文件错误及依赖库缺失等问题已通过查阅资料解决。设计验证展示了云消息队列RabbitMQ的核心优势,包括高可用性和灵活性,未来可通过增加自动化测试来提高系统稳定性。实践后,用户对方案解决问题的能力及适用场景有了明确认识,认为其具有实际生产价值,不过仍需在性能优化、安全性增强及监控功能上进行改进以适应高并发和大数据量环境。
42 0
|
4月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。

相关产品

  • 云消息队列 MQ