【SpringCloud】SpringAMQP消息发送、消息接收(WorkQueue模型、FanoutExchange、DirectExchange、TopicExchange、消息转换器)

简介: 【SpringCloud】SpringAMQP消息发送、消息接收(WorkQueue模型、FanoutExchange、DirectExchange、TopicExchange、消息转换器)

一、SpringAMQP基本介绍

  • AMQP(Advanced Message Queuing Protocol),是用于在应用程序或之间传递业务消息的开放标准,该协议与语言和平台无关,更符合微服务中独立性的要求。
  • Spring AMQP 是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit;是底层的默认实现。
  • Spring AMQP官网:https://spring.io/projects/spring-amqp
  • 特征:
  • 用于异步处理入站消息的侦听器容器
  • 用于发送和接收消息的 RabbitTemplate
  • RabbitAdmin 用于自动声明队列、交换和绑定

二、利用SpringAMQP实现HelloWorld中的基础消息队列功能

2.1 在父工程中引入spring-amqp的依赖

因为publisher和consumer服务都需要amqp依赖,因此这里把依赖直接放到父工程mq-demo中

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.2 在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列

  1. 在publisher服务中编写application.yml,添加mq连接信息
spring:
  rabbitmq:
    host: 192.168.88.129  # rabbitmq所在的虚拟主机ip地址
    port: 5672            # 端口
    username: yangjian    # 用户名
    password: 123321      # 密码
    virtual-host: /       # 虚拟主机
  1. 在publisher服务中新建一个测试类,编写测试方法
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.HashMap;
import java.util.Map;
/**
 * @Author YJ
 * @Date 2024/3/23 10:15
 * Description:测试 spring amqp
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSendMessage2SimpleQueue() {
        String queueName = "simple.queue";
        String message = "hello, spring amqp!";
        rabbitTemplate.convertAndSend(queueName,message);
    }
}



2.3 在consumer服务中编写消费逻辑,绑定simple.queue这个队列

  1. 在consumer服务中编写application.yml,添加mq连接信息
spring:
  rabbitmq:
    host: 192.168.88.129  # rabbitmq所在的虚拟主机ip地址
    port: 5672            # 端口
    username: yangjian    # 用户名
    password: 123321      # 密码
    virtual-host: /       # 虚拟主机
  1. 在consumer服务中新建一个类,编写消费逻辑
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalTime;
import java.util.Map;
/**
 * @Author YJ
 * @Date 2023/3/23 10:35
 * Description:接收消息
 */
@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) {
        System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
    }
}



三、WorkQueue工作队列(案例:模拟WorkQueue,实现一个队列绑定多个消费者)

工作队列 可以提高消息处理速度,避免队列消息堆积

3.1 在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue

@Test
public void testSendMessage2WorkQueue() throws InterruptedException {
    String queueName = "simple.queue";
    String message = "hello, message__";
    for (int i = 1; i <= 50; i++) {
        rabbitTemplate.convertAndSend(queueName,message + i);
        //每发送一条睡眠20ms
        Thread.sleep(20);
    }
}

