ActiveMQ系列:ActiveMQ标准API结合JavaSE实战

简介: 订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,当消息到达之后,系统自动调用监听器MessageListener的onMessage(Message message)方法。

ActiveMQ系列:ActiveMQ标准API结合JavaSE实战


上篇初步认识了ActiveMQ,有兴趣的可以移步:https://blog.csdn.net/qq_26975307/article/details/98875098,此篇不多逼逼,上手原生撸码(类比JDBC连接数据库)


1、JMS开发的基本步骤


20190809171808230.png


2、两种消费方式


2.1、同步阻塞方式(receive())


订阅者或接收者调用MessageConsumer的receive()方法来接收消息,receive方法在能够接收到消息之前(或超时之、前)将一直阻塞。


2.2、异步非阻塞方式(监听器onMessage())


订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,当消息到达之后,系统自动调用监听器MessageListener的onMessage(Message message)方法。


3、点对点的消息传递域


3.1、特点


(1)每个消息只能有一个消费者,类似1对1的关系。好比个人快递自己领取自己的。


(2)消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,消费都可

以提取消息。好比我们的发送短信,发送者发送后不见得接收者会即收即看。


(3)消息被消费后队列中不会再存储,所以消费者不会消费到己经被消费掉的消息。


20190809172140262.png


JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活

状态时发送的消息。


20190809172232653.png


4、两大模式的比较


20190809172316319.png


5、ActiveMQ在JavaSE中最简单的实现 —— 队列


5.1、创建生产者


package com.phubing;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 *
 */
public class JmsProduce_Queue {
    public static final String ACTIVEMQ_URL =  "tcp://192.168.177.130:61616";
    public static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws Exception{
        //1、创建链接工厂,按照给定的URL地址,采用默认用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2、通过链接工厂,获得链接connection并启动
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3、创建session会话
        /**
         * 两个参数: 1、事务;2、签收
         *
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4、创建目的地(具体是队列,还是Topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5、创建消息的生产者
        MessageProducer producer = session.createProducer(queue);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        //6、通过使用消息生产者生产消息并发送到MQ的队列中
        for(int i=1;i<=3;i++){
            //7、创建消息
            //可以理解为最简单的字符串
            TextMessage textMessage = session.createTextMessage("msg-" + i);
            /**
             * String消息设置属性
            textMessage.setStringProperty("c01", "vip");
             */
            /**
             * 在每一条消息发送之前,可以设置消息的请求头属性
            MapMessage mapMessage = session.createMapMessage();
            mapMessage.setString("map-k1", "map-v1");
            producer.send(mapMessage);
             */
            //8、通过消息生产者发送给MQ
            producer.send(textMessage);
        }
        //9、关闭资源
        producer.close();
        session.close();
        connection.close();
        System.out.println("消息发送到MQ完成********");
    }
}


5.2、创建消费者


package com.phubing;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.jms.JmsException;
import javax.jms.*;
/**
 * @ClassName JmsConsumer_Queue
 * @Description TODO
 * @Author phubing
 * @Date 2019-07-24 21:42
 * @Version 1.0
 **/
public class JmsConsumer_Queue {
    public static final String ACTIVEMQ_URL =  "tcp://192.168.177.130:61616";
    public static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws Exception {
        //1、创建链接工厂,按照给定的URL地址,采用默认用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2、通过链接工厂,获得链接connection并启动
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3、创建session会话
        /**
         * 两个参数: 1、事务;2、签收
         *
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4、创建目的地(具体是队列,还是Topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5、创建消费者
        MessageConsumer consumer = session.createConsumer(queue);
        /**
         * 通过监听方式来消费消息
         *
         */
        //如果是接口,则可以直接new ,使用其匿名内部类
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if(null != message && message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("监听到的消息:"+textMessage.getText());
                        System.out.println("监听到的属性消息:"+textMessage.getStringProperty("c01"));
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }else{
                }
                /*
                if(null != message && message instanceof MapMessage){
                    MapMessage mapMessage = (MapMessage) message;
                    try {
                        System.out.println("监听到的Map消息:"+mapMessage.getString("map-k1"));
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }else{
                }
                */
            }
        });
        //press any key to exit(保证控制台不关闭,直到按下任一按键)
        System.in.read();
        /*
        * 同步阻塞方式
        * 订阅者或接受者调用MessageConsumer的receive方法来接收消息,receive方法能在接收到消息之前(或超时之前)将一直阻塞
        *
        //6、接收消息
        while(true){
            //若consumer.receive()不指定超时时间,则会一直等待直到有消息进来
            TextMessage message = (TextMessage) consumer.receive(3000L);
            if(null != message){
                System.out.println("消费者接收到消息:"+message.getText());
            }else {
                break;
            }
        }
        */
        consumer.close();
        session.close();
        connection.close();
    }
}


启动并在浏览器和IDEA控制台查看消息的发送与消费情况


20190809172851511.png


5.4、几个问题


