ActiveMQ系列:ActiveMQ标准API结合JavaSE实战

简介: 订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,当消息到达之后,系统自动调用监听器MessageListener的onMessage(Message message)方法。

ActiveMQ系列:ActiveMQ标准API结合JavaSE实战


上篇初步认识了ActiveMQ,有兴趣的可以移步:https://blog.csdn.net/qq_26975307/article/details/98875098,此篇不多逼逼,上手原生撸码(类比JDBC连接数据库)


1、JMS开发的基本步骤


20190809171808230.png


2、两种消费方式


2.1、同步阻塞方式(receive())


订阅者或接收者调用MessageConsumer的receive()方法来接收消息,receive方法在能够接收到消息之前(或超时之、前)将一直阻塞。


2.2、异步非阻塞方式(监听器onMessage())


订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,当消息到达之后,系统自动调用监听器MessageListener的onMessage(Message message)方法。


3、点对点的消息传递域


3.1、特点


(1)每个消息只能有一个消费者,类似1对1的关系。好比个人快递自己领取自己的。


(2)消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,消费都可

以提取消息。好比我们的发送短信,发送者发送后不见得接收者会即收即看。


(3)消息被消费后队列中不会再存储,所以消费者不会消费到己经被消费掉的消息。


20190809172140262.png


JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活

状态时发送的消息。


20190809172232653.png


4、两大模式的比较


20190809172316319.png


5、ActiveMQ在JavaSE中最简单的实现 —— 队列


5.1、创建生产者


package com.phubing;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 *
 */
public class JmsProduce_Queue {
    public static final String ACTIVEMQ_URL =  "tcp://192.168.177.130:61616";
    public static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws Exception{
        //1、创建链接工厂,按照给定的URL地址,采用默认用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2、通过链接工厂,获得链接connection并启动
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3、创建session会话
        /**
         * 两个参数: 1、事务;2、签收
         *
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4、创建目的地(具体是队列,还是Topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5、创建消息的生产者
        MessageProducer producer = session.createProducer(queue);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        //6、通过使用消息生产者生产消息并发送到MQ的队列中
        for(int i=1;i<=3;i++){
            //7、创建消息
            //可以理解为最简单的字符串
            TextMessage textMessage = session.createTextMessage("msg-" + i);
            /**
             * String消息设置属性
            textMessage.setStringProperty("c01", "vip");
             */
            /**
             * 在每一条消息发送之前,可以设置消息的请求头属性
            MapMessage mapMessage = session.createMapMessage();
            mapMessage.setString("map-k1", "map-v1");
            producer.send(mapMessage);
             */
            //8、通过消息生产者发送给MQ
            producer.send(textMessage);
        }
        //9、关闭资源
        producer.close();
        session.close();
        connection.close();
        System.out.println("消息发送到MQ完成********");
    }
}


5.2、创建消费者


package com.phubing;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.jms.JmsException;
import javax.jms.*;
/**
 * @ClassName JmsConsumer_Queue
 * @Description TODO
 * @Author phubing
 * @Date 2019-07-24 21:42
 * @Version 1.0
 **/
public class JmsConsumer_Queue {
    public static final String ACTIVEMQ_URL =  "tcp://192.168.177.130:61616";
    public static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws Exception {
        //1、创建链接工厂,按照给定的URL地址,采用默认用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2、通过链接工厂,获得链接connection并启动
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3、创建session会话
        /**
         * 两个参数: 1、事务;2、签收
         *
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4、创建目的地(具体是队列,还是Topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5、创建消费者
        MessageConsumer consumer = session.createConsumer(queue);
        /**
         * 通过监听方式来消费消息
         *
         */
        //如果是接口,则可以直接new ,使用其匿名内部类
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if(null != message && message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("监听到的消息:"+textMessage.getText());
                        System.out.println("监听到的属性消息:"+textMessage.getStringProperty("c01"));
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }else{
                }
                /*
                if(null != message && message instanceof MapMessage){
                    MapMessage mapMessage = (MapMessage) message;
                    try {
                        System.out.println("监听到的Map消息:"+mapMessage.getString("map-k1"));
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }else{
                }
                */
            }
        });
        //press any key to exit(保证控制台不关闭,直到按下任一按键)
        System.in.read();
        /*
        * 同步阻塞方式
        * 订阅者或接受者调用MessageConsumer的receive方法来接收消息,receive方法能在接收到消息之前(或超时之前)将一直阻塞
        *
        //6、接收消息
        while(true){
            //若consumer.receive()不指定超时时间,则会一直等待直到有消息进来
            TextMessage message = (TextMessage) consumer.receive(3000L);
            if(null != message){
                System.out.println("消费者接收到消息:"+message.getText());
            }else {
                break;
            }
        }
        */
        consumer.close();
        session.close();
        connection.close();
    }
}


