ActiveMQ系列:ActiveMQ标准API结合JavaSE实战

简介: 订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,当消息到达之后,系统自动调用监听器MessageListener的onMessage(Message message)方法。

ActiveMQ系列:ActiveMQ标准API结合JavaSE实战


上篇初步认识了ActiveMQ,有兴趣的可以移步:https://blog.csdn.net/qq_26975307/article/details/98875098,此篇不多逼逼,上手原生撸码(类比JDBC连接数据库)


1、JMS开发的基本步骤


20190809171808230.png


2、两种消费方式


2.1、同步阻塞方式(receive())


订阅者或接收者调用MessageConsumer的receive()方法来接收消息,receive方法在能够接收到消息之前(或超时之、前)将一直阻塞。


2.2、异步非阻塞方式(监听器onMessage())


订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,当消息到达之后,系统自动调用监听器MessageListener的onMessage(Message message)方法。


3、点对点的消息传递域


3.1、特点


(1)每个消息只能有一个消费者,类似1对1的关系。好比个人快递自己领取自己的。


(2)消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,消费都可

以提取消息。好比我们的发送短信,发送者发送后不见得接收者会即收即看。


(3)消息被消费后队列中不会再存储,所以消费者不会消费到己经被消费掉的消息。


20190809172140262.png


JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活

状态时发送的消息。


20190809172232653.png


4、两大模式的比较


20190809172316319.png


5、ActiveMQ在JavaSE中最简单的实现 —— 队列


5.1、创建生产者


package com.phubing;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 *
 */
public class JmsProduce_Queue {
    public static final String ACTIVEMQ_URL =  "tcp://192.168.177.130:61616";
    public static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws Exception{
        //1、创建链接工厂,按照给定的URL地址,采用默认用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2、通过链接工厂,获得链接connection并启动
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3、创建session会话
        /**
         * 两个参数: 1、事务;2、签收
         *
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4、创建目的地(具体是队列,还是Topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5、创建消息的生产者
        MessageProducer producer = session.createProducer(queue);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        //6、通过使用消息生产者生产消息并发送到MQ的队列中
        for(int i=1;i<=3;i++){
            //7、创建消息
            //可以理解为最简单的字符串
            TextMessage textMessage = session.createTextMessage("msg-" + i);
            /**
             * String消息设置属性
            textMessage.setStringProperty("c01", "vip");
             */
            /**
             * 在每一条消息发送之前,可以设置消息的请求头属性
            MapMessage mapMessage = session.createMapMessage();
            mapMessage.setString("map-k1", "map-v1");
            producer.send(mapMessage);
             */
            //8、通过消息生产者发送给MQ
            producer.send(textMessage);
        }
        //9、关闭资源
        producer.close();
        session.close();
        connection.close();
        System.out.println("消息发送到MQ完成********");
    }
}


5.2、创建消费者


package com.phubing;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.jms.JmsException;
import javax.jms.*;
/**
 * @ClassName JmsConsumer_Queue
 * @Description TODO
 * @Author phubing
 * @Date 2019-07-24 21:42
 * @Version 1.0
 **/
public class JmsConsumer_Queue {
    public static final String ACTIVEMQ_URL =  "tcp://192.168.177.130:61616";
    public static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws Exception {
        //1、创建链接工厂,按照给定的URL地址,采用默认用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2、通过链接工厂,获得链接connection并启动
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3、创建session会话
        /**
         * 两个参数: 1、事务;2、签收
         *
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4、创建目的地(具体是队列,还是Topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5、创建消费者
        MessageConsumer consumer = session.createConsumer(queue);
        /**
         * 通过监听方式来消费消息
         *
         */
        //如果是接口,则可以直接new ,使用其匿名内部类
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if(null != message && message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("监听到的消息:"+textMessage.getText());
                        System.out.println("监听到的属性消息:"+textMessage.getStringProperty("c01"));
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }else{
                }
                /*
                if(null != message && message instanceof MapMessage){
                    MapMessage mapMessage = (MapMessage) message;
                    try {
                        System.out.println("监听到的Map消息:"+mapMessage.getString("map-k1"));
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }else{
                }
                */
            }
        });
        //press any key to exit(保证控制台不关闭,直到按下任一按键)
        System.in.read();
        /*
        * 同步阻塞方式
        * 订阅者或接受者调用MessageConsumer的receive方法来接收消息,receive方法能在接收到消息之前(或超时之前)将一直阻塞
        *
        //6、接收消息
        while(true){
            //若consumer.receive()不指定超时时间,则会一直等待直到有消息进来
            TextMessage message = (TextMessage) consumer.receive(3000L);
            if(null != message){
                System.out.println("消费者接收到消息:"+message.getText());
            }else {
                break;
            }
        }
        */
        consumer.close();
        session.close();
        connection.close();
    }
}


