了解JMS

简介: 当下消息中间件盛行,主要提供了异步、解耦、削峰平谷的作用,其实早期Java本身也提供了类似的消息服务,在研习各个消息中间件之前应该先了解下这个历史。

1 前言

当下消息中间件盛行,主要提供了异步、解耦、削峰平谷的作用,其实早期Java本身也提供了类似的消息服务,在研习各个消息中间件之前应该先了解下这个历史。

2 JMS简介(取自百度百科

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持(百度百科给出的概述)。我们可以简单的理解:两个应用程序之间需要进行通信,我们使用一个JMS服务,进行中间的转发,通过JMS 的使用,我们可以解除两个程序之间的耦合。

Java 消息服务(Java Message Service,JMS)是一种与厂商无关的 API,用来访问消息收发系统。它类似于 JDBC (Java Database Connectivity):这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂目前都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,这只是几个例子。

JMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个 JML 客户机发送消息。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来划分,可以将消息分为几种类型,它们分别携带:简单文本 (TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。

通信传递的消息交换了计算机之间至关重要的数据——而非用户之间——并且包含了例如事件通知和服务请求之类的信息。通信通常用来协调在不同的系统中或是用不同的编程语言所写的程序。

使用JMS接口,程序员可以调用IBM的MQSeries,Progress Software的SonicMQ和其他流行通信产品商家的消息服务。另外,JMS支持包含串行Java对象的消息和包含可扩展标记语言(XML)页面的消息。

总之,JMS自身并不是一种消息传送系统;它是消息传送客户端与消息传送系统通信时所需接口和类的一个抽象。与JDBC访问数据库,JNDI抽象访问命令目录接口服务一样,JMS抽象可以访问消息服务提供者,使用JMS,应用程序的消息传送客户端可以实现跨消息服务器产品的移植。JMS规范是一个具有单向优势的,健壮的规范,包括了一组丰富的消息传送语义,并和简单而灵活的API结合,用于将消息传送合并到应用程序中。

3 JMS的消息模型

   JMS具有两种通信模式:

      1、Point-to-Point Messaging Domain (点对点)

      2、Publish/Subscribe Messaging Domain (发布/订阅模式)

   任何JMS的提供者可以实现其中的一种或两种模型,这是它们自己的选择。JMS规范提供了通用接口保证我们基于JMS API编写的程序适用于任何一种模型。

  (1)、Point-to-Point Messaging Domain(点对点通信模型)

      a、模式图:

      jms3.png

      b、涉及到的概念:

        在点对点通信模式中,应用程序由消息队列,发送方,接收方组成。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。

      c、特点:

  • •  每个消息只要一个消费者

  • 发送者和接收者在时间上是没有时间的约束,也就是说发送者在发送完消息之后,不管接收者有没有接受消息,都不会影响发送方发送消息到消息队列中。

  • 发送方不管是否在发送消息,接收方都可以从消息队列中去到消息(The receiver can fetch message whether it is running or not when the sender sends the message)

  • 接收方在接收完消息之后,需要向消息队列应答成功

  (2)、Publish/Subscribe Messaging Domain(发布/订阅通信模型)

      a、模式图:

        jms2.png


      b、涉及到的概念:

        在发布/订阅消息模型中,发布者发布一个消息,该消息通过topic传递给所有的客户端。该模式下,发布者与订阅者都是匿名的,即发布者与订阅者都不知道对方是谁。并且可以动态的发布与订阅Topic。Topic主要用于保存和传递消息,且会一直保存消息直到消息被传递给客户端。

      c、特点:

  • •  一个消息可以传递个多个订阅者(即:一个消息可以有多个接受方)

  • 发布者与订阅者具有时间约束,针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。

  • 为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。

4 JMS编程模型 

  1. 1.管理对象(Administered objects)-连接工厂(Connection Factories)和目的地(Destination)
  2. 2.连接对象(Connections)
  3. 3.会话(Sessions)
  4. 4.消息生产者(Message Producers)
  5. 5.消息消费者(Message Consumers)
  6. 6.消息监听者(Message Listeners)

    jms1.png

 

    (1)、Connection Factories

        创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。可以通过JNDI来查找ConnectionFactory对象。客户端使用一个连接工厂对象连接到JMS服务提供者,它创建了JMS服务提供者和客户端之间的连接。JMS客户端(如发送者或接受者)会在JNDI名字空间中搜索并获取该连接。使用该连接,客户端能够与目的地通讯,往队列或话题发送/接收消息。


QueueConnectionFactory queueConnFactory = (QueueConnectionFactory) initialCtx.lookup ("primaryQCF");
Queue purchaseQueue = (Queue) initialCtx.lookup ("Purchase_Queue");
Queue returnQueue = (Queue) initialCtx.lookup ("Return_Queue");

    (2)、Destination

        目的地指明消息被发送的目的地以及客户端接收消息的来源。JMS使用两种目的地,队列和话题。如下代码指定了一个队列和话题:

    创建一个队列Session:

QueueSession ses = con.createQueueSession (false, Session.AUTO_ACKNOWLEDGE);  //get the Queue object  
Queue t = (Queue) ctx.lookup ("myQueue");  //create QueueReceiver  
QueueReceiver receiver = ses.createReceiver(t);

 

    (3)、Connection

      Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。

      连接对象封装了与JMS提供者之间的虚拟连接,如果我们有一个ConnectionFactory对象,可以使用它来创建一个连接。

Connection connection = connectionFactory.createConnection();

 

    (4)、Session

      Session 是我们对消息进行操作的接口,可以通过session创建生产者、消费者、消息等。Session 提供了事务的功能,如果需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。

      我们可以在连接创建完成之后创建session:

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

      这里面提供了参数两个参数,第一个参数是是否支持事务,第二个是事务的类型

 

    (5)、Producter

      消息生产者由Session创建,用于往目的地发送消息。生产者实现MessageProducer接口,我们可以为目的地、队列或话题创建生产者;

MessageProducer producer = session.createProducer(dest);
MessageProducer producer = session.createProducer(queue);
MessageProducer producer = session.createProducer(topic);


    (6)、Consumer

      消息消费者由Session创建,用于接收被发送到Destination的消息。

MessageConsumer consumer = session.createConsumer(dest);
MessageConsumer consumer = session.createConsumer(queue);
MessageConsumer consumer = session.createConsumer(topic);


    (7)、MessageListener

      消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一种MessageListener。

5 JMS实现-ActiveMQ

5.1 介绍

Apache ActiveMQApache软件基金会所研发的开放源代码消息中间件;由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行。

gitee地址:https://gitee.com/apache/activemq#https://gitee.com/link?target=https%3A%2F%2Factivemq.apache.org%2Fversion-5-getting-started.html

官网地址:https://activemq.apache.org/

因为该mq历史久远,当前鲜有人用,不详细介绍,我们了解下怎么用有一个体感就行。万一真用到,只能说你接到了一个锅,慢慢爬坑吧。

5.1.1 特色

5.2 安装

安装ActiveMQ开源消息总线:http://activemq.apache.org/activemq-5111-release.html

ActiveMQ 服务启动地址:http://127.0.0.1:8161/admin/ 用户名/密码 admin/admin

5.3 实践

5.3.1 消息生产者Receive消息确认的三种方式:


  • Session.AUTO_ACKNOWLEDGE。当客户成功的从receive 方法返回的时候,或者从MessageListener.onMessage
    方法成功返回的时候,会话自动确认客户收到的消息。

  • Session.CLIENT_ACKNOWLEDGE。 客户通过消息的 acknowledge 方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消 费的消息。例如,如果一个消息消费者消费了 10 个消息,然后确认第 5 个消息,那么所有 10 个消息都被确认。

  • Session.DUPS_ACKNOWLEDGE。 该选择只是会话迟钝第确认消息的提交。如果 JMS provider 失败,那么可能会导致一些重复的消息。如果是重复的消息,那么 JMS provider 必须把消息头的 JMSRedelivered 字段设置为 true。

5.3.2 springboot中使用activeMQ

pom

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.40</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency></dependencies>

配置

jms4.png

packagecom.jf.config;
importorg.apache.activemq.command.ActiveMQQueue;
importorg.springframework.beans.factory.annotation.Value;
importorg.springframework.context.annotation.Bean;
importorg.springframework.context.annotation.Configuration;
importjavax.jms.Queue;
@ConfigurationpublicclassQueueConfig {
@Value("${queue}")//拿到配置文件中的队列名privateStringqueue;
@BeanpublicQueuequeue(){
returnnewActiveMQQueue(queue);
    }
}

发送消息类(消息生产者)

packagecom.jf.config;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.jms.core.JmsMessagingTemplate;
importorg.springframework.stereotype.Component;
importjavax.jms.Queue;
@ComponentpublicclassSendServer {
@AutowiredprivateJmsMessagingTemplatejmsMessagingTemplate;
@AutowiredprivateQueuequeue;
publicvoidsend(Stringmsg){
jmsMessagingTemplate.convertAndSend(queue,msg);
}
}

消息消费者

packagecom.jf.listen;
importcom.alibaba.fastjson.JSON;
importcom.alibaba.fastjson.JSONObject;
importorg.springframework.jms.annotation.JmsListener;
importorg.springframework.stereotype.Component;
@ComponentpublicclassConsumer {
@JmsListener(destination="${queue}")
publicvoidreceive(Stringmsg){
//解析jsonJSONObjectjsonObject=JSON.parseObject(msg);
if ("email".equals(jsonObject.get("type"))){
System.out.println("监听器收到消息:发送email给"+jsonObject.get("to")+",内容为:"+jsonObject.get("content"));
    }
}
}
相关实践学习
消息队列+Serverless+Tablestore:实现高弹性的电商订单系统
基于消息队列以及函数计算,快速部署一个高弹性的商品订单系统,能够应对抢购场景下的高并发情况。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 Java 数据库连接
03JMS简介
03JMS简介
59 0
|
消息中间件 Java 中间件
JMS 和 AMQP
JMS 和 AMQP简介
176 0
|
消息中间件 存储 网络协议
Spring Boot与消息(JMS、AMQP、RabbitMQ)
1.概述。 大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力。 消息服务中两个重要概念: 消息代理(message broker)和目的地(destination)。 当消息发送者发送
305 0
|
消息中间件 Java 数据库
ActiveMQ系列:JMS的可靠性
持久化消息这是队列的的默认传送模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。
146 0
ActiveMQ系列:JMS的可靠性
|
消息中间件 Java 大数据
JMS VS AMQP
《高性能》系列
161 0
JMS VS AMQP
|
消息中间件 Java 测试技术
Spring整合JMS(消息中间件)实例
本篇文章主要描述了如何配置Spring-JMS,至于为何这样配置及Spring-JMS相关介绍,请阅读这篇文章:Spring整合JMS(消息中间件)。我们这里的消息broker用的是ActiveMQ。 一、相关配置 本篇主要讲解如何在Spring中配置JMS,关于Spring本身的配置本文就不多做介绍了。 1.1 配置maven依赖 在使用Spring-JMS之前,先
2020 7
|
消息中间件 Java API
Spring整合JMS(消息中间件)
本篇主要介绍了异步消息机制及Spring对JMS封装,本篇文章讲解较为详细,如果想直接看如何配置,可以参考: Spring整合JMS(消息中间件)实例,但还是建议大家先看完本篇文章。 一、消息异步处理 类似于RMI、Hessian、Burlap等远程方法调用,它们都是同步的,所谓同步调用就是客户端必须等待操作完成,如果远程服务没有返回任何响应,客户端会一直等待直到服务完成
1605 5
|
消息中间件 存储 Java
JMS
定义 Java 消息服务(Java Message Service)是 java 平台中关于面向消息中间件的 API,用于在两个应用程序之间,或者分布式系统中使用消息进行异步通信。 JMS 是一个与具体平台无关的 API,绝大多数 MOM(面向消息中间件)提供商都对 JMS 提供了支持。
1785 0
|
消息中间件 Java Spring
Spring消息之JMS.
一、概念 异步消息简介     与远程调用机制以及REST接口类似,异步消息也是用于应用程序之间通信的。     RMI、Hessian、Burlap、HTTP invoker和Web服务在应用程序之间的通信机制是同步的,即客户端应用程序直接与远程服务相交互,并且一直等到远程过程完成后才继续执行。
1247 0