ActiveMQ

简介:    MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。

ActiveMQ


   MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。


特点:


  1、支持多种语言编写客户端


  2、对spring的支持,很容易和spring整合


  3、支持多种传输协议:TCP,SSL,NIO,UDP等


  4、支持AJAX


  消息形式:


  1、点对点(queue)


  2、一对多(topic)


安装启动


   官网下载压缩包,解压到相应目录 , 这里解压到/opt


  启动mq , ./bin/activemq start


  关闭mq,   ./bin/activemq stop


  访问 :127.0.0.1:8161/admin/     账号admin   密码admin


  采用61616端口提供JMS服务,采用8161提供管理控制台服务


  通信协议:TCP


生产者生产消息


package com.steak.activemq.test;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class mqTest {
  private static final StringACTIVE_URL ="tcp://127.0.0.1:61616";
    private static final StringQUEUE ="queue";
    public static void main(String[] args)throws JMSException {
//创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory =new ActiveMQConnectionFactory(ACTIVE_URL);
        //通过连接工厂,获得连接
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //创建session
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        //创建目的地
        Queue queue = session.createQueue(QUEUE);
        //创建消息的生产者
        MessageProducer messageProducer = session.createProducer(queue);
        //通过使用messageProducer生产消息发送到MQ队列里
        for (int i =0 ; i <3 ; i++){
//创建消息
            TextMessage textMessage = session.createTextMessage("消息 "+i);
            //通过messageProducer发送消息
            messageProducer.send(textMessage);
        }
//关闭资源
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("消息发送完成");
    }
}


  启动消费者发送消息,此时等待消费的消息为3条,消费者为0个,进队的消息为3条,出队消息为0条



    Number Of Pending Messages:等待消费的消息


  Number of Conumers : 消费者数量


  Message Enqueued : 进队消息数


  Message Dequeued :    出队消息数


消费者消费消息


package com.steak.activemq.test;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Consumer {
private static final StringACTIVE_URL ="tcp://127.0.0.1:61616";
    private static final StringQUEUE ="queue";
    public static void main(String[] args)throws JMSException {
//创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory =new ActiveMQConnectionFactory(ACTIVE_URL);
        //通过连接工厂,获得连接
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //创建session
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        //创建目的地
        Queue queue = session.createQueue(QUEUE);
        //创建消费者
        MessageConsumer messageConsumer = session.createConsumer(queue);
        //通过同步阻塞接受消息
        while (true){
//接受消息的类型要和发送消息的一样 ,
            TextMessage textMessage = (TextMessage) messageConsumer.receive();
            if (null != textMessage){
System.out.println("消息 "+textMessage.getText());
            }else {
break;
            }
}
messageConsumer.close();
        session.close();
        connection.close();
    }
}


  此时消费者有一个,并且消费了3条消息



  消费者接收消息时可以一直等待(耗费系统资源),也可以设置时间,



  当为receive()(同步阻塞)时,消费者一直阻塞等待消费消息



    为recevice(long varl)(异步阻塞)时,当超时后,消费者就消失



    通过异步监听方式来接收消息


//通过异步监听方式来接收消息
    messageConsumer.setMessageListener(new MessageListener() {
@Override
        public void onMessage(Message message) {
if (null != message && messageinstanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
                try {
System.out.println("消息  "+((TextMessage) message).getText());
                }catch (JMSException e) {
e.printStackTrace();
                }
}
}
});
    System.in.read(); //保证控制台不关闭,作用是消费完消息以后才能关闭
    messageConsumer.close();
    session.close();
    connection.close();
}


消费者消费情况


        1,先启动生产者,再启动多个消费者,一定是第一个消费者消费完所有消息,后面的都消费不到


        2,先启动多个消费者,在启动生产者,消息基本是平均消费的,消息个数是基数的时候也是一个的差异


主体模式


  当为(主题模式)Topic时,所有消费者收到的消息都是一样的,前提是要先订阅,订阅后才能收到消息,消息是无状态的,发送消息后就什么都不管了



  主题模式



  如:两人订阅



  生产者发布消息,此时发布了三条消息,但是为主题模式,有两个消费者,所以一共消费6条




相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件 网络协议 Java
消息中间件-ActiveMQ
消息中间件-ActiveMQ
|
消息中间件 存储 网络协议
深入了解ActiveMQ!
深入了解ActiveMQ!
400 0
|
消息中间件 Java Linux
|
消息中间件 Java Apache
消息中间件学习笔记--ActiveMQ(一、安装)
消息中间件学习笔记--ActiveMQ(一、安装)
134 4
消息中间件学习笔记--ActiveMQ(一、安装)
|
消息中间件
ActiveMQ - SpringJMS 之 ActiveMQ
ActiveMQ - SpringJMS 之 ActiveMQ
56 0
ActiveMQ - SpringJMS 之 ActiveMQ
|
消息中间件 中间件
ActiveMQ - 基础篇(上)
ActiveMQ - 基础篇(上)
140 0
ActiveMQ - 基础篇(上)
|
消息中间件
ActiveMQ - 基础篇(下)
ActiveMQ - 基础篇(下)
137 0
ActiveMQ - 基础篇(下)
|
消息中间件 存储 缓存
ActiveMQ介绍与安装
ActiveMQ介绍与安装
259 0
ActiveMQ介绍与安装
|
消息中间件 JSON 缓存
浅谈ActiveMQ
浅谈ActiveMQ
|
消息中间件 网络协议 Java
ActiveMQ
ActiveMQ
143 0
ActiveMQ