JMS发布/订阅消息传送例子

简介:

前言

基于上篇文章"基于Tomcat + JNDI + ActiveMQ实现JMS的点对点消息传送"很容易就可以编写一个发布/订阅消息传送例子,相关环境准备与该篇文章基本类似,主要的区别如下。

在Tomcat中配置JNDI

配置连接工厂和话题

复制代码
    <Resource name="topic/connectionFactory" auth="Container"
        type="org.apache.activemq.ActiveMQConnectionFactory" description="JMS Connection Factory"
        factory="org.apache.activemq.jndi.JNDIReferenceFactory"
        brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100&amp;maxReconnectAttempts=5"
        brokerName="LocalActiveMQBroker" useEmbeddedBroker="false" />
        
    <Resource name="topic/topic0" 
        auth="Container"
        type="org.apache.activemq.command.ActiveMQTopic" description="My Topic" factory="org.apache.activemq.jndi.JNDIReferenceFactory"
        physicalName="TestTopic" />
复制代码

在Web工厂中编写代码

新建一个发布者Servlet

复制代码
package pubSub;

import java.io.IOException;
import java.io.PrintWriter;

import javax.naming.InitialContext;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.jms.Topic;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicPublisher;
import javax.jms.DeliveryMode;
import javax.jms.TopicSession;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;

/**
 * Servlet implementation class JMSTest
 */
@WebServlet("/Publish")
public class Publisher extends HttpServlet {
    private static final long serialVersionUID = 1L;

    /**
     * @see HttpServlet#HttpServlet()
     */
    public Publisher() {
        super();
        // TODO Auto-generated constructor stub
    }

    /**
     * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
     *      response)
     */
    protected void doGet(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
        PrintWriter out = response.getWriter();

        try {
            // get the initial context
            InitialContext ctx = new InitialContext();

            // lookup the topic object
            Topic topic = (Topic) ctx.lookup("java:comp/env/topic/topic0");

            // lookup the topic connection factory
            TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx
                    .lookup("java:comp/env/topic/connectionFactory");

            // create a topic connection
            TopicConnection topicConn = connFactory.createTopicConnection();

            // create a topic session
            TopicSession topicSession = topicConn.createTopicSession(false,
                    Session.AUTO_ACKNOWLEDGE);

            // create a topic publisher
            TopicPublisher topicPublisher = topicSession.createPublisher(topic);
            topicPublisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            // create the "Hello World" message
            TextMessage message = topicSession.createTextMessage();
            message.setText("Hello World");

            // publish the messages
            topicPublisher.publish(message);

            // print what we did
            out.write("Message published: " + message.getText());

            // close the topic connection
            topicConn.close();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

    /**
     * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
     *      response)
     */
    protected void doPost(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
        // TODO Auto-generated method stub
    }

}
复制代码

新建一个订阅者Servlet

复制代码
package pubSub;

import java.io.IOException;
import java.io.PrintWriter;

import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.InitialContext;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

/**
 * Servlet implementation class Receive
 */
@WebServlet("/Subscribe")
public class Subscriber extends HttpServlet {
    private static final long serialVersionUID = 1L;

    /**
     * @see HttpServlet#HttpServlet()
     */
    public Subscriber() {
        super();
        // TODO Auto-generated constructor stub
    }

    /**
     * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
     *      response)
     */
    protected void doGet(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
        PrintWriter out = response.getWriter();

        try {
            // get the initial context
            InitialContext ctx = new InitialContext();

            // lookup the topic object
            Topic topic = (Topic) ctx.lookup("java:comp/env/topic/topic0");

            // lookup the topic connection factory
            TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx
                    .lookup("java:comp/env/topic/connectionFactory");

            // create a topic connection
            TopicConnection topicConn = connFactory.createTopicConnection();

            // create a topic session
            TopicSession topicSession = topicConn.createTopicSession(false,
                    Session.AUTO_ACKNOWLEDGE);

            // create a topic subscriber
            TopicSubscriber topicSubscriber = topicSession
                    .createSubscriber(topic);

            // start the connection
            topicConn.start();

            // receive the message
            TextMessage message = (TextMessage) topicSubscriber.receive();

            // print the message
            out.write("Message received: " + message.getText());

            // close the topic connection
            topicConn.close();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    /**
     * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
     *      response)
     */
    protected void doPost(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
        // TODO Auto-generated method stub
    }

}
复制代码

运行Web工程,分别打开多个标签访问订阅servlet,然后访问发布servlet,结果如下:

在订阅者订阅消息的时候,一开始没接收到消息,一旦发布者发布消息后,订阅者马上收到消息。


本文转自风一样的码农博客园博客,原文链接:http://www.cnblogs.com/chenpi/p/5566983.html,如需转载请自行联系原作者

相关文章
|
8月前
|
JavaScript 前端开发 API
第二十九章 使用消息订阅发布实现组件通信
第二十九章 使用消息订阅发布实现组件通信
|
6月前
|
消息中间件 SQL RocketMQ
【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现
【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现
102 1
|
8月前
|
消息中间件
【面试问题】如何确保消息正确地发送至 RabbitMQ? 如何确保消息接收方消费了消息?
【1月更文挑战第27天】【面试问题】如何确保消息正确地发送至 RabbitMQ? 如何确保消息接收方消费了消息?
|
消息中间件
RabbitMQ如何支持事务性消息的发送和接收
RabbitMQ消息的发送和接收
229 0
|
存储
06JMS发布/订阅模式
06JMS发布/订阅模式
49 0
|
编解码 Java 测试技术
消息类型-普通消息|学习笔记
快速学习消息类型-普通消息
186 0
消息类型-普通消息|学习笔记
|
消息中间件 物联网 Linux
Msgrcv 接收消息|学习笔记
快速学习 Msgrcv 接收消息
|
消息中间件 RocketMQ 开发者
测试发送消息和接受消息|学习笔记
快速学习测试发送消息和接受消息
145 0
测试发送消息和接受消息|学习笔记
|
消息中间件
译MassTransit 创建消息消费者
创建消息消费者一个消息消费者是一个 可以消费一个或多个消息类型的类,指定IConsumer接口,T为消息类型 public class UpdateCustomerConsumer : IConsumer { public async Task Consume(ConsumeContext context) { await Console.
1647 0
|
消息中间件 Kafka
kafka模拟客户端发送、接受消息
producer   消息的生成者,即发布消息 consumer   消息的消费者,即订阅消息 broker     Kafka以集群的方式运行,可以由一个或多个服务组成,服务即broker zookeeper  协调转发    一、创建topic .
2182 0