启动并在浏览器和IDEA控制台查看消息的发送与消费情况


20190809172851511.png


5.4、几个问题


先生产,启动1号消费者,再启动2号消费者,问题:2号消费者还能消费吗?

1号消费者不能,2号消费者不能,因为消息已被消费


先生产,只启动1号消费者,问题:1号消费者能消费吗?


先启动2个消费者,再生产,问题:消费情况如何?

两个消费者等候,生产者发出来的消息假设是同一队列,则平均分配


MQ挂了,那么消息的持久化和丢失情况分别如何?

看是否设置消息持久化(接下来会有写到)


消息默认的持久化模式?

默认使用持久化


6、ActiveMQ在JavaSE中最简单的实现 —— 主题


6.1、创建生产者


package com.phubing;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * @ClassName JmsProduce_Topic
 * @Description TODO
 * @Author phubing
 * @Date 2019-07-31 20:56
 * @Version 1.0
 **/
public class JmsProduce_Topic {
    public static final String ACTIVEMQ_URL =  "tcp://192.168.177.130:61616";
    public static final String TOPIC_NAME = "topic-phubing";
    public static void main(String[] args) throws Exception{
        //1、创建链接工厂,按照给定的URL地址,采用默认用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2、通过链接工厂,获得链接connection并启动
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3、创建session会话
        /**
         * 两个参数: 1、事务;2、签收
         *
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4、创建目的地(具体是队列,还是Topic)
        Topic topic = session.createTopic(TOPIC_NAME);
        //5、创建消息的生产者
        MessageProducer producer = session.createProducer(topic);
        //6、通过使用消息生产者生产消息并发送到MQ的队列中
        for(int i=1;i<3;i++){
            //7、创建消息
            //可以理解为最简单的字符串
            TextMessage textMessage = session.createTextMessage("topic-name-" + i);
            //8、通过消息生产者发送给MQ
            producer.send(textMessage);
        }
        //9、关闭资源
        producer.close();
        session.close();
        connection.close();
        System.out.println("TOPIC消息发送到MQ完成********");
    }
}


6.2、创建消费者


package com.phubing;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * @ClassName JmsConsumer_Topic
 * @Description TODO
 * @Author phubing
 * @Date 2019-07-31 20:56
 * @Version 1.0
 **/
public class JmsConsumer_Topic {
    public static final String ACTIVEMQ_URL =  "tcp://192.168.177.130:61616";
    public static final String TOPIC_NAME = "topic-phubing";
    public static void main(String[] args) throws Exception{
        //1、创建链接工厂,按照给定的URL地址,采用默认用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2、通过链接工厂,获得链接connection并启动
        Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
        //3、创建session会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4、创建目的地(具体是队列,还是Topic)
        Topic topic = session.createTopic(TOPIC_NAME);
        //5、创建消费者
        MessageConsumer consumer = session.createConsumer(topic);
        //6、如果是接口,则可以直接new ,使用其匿名内部类
        consumer.setMessageListener((message) -> {
            if(null != message && message instanceof TextMessage){
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("监听到的Topic消息:"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }else{}
        });
        System.in.read();
        consumer.close();
        session.close();
        connection.close();
    }
}


切记,先启动消费者,再启动生产者(非要先生产再消费,你也可以看看什么现象)


6.3、几个问题


3个订阅者同时开启,再开启生产者,问:3个订阅者的接收消息情况如何?

每个订阅者都有生产者生产的所有消息,进队列是生产者生产的数量,出队列的是消费者数量*生产者生产的消息数量

(谁订阅谁收到,不订阅不打扰)


先启动生产者,再启动订阅者,现象如何?

不会收到之前生产的消息,进队列的是生产者生产的数量,出队列数量无(原来的消息已成为废消息)


未完待续........(下篇详细讲解下JMS)

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
1月前
|
缓存 监控 前端开发
顺企网 API 开发实战:搜索 / 详情接口从 0 到 1 落地(附 Elasticsearch 优化 + 错误速查)
企业API开发常陷参数、缓存、错误处理三大坑?本指南拆解顺企网双接口全流程,涵盖搜索优化、签名验证、限流应对,附可复用代码与错误速查表,助你2小时高效搞定开发,提升响应速度与稳定性。
|
1月前
|
缓存 自然语言处理 API
阿里巴巴国际站关键字搜索 API 实战:3 步搞定多语言适配 + 限流破局,询盘量提升 40%
跨境电商API开发常陷合规、多语言、限流等坑。本文详解从国际合规(GDPR/CCPA)到参数优化、数据结构化及区域化搜索的全链路方案,附Python代码模板与缓存重试架构,助力提升调用成功率至99%+,精准询盘增长42%。
|
1月前
|
开发者 API 机器学习/深度学习
淘宝 / 1688 / 义乌购图搜 API 实战指南:接口调用与商业场景应用
本文详解淘宝、1688、义乌购三大平台图片搜索接口的核心特点、调用流程与实战代码。涵盖跨平台对比、参数配置、响应解析及避坑指南,支持URL/Base64上传,返回商品ID、价格、销量等关键信息,助力开发者快速实现商品识别与比价功能。
淘宝 / 1688 / 义乌购图搜 API 实战指南:接口调用与商业场景应用
|
1月前
|
Cloud Native 算法 API
Python API接口实战指南:从入门到精通
🌟蒋星熠Jaxonic,技术宇宙的星际旅人。深耕API开发,以Python为舟,探索RESTful、GraphQL等接口奥秘。擅长requests、aiohttp实战,专注性能优化与架构设计,用代码连接万物,谱写极客诗篇。
Python API接口实战指南:从入门到精通
|
2月前
|
人工智能 运维 监控
阿里云 API 聚合实战:破解接口碎片化难题,3 类场景方案让业务响应提速 60%
API聚合破解接口碎片化困局,助力开发者降本增效。通过统一中间层整合微服务、第三方接口与AI模型,实现调用次数减少60%、响应提速70%。阿里云实测:APISIX+函数计算+ARMS监控组合,支撑百万级并发,故障定位效率提升90%。
271 0
|
2月前
|
JSON API 调度
Midjourney 技术拆解与阿里云开发者实战指南:从扩散模型到 API 批量生成
Midjourney深度解析:基于优化Stable Diffusion,实现文本到图像高效生成。涵盖技术架构、扩散模型原理、API调用、批量生成系统及阿里云生态协同,助力开发者快速落地AIGC图像创作。
503 0
|
2月前
|
数据采集 缓存 API
小红书笔记详情 API 实战指南:从开发对接、场景落地到收益挖掘(附避坑技巧)
本文详解小红书笔记详情API的开发对接、实战场景与收益模式,涵盖注册避坑、签名生成、数据解析全流程,并分享品牌营销、内容创作、SAAS工具等落地应用,助力开发者高效掘金“种草经济”。
小红书笔记详情 API 实战指南:从开发对接、场景落地到收益挖掘(附避坑技巧)
|
2月前
|
供应链 监控 安全
1688商品详情API接口实战指南:合规获取数据,驱动B2B业务增长
1688商品详情API(alibaba.product.get)是合规获取B2B商品数据的核心工具,支持全维度信息调用,助力企业实现智能选品、供应链优化与市场洞察,推动数字化转型。
|
2月前
|
缓存 监控 供应链
亚马逊 MWS API 实战:商品详情精准获取与跨境电商数据整合方案
本文详细解析亚马逊MWS API接口的技术实现,重点解决跨境商品数据获取中的核心问题。文章首先介绍MWS接口体系的特点,包括多站点数据获取、AWS签名认证等关键环节,并对比普通电商接口的差异。随后深入拆解API调用全流程,提供签名工具类、多站点客户端等可复用代码。针对跨境业务场景,文章还给出数据整合工具实现方案,支持缓存、批量处理等功能。最后通过实战示例展示多站点商品对比和批量选品分析的应用,并附常见问题解决方案。该技术方案可直接应用于跨境选品、价格监控等业务场景,帮助开发者高效获取亚马逊商品数据。
|
2月前
|
数据采集 JSON API
微店商品列表API接口开发指南:从零到实战
微店商品列表API(vdian.shop.item.list.get)用于获取店铺商品数据,支持分页、签名认证,返回JSON格式。适用于商品同步、竞品分析、多平台展示及数据清洗。提供Python请求示例,便于快速接入。