activeMq安装及原理分析

简介: 1.下载地址:activeMQ:window中的apache-activemq-5.15.8-bin.zip2.安装解压压缩包,运行bin中的activemq.bat,qizh,根据本地环境决定运行win64还是win32,结果:访问:http://127.

1.下载地址:
activeMQ:window中的apache-activemq-5.15.8-bin.zip
2.安装
解压压缩包,
image
运行bin中的activemq.bat,qizh,根据本地环境决定运行win64还是win32,结果:
image
访问:http://127.0.0.1:8161/,出现以下界面即为安装成功,账户密码默认都是admin,
image
image
Queues:是队列方式消息
Topics:是主题方式消息
Subscribers:消息订阅监控查询
Connections:可以查看链接数,分别可以查看xmpp、ssl、stomp、openwire、ws和网络链接
Network:是网络链接数监控
Send:可以发送消息数据。
3.消息中间件构建信息
Broker
消息服务器,为server提供消息核心服务
Producer
消息生产者,业务的发起方,负责生产消息传输给broker
Consumer
消息消费者,业务的处理方,负责从broker获取消息并进行业务逻辑处理
Topic
主题,发布订阅模式下的消息统一汇集地,不同生产者向topic发送消息,由MQ服务器分发到不同的订阅者,实现消息的广播
Queue
队列,PTP模式下,特定生产者向特定queue发送消息,消费者订阅特定的queue完成指定消息的接收
Message
消息体,根据不同通信协议定义的固定格式进行编码的数据包,来封装业务数据,实现消息的传输
4.消息中间件模式
4.1 点对点模式
消息生产者生产消息发送到queue中,消息消费者从queue中取出并且消费消息。Queue支持存在多个消费者,但是每一个消息,只会有一个消费者可以消费,此消息消费后,队里queue便不再存储。
image
Java消息服务(Java Message Service,JMS)应用程序是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间或分布式系统中发送消息,进行异步通信。
demo示例:
生成者生成消息

public class JMSProducer {


   /* private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认用户名
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认密码
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认连接地址*/
    private static final int SENDNUM = 10;
    private static final String BROKEURL = "tcp://localhost:61616";// 默认连接地址
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认用户名
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认密码

    public static void main(String [] args){
        // 连接工厂,用来生产Connection
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);
        Connection connection = null; // 连接
        Session session = null; // 会话,接收或者发送消息的线程
        Destination destination; // 消息的目的地
        MessageProducer messageProducer; // 消息发送者
        String queenName = "FirstQueue.mq.queen";//消息队列名为FirstQueue.mq.queen

        System.out.println("producer"+USERNAME+PASSWORD+BROKEURL);

        try {
            connection = connectionFactory.createConnection(); // 通过连接工厂创建连接
            connection.start();  // 启动连接
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 获取Session
            destination = session.createQueue(queenName); // 创建消息队列,名为FirstQueue.mq.queen
            messageProducer = session.createProducer(destination); // 创建消息生产者
            // 设置持久化模式
            messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            sendMessage(session, messageProducer); // 发送消息
            session.commit(); // 因为上面加了事务Boolean.TRUE表示有事务,所以要commit
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if(session != null){
                    session.close();
                }
                if(connection != null){
                    connection.close();
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }

    }

    public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
        for(int i = 0; i < JMSProducer.SENDNUM; i++) {
            TextMessage message = session.createTextMessage("ActiveMQ 发送的消息" + i);
            System.out.println("ActiveMQ 发送的消息" + i);
            messageProducer.send(message);
        }
    }

}

消费者消费消息

public class JMSConsumer {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认用户名
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认密码
    private static final String BROKEURL = "tcp://localhost:61616"; // 默认连接地址


