spring整合消息队列rabbitmq

简介: ps:本文只是简单一个整合介绍,属于抛砖引玉,具体实现还需大家深入研究哈.. 1.首先是生产者配置?12345678910111213141516171819202122232425...

ps:本文只是简单一个整合介绍,属于抛砖引玉,具体实现还需大家深入研究哈..


 

1.首先是生产者配置

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<? xml  version = "1.0"  encoding = "UTF-8" ?>
< beans  xmlns = "http://www.springframework.org/schema/beans"
        xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
        xmlns:context = "http://www.springframework.org/schema/context"
        xmlns:rabbit = "http://www.springframework.org/schema/rabbit"
        xsi:schemaLocation="
             http://www.springframework.org/schema/beans
                 http://www.springframework.org/schema/beans/spring-beans.xsd
             http://www.springframework.org/schema/context
                 http://www.springframework.org/schema/context/spring-context.xsd
             http://www.springframework.org/schema/rabbit
                 http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
 
     
    <!-- 连接服务配置  -->
    < rabbit:connection-factory  id = "connectionFactory"  host = "localhost"  username = "guest"
         password = "guest"  port = "5672"   />
         
    < rabbit:admin  connection-factory = "connectionFactory" />
    
    <!-- queue 队列声明-->
    < rabbit:queue  id = "queue_one"  durable = "true"  auto-delete = "false"  exclusive = "false"  name = "queue_one" />
    
    
    <!-- exchange queue binging key 绑定 -->
     < rabbit:direct-exchange  name = "my-mq-exchange"  durable = "true"  auto-delete = "false"  id = "my-mq-exchange" >
         < rabbit:bindings >
             < rabbit:binding  queue = "queue_one"  key = "queue_one_key" />
         </ rabbit:bindings >
     </ rabbit:direct-exchange >
     
     <-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 -->
     < bean  id = "jsonMessageConverter"   class = "mq.convert.FastJsonMessageConverter" ></ bean >
     
     <-- spring template声明-->
     < rabbit:template  exchange = "my-mq-exchange"  id = "amqpTemplate"   connection-factory = "connectionFactory"   message-converter = "jsonMessageConverter" />
</ beans >

2.fastjson messageconver插件实现

 

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import  org.apache.commons.logging.Log;
import  org.apache.commons.logging.LogFactory;
import  org.springframework.amqp.core.Message;
import  org.springframework.amqp.core.MessageProperties;
import  org.springframework.amqp.support.converter.AbstractMessageConverter;
import  org.springframework.amqp.support.converter.MessageConversionException;
 
import  fe.json.FastJson;
 
public  class  FastJsonMessageConverter   extends  AbstractMessageConverter {
     private  static  Log log = LogFactory.getLog(FastJsonMessageConverter. class );
 
     public  static  final  String DEFAULT_CHARSET =  "UTF-8" ;
 
     private  volatile  String defaultCharset = DEFAULT_CHARSET;
     
     public  FastJsonMessageConverter() {
         super ();
         //init();
     }
     
     public  void  setDefaultCharset(String defaultCharset) {
         this .defaultCharset = (defaultCharset !=  null ) ? defaultCharset
                 : DEFAULT_CHARSET;
     }
     
     public  Object fromMessage(Message message)
             throws  MessageConversionException {
         return  null ;
     }
     