启动并在浏览器和IDEA控制台查看消息的发送与消费情况


20190809172851511.png


5.4、几个问题


先生产,启动1号消费者,再启动2号消费者,问题:2号消费者还能消费吗?

1号消费者不能,2号消费者不能,因为消息已被消费


先生产,只启动1号消费者,问题:1号消费者能消费吗?


先启动2个消费者,再生产,问题:消费情况如何?

两个消费者等候,生产者发出来的消息假设是同一队列,则平均分配


MQ挂了,那么消息的持久化和丢失情况分别如何?

看是否设置消息持久化(接下来会有写到)


消息默认的持久化模式?

默认使用持久化


6、ActiveMQ在JavaSE中最简单的实现 —— 主题


6.1、创建生产者


package com.phubing;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * @ClassName JmsProduce_Topic
 * @Description TODO
 * @Author phubing
 * @Date 2019-07-31 20:56
 * @Version 1.0
 **/
public class JmsProduce_Topic {
    public static final String ACTIVEMQ_URL =  "tcp://192.168.177.130:61616";
    public static final String TOPIC_NAME = "topic-phubing";
    public static void main(String[] args) throws Exception{
        //1、创建链接工厂,按照给定的URL地址,采用默认用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2、通过链接工厂,获得链接connection并启动
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3、创建session会话
        /**
         * 两个参数: 1、事务;2、签收
         *
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4、创建目的地(具体是队列,还是Topic)
        Topic topic = session.createTopic(TOPIC_NAME);
        //5、创建消息的生产者
        MessageProducer producer = session.createProducer(topic);
        //6、通过使用消息生产者生产消息并发送到MQ的队列中
        for(int i=1;i<3;i++){
            //7、创建消息
            //可以理解为最简单的字符串
            TextMessage textMessage = session.createTextMessage("topic-name-" + i);
            //8、通过消息生产者发送给MQ
            producer.send(textMessage);
        }
        //9、关闭资源
        producer.close();
        session.close();
        connection.close();
        System.out.println("TOPIC消息发送到MQ完成********");
    }
}


6.2、创建消费者


package com.phubing;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * @ClassName JmsConsumer_Topic
 * @Description TODO
 * @Author phubing
 * @Date 2019-07-31 20:56
 * @Version 1.0
 **/
public class JmsConsumer_Topic {
    public static final String ACTIVEMQ_URL =  "tcp://192.168.177.130:61616";
    public static final String TOPIC_NAME = "topic-phubing";
    public static void main(String[] args) throws Exception{
        //1、创建链接工厂,按照给定的URL地址,采用默认用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2、通过链接工厂,获得链接connection并启动
        Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
        //3、创建session会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4、创建目的地(具体是队列,还是Topic)
        Topic topic = session.createTopic(TOPIC_NAME);
        //5、创建消费者
        MessageConsumer consumer = session.createConsumer(topic);
        //6、如果是接口,则可以直接new ,使用其匿名内部类
        consumer.setMessageListener((message) -> {
            if(null != message && message instanceof TextMessage){
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("监听到的Topic消息:"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }else{}
        });
        System.in.read();
        consumer.close();
        session.close();
        connection.close();
    }
}


切记,先启动消费者,再启动生产者(非要先生产再消费,你也可以看看什么现象)


6.3、几个问题


3个订阅者同时开启,再开启生产者,问:3个订阅者的接收消息情况如何?

每个订阅者都有生产者生产的所有消息,进队列是生产者生产的数量,出队列的是消费者数量*生产者生产的消息数量

(谁订阅谁收到,不订阅不打扰)


先启动生产者,再启动订阅者,现象如何?

不会收到之前生产的消息,进队列的是生产者生产的数量,出队列数量无(原来的消息已成为废消息)


未完待续........(下篇详细讲解下JMS)

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
编译器 API Android开发
Android经典实战之Kotlin Multiplatform 中,如何处理不同平台的 API 调用
本文介绍Kotlin Multiplatform (KMP) 中使用 `expect` 和 `actual` 关键字处理多平台API调用的方法。通过共通代码集定义预期API,各平台提供具体实现,编译器确保正确匹配,支持依赖注入、枚举类处理等,实现跨平台代码重用与原生性能。附带示例展示如何定义跨平台函数与类。
70 0
|
11天前
|
存储 前端开发 API
告别繁琐,拥抱简洁!Python RESTful API 设计实战,让 API 调用如丝般顺滑!
在 Web 开发的旅程中,设计一个高效、简洁且易于使用的 RESTful API 是至关重要的。今天,我想和大家分享一次我在 Python 中进行 RESTful API 设计的实战经历,希望能给大家带来一些启发。
26 3
|
29天前
|
Rust API Go
API 网关 OpenID Connect 实战:单点登录(SSO)如此简单
单点登录(SSO)可解决用户在多系统间频繁登录的问题,OIDC 因其标准化、简单易用及安全性等优势成为实现 SSO 的优选方案,本文通过具体步骤示例对 Higress 中开源的 OIDC Wasm 插件进行了介绍,帮助用户零代码实现 SSO 单点登录。
|
2月前
|
JSON 数据管理 关系型数据库
【Dataphin V3.9】颠覆你的数据管理体验!API数据源接入与集成优化,如何让企业轻松驾驭海量异构数据,实现数据价值最大化?全面解析、实战案例、专业指导,带你解锁数据整合新技能!
【8月更文挑战第15天】随着大数据技术的发展,企业对数据处理的需求不断增长。Dataphin V3.9 版本提供更灵活的数据源接入和高效 API 集成能力,支持 MySQL、Oracle、Hive 等多种数据源,增强 RESTful 和 SOAP API 支持,简化外部数据服务集成。例如,可轻松从 RESTful API 获取销售数据并存储分析。此外,Dataphin V3.9 还提供数据同步工具和丰富的数据治理功能,确保数据质量和一致性,助力企业最大化数据价值。
115 1
|
2月前
|
Java 缓存 数据库连接
揭秘!Struts 2性能翻倍的秘诀:不可思议的优化技巧大公开
【8月更文挑战第31天】《Struts 2性能优化技巧》介绍了提升Struts 2 Web应用响应速度的关键策略,包括减少配置开销、优化Action处理、合理使用拦截器、精简标签库使用、改进数据访问方式、利用缓存机制以及浏览器与网络层面的优化。通过实施这些技巧,如懒加载配置、异步请求处理、高效数据库连接管理和启用GZIP压缩等,可显著提高应用性能,为用户提供更快的体验。性能优化需根据实际场景持续调整。
49 0
|
2月前
|
开发者
告别繁琐代码,JSF标签库带你走进高效开发的新时代!
【8月更文挑战第31天】JSF(JavaServer Faces)标准标签库为页面开发提供了大量组件标签,如`&lt;h:inputText&gt;`、`&lt;h:dataTable&gt;`等,简化代码、提升效率并确保稳定性。本文通过示例展示如何使用这些标签实现常见功能,如创建登录表单和展示数据列表,帮助开发者更高效地进行Web应用开发。
32 0
|
2月前
|
前端开发 API 开发者
【React状态管理新思路】Context API入门:从零开始摆脱props钻孔的优雅之道,全面解析与实战案例分享!
【8月更文挑战第31天】React 的 Context API 有效解决了多级组件间状态传递的 &quot;props 钻孔&quot; 问题,使代码更简洁、易维护。本文通过电子商务网站登录状态管理案例,详细介绍了 Context API 的使用方法,包括创建、提供及消费 Context,以及处理多个 Context 的场景,适合各水平开发者学习与应用,提高开发效率和代码质量。
25 0
|
2月前
|
JSON API 数据库
探索FastAPI:不仅仅是一个Python Web框架,更是助力开发者高效构建现代化RESTful API服务的神器——从环境搭建到CRUD应用实战全面解析
【8月更文挑战第31天】FastAPI 是一个基于 Python 3.6+ 类型提示标准的现代 Web 框架,以其高性能、易用性和现代化设计而备受青睐。本文通过示例介绍了 FastAPI 的优势及其在构建高效 Web 应用中的强大功能。首先,通过安装 FastAPI 和 Uvicorn 并创建简单的“Hello, World!”应用入门;接着展示了如何处理路径参数和查询参数,并利用类型提示进行数据验证和转换。
42 0
|
2月前
|
缓存 API 数据库
打造高性能后端API:从设计到部署的实战之旅
【8月更文挑战第31天】在数字化时代的浪潮中,后端API成为了连接用户、数据与服务的桥梁。本文将带领读者踏上一段从API设计、开发到部署的旅程,通过实际案例分析,揭示如何构建一个高性能的后端系统。我们将探讨现代后端架构的关键要素,包括RESTful API设计原则、数据库优化技巧、缓存策略、以及容器化部署的实践。文章旨在为开发者提供一套实用的方法论,帮助他们在面对复杂业务需求时,能够设计出既高效又可扩展的后端服务。
下一篇
无影云桌面