ActiveMq使用笔记

简介:

java JMS技术

.1.   什么是JMS

         JMS即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

         JMS是一种与厂商无关的 API,用来访问消息收发系统消息。它类似于JDBC(Java Database Connectivity):这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,这只是几个例子。 JMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个 JMS客户机发送消息。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来划分,可以将消息分为几种类型,它们分别携带:简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。

.2.   JMS规范

.2.1.    专业技术规范

JMS(Java Messaging Service)是Java平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发,翻译为Java消息服务。

.2.2.    体系架构

JMS由以下元素组成。

JMS提供者provider:连接面向消息中间件的,JMS接口的一个实现。提供者可以是Java平台的JMS实现,也可以是非Java平台的面向消息中间件的适配器。

JMS客户:生产或消费基于消息的Java的应用程序或对象。

JMS生产者:创建并发送消息的JMS客户。

JMS消费者:接收消息的JMS客户。

JMS消息:包括可以在JMS客户之间传递的数据的对象

JMS队列:一个容纳那些被发送的等待阅读的消息的区域。与队列名字所暗示的意思不同,消息的接受顺序并不一定要与消息的发送顺序相同。一旦一个消息被阅读,该消息将被从队列中移走。

JMS主题:一种支持发送消息给多个订阅者的机制。

.2.3.    Java消息服务应用程序结构支持两种模型

1、  点对点或队列模型

在点对点或队列模型下,一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消费者的队列。

这种模式被概括为:

只有一个消费者将获得消息

生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。

每一个成功处理的消息都由接收者签收

2、发布者/订阅者模型

发布者/订阅者模型支持向一个特定的消息主题发布消息。0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方。这种模式好比是匿名公告板。

 

这种模式被概括为:

多个消费者可以获得消息

在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅(subscription),以便客户能够订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者建立了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布。

 

1.下载ActiveMQ

去官方网站下载:http://activemq.apache.org/

2.运行ActiveMQ

解压缩apache-activemq-5.5.1-bin.zip,

修改配置文件activeMQ.xml,将0.0.0.0修改为localhost

默认的activeMQ.xml文件如下:

修改后:

复制代码
<transportConnectors>
       <transportConnector name="openwire" uri="tcp://localhost:61616"/>
       <transportConnector name="ssl"     uri="ssl://localhost:61617"/>
       <transportConnector name="stomp"   uri="stomp://localhost:61613"/>
      <transportConnector uri="http://localhost:8081"/>
       <transportConnector uri="udp://localhost:61618"/>
</transportConnectors>
复制代码

然后双击apache-activemq-5.5.1\bin\activemq.bat运行ActiveMQ程序。

访问的时候如果需要用户名和密码 都是admin admin...

启动topic的相关的生产者和消费者:

生产者代码:

ProducerTest.java

复制代码
import java.util.Random;

import javax.jms.JMSException;      

public class ProducerTest {      
     
    /**    
     * @param args    
     */     
    public static void main(String[] args) throws JMSException, Exception {      
        ProducerTool producer = new ProducerTool(); 
        Random random = new Random();
        for(int i=0;i<20;i++){
            Thread.sleep(random.nextInt(10)*1000);
            producer.produceMessage("Hello, world!--"+i);      
            producer.close();
        }
    }      
}      
复制代码

 ProducerTool.java

复制代码
import javax.jms.Connection;      
import javax.jms.DeliveryMode;      
import javax.jms.Destination;      
import javax.jms.JMSException;      
import javax.jms.MessageProducer;      
import javax.jms.Session;      
import javax.jms.TextMessage;      
     
import org.apache.activemq.ActiveMQConnection;      
import org.apache.activemq.ActiveMQConnectionFactory;      
     