     public  <T> T fromMessage(Message message,T t) {
         String json =  "" ;
         try  {
             json =  new  String(message.getBody(),defaultCharset);
         catch  (UnsupportedEncodingException e) {
             e.printStackTrace();
         }
         return  (T) FastJson.fromJson(json, t.getClass());
     }  
     
 
     protected  Message createMessage(Object objectToConvert,
             MessageProperties messageProperties)
             throws  MessageConversionException {
         byte [] bytes =  null ;
         try  {
             String jsonString = FastJson.toJson(objectToConvert);
             bytes = jsonString.getBytes( this .defaultCharset);
         catch  (UnsupportedEncodingException e) {
             throw  new  MessageConversionException(
                     "Failed to convert Message content" , e);
        
         messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
         messageProperties.setContentEncoding( this .defaultCharset);
         if  (bytes !=  null ) {
             messageProperties.setContentLength(bytes.length);
         }
         return  new  Message(bytes, messageProperties);
 
     }
}

3.生产者端调用

 

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import  java.util.List;
 
import  org.springframework.amqp.core.AmqpTemplate;
 
 
public  class  MyMqGatway {
     
     @Autowired
     private  AmqpTemplate amqpTemplate;
     
     public  void  sendDataToCrQueue(Object obj) {
         amqpTemplate.convertAndSend( "queue_one_key" , obj);
     }  
}

4.消费者端配置(与生产者端大同小异)

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<? xml  version = "1.0"  encoding = "UTF-8" ?>
< beans  xmlns = "http://www.springframework.org/schema/beans"
        xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
        xmlns:context = "http://www.springframework.org/schema/context"
        xmlns:rabbit = "http://www.springframework.org/schema/rabbit"
        xsi:schemaLocation="
             http://www.springframework.org/schema/beans
                 http://www.springframework.org/schema/beans/spring-beans.xsd
             http://www.springframework.org/schema/context
                 http://www.springframework.org/schema/context/spring-context.xsd
             http://www.springframework.org/schema/rabbit
                 http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
 
     
    <!-- 连接服务配置  -->
    < rabbit:connection-factory  id = "connectionFactory"  host = "localhost"  username = "guest"
         password = "guest"  port = "5672"   />
         
    < rabbit:admin  connection-factory = "connectionFactory" />
    
    <!-- queue 队列声明-->
    < rabbit:queue  id = "queue_one"  durable = "true"  auto-delete = "false"  exclusive = "false"  name = "queue_one" />
    
    
    <!-- exchange queue binging key 绑定 -->
     < rabbit:direct-exchange  name = "my-mq-exchange"  durable = "true"  auto-delete = "false"  id = "my-mq-exchange" >
         < rabbit:bindings >
             < rabbit:binding  queue = "queue_one"  key = "queue_one_key" />
         </ rabbit:bindings >
     </ rabbit:direct-exchange >
 
     
      
     <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
     < rabbit:listener-container  connection-factory = "connectionFactory"  acknowledge = "auto"  task-executor = "taskExecutor" >
         < rabbit:listener  queues = "queue_one"  ref = "queueOneLitener" />
     </ rabbit:listener-container >
</ beans >

 

5.消费者端调用

?
1
2
3
4
5
6
7
8
9
import  org.springframework.amqp.core.Message;
import  org.springframework.amqp.core.MessageListener;
 
public  class  QueueOneLitener  implements   MessageListener{
     @Override
     public  void  onMessage(Message message) {
         System.out.println( " data :"  + message.getBody());
     }
}

6.由于消费端当队列有数据到达时,对应监听的对象就会被通知到,无法做到批量获取,批量入库,因此可以在消费端缓存一个临时队列,将mq取出来的数据存入本地队列,后台线程定时批量处理即可

转自:http://blog.csdn.net/l192168134/article/details/51210188

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5天前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
10天前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
37 3
|
9天前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
18天前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
55 5
|
16天前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
23天前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
61 4
|
2月前
|
消息中间件 监控 数据处理
解决方案 | 云消息队列RabbitMQ实践
解决方案 | 云消息队列RabbitMQ实践
48 1
|
30天前
|
消息中间件 监控 测试技术
云消息队列RabbitMQ实践 - 评测
根据反馈,对本解决方案的实践原理已有一定理解,描述整体清晰但需在消息队列配置与使用上增加更多示例和说明以助理解。部署体验中获得了一定的引导和文档支持,尽管文档仍有待完善;期间出现的配置文件错误及依赖库缺失等问题已通过查阅资料解决。设计验证展示了云消息队列RabbitMQ的核心优势,包括高可用性和灵活性,未来可通过增加自动化测试来提高系统稳定性。实践后,用户对方案解决问题的能力及适用场景有了明确认识,认为其具有实际生产价值,不过仍需在性能优化、安全性增强及监控功能上进行改进以适应高并发和大数据量环境。
38 0
|
2月前
|
SQL 监控 druid
springboot-druid数据源的配置方式及配置后台监控-自定义和导入stater(推荐-简单方便使用)两种方式配置druid数据源
这篇文章介绍了如何在Spring Boot项目中配置和监控Druid数据源,包括自定义配置和使用Spring Boot Starter两种方法。
|
23天前
|
人工智能 自然语言处理 前端开发
SpringBoot + 通义千问 + 自定义React组件:支持EventStream数据解析的技术实践
【10月更文挑战第7天】在现代Web开发中,集成多种技术栈以实现复杂的功能需求已成为常态。本文将详细介绍如何使用SpringBoot作为后端框架,结合阿里巴巴的通义千问(一个强大的自然语言处理服务),并通过自定义React组件来支持服务器发送事件(SSE, Server-Sent Events)的EventStream数据解析。这一组合不仅能够实现高效的实时通信,还能利用AI技术提升用户体验。
123 2

相关产品

  • 云消息队列 MQ