RabbitMq 集成 spring boot 消息队列 入门Demo

简介:

spring boot 集成 RabbitMq还是很方便的。现在来一个简单的例子来集成rabbitmq。入门demo。

主要概念:

其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定。


虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单,RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机“/”。

交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。 这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。

绑定:也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系。


首先是配制文件。

1
2
3
4
5
#spring.application.name=spring-boot-rabbitmq
spring.rabbitmq.host= 127.0 . 0.1
spring.rabbitmq.port= 5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest


发送者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package  com.basic.rabbitmq.send;
 
import  com.basic.rabbitmq.configuration.RabbitMqConfig2;
import  org.springframework.amqp.core.AmqpTemplate;
import  org.springframework.beans.factory.annotation.Autowired;
import  org.springframework.stereotype.Component;
import  org.springframework.stereotype.Service;
 
import  java.util.Date;
 
/**
  * Created by sdc on 2017/6/17.
  */
@Service ( "helloSender" )
public  class  HelloSender {
 
     @Autowired
     private  AmqpTemplate amqpTemplate;
 
//    private Rabbitt
 
     public  void  send() {
         String contenxt =  "order_queue_message" ;
         this .amqpTemplate.convertAndSend(RabbitMqConfig2.QUEUE_EXCHANGE_NAME, "order_queue_routing" ,contenxt);
//        this.amqpTemplate.conver
     }
 
}


配制信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package  com.basic.rabbitmq.configuration;
 
import  com.basic.rabbitmq.receiver.Receiver;
import  com.rabbitmq.client.Channel;
import  com.rabbitmq.client.ConfirmListener;
import  org.springframework.amqp.core.*;
import  org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import  org.springframework.amqp.rabbit.connection.ConnectionFactory;
import  org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import  org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import  org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import  org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import  org.springframework.context.annotation.Bean;
import  org.springframework.context.annotation.Configuration;
 
import  java.io.IOException;
 
/**
  * Created by sdc on 2017/6/17.
  */
@Configuration
public  class  RabbitMqConfig2 {
 
     public  static  final  String QUEUE_NAME =  "order_queue" ;
 
     public  static  final  String QUEUE_EXCHANGE_NAME =  "topic_exchange_new" ;
 
     public  static  final   String routing_key =  "order_queue_routing" ;
 
     @Bean
     public  Queue queue() {
         //是否持久化
         boolean  durable =  false ;
         //仅创建者可以使用该队列,断开后自动删除
         boolean  exclusive =  false ;
         //当所有消费者都断开连接后,是否删除队列
         boolean  autoDelete =  false ;
         return  new  Queue(QUEUE_NAME, durable, exclusive, autoDelete);
     }
 
     @Bean
     public  TopicExchange exchange() {
         //是否持久化
         boolean  durable =  false ;
         //当所有消费者都断开连接后,是否删除队列
         boolean  autoDelete =  false ;
         return   new  TopicExchange(QUEUE_EXCHANGE_NAME, durable, autoDelete);
     }
 
     @Bean
     public  Binding binding() {
         return  BindingBuilder.bind(queue()).to(exchange()).with(routing_key);
     }
 
     @Bean
     public  ConnectionFactory connectionFactory() {
         CachingConnectionFactory connectionFactory =  new  CachingConnectionFactory( "127.0.0.1" , 5672 );
 
         connectionFactory.setUsername( "admin" );
         connectionFactory.setPassword( "admin" );
         connectionFactory.setVirtualHost( "/" );
         /** 如果要进行消息回调,则这里必须要设置为true */
         connectionFactory.setPublisherConfirms( true );  // 必须要设置
//        connectionFactory.setPublisherReturns();
         return  connectionFactory;
     }
 
     @Bean
     SimpleMessageListenerContainer container() {
 
         SimpleMessageListenerContainer container =  new  SimpleMessageListenerContainer();
         container.setConnectionFactory(connectionFactory());
//        container.setQueueNames(QUEUE_NAME);
         container.setQueues(queue());
         container.setAcknowledgeMode(AcknowledgeMode.MANUAL);  //设置确认模式手工确认
         container.setMessageListener( new  ChannelAwareMessageListener() {
             @Override
             public  void  onMessage(Message message, Channel channel)  throws  Exception {
                 byte [] body = message.getBody();
                 System.out.println( "收到消息 : "  new  String(body));
                 channel.queueDeclare(QUEUE_NAME,  true false false null );
//                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
//                channel.basicAck(); //应答
//                channel.basicReject();//拒绝
//                channel.basicRecover(); //恢复
//                channel.basicQos();
//                channel.addConfirmListener(new ConfirmListener() {
//                    @Override
//                    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
//                        //失败重发
//                    }
//
//                    @Override
//                    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
//                        //确认ok
//                    }
//                });
             }
         });
         return   container;
     }
 
//    @Bean
//    MessageListenerAdapter listenerAdapter(Receiver receiver) {
//        return new MessageListenerAdapter(receiver, "receiveMessage");
//    }
 
}


测试类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package  com.rabbit.test;
 
import  com.basic.rabbitmq.send.HelloSender;
import  com.basic.system.Application;
import  org.junit.Test;
import  org.junit.runner.RunWith;
import  org.springframework.beans.factory.annotation.Autowired;
import  org.springframework.boot.test.context.SpringBootTest;
import  org.springframework.test.context.junit4.SpringRunner;
 
import  javax.annotation.Resource;
 
/**
  * Created by sdc on 2017/6/17.
  */
@RunWith(SpringRunner. class )
@SpringBootTest(classes = Application. class )
public  class  RabbitMqTest {
 