public class ProducerTool {        
    private String user = ActiveMQConnection.DEFAULT_USER;         
    private String password = ActiveMQConnection.DEFAULT_PASSWORD;       
    private String url = ActiveMQConnection.DEFAULT_BROKER_URL;       
    private String subject = "mytopic";      
    private Destination destination = null;      
    private Connection connection = null;      
    private Session session = null;      
    private MessageProducer producer = null;
    // 初始化      
    private void initialize() throws JMSException, Exception {      
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(      
                user, password, url);      
        connection = connectionFactory.createConnection();      
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);      
        destination = session.createTopic(subject);      
        producer = session.createProducer(destination);      
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);      
    }
    // 发送消息      
    public void produceMessage(String message) throws JMSException, Exception {      
        initialize();      
        TextMessage msg = session.createTextMessage(message);      
        connection.start();      
        System.out.println("Producer:->Sending message: " + message);      
        producer.send(msg);      
        System.out.println("Producer:->Message sent complete!");      
    }
    // 关闭连接      
    public void close() throws JMSException {      
        System.out.println("Producer:->Closing connection");      
        if (producer != null){
            producer.close();      
        }      
        if (session != null){
            session.close();      
        }      
        if (connection != null){
            connection.close();      
        }      
    }      
} 
复制代码

消费者代码:

ConsumerTest.java

复制代码
import javax.jms.JMSException;

public class ConsumerTest implements Runnable {
    static Thread t1 = null;

    /**
     * @param args
     * @throws InterruptedException
     * @throws InterruptedException
     * @throws JMSException
     * @throws InterruptedException
     */
    public static void main(String[] args) throws InterruptedException {

        t1 = new Thread(new ConsumerTest());
        t1.setDaemon(false);
        t1.start();
        /**
         * 如果发生异常,则重启consumer
         */
        /*while (true) {
            System.out.println(t1.isAlive());
            if (!t1.isAlive()) {
                t1 = new Thread(new ConsumerTest());
                t1.start();
                System.out.println("重新启动");
            }
            Thread.sleep(5000);
        }*/
        // 延时500毫秒之后停止接受消息
        // Thread.sleep(500);
        // consumer.close();
    }

    public void run() {
        try {
            ConsumerTool consumer = new ConsumerTool();
            consumer.consumeMessage();
            while (ConsumerTool.isconnection) {    
            }
        } catch (Exception e) {
        }

    }
}
复制代码

ConsumerTool.java

复制代码
import javax.jms.Connection;      
import javax.jms.Destination;      
import javax.jms.ExceptionListener;
import javax.jms.JMSException;      
import javax.jms.MessageConsumer;      
import javax.jms.Session;      
import javax.jms.MessageListener;      
import javax.jms.Message;      
import javax.jms.TextMessage;      
     
import org.apache.activemq.ActiveMQConnection;      
import org.apache.activemq.ActiveMQConnectionFactory;      
/**
 * 消费者的模板     
 * @author ABC
 *
 */
public class ConsumerTool implements MessageListener,ExceptionListener {      
    private String user = ActiveMQConnection.DEFAULT_USER;      
    private String password = ActiveMQConnection.DEFAULT_PASSWORD;      
    private String url =ActiveMQConnection.DEFAULT_BROKER_URL;      
    private String subject = "mytopic";      
    private Destination destination = null;      
    private Connection connection = null;      
    private Session session = null;      
    private MessageConsumer consumer = null;  
    public static Boolean isconnection=false;
    // 初始化      
    private void initialize() throws JMSException, Exception {      
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(      
                user, password, url);      
        connection = connectionFactory.createConnection();      
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);      
        destination = session.createTopic(subject);      
        consumer = session.createConsumer(destination);     
    }      
     
    // 消费消息      
    public void consumeMessage() throws JMSException, Exception {      
        initialize();      
        connection.start();
        consumer.setMessageListener(this);    //注册一个消息监听器,有消息就执行onMessage()方法
        connection.setExceptionListener(this);//注册一个异常监听器,有异常就执行onException()方法
        isconnection=true;
        System.out.println("Consumer:->Begin listening...");      
        // 开始监听  
        // Message message = consumer.receive();      
    }
    // 关闭连接      
    public void close() throws JMSException {      
        System.out.println("Consumer:->Closing connection");      
        if (consumer != null)      
            consumer.close();      
        if (session != null)      
            session.close();      
        if (connection != null)      
            connection.close();      
    }
    // 消息处理函数      
    public void onMessage(Message message) {      
        try {      
            if (message instanceof TextMessage) {      
                TextMessage txtMsg = (TextMessage) message;      
                String msg = txtMsg.getText();      
                System.out.println("Consumer:->Received: " + msg);      
            } else {      
                System.out.println("Consumer:->Received: " + message);      
            }      
        } catch (JMSException e) {      
            // TODO Auto-generated catch block      
            e.printStackTrace();      
        }      
    }

    public void onException(JMSException arg0) {
        isconnection=false;//出现异常把isconnection设置成false
    }      
} 
复制代码

