ActiveMQ系列:ActiveMQ标准API结合JavaSE实战

简介: 订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,当消息到达之后,系统自动调用监听器MessageListener的onMessage(Message message)方法。

ActiveMQ系列:ActiveMQ标准API结合JavaSE实战


上篇初步认识了ActiveMQ,有兴趣的可以移步:https://blog.csdn.net/qq_26975307/article/details/98875098,此篇不多逼逼,上手原生撸码(类比JDBC连接数据库)


1、JMS开发的基本步骤


20190809171808230.png


2、两种消费方式


2.1、同步阻塞方式(receive())


订阅者或接收者调用MessageConsumer的receive()方法来接收消息,receive方法在能够接收到消息之前(或超时之、前)将一直阻塞。


2.2、异步非阻塞方式(监听器onMessage())


订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,当消息到达之后,系统自动调用监听器MessageListener的onMessage(Message message)方法。


3、点对点的消息传递域


3.1、特点


(1)每个消息只能有一个消费者,类似1对1的关系。好比个人快递自己领取自己的。


(2)消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,消费都可

以提取消息。好比我们的发送短信,发送者发送后不见得接收者会即收即看。


(3)消息被消费后队列中不会再存储,所以消费者不会消费到己经被消费掉的消息。


20190809172140262.png


JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活

状态时发送的消息。


20190809172232653.png


4、两大模式的比较


20190809172316319.png


5、ActiveMQ在JavaSE中最简单的实现 —— 队列


5.1、创建生产者


package com.phubing;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 *
 */
public class JmsProduce_Queue {
    public static final String ACTIVEMQ_URL =  "tcp://192.168.177.130:61616";
    public static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws Exception{
        //1、创建链接工厂,按照给定的URL地址,采用默认用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2、通过链接工厂,获得链接connection并启动
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3、创建session会话
        /**
         * 两个参数: 1、事务;2、签收
         *
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4、创建目的地(具体是队列,还是Topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5、创建消息的生产者
        MessageProducer producer = session.createProducer(queue);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        //6、通过使用消息生产者生产消息并发送到MQ的队列中
        for(int i=1;i<=3;i++){
            //7、创建消息
            //可以理解为最简单的字符串
            TextMessage textMessage = session.createTextMessage("msg-" + i);
            /**
             * String消息设置属性
            textMessage.setStringProperty("c01", "vip");
             */
            /**
             * 在每一条消息发送之前,可以设置消息的请求头属性
            MapMessage mapMessage = session.createMapMessage();
            mapMessage.setString("map-k1", "map-v1");
            producer.send(mapMessage);
             */
            //8、通过消息生产者发送给MQ
            producer.send(textMessage);
        }
        //9、关闭资源
        producer.close();
        session.close();
        connection.close();
        System.out.println("消息发送到MQ完成********");
    }
}


5.2、创建消费者


package com.phubing;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.jms.JmsException;
import javax.jms.*;
/**
 * @ClassName JmsConsumer_Queue
 * @Description TODO
 * @Author phubing
 * @Date 2019-07-24 21:42
 * @Version 1.0
 **/
public class JmsConsumer_Queue {
    public static final String ACTIVEMQ_URL =  "tcp://192.168.177.130:61616";
    public static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws Exception {
        //1、创建链接工厂,按照给定的URL地址,采用默认用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2、通过链接工厂,获得链接connection并启动
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3、创建session会话
        /**
         * 两个参数: 1、事务;2、签收
         *
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4、创建目的地(具体是队列,还是Topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5、创建消费者
        MessageConsumer consumer = session.createConsumer(queue);
        /**
         * 通过监听方式来消费消息
         *
         */
        //如果是接口,则可以直接new ,使用其匿名内部类
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if(null != message && message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("监听到的消息:"+textMessage.getText());
                        System.out.println("监听到的属性消息:"+textMessage.getStringProperty("c01"));
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }else{
                }
                /*
                if(null != message && message instanceof MapMessage){
                    MapMessage mapMessage = (MapMessage) message;
                    try {
                        System.out.println("监听到的Map消息:"+mapMessage.getString("map-k1"));
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }else{
                }
                */
            }
        });
        //press any key to exit(保证控制台不关闭,直到按下任一按键)
        System.in.read();
        /*
        * 同步阻塞方式
        * 订阅者或接受者调用MessageConsumer的receive方法来接收消息,receive方法能在接收到消息之前(或超时之前)将一直阻塞
        *
        //6、接收消息
        while(true){
            //若consumer.receive()不指定超时时间,则会一直等待直到有消息进来
            TextMessage message = (TextMessage) consumer.receive(3000L);
            if(null != message){
                System.out.println("消费者接收到消息:"+message.getText());
            }else {
                break;
            }
        }
        */
        consumer.close();
        session.close();
        connection.close();
    }
}


