RabbitMq 集成 spring boot 消息队列 入门Demo

简介:

spring boot 集成 RabbitMq还是很方便的。现在来一个简单的例子来集成rabbitmq。入门demo。

主要概念:

其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定。


虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单,RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机“/”。

交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。 这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。

绑定:也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系。


首先是配制文件。

1
2
3
4
5
#spring.application.name=spring-boot-rabbitmq
spring.rabbitmq.host= 127.0 . 0.1
spring.rabbitmq.port= 5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest


发送者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package  com.basic.rabbitmq.send;
 
import  com.basic.rabbitmq.configuration.RabbitMqConfig2;
import  org.springframework.amqp.core.AmqpTemplate;
import  org.springframework.beans.factory.annotation.Autowired;
import  org.springframework.stereotype.Component;
import  org.springframework.stereotype.Service;
 
import  java.util.Date;
 
/**
  * Created by sdc on 2017/6/17.
  */
@Service ( "helloSender" )
public  class  HelloSender {
 
     @Autowired
     private  AmqpTemplate amqpTemplate;
 
//    private Rabbitt
 
     public  void  send() {
         String contenxt =  "order_queue_message" ;
         this .amqpTemplate.convertAndSend(RabbitMqConfig2.QUEUE_EXCHANGE_NAME, "order_queue_routing" ,contenxt);
//        this.amqpTemplate.conver
     }
 
}


配制信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package  com.basic.rabbitmq.configuration;
 
import  com.basic.rabbitmq.receiver.Receiver;
import  com.rabbitmq.client.Channel;
import  com.rabbitmq.client.ConfirmListener;
import  org.springframework.amqp.core.*;
import  org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import  org.springframework.amqp.rabbit.connection.ConnectionFactory;
import  org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import  org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import  org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import  org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import  org.springframework.context.annotation.Bean;
import  org.springframework.context.annotation.Configuration;
 
import  java.io.IOException;
 
/**
  * Created by sdc on 2017/6/17.
  */
@Configuration
public  class  RabbitMqConfig2 {
 
     public  static  final  String QUEUE_NAME =  "order_queue" ;
 
     public  static  final  String QUEUE_EXCHANGE_NAME =  "topic_exchange_new" ;
 
     public  static  final   String routing_key =  "order_queue_routing" ;
 
     @Bean
     public  Queue queue() {
         //是否持久化
         boolean  durable =  false ;
         //仅创建者可以使用该队列,断开后自动删除
         boolean  exclusive =  false ;
         //当所有消费者都断开连接后,是否删除队列
         boolean  autoDelete =  false ;
         return  new  Queue(QUEUE_NAME, durable, exclusive, autoDelete);
     }
 
     @Bean
     public  TopicExchange exchange() {
         //是否持久化
         boolean  durable =  false ;
         //当所有消费者都断开连接后,是否删除队列
         boolean  autoDelete =  false ;
         return   new  TopicExchange(QUEUE_EXCHANGE_NAME, durable, autoDelete);
     }
 
     @Bean
     public  Binding binding() {
         return  BindingBuilder.bind(queue()).to(exchange()).with(routing_key);
     }
 
     @Bean
     public  ConnectionFactory connectionFactory() {
         CachingConnectionFactory connectionFactory =  new  CachingConnectionFactory( "127.0.0.1" , 5672 );
 
         connectionFactory.setUsername( "admin" );
         connectionFactory.setPassword( "admin" );
         connectionFactory.setVirtualHost( "/" );
         /** 如果要进行消息回调,则这里必须要设置为true */
         connectionFactory.setPublisherConfirms( true );  // 必须要设置
//        connectionFactory.setPublisherReturns();
         return  connectionFactory;
     }
 
     @Bean
     SimpleMessageListenerContainer container() {
 
         SimpleMessageListenerContainer container =  new  SimpleMessageListenerContainer();
         container.setConnectionFactory(connectionFactory());
//        container.setQueueNames(QUEUE_NAME);
         container.setQueues(queue());
         container.setAcknowledgeMode(AcknowledgeMode.MANUAL);  //设置确认模式手工确认
         container.setMessageListener( new  ChannelAwareMessageListener() {
             @Override
             public  void  onMessage(Message message, Channel channel)  throws  Exception {
                 byte [] body = message.getBody();
                 System.out.println( "收到消息 : "  new  String(body));
                 channel.queueDeclare(QUEUE_NAME,  true false false null );
//                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
//                channel.basicAck(); //应答
//                channel.basicReject();//拒绝
//                channel.basicRecover(); //恢复
//                channel.basicQos();
//                channel.addConfirmListener(new ConfirmListener() {
//                    @Override
//                    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
//                        //失败重发
//                    }
//
//                    @Override
//                    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
//                        //确认ok
//                    }
//                });
             }
         });
         return   container;
     }
 
//    @Bean
//    MessageListenerAdapter listenerAdapter(Receiver receiver) {
//        return new MessageListenerAdapter(receiver, "receiveMessage");
//    }
 
}


测试类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package  com.rabbit.test;
 
import  com.basic.rabbitmq.send.HelloSender;
import  com.basic.system.Application;
import  org.junit.Test;
import  org.junit.runner.RunWith;
import  org.springframework.beans.factory.annotation.Autowired;
import  org.springframework.boot.test.context.SpringBootTest;
import  org.springframework.test.context.junit4.SpringRunner;
 