只启动ProducerTest.java

如果这个时候把ActiveMq 关闭再开启....重新访问

 

之前的主题 mytopic产生的数据就没有了.....

ActiveMq默认是没有做持久化的,如果是Kafka只要是发过去的消息,都会一直存在,也可以设置一个过期的时间.到了期限,那些消息也是可以清除掉.否则就会一直都在.

ActiveMq一般是用在JavaEE中的....Kafka是用在大数据领域的.

再运行生产者的模板代码: ConsumerTest.java

生产者生产的数据:

再运行生产者的模板代码: ConsumerTest.java

生产者生产的数据:

消费者消费到数据:

 

看WEBUI

 

其他常用的JMS实现

要使用Java消息服务,你必须要有一个JMS提供者,管理会话和队列。既有开源的提供者也有专有的提供者。

开源的提供者包括:

Apache ActiveMQ

JBoss 社区所研发的 HornetQ

Joram

Coridan的MantaRay

The OpenJMS Group的OpenJMS

专有的提供者包括:

BEA的BEA WebLogic Server JMS

TIBCO Software的EMS

GigaSpaces Technologies的GigaSpaces

Softwired 2006的iBus

IONA Technologies的IONA JMS

SeeBeyond的IQManager(2005年8月被Sun Microsystems并购)

webMethods的JMS+ -

my-channels的Nirvana

Sonic Software的SonicMQ

SwiftMQ的SwiftMQ

IBM的WebSphere MQ

 ========================================================

附关于ActiveMq处理queue的模板代码:

ProducerTest.java

复制代码
import java.util.Random;

import javax.jms.JMSException;      

public class ProducerTest {      
     
    /**    
     * @param args    
     * @throws Exception 
     * @throws JMSException 
     */     
    public static void main(String[] args) throws JMSException, Exception{      
        ProducerTool producer = new ProducerTool();
        Random random = new Random();
        for(int i=0;i<20;i++){
            Thread.sleep(random.nextInt(10)*1000);
            producer.produceMessage("Hello, world!--"+i);      
            producer.close();
        }
    }      
}
复制代码

ProducerTool.java

复制代码
import javax.jms.Connection;      
import javax.jms.DeliveryMode;      
import javax.jms.Destination;      
import javax.jms.JMSException;      
import javax.jms.MessageProducer;      
import javax.jms.Session;      
import javax.jms.TextMessage;      
     
import org.apache.activemq.ActiveMQConnection;      
import org.apache.activemq.ActiveMQConnectionFactory;      
     
public class ProducerTool {        
    private String user = ActiveMQConnection.DEFAULT_USER;         
    private String password = ActiveMQConnection.DEFAULT_PASSWORD;       
    private String url = ActiveMQConnection.DEFAULT_BROKER_URL;       
    private String subject = "myqueue";      
    private Destination destination = null;      
    private Connection connection = null;      
    private Session session = null;      
    private MessageProducer producer = null;
    // 初始化   
    private void initialize() throws JMSException, Exception {      
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(      
                user, password, url);      
        connection = connectionFactory.createConnection();      
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);      
        destination = session.createQueue(subject);      
        producer = session.createProducer(destination);      
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);      
    }
    // 发送消息 
    public void produceMessage(String message) throws JMSException, Exception {      
        initialize();      
        TextMessage msg = session.createTextMessage(message);      
        connection.start();      
        System.out.println("Producer:->Sending message: " + message);      
        producer.send(msg);      
        System.out.println("Producer:->Message sent complete!");      
    }
    // 关闭连接      
    public void close() throws JMSException {      
        System.out.println("Producer:->Closing connection");      
        if (producer != null){
            producer.close();      
        }      
        if (session != null){
            session.close();      
        }      
        if (connection != null){
            connection.close();      
        }      
    }      
}    
复制代码

