rabbitmq spring

简介:     demo 下载 :http://download.csdn.net/download/knight_black_bob/9544857             applicationContext-rabbit-consumer.

 

 

demo 下载 :http://download.csdn.net/download/knight_black_bob/9544857

 

 



 

 

 

 

applicationContext-rabbit-consumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

	<context:property-placeholder location="classpath:rabbitmq.properties" />

	<rabbit:connection-factory  id="connectionFactory"
		host="${rabbit.host}" 
		username="${rabbit.username}" 
		password="${rabbit.password}" />
		
	<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
		<constructor-arg ref="connectionFactory" />
	</bean>

	<rabbit:admin connection-factory="connectionFactory" />
	<rabbit:queue id="cqueue"  name="${rabbit.queue.name}"   durable="true" auto-delete="false" exclusive="false" />

	<rabbit:direct-exchange id="cmq-exchange" durable="true"
		auto-delete="false" name="${rabbit.exchange.name}">
		<rabbit:bindings>
			<rabbit:binding queue="cqueue" key="cqueue-key"></rabbit:binding>
		</rabbit:bindings>
	</rabbit:direct-exchange>

	<bean id="listener" class="com.curiousby.core.MessageProcessListener" />
	
	<rabbit:listener-container id="listenerContainer" connection-factory="connectionFactory" >
		<rabbit:listener ref="listener" queues="cqueue" />
	</rabbit:listener-container>
  
	
</beans>

 

applicationContext-rabbit-producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xsi:schemaLocation="
            http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
                http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/rabbit
                http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

	<context:property-placeholder location="classpath:rabbitmq.properties" />

	<rabbit:connection-factory id="connectionFactory"
		host="${rabbit.host}"
		username="${rabbit.username}" 
		password="${rabbit.password}" />

	<rabbit:admin connection-factory="connectionFactory" />

	<rabbit:queue id="pqueue"  name="${rabbit.queue.name}" />
	 
	<rabbit:direct-exchange id="pmq-exchange" durable="true" auto-delete="false" name="pmq-exchange">
		<rabbit:bindings>
			<rabbit:binding queue="pqueue" key="pqueuekey" />
		</rabbit:bindings>
	</rabbit:direct-exchange>

	<bean id="jsonMessageConverter"
		class="com.curiousby.util.FastJsonMessageConverter"></bean>
	  
	  
	  <rabbit:template 
	        exchange="pmq-exchange" 
	  		id="amqpTemplate"  
	  		connection-factory="connectionFactory"  
	 		message-converter="jsonMessageConverter"/>

</beans>

 

applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
	xmlns:cache="http://www.springframework.org/schema/cache"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:jee="http://www.springframework.org/schema/jee"
	xmlns:jms="http://www.springframework.org/schema/jms" xmlns:lang="http://www.springframework.org/schema/lang"
	xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:oxm="http://www.springframework.org/schema/oxm"
	xmlns:p="http://www.springframework.org/schema/p" xmlns:task="http://www.springframework.org/schema/task"
	xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util"
	xsi:schemaLocation="
	http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
    http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
    http://www.springframework.org/schema/tool http://www.springframework.org/schema/tool/spring-tool.xsd
    http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
    http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee.xsd
    http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
    http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang.xsd
    http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd
    http://www.springframework.org/schema/oxm http://www.springframework.org/schema/oxm/spring-oxm.xsd
    http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
    http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"
	default-autowire="byName"> 
     
    <context:annotation-config />
	<aop:aspectj-autoproxy /> 
	
	<context:property-placeholder location="classpath:rabbitmq.properties" />

	<context:component-scan base-package="com.curiousby" /> 
	
</beans>

 

package com.curiousby.entity;

public class Message {

	public String msgId;
	public String content;
	public String from;
	public String to;
	public String isValid;
	public String insertTime;
	public String lastUpdateTime;

	
	public Message (){}
	public Message(String msgId, String content, String from, String to, String isValid, String insertTime,
			String lastUpdateTime) { 
		this.msgId = msgId;
		this.content = content;
		this.from = from;
		this.to = to;
		this.isValid = isValid;
		this.insertTime = insertTime;
		this.lastUpdateTime = lastUpdateTime;
	}

	public String getMsgId() {
		return msgId;
	}

	public String getContent() {
		return content;
	}

	public String getFrom() {
		return from;
	}

	public String getTo() {
		return to;
	}

	public String getIsValid() {
		return isValid;
	}

	public String getInsertTime() {
		return insertTime;
	}

	public String getLastUpdateTime() {
		return lastUpdateTime;
	}

	public Message setMsgId(String msgId) {
		this.msgId = msgId;
		return this;
	}

	public Message setContent(String content) {
		this.content = content;
		return this;
	}

	public Message setFrom(String from) {
		this.from = from;
		return this;
	}

	public Message setTo(String to) {
		this.to = to;
		return this;
	}

	public Message setIsValid(String isValid) {
		this.isValid = isValid;
		return this;
	}

	public Message setInsertTime(String insertTime) {
		this.insertTime = insertTime;
		return this;
	}

	public Message setLastUpdateTime(String lastUpdateTime) {
		this.lastUpdateTime = lastUpdateTime;
		return this;
	}

	@Override
	public String toString() {
		return "Message [msgId=" + msgId + ", content=" + content + ", from=" + from + ", to=" + to + ", isValid="
				+ isValid + ", insertTime=" + insertTime + ", lastUpdateTime=" + lastUpdateTime + "]";
	}

	
	
}

 