启动并在浏览器和IDEA控制台查看消息的发送与消费情况


20190809172851511.png


5.4、几个问题


先生产,启动1号消费者,再启动2号消费者,问题:2号消费者还能消费吗?

1号消费者不能,2号消费者不能,因为消息已被消费


先生产,只启动1号消费者,问题:1号消费者能消费吗?


先启动2个消费者,再生产,问题:消费情况如何?

两个消费者等候,生产者发出来的消息假设是同一队列,则平均分配


MQ挂了,那么消息的持久化和丢失情况分别如何?

看是否设置消息持久化(接下来会有写到)


消息默认的持久化模式?

默认使用持久化


6、ActiveMQ在JavaSE中最简单的实现 —— 主题


6.1、创建生产者


package com.phubing;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * @ClassName JmsProduce_Topic
 * @Description TODO
 * @Author phubing
 * @Date 2019-07-31 20:56
 * @Version 1.0
 **/
public class JmsProduce_Topic {
    public static final String ACTIVEMQ_URL =  "tcp://192.168.177.130:61616";
    public static final String TOPIC_NAME = "topic-phubing";
    public static void main(String[] args) throws Exception{
        //1、创建链接工厂,按照给定的URL地址,采用默认用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2、通过链接工厂,获得链接connection并启动
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3、创建session会话
        /**
         * 两个参数: 1、事务;2、签收
         *
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4、创建目的地(具体是队列,还是Topic)
        Topic topic = session.createTopic(TOPIC_NAME);
        //5、创建消息的生产者
        MessageProducer producer = session.createProducer(topic);
        //6、通过使用消息生产者生产消息并发送到MQ的队列中
        for(int i=1;i<3;i++){
            //7、创建消息
            //可以理解为最简单的字符串
            TextMessage textMessage = session.createTextMessage("topic-name-" + i);
            //8、通过消息生产者发送给MQ
            producer.send(textMessage);
        }
        //9、关闭资源
        producer.close();
        session.close();
        connection.close();
        System.out.println("TOPIC消息发送到MQ完成********");
    }
}


6.2、创建消费者


package com.phubing;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * @ClassName JmsConsumer_Topic
 * @Description TODO
 * @Author phubing
 * @Date 2019-07-31 20:56
 * @Version 1.0
 **/
public class JmsConsumer_Topic {
    public static final String ACTIVEMQ_URL =  "tcp://192.168.177.130:61616";
    public static final String TOPIC_NAME = "topic-phubing";
    public static void main(String[] args) throws Exception{
        //1、创建链接工厂,按照给定的URL地址,采用默认用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2、通过链接工厂,获得链接connection并启动
        Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
        //3、创建session会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4、创建目的地(具体是队列,还是Topic)
        Topic topic = session.createTopic(TOPIC_NAME);
        //5、创建消费者
        MessageConsumer consumer = session.createConsumer(topic);
        //6、如果是接口,则可以直接new ,使用其匿名内部类
        consumer.setMessageListener((message) -> {
            if(null != message && message instanceof TextMessage){
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("监听到的Topic消息:"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }else{}
        });
        System.in.read();
        consumer.close();
        session.close();
        connection.close();
    }
}


切记,先启动消费者,再启动生产者(非要先生产再消费,你也可以看看什么现象)


6.3、几个问题


3个订阅者同时开启,再开启生产者,问:3个订阅者的接收消息情况如何?

每个订阅者都有生产者生产的所有消息,进队列是生产者生产的数量,出队列的是消费者数量*生产者生产的消息数量

(谁订阅谁收到,不订阅不打扰)


先启动生产者,再启动订阅者,现象如何?

不会收到之前生产的消息,进队列的是生产者生产的数量,出队列数量无(原来的消息已成为废消息)


未完待续........(下篇详细讲解下JMS)

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
8天前
|
安全 API 数据安全/隐私保护
速卖通AliExpress商品详情API接口深度解析与实战应用
速卖通(AliExpress)作为全球化电商的重要平台,提供了丰富的商品资源和便捷的购物体验。为了提升用户体验和优化商品管理,速卖通开放了API接口,其中商品详情API尤为关键。本文介绍如何获取API密钥、调用商品详情API接口,并处理API响应数据,帮助开发者和商家高效利用这些工具。通过合理规划API调用策略和确保合法合规使用,开发者可以更好地获取商品信息,优化管理和营销策略。
|
5月前
|
编译器 API Android开发
Android经典实战之Kotlin Multiplatform 中,如何处理不同平台的 API 调用
本文介绍Kotlin Multiplatform (KMP) 中使用 `expect` 和 `actual` 关键字处理多平台API调用的方法。通过共通代码集定义预期API,各平台提供具体实现,编译器确保正确匹配,支持依赖注入、枚举类处理等,实现跨平台代码重用与原生性能。附带示例展示如何定义跨平台函数与类。
144 0
|
2月前
|
JSON BI API
商城上货API接口的实战案例
在商城上货过程中,API接口扮演着至关重要的角色。以下是对商城上货API接口的实战分析,涵盖其主要功能、类型、安全性以及实战案例等方面。
|
2月前
|
XML 数据可视化 API
商品详情数据实战案例,API接口系列
淘宝商品详情数据在电商领域具有广泛的应用价值,而淘宝商品详情API接口则为开发者提供了获取这些数据的重要途径。通过合理利用这些接口和数据,可以提升业务效率、优化用户体验,为电商行业的发展注入新的活力。
|
2月前
|
前端开发 API 开发者
Python Web开发者必看!AJAX、Fetch API实战技巧,让前后端交互如丝般顺滑!
在Web开发中,前后端的高效交互是提升用户体验的关键。本文通过一个基于Flask框架的博客系统实战案例,详细介绍了如何使用AJAX和Fetch API实现不刷新页面查看评论的功能。从后端路由设置到前端请求处理,全面展示了这两种技术的应用技巧,帮助Python Web开发者提升项目质量和开发效率。
58 1
|
2月前
|
存储 JSON API
淘宝API接口实战:高效获取商品标题、分类及店铺名称
在淘宝API接口实战中,通过以下步骤高效获取商品标题、分类及店铺名称:1. 准备工作:了解淘宝开放平台文档,注册开发者账号,选择开发语言和工具。2. 获取API访问权限:申请相应权限,提供应用场景说明。3. 调用API接口:构建HTTP请求,提供必要参数。4. 解析响应数据:提取JSON数据中的所需信息。5. 数据处理和存储:进一步处理并存储数据。6. 注意事项:遵守使用规范,注意调用频率和数据安全。示例代码使用Python调用淘宝API。
|
2月前
|
JSON API 开发者
微店(Weidian)商品详情API接口解析实战
微店(Weidian)是一个基于社交关系的电商平台,为商家提供了一整套的电商解决方案。微店API接口允许开发者通过编程方式访问和操作微店平台上的数据,从而可以创建自动化的工具、应用或集成服务。
|
3月前
|
前端开发 API
Context API 实战应用
【10月更文挑战第8天】在 React 应用开发中,状态管理至关重要。本文介绍了 `Context API` 的基础概念、基本用法,以及常见问题和易错点的解决方法。通过代码示例,详细讲解了如何在组件间高效共享状态,优化性能,处理嵌套 Context 和副作用。
31 1
|
4月前
|
Rust API Go
API 网关 OpenID Connect 实战:单点登录(SSO)如此简单
单点登录(SSO)可解决用户在多系统间频繁登录的问题,OIDC 因其标准化、简单易用及安全性等优势成为实现 SSO 的优选方案,本文通过具体步骤示例对 Higress 中开源的 OIDC Wasm 插件进行了介绍,帮助用户零代码实现 SSO 单点登录。
526 11