ActiveMQ –JMS开源框架入门介绍

简介: ActiveMQ –JMS开源框架入门介绍

介绍基本的JMS概念与开源的JMS框架ActiveMQ应用,内容涵盖一下几点:


1.      基本的JMS概念


2.       JMS的消息模式


3.       介绍ActiveMQ


4.      一个基于ActiveMQ的JMS例子程序


一:JMS基本概念


1.      JMS的目标


       为企业级的应用提供一种智能的消息系统,JMS定义了一整套的企业级的消息概念与工具,


       尽可能最小化的Java语言概念去构建最大化企业消息应用。统一已经存在的企业级消息系


       统功能。


2.      JMS提供者


       JMS提供者是指那些完全完成JMS功能与管理功能的JMS消息厂商,理论上JMS提供者完成


       JMS消息产品必须是100%的纯Java语言实现,可以运行在跨平台的架构与操作系统上,当前


       一些JMS厂商包括IBM,Oracle, JBoss社区 (JBoss Community), Apache 社区(ApacheCommunity)。


3.      JMS应用程序, 一个完整的JMS应用应该实现以下功能:


        JMS 客户端 – Java语言开发的接受与发送消息的程序


        非JMS客户端 – 基于消息系统的本地API实现而不是JMS


        消息 – 应用程序用来相互交流信息的载体


         被管理对象–预先配置的JMS对象,JMS管理员创建,被客户端运用。如链接工厂,主题等


        JMS提供者–完成JMS功能与管理功能的消息系统



二:JMS的消息模式


1.      点对点的消息模式(Point to Point Messaging)

1337915714_4642.png


下面的JMS对象在点对点消息模式中是必须的:


a.      队列(Queue) – 一个提供者命名的队列对象,客户端将会使用这个命名的队列对象


b.     队列链接工厂(QueueConnectionFactory) – 客户端使用队列链接工厂创建链接队列


       ConnectionQueue来取得与JMS点对点消息提供者的链接。


c.      链接队列(ConnectionQueue) – 一个活动的链接队列存在在客户端与点对点消息提供者之


       间,客户用它创建一个或者多个JMS队列会话(QueueSession)


d.     队列会话(QueueSession) – 用来创建队列消息的发送者与接受者(QueueSenderand


        QueueReceiver)


e.     消息发送者(QueueSender 或者MessageProducer)– 发送消息到已经声明的队列


f.       消息接受者(QueueReceiver或者MessageConsumer) – 接受已经被发送到指定队列的消息


2.      发布订阅模式(publish – subscribe Mode)

1337915755_8015.png


a.      主题Topic(Destination) – 一个提供者命名的主题对象,客户端将会使用这个命名的主题对象


b.     主题链接工厂(TopciConnectionFactory) – 客户端使用主题链接工厂创建链接主题


        ConnectionTopic来取得与JMS消息Pub/Sub提供者的链接。


c.      链接主题(ConnectionTopic) – 一个活动的链接主题存在发布者与订阅者之间


d.     会话(TopicSession) – 用来创建主题消息的发布者与订阅者 (TopicPublisher  and


        TopicSubscribers)


e.     消息发送者MessageProducer) – 发送消息到已经声明的主题


f.       消息接受者(MessageConsumer) – 接受已经被发送到指定主题的消息



三:介绍ActiveMQ


ActiveMQ是apache社区完成的JMS开源消息组件,客户端支持多种语言调用,包括Java,C++, C#,


Perl, Python等。支持Spring配置集成等。更多信息访问这里:


http://activemq.apache.org/index.html


四:基于ActiveMQ的Publish/subscribe模式Demo程序


消息Broker,JMSprovider

import java.net.URI;
import java.net.URISyntaxException;
 
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
 
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
 
/**
 * refer to http://activemq.apache.org/jndi-support.html
 * http://activemq.apache.org/how-do-i-embed-a-broker-inside-a-connection.html
 * @author gloomyfish
 *
 */
public class PureJMSProducer {
  
  private static final Log LOG = LogFactory.getLog(PureJMSProducer.class);
 
    private PureJMSProducer() {
    }
 
