ActiveMQ

简介:    MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。

ActiveMQ


   MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。


特点:


  1、支持多种语言编写客户端


  2、对spring的支持,很容易和spring整合


  3、支持多种传输协议:TCP,SSL,NIO,UDP等


  4、支持AJAX


  消息形式:


  1、点对点(queue)


  2、一对多(topic)


安装启动


   官网下载压缩包,解压到相应目录 , 这里解压到/opt


  启动mq , ./bin/activemq start


  关闭mq,   ./bin/activemq stop


  访问 :127.0.0.1:8161/admin/     账号admin   密码admin


  采用61616端口提供JMS服务,采用8161提供管理控制台服务


  通信协议:TCP


生产者生产消息


package com.steak.activemq.test;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class mqTest {
  private static final StringACTIVE_URL ="tcp://127.0.0.1:61616";
    private static final StringQUEUE ="queue";
    public static void main(String[] args)throws JMSException {
//创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory =new ActiveMQConnectionFactory(ACTIVE_URL);
        //通过连接工厂,获得连接
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //创建session
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        //创建目的地
        Queue queue = session.createQueue(QUEUE);
        //创建消息的生产者
        MessageProducer messageProducer = session.createProducer(queue);
        //通过使用messageProducer生产消息发送到MQ队列里
        for (int i =0 ; i <3 ; i++){
//创建消息
            TextMessage textMessage = session.createTextMessage("消息 "+i);
            //通过messageProducer发送消息
            messageProducer.send(textMessage);
        }
//关闭资源
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("消息发送完成");
    }
}


  启动消费者发送消息,此时等待消费的消息为3条,消费者为0个,进队的消息为3条,出队消息为0条



    Number Of Pending Messages:等待消费的消息


  Number of Conumers : 消费者数量


  Message Enqueued : 进队消息数


  Message Dequeued :    出队消息数


消费者消费消息


package com.steak.activemq.test;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Consumer {
private static final StringACTIVE_URL ="tcp://127.0.0.1:61616";
    private static final StringQUEUE ="queue";
    public static void main(String[] args)throws JMSException {
//创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory =new ActiveMQConnectionFactory(ACTIVE_URL);
        //通过连接工厂,获得连接
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //创建session
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        //创建目的地
        Queue queue = session.createQueue(QUEUE);
        //创建消费者
        MessageConsumer messageConsumer = session.createConsumer(queue);
        //通过同步阻塞接受消息
        while (true){
//接受消息的类型要和发送消息的一样 ,
            TextMessage textMessage = (TextMessage) messageConsumer.receive();
            if (null != textMessage){
System.out.println("消息 "+textMessage.getText());
            }else {
break;
            }
}
messageConsumer.close();
        session.close();
        connection.close();
    }
}


  此时消费者有一个,并且消费了3条消息



  消费者接收消息时可以一直等待(耗费系统资源),也可以设置时间,



  当为receive()(同步阻塞)时,消费者一直阻塞等待消费消息



    为recevice(long varl)(异步阻塞)时,当超时后,消费者就消失



    通过异步监听方式来接收消息


//通过异步监听方式来接收消息
    messageConsumer.setMessageListener(new MessageListener() {
@Override
        public void onMessage(Message message) {
if (null != message && messageinstanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
                try {
System.out.println("消息  "+((TextMessage) message).getText());
                }catch (JMSException e) {
e.printStackTrace();
                }
}
}
});
    System.in.read(); //保证控制台不关闭,作用是消费完消息以后才能关闭
    messageConsumer.close();
    session.close();
    connection.close();
}


消费者消费情况


        1,先启动生产者,再启动多个消费者,一定是第一个消费者消费完所有消息,后面的都消费不到


        2,先启动多个消费者,在启动生产者,消息基本是平均消费的,消息个数是基数的时候也是一个的差异


主体模式


  当为(主题模式)Topic时,所有消费者收到的消息都是一样的,前提是要先订阅,订阅后才能收到消息,消息是无状态的,发送消息后就什么都不管了



  主题模式



  如:两人订阅



  生产者发布消息,此时发布了三条消息,但是为主题模式,有两个消费者,所以一共消费6条




相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 存储 负载均衡
ActiveMQ高可用集群部署方案
ActiveMQ是分布式系统中重要的组件,在生产环境中如何保证让ActiveMQ能够持续工作,同时还要使消息中间件服务保持可靠性和高效的处理性能。
7227 0
ActiveMQ高可用集群部署方案
|
消息中间件 存储 监控
ActiveMQ系列: ActiveMQ 的死信队列与消费重试机制
maximumRedeliveryDelay:最大传送延迟,只在 useExponentialBackOff 为 true 时有效(V5.5),假设首次重连间隔为 10ms,倍数为 2,那么第二次重连时间间隔为 20ms,第三次重连时间间隔为 40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。默认为 -1。
1486 0
ActiveMQ系列: ActiveMQ 的死信队列与消费重试机制
|
移动开发 Java API
大疆无人机对接
本文介绍了大疆无人机对接第三方云平台的方案,包括设备对接和CloudAPI对接两种方式,重点讨论了CloudAPI对接。CloudAPI对接方案通过DJI Pilot 2或大疆机场将无人机与第三方云平台连接,实现低门槛接入,无需重复开发APP。方案优势在于让开发者更专注于业务开发,而非无人机功能适配。文章详细阐述了对接流程,包括环境准备、申请APPKey、对接流程、直播功能及获取无人机实时数据等内容,并提供了丰富的接口说明和技术支持资源。
9030 4
大疆无人机对接
|
6月前
|
数据采集 物联网 芯片
RFID运动采集系统实现争分夺秒的精准计时
RFID技术在运动计时中实现精准“争分夺秒”,广泛应用于马拉松、自行车、游泳等赛事,具备高精度、高效、全自动数据采集等特点,极大提升赛事组织效率与成绩准确性。
|
存储 缓存 NoSQL
解决Redis缓存数据类型丢失问题
解决Redis缓存数据类型丢失问题
528 85
|
Java 数据库 Spring
Failed to configure a DataSource: ‘url‘ attribute is not specified and no embedded datasource could
Failed to configure a DataSource: ‘url‘ attribute is not specified and no embedded datasource could
426 0
|
存储 消息中间件 NoSQL
使用Java操作Redis数据类型的详解指南
通过使用Jedis库,可以在Java中方便地操作Redis的各种数据类型。本文详细介绍了字符串、哈希、列表、集合和有序集合的基本操作及其对应的Java实现。这些示例展示了如何使用Java与Redis进行交互,为开发高效的Redis客户端应用程序提供了基础。希望本文的指南能帮助您更好地理解和使用Redis,提升应用程序的性能和可靠性。
313 1
|
存储 JSON 安全
Flask四种配置方式
Flask是一个轻量级的Python Web框架,被广泛应用于Web开发中。Flask提供了多种配置方式,可以根据不同的需求和场景进行选择。本篇博客将介绍Flask的几种配置方式,并提供相关代码示例。
398 3
|
运维 Nacos 开发者
nacos常见问题之raft报错如何解决
Nacos是阿里云开源的服务发现和配置管理平台,用于构建动态微服务应用架构;本汇总针对Nacos在实际应用中用户常遇到的问题进行了归纳和解答,旨在帮助开发者和运维人员高效解决使用Nacos时的各类疑难杂症。
nacos常见问题之raft报错如何解决
|
消息中间件 Java Apache
使用Spring Boot实现与ActiveMQ的消息队列集成
使用Spring Boot实现与ActiveMQ的消息队列集成