来mark一下我做的ActiveMQ

简介:

1.activemq.xml


 <!-- 引入activemq -->
    <import resource="activemq.xml" />
    
用上面的代码来导入。

    


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:jms="http://www.springframework.org/schema/jms" xmlns:amq="http://activemq.apache.org/schema/core"
	xmlns:aop="http://www.springframework.org/schema/aop"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.5.0.xsd">
	<bean id="topicSendConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
		<!-- <property name="brokerURL" value="udp://localhost:8123" /> -->
		<!-- UDP传输方式 -->
		<property name="brokerURL" value="tcp://10.0.1.222:61616" />
		<!-- TCP传输方式 -->
		<property name="useAsyncSend" value="true" />
	</bean>

	<bean id="topicListenConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
		<!-- <property name="brokerURL" value="udp://localhost:8123" /> -->
		<!-- UDP传输方式需要在activemq上面做配置 -->
		<property name="brokerURL" value="tcp://10.0.1.222:61616" />
		<!-- TCP传输方式 -->
	</bean>
	<!-- 定义主题 -->
	<bean id="myTopic" class="org.apache.activemq.command.ActiveMQTopic">
		<constructor-arg value="normandy.topic" />
	</bean>

	<bean id="messageConvertForSys" class="com.esteel.chat.mq.MessageConvertForSys" />
	
	<!-- TOPIC send jms模板 -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="topicSendConnectionFactory" />
		<property name="defaultDestination" ref="myTopic" />
		<property name="messageConverter" ref="messageConvertForSys" />
		<!-- 发送模式 DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久 -->
		<property name="deliveryMode" value="1" />
		<property name="pubSubDomain" value="true" />
		<!-- 开启订阅模式 -->
	</bean>
	<!-- 消息发送方 -->
	<bean id="topicSender" class="com.esteel.chat.mq.MessageSender">
		<property name="jmsTemplate" ref="jmsTemplate" />
	</bean>
	
	<!-- <bean id="springContextUtil" class="com.esteel.common.SpringContextUtil" /> -->
    
	
	<!-- 消息接收方 -->
	<bean id="topicReceiver" class="com.esteel.chat.mq.MessageReceiver" />
	<!-- 主题消息监听容器 -->
	<bean id="listenerContainer"
		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="topicListenConnectionFactory" />
		<property name="pubSubDomain" value="true" />
		<!-- true 订阅模式 -->
		<property name="destination" ref="myTopic" />
		<!-- 目的地 myTopic -->
		<property name="subscriptionDurable" value="true" />
		<!-- -这里是设置接收客户端的ID,在持久化时,但这个客户端不在线时,消息就存在数据库里,知道被这个ID的客户端消费掉 -->
		<property name="clientId" value="clientId_1" />
		<property name="messageListener" ref="topicReceiver" />
	</bean>
	<!-- Servlet -->
	<!-- <bean id="ControlServlet1" class="com.esteel.servlet.ControlServlet1"> 
		<property name="topicSender" ref="topicSender" /> </bean> -->
</beans>  

2、引入包,注意要引入5.1.0的。版本太高,会报jsp-api包冲突。咋整都不行,最后改成5.1.0行了。


<dependency>
				<groupId>org.apache.activemq</groupId>
				<artifactId>activemq-core</artifactId>
				<version>5.1.0</version>
				<exclusions>
					<exclusion>
						<artifactId>jsp-api</artifactId>
						<groupId>javax.servlet.jsp</groupId>
					</exclusion>
					<exclusion>
						<artifactId>javax.servlet-api</artifactId>
						<groupId>javax.servlet</groupId>
					</exclusion>
				</exclusions>
			</dependency>
			<dependency>
				<groupId>org.springframework</groupId>
				<artifactId>spring-jms</artifactId>
				<version>3.2.16.RELEASE</version>
			</dependency>
			<dependency>
				<groupId>net.sf.json-lib</groupId>
				<artifactId>json-lib</artifactId>
				<classifier>jdk15</classifier>
				<version>2.4</version>
			</dependency>
		</dependencies>

3. MessageConvertForSys.java


package com.esteel.chat.mq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.stereotype.Component;

public class MessageConvertForSys implements MessageConverter {

	public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {
		System.out.println("sendMessage:" + object.toString());
		ObjectMessage objectMessage = session.createObjectMessage();
		objectMessage.setStringProperty("key_esteelChat", object.toString());
		return objectMessage;
	}

	public Object fromMessage(Message message) throws JMSException, MessageConversionException {
		ObjectMessage objectMessage = (ObjectMessage) message;
		return objectMessage.getObjectProperty("key_esteelChat");
	}

}

4. MessageSender.java



package com.esteel.chat.mq;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

public class MessageSender {

	private JmsTemplate jmsTemplate;

	public void sendMessage(String msg) {
		jmsTemplate.convertAndSend(msg);
	}

	public JmsTemplate getJmsTemplate() {
		return jmsTemplate;
	}

	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}

}

5. 写入mq的代码,在前面已经引入了Json包


JSONObject object = JSONObject.fromObject(tbConOrdVo);
		String tempstr = object.toString();
		/*加入name信息*/
		tempstr = "{\"objectName\":\"TbConOrdVo\",\"ipAddress\":"+EsteelNetworkUtil.getIpAddress(request)+",\"object\":"+tempstr+"}";
		/*然后写入activemq*/
		topicSender.sendMessage(tempstr);