    /**
     * @param args the destination name to send to and optionally, the number of
     *                messages to send
     */
    public static void main(String[] args) {
        Context jndiContext = null;
        ConnectionFactory connectionFactory = null;
        Connection connection = null;
        Session session = null;
        Destination destination = null;
        MessageProducer producer = null;
        BrokerService broker = null;
        final int numMsgs = 10;
 
        /*
         * Create a JNDI API InitialContext object
         */
        try {
            jndiContext = new InitialContext();
        } catch (NamingException e) {
            LOG.info("Could not create JNDI API context: " + e.toString());
            System.exit(1);
        }
        
        // create external TCP broker
    try {
      broker = BrokerFactory.createBroker(new URI("broker:tcp://localhost:61616"));
      broker.start(); 
    } catch (URISyntaxException e) {
      LOG.info("Could not create broker: " + e.toString());
    } catch (Exception e) {
      LOG.info("Could not create broker: " + e.toString());
    }
//        try {
//          
//        }
 
        /*
         * Look up connection factory and destination.
         */
        try {
            connectionFactory = (ConnectionFactory)jndiContext.lookup("ConnectionFactory");
            destination = (Destination)jndiContext.lookup("MyTopic");
        } catch (NamingException e) {
            LOG.info("JNDI API lookup failed: " + e);
            System.exit(1);
        }
        
        /*
         * Create connection. Create session from connection; false means
         * session is not transacted. Create sender and text message. Send
         * messages, varying text slightly. Send end-of-messages message.
         * Finally, close connection.
         */
        try {
            connection = connectionFactory.createConnection();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            producer = session.createProducer(destination);
            TextMessage message = session.createTextMessage();
            Thread.sleep(3000);
            for (int i = 0; i < numMsgs; i++) {
                message.setText("This is message " + (i + 1));
                LOG.info("Sending message: " + message.getText());
                producer.send(message);
                Thread.sleep(3000);
            }
 
            /*
             * Send a non-text control message indicating end of messages.
             */
            producer.send(session.createMessage());
        } catch (JMSException e) {
            LOG.info("Exception occurred: " + e);
        } catch (InterruptedException e) {
          LOG.info("Exception occurred: " + e);
    } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                }
            }
        }
    
    // stop the TCP broker
    try {
      broker.stop();
    } catch (Exception e) {
      LOG.info("stop the broker failed: " + e);
    }
    }
 
 
}

客户端:

import java.io.IOException;
 
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.naming.InitialContext;
 
import org.apache.activemq.ActiveMQConnectionFactory;
 
 
public class ActiveMQClient {
  
  public static void main(String[] args) throws IOException {
    
    // -- http://dlc.sun.com/pdf//816-5904-10/816-5904-10.pdf
    try {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    // ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://locahost");
        Connection connection = factory.createConnection();
        connection.start();
        
        // create message topic
        //Topic topic= new ActiveMQTopic("MyTopic");
        InitialContext jndiContext=new InitialContext();
        Topic topic=(Topic)jndiContext.lookup("MyTopic"); 
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        // register message consumer
        MessageConsumer comsumer1 = session.createConsumer(topic);
        comsumer1.setMessageListener(new MessageListener(){
            public void onMessage(Message m) {
                try {
                  System.out.println("Consumer get " + ((TextMessage)m).getText());
                } catch (JMSException e) {
                  e.printStackTrace();
                } 
            }
        });
        Thread.sleep(30000);
        session.close();
        connection.stop();
        
    } catch(Exception e) {
      e.printStackTrace();
    }
    }
 
}

项目配置,Jar依赖:

1337915919_7952.png

依赖的三个Jar分别为:

  • activemq-all.jar
  • geronimo-jms_1.1_spec-1.1.1.jar
  • xbean-spring.jar


补充JNDI内容:

java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url = tcp://localhost:61616
connectionFactory=topicConnectionFactry
topic.MyTopic = example.MyTopic
相关文章
|
Python
Python计算误码率,输入是0-1比特流矩阵和小数矩阵
本文提供了一个Python函数calculate_ber,用于计算两个NumPy矩阵表示的二进制信号和接收信号之间的误码率(BER),其中包括信号与接收信号的比较、误差计数以及BER的计算过程,并给出了具体的使用示例。
256 2
|
Ubuntu Shell Python
Ubuntu学习笔记(一):pycharm设置快捷启动图标详解
这篇博客详细讲解了如何在Ubuntu 20.04系统中为PyCharm设置快捷启动图标,包括创建.desktop文件、编辑文件内容以及添加到收藏夹的步骤。
991 0
Ubuntu学习笔记(一):pycharm设置快捷启动图标详解
|
8月前
|
存储 人工智能 固态存储
DeepSeek开源周第五弹之一!3FS:支撑V3/R1模型数据访问的高性能分布式文件系统
3FS是DeepSeek开源的高性能分布式文件系统,专为AI训练和推理任务设计,提供高达6.6 TiB/s的读取吞吐量,支持强一致性保障和通用文件接口,优化AI工作负载。
1223 2
DeepSeek开源周第五弹之一!3FS:支撑V3/R1模型数据访问的高性能分布式文件系统
|
10月前
|
算法 数据安全/隐私保护
室内障碍物射线追踪算法matlab模拟仿真
### 简介 本项目展示了室内障碍物射线追踪算法在无线通信中的应用。通过Matlab 2022a实现,包含完整程序运行效果(无水印),支持增加发射点和室内墙壁设置。核心代码配有详细中文注释及操作视频。该算法基于几何光学原理,模拟信号在复杂室内环境中的传播路径与强度,涵盖场景建模、射线发射、传播及接收点场强计算等步骤,为无线网络规划提供重要依据。
|
8月前
|
监控 JavaScript 数据库
Umami:自建网站访问统计服务,突破浏览器广告拦截
本文介绍了开源网站访问统计系统 Umami,一款可替代 Google Analytics 的工具。Umami 支持私有化部署,确保数据完全可控,保护用户隐私。文章详细讲解了 Umami 的部署方式(如 Vercel 云函数、Docker Compose 和 1 Panel)及基本使用方法,包括添加网站和集成跟踪代码。此外,还分享了突破浏览器广告拦截的技巧,例如修改 JS 脚本文件名和服务端接口名称。通过这些优化,可有效避免统计代码被拦截,帮助站长获取更准确的访问数据。
418 0
|
消息中间件 监控 持续交付
《云消息队列RabbitMQ实践》解决方案测评报告
《云消息队列RabbitMQ实践》解决方案通过RabbitMQ实现业务解耦、异步处理和高可用性。其核心优势包括消息持久化、灵活路由及高可靠性。文档详细介绍了部署步骤、配置方法及监控手段,帮助用户快速搭建消息队列系统。方案适用于电商、金融和实时数据处理等高并发场景,通过异步处理提升系统性能。建议增加自动化部署、复杂场景示例及更详尽的日志解析,进一步提升用户体验。
|
XML JSON 定位技术
Unity & 微信公众号 - 实现扫码关注登录
Unity & 微信公众号 - 实现扫码关注登录
700 1
Unity & 微信公众号 - 实现扫码关注登录
|
人工智能 数据可视化 数据处理
推荐2款免费开源的标注工具,支持大模型对话标注
【LabelLLM】一款开源免费的大模型对话标注平台,专为优化大型语言模型的数据标注过程设计。支持灵活配置与多模态数据(音频、图像、视频),具备全面任务管理和AI辅助标注功能,大幅提升标注效率与准确性。了解更多请前往https://github.com/opendatalab/LabelLLM 【LabelU】一款轻量级开源标注工具,支持图像、视频、音频的高效标注。特色功能包括多功能图像处理、视频和音频分析等,简易灵活,支持多种数据格式输出。了解更多请前往https://github.com/opendatalab/labelU
3419 11
|
测试技术
详解单元测试问题之@InjectMocks注入mock对象如何解决
详解单元测试问题之@InjectMocks注入mock对象如何解决
1069 1
|
编解码 定位技术
Google Earth Engine——全球土壤含数量数据:6个标准深度(0、10、30、60、100和200厘米)以250米的分辨率预测的33kPa和1500kPa吸力的土壤含水量(体积百分比)。
Google Earth Engine——全球土壤含数量数据:6个标准深度(0、10、30、60、100和200厘米)以250米的分辨率预测的33kPa和1500kPa吸力的土壤含水量(体积百分比)。
1280 0
Google Earth Engine——全球土壤含数量数据:6个标准深度(0、10、30、60、100和200厘米)以250米的分辨率预测的33kPa和1500kPa吸力的土壤含水量(体积百分比)。