前言
基于上篇文章"基于Tomcat + JNDI + ActiveMQ实现JMS的点对点消息传送"很容易就可以编写一个发布/订阅消息传送例子,相关环境准备与该篇文章基本类似,主要的区别如下。
在Tomcat中配置JNDI
配置连接工厂和话题
<Resource name="topic/connectionFactory" auth="Container" type="org.apache.activemq.ActiveMQConnectionFactory" description="JMS Connection Factory" factory="org.apache.activemq.jndi.JNDIReferenceFactory" brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100&maxReconnectAttempts=5" brokerName="LocalActiveMQBroker" useEmbeddedBroker="false" /> <Resource name="topic/topic0" auth="Container" type="org.apache.activemq.command.ActiveMQTopic" description="My Topic" factory="org.apache.activemq.jndi.JNDIReferenceFactory" physicalName="TestTopic" />
在Web工厂中编写代码
新建一个发布者Servlet
package pubSub; import java.io.IOException; import java.io.PrintWriter; import javax.naming.InitialContext; import javax.servlet.ServletException; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.jms.Topic; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.TopicPublisher; import javax.jms.DeliveryMode; import javax.jms.TopicSession; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; /** * Servlet implementation class JMSTest */ @WebServlet("/Publish") public class Publisher extends HttpServlet { private static final long serialVersionUID = 1L; /** * @see HttpServlet#HttpServlet() */ public Publisher() { super(); // TODO Auto-generated constructor stub } /** * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse * response) */ protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { PrintWriter out = response.getWriter(); try { // get the initial context InitialContext ctx = new InitialContext(); // lookup the topic object Topic topic = (Topic) ctx.lookup("java:comp/env/topic/topic0"); // lookup the topic connection factory TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx .lookup("java:comp/env/topic/connectionFactory"); // create a topic connection TopicConnection topicConn = connFactory.createTopicConnection(); // create a topic session TopicSession topicSession = topicConn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); // create a topic publisher TopicPublisher topicPublisher = topicSession.createPublisher(topic); topicPublisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // create the "Hello World" message TextMessage message = topicSession.createTextMessage(); message.setText("Hello World"); // publish the messages topicPublisher.publish(message); // print what we did out.write("Message published: " + message.getText()); // close the topic connection topicConn.close(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse * response) */ protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { // TODO Auto-generated method stub } }
新建一个订阅者Servlet
package pubSub; import java.io.IOException; import java.io.PrintWriter; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import javax.naming.InitialContext; import javax.servlet.ServletException; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; /** * Servlet implementation class Receive */ @WebServlet("/Subscribe") public class Subscriber extends HttpServlet { private static final long serialVersionUID = 1L; /** * @see HttpServlet#HttpServlet() */ public Subscriber() { super(); // TODO Auto-generated constructor stub } /** * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse * response) */ protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { PrintWriter out = response.getWriter(); try { // get the initial context InitialContext ctx = new InitialContext(); // lookup the topic object Topic topic = (Topic) ctx.lookup("java:comp/env/topic/topic0"); // lookup the topic connection factory TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx .lookup("java:comp/env/topic/connectionFactory"); // create a topic connection TopicConnection topicConn = connFactory.createTopicConnection(); // create a topic session TopicSession topicSession = topicConn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); // create a topic subscriber TopicSubscriber topicSubscriber = topicSession .createSubscriber(topic); // start the connection topicConn.start(); // receive the message TextMessage message = (TextMessage) topicSubscriber.receive(); // print the message out.write("Message received: " + message.getText()); // close the topic connection topicConn.close(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse * response) */ protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { // TODO Auto-generated method stub } }
运行Web工程,分别打开多个标签访问订阅servlet,然后访问发布servlet,结果如下:
在订阅者订阅消息的时候,一开始没接收到消息,一旦发布者发布消息后,订阅者马上收到消息。
本文转自风一样的码农博客园博客,原文链接:http://www.cnblogs.com/chenpi/p/5566983.html,如需转载请自行联系原作者