定义
Java 消息服务(Java Message Service)是 java 平台中关于面向消息中间件的 API,用于在两个应用程序之间,或者分布式系统中使用消息进行异步通信。
JMS 是一个与具体平台无关的 API,绝大多数 MOM(面向消息中间件)提供商都对 JMS 提供了支持。
MOM
面向消息的中间件,使用消息传送提供者来协调消息传送操作。MOM 需要提供 API 和管理工具。客户端使用 api 调用,把消息发送到由提供者管理的目的地。接收方收到这个消息确认之前,提供者一直保留该消息。
JMS 规范
规范最初的开发目的是为了使 Java 应用程序能够访问现有 MOM 系统,并形成一套统一的标准,解决不同消息中间件之间的协作问题。其内容包括:
1. 不同的消息传送模式或域,例如点对点消息传送和发布/订阅消息传送
2. 提供于接收同步和异步消息的工具
3. 对可靠消息传送的支持
4. 常见消息格式,例如流、文本和字节
JMS体系结构:
消息传递域
JMS 规范中定义了两种消息传递域:点对点(point-to-point )消息传递域 和发布 / 订阅消息传递域
(publish/subscribe)。
点对点消息传递域:
1. 每个消息只能有一个消费者
2. 消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,都可以提取消息
发布订阅消息传递域:
1. 每个消息可以有多个消费者
2. 生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费自它订阅之后发布的消息。
JMS 规范允许客户创建持久订阅,这在一定程度上降低了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。
持久订阅时,客户端向 JMS 服务器注册一个自己身份的 ID(connection的 ClientID),当这个客户端处于离线时,JMS Provider 会为这个 ID 保存所有发送到主题的消息,当客户再次连接到 JMS Provider 时,会根据自己的 ID 得到所有当自己处于离线时发送到主题的消息。
当然这种方式也有一定的影响:当持久订阅者处于 未激活状态时,Broker 需要为持久订阅者保存消息;如果持久订阅者订阅的消息太多则会溢出。
消息结构组成
JMS 消息由及部分组成:消息头、属性、消息体:
1. 消息头(Header) - 消息头包含消息的识别信息和路由信息,消息头包含一些标准的属性如:
2. JMSDestination 消息发送的目的地,例如 queue 或者 topic;
3. JMSDeliveryMode 传送模式,持久模式和非持久模式
4. JMSPriority 消息优先级(优先级分为 10 个级别,从 0(最低)到 9(最高). 如果不设定优先级,默认 4,需要注意的是,JMS provider 并不一定保证按照优先级的顺序提交消息)
5. JMSMessageID 唯一识别每个消息的标识
属性
按类型可以分为应用设置的属性,标准属性和消息中间件定义的属性。
1. 应用程序设置和添加的属性:
Message.setStringProperty(“key”,”value”);
在发送端,定义消息属性:
message.setStringProperty("abc","hello");
在接收端接收数据:
Enumeration enumeration=message.getPropertyNames();
while(enumeration.hasMoreElements()) {
String key = enumeration.nextElement().toString();
System.out.println(message.getStringProperty(name));
}
2. JMS 定义的属性
返回所有连接支持的 JMSX 属性的名字:
connection.getMetaData().getJMSXPropertyNames()
3. JMS provider 属性
消息体,即实际传递的消息内容,JMS API 定义的 5 种消息体格式:
TextMessage java.lang.String 对象
MapMessage key为 String,value 可以是 Java 任何基本类型
BytesMessage 字节流
StreamMessage Java 中的输入输出流
ObjectMessage Java 中的可序列化对象
一般情况下,只有JMS provider 属性会被用到。
JMS 消息的可靠性机制
消息的消费通常包含 3 个阶段:客户接收消息、客户处理消息、消息被确认。
事务型会话
JMS Session 接口提供了 commit 和 rollback 方法。
在发送端,消息 send 之后且 commit 之前,消息只会在本地,当 commit 后才会实际发送到broker,当 rollback 后,消息在本地被销毁;在接收端,消息commit之后意味着当前消息被确认。
commit 或 rollback 方法一旦被调用,一个事务就结束了。
通过在创建 session 的时候使用 true or false 来决定当前的会话是事务性还是非事务性。
connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
在事务性会话中,消费端 commit 之后,即
session.commit()
以后,当前消息会自动被确认掉。
非事务型会话
消息何时被确认取决于创建会话时的应答模式:
acknowledgement mode
取值如下:
1. Session.AUTO_ACKNOWLEDGE
当客户成功的从 receive 方法返回的时候,或者从 MessageListenner.onMessage 方法成功返回的时候,会话自动确认客户收到消息。
2. Session.CLIENT_ACKNOWLEDGE
客户通过调用消息的 acknowledge 方法确认消息。
3. CLIENT_ACKNOWLEDGE 特性
在这种模式中,确认一个被消费的消息将自动确认所有之前已被会话消费的消息。列如,如果当前消费了 10 个消息,确认了第 5 个消息,那么 0~5 的消息都会被确认。
4. Session.DUPS_ACKNOWLEDGE
消息延迟确认。指定消息提供者在消息接收者没有确认发送时重新发送消息,这种模式不在乎接受者收到重复的消息。
消息的持久化存储
消息发送到 Broker 上以后,存储在 broker 上的消息不应该丢失。可以通过下面的代码来设置消息发送端的持久化和非持久化特性:
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
对于持久消息,broker 使用存储-转发机制,先将消息存储到稳定介质中,如果 broker 挂掉了,这些消息不会丢失;broker 恢复正常后,会重新传送给对应的消费者。
非持久的消息,JMS provider 会将它存到内存中,如果 jms provider 宕机,那么内存中的非持久消息会丢失。
消息同步发送和异步发送
mq消息发送一般支持同步、异步两种模式将消息发送到 broker 上。
同步发送过程中,发送者发送一条消息会阻塞直到broker反馈一个确认消息,表示消息已经被broker处理。这个机制提供了消息发送的安全性保障,但是会影响客户端消息发送性能。
异步发送的过程中,发送者不需要等待broker提供反馈,性能相对较高。但可能出现消息丢失的情况。所以使用异步发送的前提是在某些情况下允许出现数据丢失的情况。