Spring Boot异步消息之AMQP讲解及实战(附源码)

简介: Spring Boot异步消息之AMQP讲解及实战(附源码)

觉得有帮助请点赞关注收藏~~~

AMQP(高级消息队列协议)是一个提供统一消息服务的应用层标准高级消息队列协议。是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可 传递消息,并不受客户端/中间件的不同产品,不同开发语言等条件的限制。

下面实现主要用RabbitMQ讲解AMQP实例,因此需要事先安装RabbitMQ和erlang语言

erlang下载地址 https://www.erlang.org/downloads

RabbitMQ下载地址 https://www.rabbitmq.com/download.html 

使用RabbitMQ实现发布/订阅异步消息模式

1:创建发布者应用ch8_2Sender

2:在pom.xml文件中添加依赖

<?xml version="1.0" encoding="UTF-8"?>
-<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0">
<modelVersion>4.0.0</modelVersion>
-<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.8.RELEASE</version>
<relativePath/>
<!-- lookup parent from repository -->
</parent>
<groupId>com.ch</groupId>
<artifactId>ch8_2Sender</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>ch8_2Sender</name>
<description>Demo project for Spring Boot</description>
+<properties>
-<dependencies>
-<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
+<dependency>
-<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
-<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
-<build>
-<plugins>
-<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

3:创建Weather实体类

package com.ch.ch8_2Sender.entity;
import java.io.Serializable;
public class Weather implements Serializable{
  private static final long serialVersionUID = -8221467966772683998L;
  private String id;
  private String city;
  private String weatherDetail;
  public String getCity() {
    return city;
  }
  public void setCity(String city) {
    this.city = city;
  }
  public String getWeatherDetail() {
    return weatherDetail;
  }
  public void setWeatherDetail(String weatherDetail) {
    this.weatherDetail = weatherDetail;
  }
  public String getId() {
    return id;
  }
  public void setId(String id) {
    this.id = id;
  }
  @Override
  public String toString() {
    return "Weather [id=" + id + ", city=" + city + ", weatherDetail=" + weatherDetail + "]";
  }
}

4:重写Ch82SenderApplication主类

package com.ch.ch8_2Sender;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.ch.ch8_2Sender.entity.Weather;
import com.fasterxml.jackson.databind.ObjectMapper;
@SpringBootApplication
public class Ch82SenderApplication implements CommandLineRunner{ 
  @Autowired
    private ObjectMapper objectMapper;
  @Autowired 
  RabbitTemplate rabbitTemplate;
  public static void main(String[] args) {
    SpringApplication.run(Ch82SenderApplication.class, args);
  }
  /**
   * 定义发布者
   */
  @Override
  public void run(String... args) throws Exception {
    //定义消息对象
    Weather weather = new Weather();
    weather.setId("010");
    weather.setCity("北京");
    weather.setWeatherDetail("今天晴到多云,南风5-6级,温度19-26°C");
    //指定Json转换器,Jackson2JsonMessageConverter默认将消息转换成byte[]类型的消息
    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    //objectMapper将weather对象转换为JSON字节数组
    Message msg=MessageBuilder.withBody(objectMapper.writeValueAsBytes(weather))
        .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
                .build();
        // 消息唯一ID
        CorrelationData correlationData = new CorrelationData(weather.getId());
    //使用已封装好的convertAndSend(String exchange , String routingKey , Object message, CorrelationData correlationData)
    //将特定的路由key发送消息到指定的交换机
    rabbitTemplate.convertAndSend(
        "weather-exchange", //分发消息的交换机名称
        "weather.message", //用来匹配消息的路由Key
        msg, //消息体
        correlationData);
  }
}

5:创建订阅者应用ch8_2Receiver-1

package com.ch.ch8_2Receiver1;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
 * 定义订阅者Receiver1
 */
@Component
public class Receiver1 {
  @Autowired
    private ObjectMapper objectMapper;
  @RabbitListener(
      bindings = 
      @QueueBinding(
        //队列名weather-queue1保证和别的订阅者不一样
        value = @Queue(value = "weather-queue1",durable = "true"),
        //weather-exchange与发布者的交换机名相同
        exchange = @Exchange(value = "weather-exchange",durable = "true",type = "topic"),
        //weather.message与发布者的消息的路由Key相同
        key = "weather.message"
      ) 
    )
    @RabbitHandler
    public void receiveWeather(@Payload byte[] weatherMessage)throws Exception{
        System.out.println("-----------订阅者Receiver1接收到消息--------");
        //将JSON字节数组转换为Weather对象
        Weather w=objectMapper.readValue(weatherMessage, Weather.class);
        System.out.println("Receiver1收到的消息内容:"+w);
    }
}

6:创建订阅者应用ch8_2Receiver-2

