ActiveMQ发布订阅模式

简介: 生产者代码

生产者代码

1.package com.vhukze.Producer;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class ProducerTopic {
    //mq通讯地址
    private static String url = "tcp://127.0.0.1:61616";
    //队列名称
    private static String topicName = "my_topic";
    public static void main(String[] args) throws JMSException {
        //创建连接工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
        //创建连接
        Connection connection = factory.createConnection();
        //启动连接
        connection.start();
        //创建会话,参数1,设置是否需要事务方式提交,参数2,消息方式  默认采用自动接收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建目标(队列)
        Topic topic = session.createTopic(topicName);
        //创建生产者
        MessageProducer producer = session.createProducer(topic);
        for (int i = 0; i <= 10 ; i++) {
            //创建消息
            TextMessage textMessage = session.createTextMessage("消息内容" + i);
            //发送消息
            producer.send(textMessage);
        }
        connection.close();
    }
}

消费者代码

package com.vhukze.consumer;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class ConsumerTopic {
    //mq通讯地址
    private static String url = "tcp://127.0.0.1:61616";
    //队列名称
    private static String topicName = "my_topic";
    public static void main(String[] args) throws JMSException {
        System.out.println("消费者1");
        //创建连接工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
        //创建连接
        Connection connection = factory.createConnection();
        //启动连接
        connection.start();
        //创建会话,参数1,设置是否需要事务方式提交,参数2,消息方式  默认采用自动接收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建目标(队列)
        Topic topic = session.createTopic(topicName);
        //创建消费者
        MessageConsumer consumer = session.createConsumer(topic);
        //启动监听消息
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    TextMessage textMessage = (TextMessage) message;
                    System.out.println("消费消息:"+textMessage.getText());
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        });
    }
}
相关文章
|
12月前
|
消息中间件 存储 网络协议
我们一起来学RabbitMQ 二:RabbiMQ 的 6 种模式的基本应用
我们一起来学RabbitMQ 二:RabbiMQ 的 6 种模式的基本应用
|
消息中间件 Java Maven
消息中间件系列教程(03) -ActiveMQ -点对点&发布订阅模式
消息中间件系列教程(03) -ActiveMQ -点对点&发布订阅模式
95 0
|
消息中间件 存储 Java
消息中间件系列教程(09) -RabbitMQ -案例代码(发布订阅模式)
消息中间件系列教程(09) -RabbitMQ -案例代码(发布订阅模式)
62 0
|
6月前
|
消息中间件 存储 Java
RabbitMQ是如何实现消息传递的?
RabbitMQ是如何实现消息传递的?
110 0
|
6月前
|
消息中间件 Java
RabbitMQ中的消息发布-订阅模式是什么?如何实现?
RabbitMQ中的消息发布-订阅模式是什么?如何实现?
158 0
4 # 发布订阅模式
4 # 发布订阅模式
52 0
|
消息中间件 负载均衡 Kafka
Kafka如何实现点对点消息和发布订阅消息?
Kafka 可以同时支持点对点消息和发布订阅消息模型
950 0
|
消息中间件 数据库
【消息中间件】RabbitMQ的工作模式
【消息中间件】RabbitMQ的工作模式
|
消息中间件 设计模式 网络协议
RabbitMQ初识以及简单模式初步
MQ是Message Queue的缩写,也就是消息队列。 MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息。 消息队列这种技术主要用在分布式设计当中,其实可以说是一种设计模式。是相对同步系统而言的。同步系统是什么呢? 同步,是当所有的操作都完毕,才会返回结果,比如用户支付,如果是同步的话就是当用户所发起的支付操作,然后只有等支付的这个业务成功然后才给用户返回结果说支付
|
消息中间件 API
10、RabbitMQ教程-消息的两种消费模式
10、RabbitMQ教程-消息的两种消费模式
519 0