3.2 在consumer服务中定义两个消息监听者,都监听simple.queue队列

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
    System.out.println("消费者1接收到simple.queue的消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
    System.err.println("消费者2........接收到simple.queue的消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(200);
}

3.3 消费者1每秒处理50条消息,消费者2每秒处理10条消息




这里发现发送、接收消息已经远远超出了1秒,而且消费者1处理的都是奇数消息,消费者2处理的都是偶数消息,这是因为这种分配方式没有考虑两个消费者的能力,它是将消息平均分配给两个消费者的,这是源于rabbitmq底层有一个消息预取机制,这个机制是当消息发送到队列中后,队列会对消息进行投递,消费者的channel通道会提前把消息获取到,当消费者处理消息的时间不同时,处理消息快的消费者会提前结束,处理消息慢的消费者所耗费的时间就更久一些,这样就导致总的处理时间超出了。

消费预取限制:修改application.yml文件,设置preFetch这个值,可以控制预取消息的上限

spring:
  rabbitmq:
    host: 192.168.88.129  # rabbitmq的ip地址
    port: 5672            # 端口
    username: yangjian    # 用户名
    password: 123321      # 密码
    virtual-host: /       # 虚拟主机
    listener:
      simple:
        prefetch: 1       # 每次只能获取一条消息,处理完成才能获取下一个消息



四、发布(Publish)、订阅(Subscribe)

  • 发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。
  • 常见的exchange类型包括:
  • Fanout:广播
  • Direct:路由
  • Topic:话题

exchange负责消息路由,而不是存储,路由失败则消息丢失



4.1 Fanout Exchange

Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue



案例:利用SpringAMQP演示FanoutExchangel的使用

4.1.1 在consumer服务中,利用代码声明队列、交换机,并将两者绑定

  • 在consumer服务常见一个类,添加@Configuration注解,并声明FanoutExchange、Queue和绑定关系对象Binding,代码如下:
package cn.itcast.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @Author YJ
 * @Date 2024/3/23 13:35
 * Description:队列绑定交换机
 */
@Configuration
public class FanoutConfig {
    //practice.fanout
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("practice.fanout");
    }
    //fanout.queue1
    @Bean
    public Queue fanoutQueue1() {
        return new Queue("fanout.queue1");
    }
    //绑定队列1到交换机
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange) {
        return BindingBuilder
                .bind(fanoutQueue1)
                .to(fanoutExchange);
    }
    //fanout.queue2
    @Bean
    public Queue fanoutQueue2() {
        return new Queue("fanout.queue2");
    }
    //绑定队列2到交换机
    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2,FanoutExchange fanoutExchange) {
        return BindingBuilder
                .bind(fanoutQueue2)
                .to(fanoutExchange);
    }
}

4.1.2 在consumer)服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) throws InterruptedException {
    System.out.println("消费者接收到fanout.queue1的消息:【" + msg + "】");
    Thread.sleep(200);
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) throws InterruptedException {
    System.out.println("消费者接收到fanout.queue2的消息:【" + msg + "】");
    Thread.sleep(200);
}

4.1.3 在publisher中编写测试方法,向practice.fanout发送消息

@Test
public void testSendFanoutExchange() {
    //交换机名称
    String exchange = "practice.fanout";
    //消息
    String message = "hello everyone!";
    //发送消息
    rabbitTemplate.convertAndSend(exchange,"",message);
}


4.2 Direct Exchange

Direct Exchange会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange:将消息路由到BindingKey.与消息RoutingKey一致的队列


案例:利用SpringAMQP演示DirectExchangel的使用

4.2.1 利用@RabbitListenerj声明Exchange、Queue、RoutingKey(同4.2.2)

4.2.2 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue1"),
        exchange = @Exchange(name = "practice.direct",type = ExchangeTypes.DIRECT),
        key = {"red","blue"}
))
public void listenDirectQueue1(String msg) {
    System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue2"),
        exchange = @Exchange(name = "practice.direct",type = ExchangeTypes.DIRECT),
        key = {"red","yellow"}
))
public void listenDirectQueue2(String msg) {
    System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}

4.2.3 在publishert中编写测试方法,向practice.direct发送消息

@Test
public void testSendDirectExchange() {
    //交换机名称
    String exchange = "practice.direct";
    //消息
    String message = "hello blue!";
    //发送消息
    rabbitTemplate.convertAndSend(exchange,"blue",message);
}


4.3 Topic Exchange

TopicExchange.与DirectExchange类似,区别在于routing Key必须是多个单词的列表,并且以 . 分割。

  • Queue.与Exchange:指定BindingKey时可以使用通配符
  • #:代指0个或多个单词
  • *:代指一个单词


案例:利用SpringAMQP演示TopicExchange的使用

4.3.1 并利用@RabbitListener声明Exchange、Queue、RoutingKey(同4.3.2)

4.3.2 在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue1"),
        exchange = @Exchange(value = "practice.topic",type = ExchangeTypes.TOPIC),
        key = "china.#"
))
public void listenTopicQueue1(String msg) {
    System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue2"),
        exchange = @Exchange(value = "practice.topic",type = ExchangeTypes.TOPIC),
        key = "#.news"
))
public void listenTopicQueue2(String msg) {
    System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}

4.3.3 在publisher中编写测试方法,向practice.topic发送消息

@Test
public void testSendTopicExchange() {
    //交换机名称
    String exchange = "practice.topic";
    //消息
    String message = "天津,晴~";
    //发送消息
    rabbitTemplate.convertAndSend(exchange,"china.weathers",message);
}


五、SpringAMQP - 消息转换器

案例:测试发送Object类型消息

在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。

5.1 声明Bean