先生产,启动1号消费者,再启动2号消费者,问题:2号消费者还能消费吗?

1号消费者不能,2号消费者不能,因为消息已被消费


先生产,只启动1号消费者,问题:1号消费者能消费吗?


先启动2个消费者,再生产,问题:消费情况如何?

两个消费者等候,生产者发出来的消息假设是同一队列,则平均分配


MQ挂了,那么消息的持久化和丢失情况分别如何?

看是否设置消息持久化(接下来会有写到)


消息默认的持久化模式?

默认使用持久化


6、ActiveMQ在JavaSE中最简单的实现 —— 主题


6.1、创建生产者


package com.phubing;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * @ClassName JmsProduce_Topic
 * @Description TODO
 * @Author phubing
 * @Date 2019-07-31 20:56
 * @Version 1.0
 **/
public class JmsProduce_Topic {
    public static final String ACTIVEMQ_URL =  "tcp://192.168.177.130:61616";
    public static final String TOPIC_NAME = "topic-phubing";
    public static void main(String[] args) throws Exception{
        //1、创建链接工厂,按照给定的URL地址,采用默认用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2、通过链接工厂,获得链接connection并启动
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3、创建session会话
        /**
         * 两个参数: 1、事务;2、签收
         *
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4、创建目的地(具体是队列,还是Topic)
        Topic topic = session.createTopic(TOPIC_NAME);
        //5、创建消息的生产者
        MessageProducer producer = session.createProducer(topic);
        //6、通过使用消息生产者生产消息并发送到MQ的队列中
        for(int i=1;i<3;i++){
            //7、创建消息
            //可以理解为最简单的字符串
            TextMessage textMessage = session.createTextMessage("topic-name-" + i);
            //8、通过消息生产者发送给MQ
            producer.send(textMessage);
        }
        //9、关闭资源
        producer.close();
        session.close();
        connection.close();
        System.out.println("TOPIC消息发送到MQ完成********");
    }
}


6.2、创建消费者


package com.phubing;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * @ClassName JmsConsumer_Topic
 * @Description TODO
 * @Author phubing
 * @Date 2019-07-31 20:56
 * @Version 1.0
 **/
public class JmsConsumer_Topic {
    public static final String ACTIVEMQ_URL =  "tcp://192.168.177.130:61616";
    public static final String TOPIC_NAME = "topic-phubing";
    public static void main(String[] args) throws Exception{
        //1、创建链接工厂,按照给定的URL地址,采用默认用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2、通过链接工厂,获得链接connection并启动
        Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
        //3、创建session会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4、创建目的地(具体是队列,还是Topic)
        Topic topic = session.createTopic(TOPIC_NAME);
        //5、创建消费者
        MessageConsumer consumer = session.createConsumer(topic);
        //6、如果是接口,则可以直接new ,使用其匿名内部类
        consumer.setMessageListener((message) -> {
            if(null != message && message instanceof TextMessage){
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("监听到的Topic消息:"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }else{}
        });
        System.in.read();
        consumer.close();
        session.close();
        connection.close();
    }
}


切记,先启动消费者,再启动生产者(非要先生产再消费,你也可以看看什么现象)


6.3、几个问题


3个订阅者同时开启,再开启生产者,问:3个订阅者的接收消息情况如何?

每个订阅者都有生产者生产的所有消息,进队列是生产者生产的数量,出队列的是消费者数量*生产者生产的消息数量

(谁订阅谁收到,不订阅不打扰)


先启动生产者,再启动订阅者,现象如何?

不会收到之前生产的消息,进队列的是生产者生产的数量,出队列数量无(原来的消息已成为废消息)


未完待续........(下篇详细讲解下JMS)

相关实践学习
5分钟轻松打造应对流量洪峰的稳定商城交易系统
本实验通过SAE极速部署一个微服务电商商城,同时结合RocketMQ异步解耦、削峰填谷的能力,带大家体验面对流量洪峰仍旧稳定可靠的商城交易系统!
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
打赏
0
0
0
0
1
分享
相关文章
商城API接口:上货安全与实战指南
商城上货API接口在电商运营中扮演着核心角色,本文从功能、类型、安全性及实战案例多角度剖析其应用价值。主要功能涵盖商品信息管理(如发布、图片上传、类目设置)、库存与价格管理、订单处理、物流信息管理、用户信息管理及销售数据分析等。API类型包括RESTful、SOAP、JSON和XML等,其中RESTful和JSON因高效简洁而广泛应用。安全性方面,通过身份验证、授权及数据加密等措施保障接口安全,确保数据传输的可靠性。这些内容为开发者优化商城运营提供了重要指导。
43 1
车辆车型大全 API 实战指南:推动交通行业智能化
车辆车型大全API由探数平台提供,旨在解决企业班车、物流运输及汽车销售等行业对标准化车型数据的需求。传统人工维护车型库效率低且易出错,而该API覆盖主流品牌与车系,包含品牌、车系、销售车型及配置参数等详细信息,适用于车队管理、电商平台及汽车资讯平台。API提供四个子接口:获取品牌、车系、销售车型与配置详情信息,支持高效查询。通过HTTP POST请求即可调用,返回结构化数据,助力企业实现智能化运营与科学决策,在绿色智能交通时代发挥重要作用。
99 4
1688商品详情API实战:Python调用全流程与数据解析技巧
本文介绍了1688电商平台的商品详情API接口,助力电商从业者高效获取商品信息。接口可返回商品基础属性、价格体系、库存状态、图片描述及商家详情等多维度数据,支持全球化语言设置。通过Python示例代码展示了如何调用该接口,帮助用户快速上手,适用于选品分析、市场研究等场景。
一文掌握 1688 商品详情 API 接口:从入门到实战
1688是国内领先的综合电商批发平台,提供海量商品资源。其商品详情API助力开发者与企业获取商品的详细信息(如属性、价格、库存等),广泛应用于电商数据分析、比价系统及采购场景。API支持GET/POST请求,需传入通用参数(app_key、timestamp等)与业务参数(如product_id)。返回JSON格式数据,包含商品标题、价格、图片链接等详情,提升业务效率与决策精准度。
RunnerGo API 性能测试实战:从问题到解决的全链路剖析
API性能测试是保障软件系统稳定性与用户体验的关键环节。本文详细探讨了使用RunnerGo全栈测试平台进行API性能测试的全流程,涵盖测试计划创建、场景设计、执行分析及优化改进。通过电商平台促销活动的实际案例,展示了如何设置测试目标、选择压测模式并分析结果。针对发现的性能瓶颈,提出了代码优化、数据库调优、服务器资源配置和缓存策略等解决方案。最终,系统性能显著提升,满足高并发需求。持续关注与优化API性能,对系统稳定运行至关重要。
基于通义大模型的智能客服系统构建实战:从模型微调到API部署
本文详细解析了基于通义大模型的智能客服系统构建全流程,涵盖数据准备、模型微调、性能优化及API部署等关键环节。通过实战案例与代码演示,展示了如何针对客服场景优化训练数据、高效微调大模型、解决部署中的延迟与并发问题,以及构建完整的API服务与监控体系。文章还探讨了性能优化进阶技术,如模型量化压缩和缓存策略,并提供了安全与合规实践建议。最终总结显示,微调后模型意图识别准确率提升14.3%,QPS从12.3提升至86.7,延迟降低74%。
200 14
微服务架构下的电商API接口设计:策略、方法与实战案例
本文探讨了微服务架构下的电商API接口设计,旨在打造高效、灵活与可扩展的电商系统。通过服务拆分(如商品、订单、支付等模块)和标准化设计(RESTful或GraphQL风格),确保接口一致性与易用性。同时,采用缓存策略、负载均衡及限流技术优化性能,并借助Prometheus等工具实现监控与日志管理。微服务架构的优势在于支持敏捷开发、高并发处理和独立部署,满足电商业务快速迭代需求。未来,电商API设计将向智能化与安全化方向发展。
淘宝商品详情API接口解析与 Python 实战指南
淘宝商品详情API接口是淘宝开放平台提供的编程工具,支持开发者获取商品详细信息,包括基础属性、价格、库存、销售策略及卖家信息等。适用于电商数据分析、竞品分析与价格策略优化等场景。接口功能涵盖商品基础信息、详情描述、图片视频资源、SKU属性及评价统计的查询。通过构造请求URL和签名,可便捷调用数据。典型应用场景包括电商比价工具、商品数据分析平台、供应链管理及营销活动监控等,助力高效运营与决策。
188 26
车辆限行查询API的实战指南:让限行管理从此 “有码可循”
随着全国机动车保有量突破4.53亿辆,交通拥堵与污染问题日益严峻,各城市陆续实施限行政策。探数API推出的车辆限行查询服务覆盖200+城市,提供实时限行数据,包括本地/外地燃油车及新能源车的限行规则、区域和时间等信息。其功能涵盖单个城市限行政策查询与支持城市的全面列表,助力用户精准规划出行。通过HTTP POST请求即可轻松接入,适用于导航平台和个人开发者。在“双碳”目标下,该API推动绿色出行与智能交通发展,为个人、企业和城市治理提供高效解决方案。
122 5
运营商三要素API的实战指南:实现 “人 - 证 - 号” 三位一体核验
在数字身份欺诈频发的背景下,传统单点验证已无法满足高安全需求。探数API推出的“运营商三要素核验API”,通过姓名、身份证号、手机号的三重交叉验证,构建起“铁三角”防线,广泛适用于金融、政务、电商等领域。该API支持一致性验证及基础信息返回(可选),具备高准确性与防伪性,远超单一或双因素验证方式。其调用流程简单,提供Python示例代码及异常处理建议,助力打造更安全的数字身份体系,成为连接多领域的关键桥梁。未来,多因子融合的身份认证将成为趋势,而三要素核验API正是当前可信数字身份的重要基石。
102 2

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问