    public static void main(String[] args) {
        ConnectionFactory connectionFactory; // 连接工厂,用来生产Connection

        Connection connection = null; // 连接
        Session session = null; // 会话,接收或者发送消息的线程
        Destination destination; // 消息的目的地
        MessageConsumer messageConsumer; // 消息消费者
        String queenName = "FirstQueue.mq.queen";//消息队列名为FirstQueue.mq.queen
        // 实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);
        System.out.println(USERNAME+PASSWORD+ActiveMQConnection.DEFAULT_BROKER_URL);
        try {
            connection = connectionFactory.createConnection(); // 通过连接工厂获取连接
            connection.start();  // 启动连接
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取Session,不需要加事务了
            destination = session.createQueue(queenName); // 创建消息队列,名为FirstQueue1
            messageConsumer = session.createConsumer(destination); // 创建消息消费者
            //注册消息监听
            messageConsumer.setMessageListener(new Listener());
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }finally {
          /*  try {
                if(session != null){
                    session.close();
                }
                if(connection != null){
                    connection.close();
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }*/
        }
    }

}

在写一个消息的消费者2,和以上代码一样,
结果如下:
生产者生成10条消息:
image
消费者1:
image
消费者2:
image
队列queue的特点:queue实现了负载均衡,producer生产的消息发送到消息队列中可以由多个消费者消费,但每一条消息只能被一个消费者消费且只能消费一次。
4.2 发布/订阅模式
topic实现了发布和订阅,当你发布一个消息,所有订阅这个topic的服务都能得到这个消息。
image
demo示例:
topic的消息生产者

public class JMSTopicProducer {
   /* private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认用户名
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认密码
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认连接地址*/
    private static final int SENDNUM = 10;
    private static final String BROKEURL = "tcp://localhost:61616";// 默认连接地址
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认用户名
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认密码

    public static void main(String [] args){
        // 连接工厂,用来生产Connection
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);
        Connection connection = null; // 连接
        Session session = null; // 会话,接收或者发送消息的线程
        Destination destination; // 消息的目的地
        MessageProducer messageProducer; // 消息发送者
        String queenName = "FirstTopic.mq";//消息队列名为FirstQueue.mq.queen

        System.out.println("producer"+USERNAME+PASSWORD+BROKEURL);

        try {
            connection = connectionFactory.createConnection(); // 通过连接工厂创建连接
            connection.start();  // 启动连接
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 获取Session
            destination = session.createTopic(queenName); // 创建topic,名为FirstTopic.mq
            messageProducer = session.createProducer(destination); // 创建消息生产者
            // 设置持久化模式
            messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            sendMessage(session, messageProducer); // 发送消息
            session.commit(); // 因为上面加了事务Boolean.TRUE表示有事务,所以要commit
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if(session != null){
                    session.close();
                }
                if(connection != null){
                    connection.close();
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }

    }

    public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
        for(int i = 0; i < JMSTopicProducer.SENDNUM; i++) {
            TextMessage message = session.createTextMessage("ActiveMQ 发送的消息" + i);
            System.out.println("ActiveMQ 发送的消息" + i);
            messageProducer.send(message);
        }
    }

}

topic需要两个消费者进行订阅,这里 只写一个:

public class JMSTopicConsumer2 {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认用户名
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认密码
    private static final String BROKEURL = "tcp://localhost:61616"; // 默认连接地址


    public static void main(String[] args) {
        ConnectionFactory connectionFactory; // 连接工厂,用来生产Connection

        Connection connection = null; // 连接
        Session session = null; // 会话,接收或者发送消息的线程
        Destination destination; // 消息的目的地
        MessageConsumer messageConsumer; // 消息消费者
        String queenName = "FirstTopic.mq";//消息队列名为FirstQueue.mq.queen
        // 实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
        try {
            connection = connectionFactory.createConnection(); // 通过连接工厂获取连接
            connection.start();  // 启动连接
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 获取Session,不需要加事务了
            destination = session.createTopic(queenName); // 创建消息topic,名为FirstTopic.mq
            messageConsumer = session.createConsumer(destination); // 创建消息消费者
            //注册消息监听
            messageConsumer.setMessageListener(new Listener());
            session.commit();
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }finally {
          /*  try {
                if(session != null){
                    session.close();
                }
                if(connection != null){
                    connection.close();
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }*/
        }
    }

}