6. 异步处理数据。


package com.esteel.chat.mq;

import java.text.SimpleDateFormat;
import java.util.Calendar;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.servlet.http.HttpServletRequest;

import org.springframework.beans.factory.annotation.Autowired;

import com.esteel.chat.bean.TbConOrd;
import com.esteel.chat.bean.TbConOrdPrice;
import com.esteel.chat.beanVo.TbConOrdVo;
import com.esteel.chat.service.TbConOrdPriceService;
import com.esteel.chat.service.TbConOrdService;
import com.esteel.chat.until.EsteelNetworkUtil;
import com.esteel.exception.EsteelException;

import net.sf.json.JSONObject;

public class MessageReceiver implements MessageListener {

	@Autowired
	TbConOrdService tbConOrdService;
	@Autowired
	TbConOrdPriceService tbConOrdPriceService;
	
	public void onMessage(Message m) {
		ObjectMessage om = (ObjectMessage) m;
		try {
			String key_esteelChat = om.getStringProperty("key_esteelChat");
			JSONObject object1 = JSONObject.fromObject(key_esteelChat);
			String objectName = (String)object1.get("objectName");
			if(objectName.equals("TbConOrdVo")){
				JSONObject object2 = (JSONObject) object1.get("object");
				TbConOrdVo tbConOrdVo=(TbConOrdVo)JSONObject.toBean(object2, TbConOrdVo.class);
				TbConOrd tbConOrd = new TbConOrd();
				/* 从提交的表单中提取tbConOrd */
				String ipAddress = (String)object1.get("ipAddress");
				tbConOrd = copyTbConOrd(tbConOrdVo, tbConOrd, ipAddress);
				/* 写入tbConOrd */
				tbConOrd = tbConOrdService.insertTbConOrd(tbConOrd);
				TbConOrdPrice tbConOrdPrice = new TbConOrdPrice();
				tbConOrdPrice = copyTbConOrdPrice(tbConOrd, tbConOrdVo, tbConOrdPrice);
				/* 写入聊天文字 */
				String msgText = tbConOrdPrice.getMsgText();
				if (msgText.equals("请录入您的议价留言,最大为300个字符!按Ctrl+Enter提交!")) {
					tbConOrdPrice.setMsgText("");
				}
				tbConOrdPrice=tbConOrdPriceService.insertTbConOrdPrice(tbConOrdPrice);
			}
			System.out.println(" ");
		} catch (JMSException e) {
			e.printStackTrace();
		} catch (EsteelException e) {
			e.printStackTrace();
		}
	}
	

	private TbConOrd copyTbConOrd(TbConOrdVo tbConOrdVo, TbConOrd tbConOrd, String ipAddress) {
		tbConOrd.setConobjKey(tbConOrdVo.getConobjKey());
		/*****.........*****/
		return tbConOrd;
	}

	private TbConOrdPrice copyTbConOrdPrice(TbConOrd tbConOrd, TbConOrdVo tbConOrdVo, TbConOrdPrice tbConOrdPrice) throws EsteelException {
		tbConOrdPrice.setOrdKey(tbConOrd.getOrdKey());
		/*****.........*****/
		tbConOrdPrice.setOrdpriceNo(String.valueOf(System.currentTimeMillis()));
		return tbConOrdPrice;
	}

}


Over。

本地测试通过。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
8月前
|
存储 缓存 Java
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
118 0
|
8月前
|
存储 缓存 Java
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】(2)
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
94 0
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】(2)
|
消息中间件 前端开发 Java
记录CDH5.10一个clients.NetworkClient: Bootstrap broker ip:9092 disconnected问题
1.当前环境使用的稳定版本组合a.本套环境CDH经过四次升级,当然版本为CDH-5.10.0-1.cdh5.10.0.p0.41b.KAFKA版本为KAFKA-2.
5446 0
|
消息中间件
RabbitMQ延时队列插件rabbitmq_delayed_message_exchange-3.8.0.ez,有需要的朋友可自取
RabbitMQ延时队列插件rabbitmq_delayed_message_exchange-3.8.0.ez,有需要的朋友可自取
211 0
|
8月前
|
存储 缓存 Java
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】(1)
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
62 0
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】(1)
|
消息中间件 缓存 Java
聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 GROUP、OFFSET、HEARTBEAT 相关命令
聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 GROUP、OFFSET、HEARTBEAT 相关命令
213 0
|
消息中间件 存储 缓存
聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 FIND_COORDINATOR
聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 FIND_COORDINATOR
191 0
|
消息中间件 Java
SpringBoot整合ActiveMq实现Queue和Topic两种模式
SpringBoot整合ActiveMq实现Queue和Topic两种模式
149 4
SpringBoot整合ActiveMq实现Queue和Topic两种模式
|
消息中间件 Java Spring
Spring5整合ActiveMQ实现queue和topic消息模式
Spring5整合ActiveMQ实现queue和topic消息模式
97 0
|
消息中间件 Java Spring
ActiveMQ --- 整合篇
之前说到了activeMQ的一些基本用法,本文将介绍activeMQ如何与spring以及spring boot整合。