ActiveMQ发布订阅模式

简介: 生产者代码

生产者代码

1.package com.vhukze.Producer;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class ProducerTopic {
    //mq通讯地址
    private static String url = "tcp://127.0.0.1:61616";
    //队列名称
    private static String topicName = "my_topic";
    public static void main(String[] args) throws JMSException {
        //创建连接工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
        //创建连接
        Connection connection = factory.createConnection();
        //启动连接
        connection.start();
        //创建会话,参数1,设置是否需要事务方式提交,参数2,消息方式  默认采用自动接收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建目标(队列)
        Topic topic = session.createTopic(topicName);
        //创建生产者
        MessageProducer producer = session.createProducer(topic);
        for (int i = 0; i <= 10 ; i++) {
            //创建消息
            TextMessage textMessage = session.createTextMessage("消息内容" + i);
            //发送消息
            producer.send(textMessage);
        }
        connection.close();
    }
}

消费者代码

package com.vhukze.consumer;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class ConsumerTopic {
    //mq通讯地址
    private static String url = "tcp://127.0.0.1:61616";
    //队列名称
    private static String topicName = "my_topic";
    public static void main(String[] args) throws JMSException {
        System.out.println("消费者1");
        //创建连接工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
        //创建连接
        Connection connection = factory.createConnection();
        //启动连接
        connection.start();
        //创建会话,参数1,设置是否需要事务方式提交,参数2,消息方式  默认采用自动接收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建目标(队列)
        Topic topic = session.createTopic(topicName);
        //创建消费者
        MessageConsumer consumer = session.createConsumer(topic);
        //启动监听消息
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    TextMessage textMessage = (TextMessage) message;
                    System.out.println("消费消息:"+textMessage.getText());
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        });
    }
}
相关文章
|
8月前
|
消息中间件 存储 网络协议
我们一起来学RabbitMQ 二:RabbiMQ 的 6 种模式的基本应用
我们一起来学RabbitMQ 二:RabbiMQ 的 6 种模式的基本应用
|
2月前
|
消息中间件 存储 数据安全/隐私保护
深入学习RabbitMQ五种模式(一)
深入学习RabbitMQ五种模式(一)
48 0
|
8月前
|
消息中间件 Java Maven
消息中间件系列教程(03) -ActiveMQ -点对点&发布订阅模式
消息中间件系列教程(03) -ActiveMQ -点对点&发布订阅模式
60 0
|
8月前
|
消息中间件 存储 Java
消息中间件系列教程(09) -RabbitMQ -案例代码(发布订阅模式)
消息中间件系列教程(09) -RabbitMQ -案例代码(发布订阅模式)
40 0
|
2月前
|
消息中间件 存储 Java
RabbitMQ是如何实现消息传递的?
RabbitMQ是如何实现消息传递的?
62 0
|
2月前
|
消息中间件 存储
深入学习RabbitMQ五种模式(二)
深入学习RabbitMQ五种模式(二)
33 0
|
9月前
4 # 发布订阅模式
4 # 发布订阅模式
29 0
|
12月前
|
消息中间件 负载均衡 Kafka
Kafka如何实现点对点消息和发布订阅消息?
Kafka 可以同时支持点对点消息和发布订阅消息模型
631 0
|
消息中间件 数据库
【消息中间件】RabbitMQ的工作模式
【消息中间件】RabbitMQ的工作模式
|
消息中间件
RabbitMQ简单生产者消费者模式
RabbitMQ简单生产者消费者模式