1.activemq.xml
<!-- 引入activemq -->
<import resource="activemq.xml" />
用上面的代码来导入。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jms="http://www.springframework.org/schema/jms" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.5.0.xsd">
<bean id="topicSendConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<!-- <property name="brokerURL" value="udp://localhost:8123" /> -->
<!-- UDP传输方式 -->
<property name="brokerURL" value="tcp://10.0.1.222:61616" />
<!-- TCP传输方式 -->
<property name="useAsyncSend" value="true" />
</bean>
<bean id="topicListenConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<!-- <property name="brokerURL" value="udp://localhost:8123" /> -->
<!-- UDP传输方式需要在activemq上面做配置 -->
<property name="brokerURL" value="tcp://10.0.1.222:61616" />
<!-- TCP传输方式 -->
</bean>
<!-- 定义主题 -->
<bean id="myTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="normandy.topic" />
</bean>
<bean id="messageConvertForSys" class="com.esteel.chat.mq.MessageConvertForSys" />
<!-- TOPIC send jms模板 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="topicSendConnectionFactory" />
<property name="defaultDestination" ref="myTopic" />
<property name="messageConverter" ref="messageConvertForSys" />
<!-- 发送模式 DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久 -->
<property name="deliveryMode" value="1" />
<property name="pubSubDomain" value="true" />
<!-- 开启订阅模式 -->
</bean>
<!-- 消息发送方 -->
<bean id="topicSender" class="com.esteel.chat.mq.MessageSender">
<property name="jmsTemplate" ref="jmsTemplate" />
</bean>
<!-- <bean id="springContextUtil" class="com.esteel.common.SpringContextUtil" /> -->
<!-- 消息接收方 -->
<bean id="topicReceiver" class="com.esteel.chat.mq.MessageReceiver" />
<!-- 主题消息监听容器 -->
<bean id="listenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="topicListenConnectionFactory" />
<property name="pubSubDomain" value="true" />
<!-- true 订阅模式 -->
<property name="destination" ref="myTopic" />
<!-- 目的地 myTopic -->
<property name="subscriptionDurable" value="true" />
<!-- -这里是设置接收客户端的ID,在持久化时,但这个客户端不在线时,消息就存在数据库里,知道被这个ID的客户端消费掉 -->
<property name="clientId" value="clientId_1" />
<property name="messageListener" ref="topicReceiver" />
</bean>
<!-- Servlet -->
<!-- <bean id="ControlServlet1" class="com.esteel.servlet.ControlServlet1">
<property name="topicSender" ref="topicSender" /> </bean> -->
</beans>
2、引入包,注意要引入5.1.0的。版本太高,会报jsp-api包冲突。咋整都不行,最后改成5.1.0行了。
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.1.0</version>
<exclusions>
<exclusion>
<artifactId>jsp-api</artifactId>
<groupId>javax.servlet.jsp</groupId>
</exclusion>
<exclusion>
<artifactId>javax.servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>3.2.16.RELEASE</version>
</dependency>
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<classifier>jdk15</classifier>
<version>2.4</version>
</dependency>
</dependencies>
3. MessageConvertForSys.java
package com.esteel.chat.mq;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.stereotype.Component;
public class MessageConvertForSys implements MessageConverter {
public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {
System.out.println("sendMessage:" + object.toString());
ObjectMessage objectMessage = session.createObjectMessage();
objectMessage.setStringProperty("key_esteelChat", object.toString());
return objectMessage;
}
public Object fromMessage(Message message) throws JMSException, MessageConversionException {
ObjectMessage objectMessage = (ObjectMessage) message;
return objectMessage.getObjectProperty("key_esteelChat");
}
}
4. MessageSender.java
package com.esteel.chat.mq;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
public class MessageSender {
private JmsTemplate jmsTemplate;
public void sendMessage(String msg) {
jmsTemplate.convertAndSend(msg);
}
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
}
5. 写入mq的代码,在前面已经引入了Json包
JSONObject object = JSONObject.fromObject(tbConOrdVo);
String tempstr = object.toString();
/*加入name信息*/
tempstr = "{\"objectName\":\"TbConOrdVo\",\"ipAddress\":"+EsteelNetworkUtil.getIpAddress(request)+",\"object\":"+tempstr+"}";
/*然后写入activemq*/
topicSender.sendMessage(tempstr);
6. 异步处理数据。
package com.esteel.chat.mq;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.servlet.http.HttpServletRequest;
import org.springframework.beans.factory.annotation.Autowired;
import com.esteel.chat.bean.TbConOrd;
import com.esteel.chat.bean.TbConOrdPrice;
import com.esteel.chat.beanVo.TbConOrdVo;
import com.esteel.chat.service.TbConOrdPriceService;
import com.esteel.chat.service.TbConOrdService;
import com.esteel.chat.until.EsteelNetworkUtil;
import com.esteel.exception.EsteelException;
import net.sf.json.JSONObject;
public class MessageReceiver implements MessageListener {
@Autowired
TbConOrdService tbConOrdService;
@Autowired
TbConOrdPriceService tbConOrdPriceService;
public void onMessage(Message m) {
ObjectMessage om = (ObjectMessage) m;
try {
String key_esteelChat = om.getStringProperty("key_esteelChat");
JSONObject object1 = JSONObject.fromObject(key_esteelChat);
String objectName = (String)object1.get("objectName");
if(objectName.equals("TbConOrdVo")){
JSONObject object2 = (JSONObject) object1.get("object");
TbConOrdVo tbConOrdVo=(TbConOrdVo)JSONObject.toBean(object2, TbConOrdVo.class);
TbConOrd tbConOrd = new TbConOrd();
/* 从提交的表单中提取tbConOrd */
String ipAddress = (String)object1.get("ipAddress");
tbConOrd = copyTbConOrd(tbConOrdVo, tbConOrd, ipAddress);
/* 写入tbConOrd */
tbConOrd = tbConOrdService.insertTbConOrd(tbConOrd);
TbConOrdPrice tbConOrdPrice = new TbConOrdPrice();
tbConOrdPrice = copyTbConOrdPrice(tbConOrd, tbConOrdVo, tbConOrdPrice);
/* 写入聊天文字 */
String msgText = tbConOrdPrice.getMsgText();
if (msgText.equals("请录入您的议价留言,最大为300个字符!按Ctrl+Enter提交!")) {
tbConOrdPrice.setMsgText("");
}
tbConOrdPrice=tbConOrdPriceService.insertTbConOrdPrice(tbConOrdPrice);
}
System.out.println(" ");
} catch (JMSException e) {
e.printStackTrace();
} catch (EsteelException e) {
e.printStackTrace();
}
}
private TbConOrd copyTbConOrd(TbConOrdVo tbConOrdVo, TbConOrd tbConOrd, String ipAddress) {
tbConOrd.setConobjKey(tbConOrdVo.getConobjKey());
/*****.........*****/
return tbConOrd;
}
private TbConOrdPrice copyTbConOrdPrice(TbConOrd tbConOrd, TbConOrdVo tbConOrdVo, TbConOrdPrice tbConOrdPrice) throws EsteelException {
tbConOrdPrice.setOrdKey(tbConOrd.getOrdKey());
/*****.........*****/
tbConOrdPrice.setOrdpriceNo(String.valueOf(System.currentTimeMillis()));
return tbConOrdPrice;
}
}
Over。
本地测试通过。