结果:每一个订阅了topic,都可以收到此消息,

image

参考:
消息中间件(一)MQ详解及四大MQ比较
ActiveMQ入门教程

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
14天前
|
人工智能 数据可视化 安全
王炸组合!阿里云 OpenClaw X 飞书 CLI,开启 Agent 基建狂潮!(附带免费使用6个月服务器)
本文详解如何用阿里云Lighthouse一键部署OpenClaw,结合飞书CLI等工具,让AI真正“动手”——自动群发、生成科研日报、整理知识库。核心理念:未来软件应为AI而生,CLI即AI的“手脚”,实现高效、安全、可控的智能自动化。
34788 40
王炸组合!阿里云 OpenClaw X 飞书 CLI,开启 Agent 基建狂潮!(附带免费使用6个月服务器)
|
9天前
|
人工智能 自然语言处理 安全
Claude Code 全攻略:命令大全 + 实战工作流(建议收藏)
本文介绍了Claude Code终端AI助手的使用指南,主要内容包括:1)常用命令如版本查看、项目启动和更新;2)三种工作模式切换及界面说明;3)核心功能指令速查表,包含初始化、压缩对话、清除历史等操作;4)详细解析了/init、/help、/clear、/compact、/memory等关键命令的使用场景和语法。文章通过丰富的界面截图和场景示例,帮助开发者快速掌握如何通过命令行和交互界面高效使用Claude Code进行项目开发,特别强调了CLAUDE.md文件作为项目知识库的核心作用。
9451 29
Claude Code 全攻略:命令大全 + 实战工作流(建议收藏)
|
4天前
|
人工智能 JavaScript Ubuntu
低成本搭建AIP自动化写作系统:Hermes保姆级使用教程,长文和逐步实操贴图
我带着怀疑的态度,深度使用了几天,聚焦微信公众号AIP自动化写作场景,写出来的几篇文章,几乎没有什么修改,至少合乎我本人的意愿,而且排版风格,也越来越完善,同样是起码过得了我自己这一关。 这个其实OpenClaw早可以实现了,但是目前我觉得最大的区别是,Hermes会自主总结提炼,并更新你的写作技能。 相信就冲这一点,就值得一试。 这篇帖子主要就Hermes部署使用,作一个非常详细的介绍,几乎一步一贴图。 关于Hermes,无论你赞成哪种声音,我希望都是你自己动手行动过,发自内心的选择!
1902 20
|
26天前
|
人工智能 JSON 机器人
让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
本文带你零成本玩转OpenClaw:学生认证白嫖6个月阿里云服务器,手把手配置飞书机器人、接入免费/高性价比AI模型(NVIDIA/通义),并打造微信公众号“全自动分身”——实时抓热榜、AI选题拆解、一键发布草稿,5分钟完成热点→文章全流程!
45672 155
让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
|
1天前
|
人工智能 自然语言处理 安全
|
8天前
|
机器学习/深度学习 存储 人工智能
还在手写Skill?hermes-agent 让 Agent 自己进化能力
Hermes-agent 是 GitHub 23k+ Star 的开源项目,突破传统 Agent 依赖人工编写Aegnt Skill 的瓶颈,首创“自我进化”机制:通过失败→反思→自动生成技能→持续优化的闭环,让 Agent 在实践中自主构建、更新技能库,持续自我改进。
1596 5
|
16天前
|
人工智能 JSON 监控
Claude Code 源码泄露:一份价值亿元的 AI 工程公开课
我以为顶级 AI 产品的护城河是模型。读完这 51.2 万行泄露的源码,我发现自己错了。
5731 26
|
6天前
|
IDE Java 编译器
【全网最详细】JDK17下载安装图文教程 | Java17编程环境搭建步骤详解
JDK 17是Java官方长期支持(LTS)版本,提供编译、调试、运行Java程序的完整工具链。具备高稳定性、强安全性及现代语言特性(如密封类、模式匹配),广泛用于企业开发、教学入门与生产环境,是学习和实践Java的首选基础工具。(239字)
1079 15
下一篇
开通oss服务