package com.curiousby.core;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

import com.fasterxml.jackson.databind.jsonFormatVisitors.JsonAnyFormatVisitor;
//com.curiousby.core.MessageProcessListener
public class MessageProcessListener implements MessageListener{

	@Override
	public void onMessage(Message message) { 
		process(message);
	}

	private void  process(Message message){
		if (message != null) {
			byte[] msg = message.getBody();
			System.out.println("===============" + msg.toString());
		}
	}
	
}

 

 

package com.curiousby.core;

import javax.annotation.Resource;

import org.springframework.amqp.core.AmqpTemplate; 
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Repository;

@Repository
public class MessagePush {

	@Resource
	private AmqpTemplate amqpTemplate;
	 
	public void convertAndSend(Object obj) {
	     amqpTemplate.convertAndSend("pqueuekey", obj);
	}	
	 
}

 

 

package com.curiousby.util;

import java.io.UnsupportedEncodingException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;

import com.alibaba.fastjson.JSON; 
//import fe.json.FastJson;



//com.curiousby.util.FastJsonMessageConverter
public class FastJsonMessageConverter  extends AbstractMessageConverter {
    private static Log log = LogFactory.getLog(FastJsonMessageConverter.class);

    public static final String DEFAULT_CHARSET = "UTF-8";

    private volatile String defaultCharset = DEFAULT_CHARSET;
    
    public FastJsonMessageConverter() {
        super(); 
    }
    
    public void setDefaultCharset(String defaultCharset) {
        this.defaultCharset = (defaultCharset != null) ? defaultCharset
                : DEFAULT_CHARSET;
    }
    
    public Object fromMessage(Message message)
            throws MessageConversionException {
        return null;
    }
    
    public <T> T fromMessage(Message message,T t) {
        String json = "";
        try {
            json = new String(message.getBody(),defaultCharset);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return   (T) JSON.parseObject(json, t.getClass());
        		//(T) FastJson.fromJson(json, t.getClass());
    }	
    

    protected Message createMessage(Object objectToConvert,
            MessageProperties messageProperties)
            throws MessageConversionException {
        byte[] bytes = null;
        try {
            String jsonString = JSON.toJSONString(objectToConvert);
            		//FastJson.toJson(objectToConvert);
            bytes = jsonString.getBytes(this.defaultCharset);
        } catch (UnsupportedEncodingException e) {
            throw new MessageConversionException(
                    "Failed to convert Message content", e);
        } 
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        messageProperties.setContentEncoding(this.defaultCharset);
        if (bytes != null) {
            messageProperties.setContentLength(bytes.length);
        }
        return new Message(bytes, messageProperties);

    }
}

 

 

package com.curiousby;

import javax.annotation.Resource;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.curiousby.core.MessagePush;

@Component
@RunWith(SpringJUnit4ClassRunner.class)   
@ContextConfiguration(locations = {"classpath*:applicationContext*.xml"}) 
public class MainStart {

 
	@Autowired
	MessagePush messagePush;
	
	@Test
	public  void testMain() throws InterruptedException{ 
			Thread.sleep(100000000);
	}
}

 

package com.curiousby;

import java.util.UUID;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.curiousby.core.MessagePush;
import com.curiousby.entity.Message;

@Component
@RunWith(SpringJUnit4ClassRunner.class)   
@ContextConfiguration(locations = {"classpath*:applicationContext*.xml"}) 
public class TestProducter {

	@Autowired
	MessagePush messagePush;
	
	@Test
	public  void testMain() throws InterruptedException{ 
		Thread.sleep(10000);
		Message m = new Message();
		m.setContent("baoyou")
		 .setMsgId(UUID.randomUUID().toString().replaceAll("-", ""))
		 .setFrom("1")
		 .setTo("2");
		messagePush.convertAndSend(m);
	}
}

 

 

 

 

 

 

 

 

 

 

 

 

捐助开发者

在兴趣的驱动下,写一个免费的东西,有欣喜,也还有汗水,希望你喜欢我的作品,同时也能支持一下。 当然,有钱捧个钱场(右上角的爱心标志,支持支付宝和PayPal捐助),没钱捧个人场,谢谢各位。



 
 
 谢谢您的赞助,我会做的更好!

 

 

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5月前
|
消息中间件 Java 网络架构
|
1月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
38 6
|
5月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
5月前
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
78 0
|
6月前
|
消息中间件 Java 数据安全/隐私保护
Spring Boot与RabbitMQ的集成
Spring Boot与RabbitMQ的集成
|
6月前
|
消息中间件 Java RocketMQ
Spring Boot与RocketMQ的集成
Spring Boot与RocketMQ的集成
|
6月前
|
消息中间件 Java Spring
实现Spring Boot与RabbitMQ消息中间件的无缝集成
实现Spring Boot与RabbitMQ消息中间件的无缝集成
|
7月前
|
消息中间件 JavaScript Java
消息队列 MQ产品使用合集之如何嵌入到Spring Boot中运行
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7月前
|
消息中间件 Java Spring
Spring Boot与RabbitMQ的集成应用
Spring Boot与RabbitMQ的集成应用
|
7月前
|
消息中间件 Java Spring
实现Spring Boot与RabbitMQ消息中间件的无缝集成
实现Spring Boot与RabbitMQ消息中间件的无缝集成