@Bean
public Queue objectQueue() {
    return new Queue("object.queue");
}

5.2 发送消息

@Test
public void testSendObjectQueue() {
    Map<String,Object> msg = new HashMap<>();
    msg.put("name","YJ");
    msg.put("age",22);
    rabbitTemplate.convertAndSend("object.queue",msg);
}


消息转换器

Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。

如果要修改只需要定义一个MessageConverter类型的Bean即可,推荐用JSON方式序列化,步骤如下

  • 在publisher服务中引入依赖
<!--jackson依赖(消息转换器)-->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>
  • 在publisher服务声明MessageConverter
@Bean
public MessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
}


5.3 接收消息

  • 在consumer服务中引入依赖
<!--jackson依赖(消息转换器)-->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>
  • 在consumer服务声明MessageConverter
@Bean
public MessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
}
  • 然后定义一个消费者,监听object.queue队列并消费消息
@RabbitListener(queues = "object.queue")
public void listenObjectQueue(Map<String,Object> msg) {
    System.out.println("接收到object.queue的消息:" + msg);
}


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
8月前
|
存储 SpringCloudAlibaba Nacos
SpringCloud Alibaba核心组件Nacos【服务多级存储模型&配置集群】第2章(上)
SpringCloud Alibaba核心组件Nacos【服务多级存储模型&配置集群】第2章
SpringCloud Alibaba核心组件Nacos【服务多级存储模型&配置集群】第2章(上)
|
30天前
|
存储 Nacos 数据安全/隐私保护
【SpringCloud】Nacos的安装、Nacos注册、Nacos服务多级存储模型
【SpringCloud】Nacos的安装、Nacos注册、Nacos服务多级存储模型
46 1
|
8月前
|
存储 SpringCloudAlibaba JavaScript
SpringCloud Alibaba核心组件Nacos【服务多级存储模型&配置集群】第2章(下)
SpringCloud Alibaba核心组件Nacos【服务多级存储模型&配置集群】第2章
|
存储 SpringCloudAlibaba JavaScript
SpringCloud Alibaba核心组件Nacos【服务多级存储模型&配置集群】第2章
本章主要诠释:服务集群属性和配置,服务分级存储是什么,什么是集群,为什么要引物服务分级,解决什么问题
SpringCloud Alibaba核心组件Nacos【服务多级存储模型&配置集群】第2章
|
开发框架 数据安全/隐私保护 微服务
SpringCloud微服务实战——搭建企业级开发框架(二十一):基于RBAC模型的系统权限设计
RBAC(基于角色的权限控制)模型的核心是在用户和权限之间引入了角色的概念。取消了用户和权限的直接关联,改为通过用户关联角色、角色关联权限的方法来间接地赋予用户权限,从而达到用户和权限解耦的目的,RBAC介绍原文链接。 RABC的好处
326 0
SpringCloud微服务实战——搭建企业级开发框架(二十一):基于RBAC模型的系统权限设计
|
存储 安全 前端开发
SpringCloud 源码剖析(五)Eureka源码之服务端接收注册信息
SpringCloud 源码剖析(五)Eureka源码之服务端接收注册信息
145 0
SpringCloud 源码剖析(五)Eureka源码之服务端接收注册信息
|
22天前
|
人工智能 Java Spring
使用 Spring Cloud Alibaba AI 构建 RAG 应用
本文介绍了RAG(Retrieval Augmented Generation)技术,它结合了检索和生成模型以提供更准确的AI响应。示例中,数据集(包含啤酒信息)被加载到Redis矢量数据库,Spring Cloud Alibaba AI Starter用于构建一个Spring项目,演示如何在接收到用户查询时检索相关文档并生成回答。代码示例展示了数据加载到Redis以及RAG应用的工作流程,用户可以通过Web API接口进行交互。
52328 61
|
5天前
|
监控 Java 应用服务中间件
替代 Hystrix,Spring Cloud Alibaba Sentinel 快速入门
替代 Hystrix,Spring Cloud Alibaba Sentinel 快速入门
|
21天前
|
消息中间件 Java 持续交付
Spring Cloud Alibaba 项目搭建步骤和注意事项
Spring Cloud Alibaba 项目搭建步骤和注意事项
181 0
Spring Cloud Alibaba 项目搭建步骤和注意事项
|
19天前
|
存储 SpringCloudAlibaba 关系型数据库
springcloud alibaba(5)
springcloud alibaba
96 0