import  javax.annotation.Resource;
 
/**
  * Created by sdc on 2017/6/17.
  */
@RunWith(SpringRunner. class )
@SpringBootTest(classes = Application. class )
public  class  RabbitMqTest {
 
     @Autowired
     public  HelloSender helloSender;
 
     @Test
     public  void helloword() throws  Exception {
         helloSender.send();
     }
 
 
}


这只是一个demo,学习的时候会测试各种的事情,在这基础上更改就可以了,心中的疑虑测试没了就可以写一些项目了。


本文转自 豆芽菜橙 51CTO博客,原文链接:http://blog.51cto.com/shangdc/1944218


相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
缓存 监控 安全
电商API集成入门:从零开始搭建高效接口
在数字化电商时代,API集成成为企业提升效率、实现系统互联的关键。本文从零开始,逐步讲解如何搭建高效、可靠的电商API接口,适合初学者学习。内容涵盖API基础、认证安全、请求处理、性能优化等核心步骤,并提供Python代码示例与数学公式辅助理解。通过实践,读者可掌握构建优质电商API的技巧,提升用户体验与系统性能。
159 0
|
12天前
|
XML Java 应用服务中间件
【SpringBoot(一)】Spring的认知、容器功能讲解与自动装配原理的入门,带你熟悉Springboot中基本的注解使用
SpringBoot专栏开篇第一章,讲述认识SpringBoot、Bean容器功能的讲解、自动装配原理的入门,还有其他常用的Springboot注解!如果想要了解SpringBoot,那么就进来看看吧!
142 2
|
5月前
|
人工智能 Java API
Spring AI 实战|Spring AI入门之DeepSeek调用
本文介绍了Spring AI框架如何帮助Java开发者轻松集成和使用大模型API。文章从Spring AI的初探开始,探讨了其核心能力及应用场景,包括手动与自动发起请求、流式响应实现打字机效果,以及兼容不同AI服务(如DeepSeek、通义千问)的方法。同时,还详细讲解了如何在生产环境中添加监控以优化性能和成本管理。通过Spring AI,开发者可以简化大模型调用流程,降低复杂度,为企业智能应用开发提供强大支持。最后,文章展望了Spring AI在未来AI时代的重要作用,鼓励开发者积极拥抱这一技术变革。
1941 71
Spring AI 实战|Spring AI入门之DeepSeek调用
|
6月前
|
安全 Java 数据库
Spring Security 实战指南:从入门到精通
本文详细介绍了Spring Security在Java Web项目中的应用,涵盖登录、权限控制与安全防护等功能。通过Filter Chain过滤器链实现请求拦截与认证授权,核心组件包括AuthenticationProvider和UserDetailsService,负责用户信息加载与密码验证。文章还解析了项目结构,如SecurityConfig配置类、User实体类及自定义登录逻辑,并探讨了Method-Level Security、CSRF防护、Remember-Me等进阶功能。最后总结了Spring Security的核心机制与常见配置,帮助开发者构建健壮的安全系统。
373 0
|
4月前
|
消息中间件 存储 Kafka
一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。
2958 9
 一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
|
3月前
|
前端开发 Java API
基于 Spring Boot 3 与 React 的 Java 学生信息管理系统从入门到精通实操指南
本项目基于Spring Boot 3与React 18构建学生信息管理系统,涵盖前后端开发、容器化部署及测试监控,提供完整实操指南与源码,助你掌握Java全栈开发技能。
174 0
|
4月前
|
Java 关系型数据库 MySQL
【Spring】【事务】初学者直呼学会了的Spring事务入门
本文深入解析了Spring事务的核心概念与使用方法。Spring事务是一种数据库事务管理机制,通过确保操作的原子性、一致性、隔离性和持久性(ACID),维护数据完整性。文章详细讲解了声明式事务(@Transactional注解)和编程式事务(TransactionTemplate、PlatformTransactionManager)的区别与用法,并探讨了事务传播行为(如REQUIRED、REQUIRES_NEW等)及隔离级别(如READ_COMMITTED、REPEATABLE_READ)。
349 1
|
4月前
|
Java API 微服务
Java 21 与 Spring Boot 3.2 微服务开发从入门到精通实操指南
《Java 21与Spring Boot 3.2微服务开发实践》摘要: 本文基于Java 21和Spring Boot 3.2最新特性,通过完整代码示例展示了微服务开发全流程。主要内容包括:1) 使用Spring Initializr初始化项目,集成Web、JPA、H2等组件;2) 配置虚拟线程支持高并发;3) 采用记录类优化DTO设计;4) 实现JPA Repository与Stream API数据访问;5) 服务层整合虚拟线程异步处理和结构化并发;6) 构建RESTful API并使用Springdoc生成文档。文中特别演示了虚拟线程配置(@Async)和StructuredTaskSco
482 0
|
6月前
|
存储 安全 Java
Spring Security 入门与详解
Spring Security 是 Spring 框架中的核心安全模块,提供认证、授权及防护功能。本文详解其核心概念,包括认证(Authentication)、授权(Authorization)和过滤器链(Security Filter Chain)。同时,通过代码示例介绍基本配置,如 PasswordEncoder、UserDetailsService 和自定义登录页面等。最后总结常见问题与解决方法,助你快速掌握 Spring Security 的使用与优化。
1462 0

相关产品

  • 云消息队列 MQ