     @Autowired
     public  HelloSender helloSender;
 
     @Test
     public  void helloword() throws  Exception {
         helloSender.send();
     }
 
 
}


这只是一个demo,学习的时候会测试各种的事情,在这基础上更改就可以了,心中的疑虑测试没了就可以写一些项目了。


本文转自 豆芽菜橙 51CTO博客,原文链接:http://blog.51cto.com/shangdc/1944218


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
30天前
|
XML Java 测试技术
Spring5入门到实战------17、Spring5新功能 --Nullable注解和函数式注册对象。整合JUnit5单元测试框架
这篇文章介绍了Spring5框架的三个新特性:支持@Nullable注解以明确方法返回、参数和属性值可以为空;引入函数式风格的GenericApplicationContext进行对象注册和管理;以及如何整合JUnit5进行单元测试,同时讨论了JUnit4与JUnit5的整合方法,并提出了关于配置文件加载的疑问。
Spring5入门到实战------17、Spring5新功能 --Nullable注解和函数式注册对象。整合JUnit5单元测试框架
|
18天前
|
消息中间件 存储 Java
【干货】看看我司消息队列用啥,全网最接地气pulsar教程(含业务解耦demo源码)
本文介绍了Apache Pulsar消息队列系统的核心特性及其与其它消息队列的区别,通过Docker安装Pulsar及Pulsar Manager,并结合电商业务场景,对比了串行执行与使用Pulsar实现异步解耦的优势,最后通过Java代码示例展示了如何利用Pulsar解决实际业务问题。
42 2
【干货】看看我司消息队列用啥,全网最接地气pulsar教程(含业务解耦demo源码)
|
25天前
|
Java 数据库连接 Spring
后端框架入门超详细 三部曲 Spring 、SpringMVC、Mybatis、SSM框架整合案例 【爆肝整理五万字】
文章是关于Spring、SpringMVC、Mybatis三个后端框架的超详细入门教程,包括基础知识讲解、代码案例及SSM框架整合的实战应用,旨在帮助读者全面理解并掌握这些框架的使用。
后端框架入门超详细 三部曲 Spring 、SpringMVC、Mybatis、SSM框架整合案例 【爆肝整理五万字】
|
27天前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
27天前
|
NoSQL Java Redis
Redis6入门到实战------ 八、Redis与Spring Boot整合
这篇文章详细介绍了如何在Spring Boot项目中整合Redis,包括在`pom.xml`中添加依赖、配置`application.properties`文件、创建配置类以及编写测试类来验证Redis的连接和基本操作。
Redis6入门到实战------ 八、Redis与Spring Boot整合
|
30天前
|
SQL 数据库
Spring5入门到实战------13、使用JdbcTemplate操作数据库(批量增删改)。具体代码+讲解 【下篇】
这篇文章是Spring5框架的实战教程,深入讲解了如何使用JdbcTemplate进行数据库的批量操作,包括批量添加、批量修改和批量删除的具体代码实现和测试过程,并通过完整的项目案例展示了如何在实际开发中应用这些技术。
Spring5入门到实战------13、使用JdbcTemplate操作数据库(批量增删改)。具体代码+讲解 【下篇】
|
30天前
|
XML Java 数据格式
Spring5入门到实战------11、使用XML方式实现AOP切面编程。具体代码+讲解
这篇文章是Spring5框架的AOP切面编程教程,通过XML配置方式,详细讲解了如何创建被增强类和增强类,如何在Spring配置文件中定义切入点和切面,以及如何将增强逻辑应用到具体方法上。文章通过具体的代码示例和测试结果,展示了使用XML配置实现AOP的过程,并强调了虽然注解开发更为便捷,但掌握XML配置也是非常重要的。
Spring5入门到实战------11、使用XML方式实现AOP切面编程。具体代码+讲解
|
30天前
|
XML Java Maven
Spring5入门到实战------16、Spring5新功能 --整合日志框架(Log4j2)
这篇文章是Spring5框架的入门到实战教程,介绍了Spring5的新功能——整合日志框架Log4j2,包括Spring5对日志框架的通用封装、如何在项目中引入Log4j2、编写Log4j2的XML配置文件,并通过测试类展示了如何使用Log4j2进行日志记录。
Spring5入门到实战------16、Spring5新功能 --整合日志框架(Log4j2)
|
30天前
|
XML Java 数据库
Spring5入门到实战------15、事务操作---概念--场景---声明式事务管理---事务参数--注解方式---xml方式
这篇文章是Spring5框架的实战教程,详细介绍了事务的概念、ACID特性、事务操作的场景,并通过实际的银行转账示例,演示了Spring框架中声明式事务管理的实现,包括使用注解和XML配置两种方式,以及如何配置事务参数来控制事务的行为。
Spring5入门到实战------15、事务操作---概念--场景---声明式事务管理---事务参数--注解方式---xml方式
|
30天前
|
XML 数据库 数据格式
Spring5入门到实战------14、完全注解开发形式 ----JdbcTemplate操作数据库(增删改查、批量增删改)。具体代码+讲解 【终结篇】
这篇文章是Spring5框架的实战教程的终结篇,介绍了如何使用注解而非XML配置文件来实现JdbcTemplate的数据库操作,包括增删改查和批量操作,通过创建配置类来注入数据库连接池和JdbcTemplate对象,并展示了完全注解开发形式的项目结构和代码实现。
Spring5入门到实战------14、完全注解开发形式 ----JdbcTemplate操作数据库(增删改查、批量增删改)。具体代码+讲解 【终结篇】

相关产品

  • 云消息队列 MQ