package com.ch.ch8_2Receiver1;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
 * 定义订阅者Receiver2
 */
@Component
public class Receiver2 {
  @Autowired
    private ObjectMapper objectMapper;
  @RabbitListener(
      bindings = 
      @QueueBinding(
        //队列名weather-queue2保证和别的订阅者不一样
        value = @Queue(value = "weather-queue2",durable = "true"),
        //weather-exchange与发布者的交换机名相同
        exchange = @Exchange(value = "weather-exchange",durable = "true",type = "topic"),
        //weather.message与发布者的消息的路由Key相同
        key = "weather.message"
      ) 
    )
    @RabbitHandler
    public void receiveWeather(@Payload byte[] weatherMessage)throws Exception{
        System.out.println("-----------订阅者Receiver2接收到消息--------");
        Weather w=objectMapper.readValue(weatherMessage, Weather.class);
        //将JSON字节数组转换为Weather对象
        System.out.println("Receiver2收到的消息内容:"+w);
    }
}

接下来分别运行发布者和订阅者的主类即可,发现一个发布者发布的消息可以被多个订阅者订阅。

相关文章
|
11天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
39 2
|
27天前
|
数据采集 监控 前端开发
二级公立医院绩效考核系统源码,B/S架构,前后端分别基于Spring Boot和Avue框架
医院绩效管理系统通过与HIS系统的无缝对接,实现数据网络化采集、评价结果透明化管理及奖金分配自动化生成。系统涵盖科室和个人绩效考核、医疗质量考核、数据采集、绩效工资核算、收支核算、工作量统计、单项奖惩等功能,提升绩效评估的全面性、准确性和公正性。技术栈采用B/S架构,前后端分别基于Spring Boot和Avue框架。
|
17天前
|
前端开发 Java 开发者
Spring生态学习路径与源码深度探讨
【11月更文挑战第13天】Spring框架作为Java企业级开发中的核心框架,其丰富的生态系统和强大的功能吸引了无数开发者的关注。学习Spring生态不仅仅是掌握Spring Framework本身,更需要深入理解其周边组件和工具,以及源码的底层实现逻辑。本文将从Spring生态的学习路径入手,详细探讨如何系统地学习Spring,并深入解析各个重点的底层实现逻辑。
41 9
|
26天前
|
JavaScript Java 项目管理
Java毕设学习 基于SpringBoot + Vue 的医院管理系统 持续给大家寻找Java毕设学习项目(附源码)
基于SpringBoot + Vue的医院管理系统,涵盖医院、患者、挂号、药物、检查、病床、排班管理和数据分析等功能。开发工具为IDEA和HBuilder X,环境需配置jdk8、Node.js14、MySQL8。文末提供源码下载链接。
|
1月前
|
自然语言处理 Java API
Spring Boot 接入大模型实战:通义千问赋能智能应用快速构建
【10月更文挑战第23天】在人工智能(AI)技术飞速发展的今天,大模型如通义千问(阿里云推出的生成式对话引擎)等已成为推动智能应用创新的重要力量。然而,对于许多开发者而言,如何高效、便捷地接入这些大模型并构建出功能丰富的智能应用仍是一个挑战。
131 6
|
1月前
|
缓存 NoSQL Java
Spring Boot与Redis:整合与实战
【10月更文挑战第15天】本文介绍了如何在Spring Boot项目中整合Redis,通过一个电商商品推荐系统的案例,详细展示了从添加依赖、配置连接信息到创建配置类的具体步骤。实战部分演示了如何利用Redis缓存提高系统响应速度,减少数据库访问压力,从而提升用户体验。
85 2
|
1月前
|
Java Spring
Spring底层架构源码解析(三)
Spring底层架构源码解析(三)
115 5
|
1月前
|
XML Java 数据格式
Spring底层架构源码解析(二)
Spring底层架构源码解析(二)
|
1月前
|
XML Java 数据格式
Spring IOC容器的深度解析及实战应用
【10月更文挑战第14天】在软件工程中,随着系统规模的扩大,对象间的依赖关系变得越来越复杂,这导致了系统的高耦合度,增加了开发和维护的难度。为解决这一问题,Michael Mattson在1996年提出了IOC(Inversion of Control,控制反转)理论,旨在降低对象间的耦合度,提高系统的灵活性和可维护性。Spring框架正是基于这一理论,通过IOC容器实现了对象间的依赖注入和生命周期管理。
71 0
|
6月前
|
存储 XML 缓存
【深入浅出Spring原理及实战】「缓存Cache开发系列」带你深入分析Spring所提供的缓存Cache功能的开发实战指南(一)
【深入浅出Spring原理及实战】「缓存Cache开发系列」带你深入分析Spring所提供的缓存Cache功能的开发实战指南
464 0
下一篇
无影云桌面