CustomerTest.java

复制代码
public class ConsumerTest implements Runnable {
    static Thread t1 = null;
    public static void main(String[] args) throws InterruptedException {
        t1 = new Thread(new ConsumerTest());
        t1.start();
//        while (true) {
//            System.out.println(t1.isAlive());
//            if (!t1.isAlive()) {
//                t1 = new Thread(new ConsumerTest());
//                t1.start();
//                System.out.println("重新启动");
//            }
//            Thread.sleep(5000);
//        }
        // 延时500毫秒之后停止接受消息
        // Thread.sleep(500);
        // consumer.close();
    }

    public void run() {
        try {
            ConsumerTool consumer = new ConsumerTool();
            consumer.consumeMessage();
            while (ConsumerTool.isconnection) {    
                //System.out.println(123);
            }
        } catch (Exception e) {
        }
    }
}
复制代码

CustomerTool.java

复制代码
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.MessageListener;
import javax.jms.Message;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ConsumerTool implements MessageListener,ExceptionListener {
    private String user = ActiveMQConnection.DEFAULT_USER;
    private String password = ActiveMQConnection.DEFAULT_PASSWORD;
    private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    private String subject = "myqueue";
    private Destination destination = null;
    private Connection connection = null;
    private Session session = null;
    private MessageConsumer consumer = null;
    private ActiveMQConnectionFactory connectionFactory=null;
    public static Boolean isconnection=false;
    // 初始化
    private void initialize() throws JMSException {
            connectionFactory= new ActiveMQConnectionFactory(
                user, password, url);
            connection = connectionFactory.createConnection();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue(subject);
            consumer = session.createConsumer(destination);
    }

    // 消费消息
    public void consumeMessage() throws JMSException {
            initialize();
            connection.start();
            
            consumer.setMessageListener(this);
            connection.setExceptionListener(this);
            System.out.println("Consumer:->Begin listening...");
            isconnection=true;
            // 开始监听
            Message message = consumer.receive();
            System.out.println(message.getJMSMessageID());
    }

    // 关闭连接
    public void close() throws JMSException {
            System.out.println("Consumer:->Closing connection");
            if (consumer != null){
                consumer.close();
            }
            if (session != null){
                session.close();
            }
            if (connection != null){
                connection.close();
            }
    }

    // 消息处理函数
    public void onMessage(Message message) {
        try {
            if (message instanceof TextMessage) {
                TextMessage txtMsg = (TextMessage) message;
                String msg = txtMsg.getText();
                System.out.println("Consumer:->Received: " + msg);
            } else {
                System.out.println("Consumer:->Received: " + message);
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void onException(JMSException arg0){
        isconnection=false;
    }
}
复制代码

 


本文转自SummerChill博客园博客,原文链接:http://www.cnblogs.com/DreamDrive/p/6920295.html,如需转载请自行联系原作者

相关实践学习
通过轻量消息队列(原MNS)主题HTTP订阅+ARMS实现自定义数据多渠道告警
本场景将自定义告警信息同时分发至多个通知渠道的需求,例如短信、电子邮件及钉钉群组等。通过采用轻量消息队列(原 MNS)的主题模型的HTTP订阅方式,并结合应用实时监控服务提供的自定义集成能力,使得您能够以简便的配置方式实现上述多渠道同步通知的功能。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
人工智能 编解码 芯片
告别低效沟通|让技术提问不再头疼-这套高效AI提问模板来帮你
不会向ai提问,不知道怎么提问的 可以看看
173 1
告别低效沟通|让技术提问不再头疼-这套高效AI提问模板来帮你
|
10月前
|
机器学习/深度学习 边缘计算 人工智能
第二届边缘计算与并行、分布式计算国际学术会议(ECPDC 2025) 2025 2nd international Conference on Edge Computing, Parallel and Distributed Computing
第二届边缘计算与并行、分布式计算国际学术会议(ECPDC 2025) 2025 2nd international Conference on Edge Computing, Parallel and Distributed Computing 机器学习 计算学习理论 数据挖掘 科学计算 计算应用 数字图像处理 人工智能
220 6
|
11月前
|
人工智能 物联网 UED
自修复材料:未来材料的自我修复能力
【10月更文挑战第14天】自修复材料作为未来材料的重要发展方向之一,以其独特的自我修复能力,正逐步改变着我们的生活和工作方式。通过深入了解其原理、分类、创新性研究及应用前景,我们可以更加清晰地看到自修复材料在推动社会进步和科技创新中的重要作用。让我们共同期待自修复材料在未来带来的更多惊喜和变革!
|
6月前
|
编解码 人工智能 安全
快来试试这个开箱即用的万相2.1服务!!!
阿里云万相2.1系列提供高效灵活的视频生成解决方案,支持文生视频、图生视频等多模态任务。通过阿里云计算巢与ComfyUI推出的快速视频生成服务,用户可轻松制作微电影或短视频。针对长视频生成,提供了三种方法:使用高性能显卡、首尾帧拼接和补帧模型。此外,还内置工作流实现文生图和图生图功能,简单易用。部署说明及使用流程详细列出,方便开发者快速上手。访问计算巢了解更多有趣服务。
|
7月前
|
人工智能 API
DeepSeek-5min部署体验
本文介绍如何通过阿里云平台免费部署属于自己的DeepSeek AI助理,实现0成本打造满血DeepSeek。文中详细描述了创建API-KEY、下载Chatbox及配置自定义提供方的步骤,并展示了AI助理在不同场景下的表现。总结中提到,尽管部分复杂问题处理稍显卡顿,但整体准确性较高,基本满足日常需求。对于专业需求,建议探索使用自有数据集进行微调以提升性能。
175 2
|
12月前
|
存储 JavaScript 前端开发
敲黑板!vue3重点!一文了解Composition API新特性:ref、toRef、toRefs
该文章深入探讨了Vue3中Composition API的关键特性,包括`ref`、`toRef`、`toRefs`的使用方法与场景,以及它们如何帮助开发者更好地管理组件状态和促进逻辑复用。
敲黑板!vue3重点!一文了解Composition API新特性:ref、toRef、toRefs
|
10月前
|
存储 安全 算法
什么是秒合约?竞猜游戏交易所app系统开发规则介绍
秒合约是一种基于区块链技术的超短期衍生品合约,交易周期以秒为单位。它通过智能合约实现交易的自动化和去信任化,优化执行流程,提高交易速度和效率。秒合约适合高风险投机者,收益和风险固定,不使用杠杆。此外,竞猜游戏交易所app系统也涉及快速交易和投机,需确保安全、稳定及合规运营。
|
10月前
|
机器学习/深度学习 传感器 边缘计算
深度强化学习在自动驾驶汽车中的应用与挑战###
本文探讨了深度强化学习(Deep Reinforcement Learning, DRL)技术在自动驾驶汽车领域的应用现状、关键技术路径及面临的主要挑战。通过分析当前自动驾驶系统的局限性,阐述了引入DRL的必要性与优势,特别是在环境感知、决策制定和控制优化等方面的潜力。文章还概述了几种主流的DRL算法在自动驾驶模拟环境中的成功案例,并讨论了实现大规模部署前需解决的关键问题,如数据效率、安全性验证及伦理考量。最后,展望了DRL与其他先进技术融合的未来趋势,为推动自动驾驶技术的成熟与发展提供了新的视角。 ###
|
11月前
|
网络协议 安全 数据安全/隐私保护
网络协议:互联网通信的基石
【10月更文挑战第12天】
298 1
|
存储
ARM 堆栈寻址类型区分
该文介绍了堆栈的两种指向分类:向上生成型(递增堆栈)和向下生成型(递减堆栈),以及堆栈的两种数据状态:满堆栈(指针指向最后数据)和空堆栈(指针指向存放数据的位置)。满递增和满递减是在完整数据单元上操作,而空递增和空递减则允许自定义步长。文中通过图示说明了不同情况下的堆栈存储方式。
276 3

热门文章

最新文章