开发者社区> 沉默术士> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

JMS 概述

简介:
+关注继续查看

一.JMS使用场景:
    JMS是为"消息"而生,从使用的角度来说,任何可以与业务解耦的数据均可以作为"消息"存储;任何结构化(格式严格,适合解析)但未索引化(不能被随意检索)的数据均可以交付给JMS存储,但尽管JMS不是为存储而生.


    1) 异构系统(应用)中,如果数据的交互无法通过DB共享/调用接口等方式实现时,可以考虑使用JMS作为数据的中转中心,并使用JMS API来交付消息和消费消息.主要作用为"业务解耦"或"系统解耦".


    2) 数据交互过程,允许异步操作时,可以考虑使用JMS;比如数据的生产者交付数据后,即可立即返回而无需等待数据的执行结果,对于数据消费者可以在合适的时机(网络允许/选择条件允许时)接收数据并执行.这一点可能是我们使用JMS的主要原因,它较好的解决了RPC的同步调用所带来的弊端,很多时候具有"事件驱动"特征的调用过程,可以尝试使用JMS来解决.主要作用为"数据交互过程解耦".


    3) 如果数据交互的双端执行效率不对等时,可以考虑使用JMS;主要场景为,数据生产的速度非常高(比如队列),但是数据消费的速度相对较低,如果没有良好的架构设计,必将对数据生产者造成阻塞或者数据积压,从而对整体的性能带来挑战或者数据安全带来风险;JMS就是一个很好的"数据缓冲"中心,它将密集的数据提交操作以"均匀"的速度交付给数据消费者,从而实现系统整体的稳定性.这种策略,在日志统计/小数据分析系统中,经常使用.主要作用为"缓解系统瓶颈,提高生产效率".

    归纳为: "异构系统通讯"/"异步调用"/"应用解耦",同时,JMS也是"异步RPC调用"的解决方案之一,同时也是"actor"设计模型的特列.开发者可以选择合适的JMS实现,作为消息中间件来解决系统架构中的上述问题.
    
二.JMS消息传送模型
    JMS具备两种"消息传送模型":P2P和Pub/sub.

 

    1) P2P: 点对点消息传送模型,允许JMS客户端通过队列(queue)这个虚拟通道来同步或异步发送消息;消息的生产者为Sender,消费者为receiver;消息传送机制上,基于拉取(pull)或者轮询(polling)方式,即receiver主动到队列中请求消息,而不是JMS提供者将消息推送到客户端;主要原因是一个队列通道可能有多个receiver,每个receiver可能对消息的处理速率不同(因处理消息而造成的阻塞时间不同),对于JMS提供者而言,它无法意识到哪个receiver处于"空闲"状态,如果JMS提供者主动推送会造成通道的阻塞或者消息在客户端积压等问题;所以基于客户端pull的方式,当receiver空闲时向JMS提供者请求消息,很好的解决了这个问题,而且还能进行良好的"负载均衡".

    Queue中的消息如果被某个recervier成功接收(或者确认成功)后,消息就会被移除.

    P2P消息传送模式即支持异步"即发即失",也支持同步的"请求/应答";这两种实现手段,稍后会通过实例来展示.

 

    2) Pub/sub: 发布/订模型中,消息会发布到一个名为主题(Topic)的虚拟通道中,消息的生产者为Publisher,消费者为subscriber,发布到Topic中的消息,可以被多个客户端同时接收(区别于queue);pub/sub消息传送模型是基于推送(push),JMS提供者将消息主动推送给及客户端,类似于广播;之所以采取此方式,其实很好理解,既然每个客户端都应该收到消息,那么对于JMS提供者而言,只需要遍历所有的"活跃"的链接,依次将消息发送出去即可,而无需客户端"徒劳"的去轮询.

    在Pub/sub模型内部,有多种不同类型的订阅者;非持久订阅者是临时订阅者,它们只是在主动侦听主题式才能收到消息;持久订阅者将接收到发布的每一条消息,即使它的链接处于"离线".此外还有"动态持久订阅者"和"受托管的持久订阅者",稍后会通过实例展示如何使用它们.

 

    JMS提供者都支持"消息"的持久化,任何发送给JMS提供者的消息,都会首先被持久存储(对于非持久类型的数据,是基于cache),然后适时将消息交付给消费者;这一种有效的担保策略("保存并转发"),有效的确保了消息的安全性.

 

三.JMS API简析

顶级接口

P2P

Pub/sub

备注

ConnectionFactory

QueueConnectionFactory

TopicConnectionFactory

基于工厂模式,创建和JMS提供者之间的链接,需要制定链接的URL或者协议;任何JMS客户端和JMS提供者之间的交互,都必须基于制定的连接.

Destination

Queue

Topic

“目的地”,用于描述消息的通道类型,是JMS提供者用于标记消息归属类型的”标记”.

Connection

QueueConnection

TopicConnection

“链接”,用于描述一个实际的网络通讯链接,比如TCP/UDP等,任何交互数据,都必须通过”链接”进行传输,JMS实现者负责定义数据格式(协议);在物理层用于区分JMS客户端,一般而言,一个应用只会有一个”链接”.

Session

QueueSession

TopicSession

“会话”,在逻辑上用于区分JMS客户端,因为”链接”可被公用以提高网络利用率;每个session可以支持相互独立的”事务”和相关属性.每个session都有ID.

Message

--

--

“消息”,JMS API中提供了多种类型Message,它们有各自的”序列化/反序列化”机制;消息中可以包含多种JMS属性以及客户端自定义的消息属性和内容.

MessageProducer

QueueSender

TopicPublisher

“生产者”,一种可以向JMS提供者提交消息的客户端类型.

MessageConsumer

QueueReceiver

TopicSubscriber

“消费者”,一种可以向JMS提供获取消息的客户端类型.

 

四. JMS-API详解部分

 

1. ConnectionFactory接口:

    链接工厂,用于创建"链接",此处所指的连接为底层实际物理连接,即TCP连接(Socket通道).此接口有多个子接口:QueueConnectionFactory(用来创建Queue消息类型的链接),TopicConnectFactory,XAQueueConnectionFactory(基于XA分布式事务的Queue链接),XATopicConnectionFactory.

    ConnectionFactory接口中并没有约束建立链接所使用的协议、URL、安全策略等;这一切就交给JMS Provider去实现。通常情况下ConnectionFactory实例为单例,而且推荐以JNDI的方式获取.

  • Connection createConnection()
  • Connection createConnection(String username,String password):使用简单的密码校验方式来创建链接.如果JMS Server端对受托管的queue/topic配置了需要授权才能访问,那么在建立相应的链接时需要交付密码.

2. Connection接口:

    代表底层一个物理(或者逻辑上)的一个Socket链接通道,此链接将会保持活跃直到JMS Client关闭或者JMS提供者(即JMS server端)在socket上阻塞超时.通常情况下,Connection为一个TCP链接(长连接),一个JMS Client建议维持一个Connection即可,事实上Queue/Topic不同类型的Client,那么也将创建不同的Connection.对于Server而言,Socket的资源开支是昂贵的,尽量避免一个Client创建多个Connection的情况.

    消息将通过"数据格式协议"在socket通道中传输,同一个connection中消息的发送/接受是有顺序的,这受限于Socket本身对流数据操作的特性.

    JMS Provider会为每个Connection生成ID,用来监控链接、消息分组等;JMS server端将会维护当前所有存活的Connection列表。

  • void start(): "开启"消息接收,此后即可接收消息;不过对于Producer而言,无论链接处于何种状态,均可以发送消息;此方法主要对消费者有效.此操作对Connection上所有的session都有效.
  • void stop():"终止"消息接收,此后将不能接收到消息;当链接重新被start之后,将仍然可以继续接收.
  • void close():关闭链接,底层为直接关闭socket;与Connection有关的临时(temporary)目的地都将被删除(包括TemporaryQueue,TemporaryTopic),以及此Connection有关的Sessions/productor/consumer都将被关闭.JMS规范要求,如果close方法返回意味着connection中所有的send操作已经结束,消息接收的receive方法已经返回(或中断);close方法会导致事务中的session无法继续,可能会rollback.
  • Session createSession(boolean transacted,int acknowledgeMode):创建会话,并指定此Session的事务性和消息确认模式.
  • String getClientID():获得当前JMS Client的ID;每个Connection,对于JMS提供者而言,就被认为是一个Client,clientId用来表示全局中唯一的一个链接,有server端生成;不同的JMS提供者对此ID的生成策略有所不同.
  • void setClientID():设置ClientID,此操作需要在建立Connection之后,未使用Connection进行任何实际操作之前(包括创建session)进行;否则将会抛出异常.因为clientID是JMS server用来唯一标记链接的,因此在全局中不能重复,如果尝试设定一个已有的ClientId,将会抛出InvalidClientIDException.不过此方法可能在某些JMS Provider上不被支持.
  • void setExceptionListener(ExceptionListener listener):设定Connection失效异常监听器,当JMS Client检测到链接异常,比如链接异常断开,将会通知此listener,可以在listener中做一些日志记录/补救措施,比如重新建立链接/会话恢复等.
  • ConnectionMetaData getMetaData():获取JMS Provider中有关的元数据信息,比如版本号等.

3. DeliveryMode接口:

    "消息传输模式",此属性可以在session中指定,也可以在发送消息时指定;用来标记此消息是否需要被持久化,对于JMS而言,支持2种(作为DeliveryMode的静态属性):

    1) PERSISTENT: 表示消息需要被持久化,对于JMS Provider而言,这种类型的消息将会被存储在磁盘上;以确保在server故障恢复后,消息仍然保留.

    2) NON_PERSISTENT:表示消息不需要持久化,消息有可能被优先存储在内存中,或者存储在磁盘上某个临时文件上;当server故障失效后,消息将不能被恢复.

 

4.Destination接口:

    用来表示一个虚拟通道,或者说"目的地",消息的发送或者接收,都需要指定的destination.常见的2个子接口为:Queue和Topic;根据Destination的约束条件不同,可以分为"受托管Destination"、"动态Destination"、“临时Destination”。

    其中"受托管Destination"为JMS Provider中通过配置的方式声明的,无法通过外部API直接修改,此destination的使用需要受到JMS server管理员的授权等.

    "动态Destination"为通过JMS API方式创建的,比如session.createQueue(String queueName);这种类型的destination可以给JMS Client使用者提供了更多的自由空间,更加常用.

    "临时Destination"相对于"durable"(耐久的),这种类型的destination只能被当前Client感知到,它的生命周期和使用范围局限于Connection;即只有创建它的session所属的Connection中的其他消费者或者生产者才能使用它.这种destination在某些场景下很有用.

 

5. ExceptionListener接口:

    主要用来处理connection级别异常,比如链接异常断开;你可以在此listener中增加比如"链接重建"/"会话恢复"等措施;但是对于业务异常,此listener将不负责管理.

  • void onMessage(JMSException exception)

6. Session接口:

    最常用接口,考虑到Connection的资源开支较大,那么JMS提供了API级别的逻辑上的"链接",即Session;它可以更小粒度的控制消息属性(比如,确认模式,事务支持)以及消息的发送和接收.

    session通常在单线程中使用,无论是MessageProducer还是Consumer;而且通常情况下,一个Session只维护一个Producer实例或者Consumer实例;此外session的创建是非常便捷的.此外sesison接口本身实现了runnable接口,它通常被运行在一个"SessionThread"中.

    通过Session,可以创建多种类型的Producer和Consumer,而且事务的支持,也被控制在session级别.因此在支持事务的情况下,多个consumer或者Producer公用一个Session实例是不明智的.当然这也不是错误,不过前提需要注意:session中数据的操作并非在多线程下,能够得到预期的效果.

    如果你期望一个Producer持续的send消息,而另一个Consumer能够通过listener的方式接收消息,那么你应该将它们放在两个session中.此外如果你的消费者是基于listener异步接收消息,那么你应该为每个listener使用不同的session.

    在支持事务的Session中,多个消息的发送或者接收作为一个原子性单元,当事务提交后,消息的"确认"也是

作为原子性单元(此处消息的确认包括事务中多个发送的消息,或者消费者中连续消费的多个消息);如果事务回滚,将导致此事务中发送的消息被销毁(JMS server端做删除操作),对于消费者而言,事务中接收的消息将会被恢复(即认为消息未被收到,将会被重发).由此可见,在事务类型的session中,消息的确认时机将和事务提交的时机保持一致.

    在事务类型的Session中,如果事务没有提交,那么生产者send的消息对其他消费者不可见;对于消费者而言,如果事务没有提交,那么消息将不会从queue中删除,对于topic类型,消息将不会从自己的"消息副本"中删除.

    session并没有start()方法,默认每次commit之后就会开启一个新的事务.

  • static int AUTO_ACKNOWLEDGE: 消息自动确认,即消费者从receive()方法成功返回时或者当messageListener.onMessage(..)方法成功返回时,进行消息确认.如果receive()方法内部或者onMessage方法内部抛出异常(未捕获),将会导致此消息不能被"确认";那么对于JMS Provider而言,则认为此消息消费失败,将会重发.(对于某些Provider而言,将会在重发多次后仍然失败,将会考虑将消息转发给其他Connection).
  • static int CLIENT_ACKNOWLEDGE: 客户端确认,即消息的确认需要client端选择时机手动去触发。这个方式给消息的确认提供了更加自由的方式;此方式针对消息消费者,消费者可以在接收到消息后,在任意时间调用此message.acknowledge()方法来确认消息。
  • static int DUPS_OK_ACKNOWLEDGE: "可重复消息确认",此模式可以允许JMS提供者将一条消息向同一个目的地发送两次以上。
  • void close(): 关闭session;直接导致与此session有关的资源被释放,如果消费者的recevie()正在接收消息(而不是wait阻塞)或者messageListener.onMessage()方法正在执行,那么session关闭将会被阻塞。session关闭将会导致尚未提交的事务被回滚。session关闭后,此session创建的Producer或者Consumer将无法继续工作。
  • BytesMessage createByteMessage():
  • MapMessage createMapMessage():创建一条消息
  • void commit():提交事务。
  • void rollback():事务回滚。
  • void recover():此方法只会被JMS Provider调用,JMS提供者将会暂停消息的发送,将此前已经发送但没有“确认”消息标记为“redelivered”,然后按照消息的原始顺序,将消息发送给Client端(包括redelivered和新消息)。
  • MessageProducer createProducer(Destination des): 创建一个消息生产者。
  • MessageConsumer createConsumer(Destination des): 创建一个消息消费者。
  • MessageConsumer createConsumer(Destination des,String selector): 指定消息选择器。
  • MessageConsumer createConsumer(Destination des,String selector,boolean noLocal): 指明当前消费者是否可以接受"本地"消息.noLocal参数只对Topic类型的"目的地"有效,如果消息消费者和生产者有一个Connection创建(即它们具有同一个ClientID,或者说底层是一个TCP链接),即使它们是在不同的session中,它们均被认为是“本地”。
  • Queue createQueue(String queueName):创建一个“队列”类型的目的地(动态队列)。
  • Topic createTopic(String tpoicName):创建一个“主题”类型的目的地(动态主题)。
  • TopicSubscriber createDurableSubscriber(Topic topic,String name): 创建一个“耐久订阅者”,并指定订阅者的名称(name,需要全局唯一);如果一个Client需要接收Topic的全部信息,即使当Client的链接失效时也需要JMS 提供者保存它“错过”的消息。考虑到JMS 提供者内部的机制(消息副本),需要为此“耐久订阅者”指定一个名称,且名称不能改动,否则此后消息不能接收到。创建“耐久订阅者”有个必要的先决条件:ClientID必须一致,你在创建“durableSubscriber”时,需要显示的指定“connection.setClientID(xxx)”,且ClientID在每次启动时必须一样,否则将无法使用“耐久订阅者”。(You cannot create a durable subscriber without specifying a unique clientID on a Connection)。
  • QueueBrowser createBrowser(Queue queue,String selector): 类似于创建一个Consumer,不过此时创建的是一个“Browser”,只能查看消息队列,但不能消费(也不能干扰消费)。
  • TemporaryQueue createTemporaryQueue(): 创建一个“临时队列”,其生命周期为当前Connection;如果当前session或者Connection关闭,那么queue也将不可用;同时此Queue只能被当前Connection下的其他session使用,对于其他Connection,此Queue是不可见的。(不能跨Connection,同时此方法并没有指名参数;Don't understand null destinations)
  • TemporaryTopic createTemporaryTopic(): 同上
  • void unsubscribe(String name): 取消“耐久订阅者”,此后JMS Provider将不会对此订阅者保留消息副本。 

7. MessageConsumer接口:

    消息消费者顶级接口,其子接口有QueueReceiver和TopicSubscriber;可以在Session接口中通过各种方式创建MessageConsumer.消息接收可以是同步的,也可以是异步的.这取决于编码方式.

  • String getMessageSelector(): 获取当前消息的"消息选择器";Session接口中,已经提供了基于选择器创建Consumer的方法,注意:消息选择器必须在创建Consumer时指定,且整个session期间将无法再次改动,当然MessageConsumer接口中也没有提供设置选择器的方法;这个和JMS Provider对消息选择器的使用机制有关(后端过滤机制),稍后有专题专门介绍"消息选择器"的原理.
  • Message receive(): 以同步的方式接收消息,同步意味着"阻塞",当在connection活跃期间,当前"目的地"中没有消息push过来(对于Topic)或者没有侦听到消息(对于Queue,普遍采用polling策略),那么此方法将会阻塞,直到接收到消息,或者consumer被关闭(close方法),或者connection/session被关闭,此时将会返回null.前文已经提到,在事务类型的session中,此方法返回时表示此消息已经被确认.
  • Message receive(long timeout): 已同步的方式接收消息,不过指定了方法阻塞的时间;如果在超时时仍未收到消息,将会返回null.
  • Message receiveNoWait(): 以非阻塞的方式接收消息,不过此处的非阻塞,只是尝试去获取,如果此时有消息亟待接收,将会返回message.否则返回null..此方法在一定程度上要求consumer使用"轮询"的方式获取消息,比如在while循环中.
  • void close(): 关闭消费者,此方法会阻塞,直到正在接收消息的receive方法返回,或者基于messageListener.onMessage()方法执行结束.如果receive方法尚未收到消息,则返回null.如果session或者connection被关闭,也将连带此consumer被关闭.

8. MessageProducer接口:

    消息生产者的顶级接口,负责向指定的"目的地"发送消息.两个子接口:QueneSender和TopicPublisher

  • void setDisableMessageID(boolean value): 是否关闭"messageID"选项,不过首先声明并不是所有的JMS Provider都会此选项感兴趣,有些JMS Provider会忽略此选项,会对所有的消息都生成messageID.有些JMS Provider则反其道而行之,将会忽略所有的MessageID,以便减少此值带来的额外的开支(比如网络传输数据量).在很多场景下,我们需要跟踪消息(或者过滤消息,"请求应答"模式),那么MessageID对我们将会非常有效.JMS API规定,任何一个MessageID必须为全局唯一的,以便Client端可以使用MessageID作为某种检索/存储的主键.
  • setDisableMessageTimestamp(): 是否关闭"messageTimestamp"选项,它的机制和messageID一样;messageTimestamp用来表示此消息被发送的时间戳(send方法执行时);此选项通常可以用来跟踪消息被创建的时间.
  • void setDeliveryMode(int mode): 设置消息的"传输模式";此设置将直接影响此Producer下所有发送的消息.
  • void setPriority( int priority): 设置消息的"权重",共0~9个级别,默认为4,其中0~4表示普通优先级,5~9表示高优先级;优先级越高,将会导致此消息被优先发送给消费者.此后我们会详解"消息权重"和系统设计.
  • void setTimeToLive(long timeToLive): 设置消息的最大存活时间,毫秒数.默认为0表示永不过期;JMS Provider在发送消息时都会检测消息的"存活有效性",如果消息过期,将会直接删除,而不发送给消费者.其中mesageTimestamp + timeToLive最终表示为消息过期的时间.在JMS Provider将消息发送给Client时,将使用过期时间和server的本地时间比较,以决定其是否过期.
  • void close(): 关闭消息生产者.如果session或者connection被关闭,也将连带此consumer被关闭.
  • void send(Message message): 发送消息 ,不过此时将会使用MessageProducer指定的priority/deliveryMode等.
  • void send(Message message,int deliveryMode,int priority,long timeToLive); 发送消息,并为此消息指定"传输模式"和"消息权重".如果某条消息需要特定声明这些属性,那么你可以使用此方法.
  • void send(Destinantion des,Message message): 向指定的目的地,发送消息;这个方法并不常用,通常用在"请求-应答"模式中:即消息消费者接收到消息之后,需要临时创建一个Producer并将"应答消息"发送到指定的"replyTo"地址.

五.JMS消息类型:

    无论是消费者还是生产者,这一切都是在围绕:消息(Message).一条消息包括消息头/消息属性/消息体.

    1. 消息头:用于标识消息的核心属性,这些属性通常有JMS Provider设定.API格式为message.getJMS<header>.例如:

 

Java代码  收藏代码
  1. message.getJMSCorrelationID();//有消息生产者设定  
  2. message.getJMSReplyTo();//  
  3. message.getJMSDeliveryMode();  
  4. message.getJMSDestination();  
  5. message.getJMSExpiration();  
  6. message.getJMSMessageID();  
  7. message.getJMSRedelivered();  
  8. message.getJMSTimestamp()  

    JMSCorrlelationID通常有Producer在发送消息之前,由开发者指定:message.setJMSCorrelationID(..);用来表示此消息"关联ID",此ID可以为消息发送者指定的任意与业务有关的数据,通常用来表达当前message与某个ID具有一定的耦合关系:通常使用在"请求-应答"模式中,用来约束两条消息的关系.

    JMSReplyTo使用在"请求-应答"场景中,用来表明此消息接收之后需要向此目的地发送"应答"消息.此属性值需要在生产者发送消息之前指定:message.setJMSReplyTo(queue).

Java代码  收藏代码
  1. Message message = consumer.receive(10000);  
  2. if(message == null){  
  3.     continue;  
  4. }  
  5. Destination replyTo = message.getJMSReplyTo();  
  6. if(replyTo != null){  
  7.     Message replyMessage = session.createTextMessage("Replay message");  
  8.     String correlationID = message.getJMSMessageID();  
  9.     replyMessage.setJMSCorrelationID(correlationID);  
  10.     MessageProducer producer = session.createProducer(replyTo);  
  11.     producer.send(replyMessage);  
  12. }  

    JMSDeliveryMode:即消息的传输模型(持久性/非持久性),这个属性由JMS Provider自动分配,即当消息发送给消费者时,由JMS Provider自动填充其值.此值来自消息发送者的设定.无论是何种传输模式,JMS Provider都不会将一条消息成功发送一次以上.

Java代码  收藏代码
  1. MessageProducer producer = session.createProducer(replyTo);  
  2. replyMessage.setJMSDeliveryMode(DeliveryMode.PERSISTENT);//默认为持久性  

    JMSMessageID:消息的ID,此ID通常为全局唯一的,用来唯一的标识一条消息.MessageID是由JMS Provider生成,因此一个没有被JMS Provider转发的新消息(或者通过session.createMessage()创建的新消息),其MessageID为空.生产者无法直接对MessageID赋值,因为JMS Provider会最终忽略它.你可以通过producer.setDisableMessageID()方法来"取消"JMSMessageID消息头,此后尝试获取MessageID将得到null,但并不意味着JMS Provider不会为Message生成ID,事实上每个消息都有ID,只是此方法可以决定是否在传输给消费者时它是否会被包含在消息头上.

    JMSTimestamp:消息被发送的时间戳,即producer.send()的时间,此时间为Client端的本地时间.可以用来计算消息由生产者发送之后,到消息成功接收,之间的时间差.

    JMSExpiration: 消息的有效期,时间戳.常用来过滤"过期"的消息;最终Expiration = timeToLive + timestamp.在消息发送时此属性将会被设定.

Java代码  收藏代码
  1. producer.setTimeToLive(30000);//默认为0,表示永不过期.  

    JMSRedilvered: 如果消息是"重发"时,那么此属性将为true.表示此消息先前未能被正确接收.由JMS Provider填充此值.

    JMSPriority: 此消息的权重,此值可以通过producer.setPriority(int)来设定.通常情况下权重有10个可选值,0~4表示普通优先级,5~9表示高优先级;JMS Provider将会把优先级较高的消息优先发送给消费者,你可以认为JMS Server端的消息存储为"基于权重排序的队列".不过事实上JMS Provider并不会对消息基于权重进行排序,而是在发送时尽可能的选择"权重较高"的消息优先发送,这个是"尽力而为"的,请不要依赖"权重"来实现绝对意义的"排序".

 

    上述JMS消息头中,开发者可以分配的有:JMSReplyTo,JMSCorrelationID;由JMS Provider设定的有JMSMessageID,JMSRedelivered.其他属性均有JMS Client自动分配(即在producer.send()时设定).

 

    2. JMS消息类型:

    1)TextMessage: 消息体为文本字符串,最常用的消息类型.底层为new String(byte[],"utf-8"),非常简单.可以通过getText()/setText()方法操作文本内容.

    2)ObjectMessage: 消息体为java对象,此对象必须是Serializable接口的实例,使用java的序列化机制.消息体在Producer发送时通过ObjectOutputStream.writeObject()序列化,消费者通过ObjectInputStream.readObject()反序列化;可以通过ObjectMessage.getObject()获取对象实例.

    3)BytesMessage: 消息体为字节数组,类似于byteBuffer的结构,字节数组基于utf-8编码,可以像使用ByteBuffer一样来获取ByteMessage消息体中的数据.比如 readLong(),将会获取8个字节并转换成long.如果在对消息操作时,跑出异常,那么仍然可以从异常中回复:reset()方法,将指针返回到字节流的起始位置,可以继续重新read.

    ByteMessage是最基础/移植性最好的消息类型,它可以兼容各种语言下的Client,而且方便存储.

    底层使用DataInputStream/DataInputStream作为数据支撑.

    4)StreamMessage: 消息体为byteStream,可以方便对任意流数据进行传输,当然文本/字节数组/图片流等,也可以以StreamMessage的方式传输.

    毕竟stream并不是一个结构化的数据,因此操作起来比较复杂,这就要求Producer和Consumer需要约定流中每个元素的顺序以及类型.比如:

    在Producer中,执行message.writeLong(),message.writeByte()..

    那么在Consumter中,必须依此调用message.getLong().message.getByte()...否则将会出现异常错误,因此StreamMessage具有严格的字节顺序.

    不过,在不同的JMS provider中,对StreamMessage的实现有很大差异,在ActiveMQ中,StreamMessage内部使用一个list支撑,write操作就是向list中add数据,read操作就是根据根据index获取值(每read依此,导致position++);这个list最终将会在网络交互是被序列化和反序列化.有点像ObjectStream的特例.

    5)MapMessage: 消息体为map,其实消息内容可以通过类似于getString(String key)的方式获取.也是一种常用的消息类型.和StreamMessage类似,底层使用map作为数据支撑.

    6)Message: 默认的消息,此类型的消息不包含消息体,只能发送消息属性(properties)和消息头(header),通常用于表达"通知".

 

    JMS的消息本身是不可重用的,即消息发送之后,消费者得到的Message实例只能read.

    3. JMS消息属性

    JMS header是JMS内置的特殊"消息属性",这些header将有Producer/Consumer/Provider三方共同约定且协同维护一条消息,在消息的路由方式/操作时机上会依赖header..不过JMS只提供了有限的几个header条目,如果开发者或者Provider期望扩展它,可以通过消息属性来完成(Message Properties).如下为JMS中保留的属性:

  • JMSXAppId: application标识
  • JMSXConsumerTXID: 消息消费者的事务ID,provider自动填充
  • JMSXDeliveryCount: 消息已重发的次数.
  • JMSXGroupSeq/JMSXGroupID: 消息分组时使用.
  • JMSXProducerTXID: 消息生产者的事务ID
  • JMSXRcvTimestamp: 消息由Provider发送给consumer的时间戳.由Provider填充.
    其中JMSXGroupID和JMSXGroupSeq是JMS要求Provider必须支持的;对于其他属性,只是可选.此外Provider也可以提供自己的"属性",一般格式为:JMS_<vender-name>;可以参考各自的JMS Provider来获取相关的属性列表.

六.消息发送与接收

    1. 消息发送:

Java代码  收藏代码
  1. Connection connection = connectionFactory.createConnection();  
  2. connection.setClientID("TEST+++++");//optional  
  3. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
  4. Destination queue = session.createQueue(QUEUE);//better: From jndi  
  5. MessageProducer producer = session.createProducer(queue);  
  6. producer.setDeliveryMode(DeliveryMode.PERSISTENT);//持久存储  
  7. TextMessage message = session.createTextMessage("message for test");  
  8. producer.send(message);  

    通过代码我们能够直观的看到,一个connection可以创建多个session,其中每个session就是一个会话单元,事务操作依赖于session.可以通过session来创建producer或者consumer;当然session也可以创建"动态的queue".

    通常情况下,一个应用只会有一个Connection,这个Connection是物理层面的TCP连接.在一个应用中创建多个Connection是一种不良的设计,且存在性能隐患.JMS 提供了细颗粒度的控制API:Session;需要特别提醒,session非线程安全,主要是因为其事务的问题,如果一个支持事务的session在多线程执行,极有可能在事务提交和回滚时互相干扰.通常我们建议,支持事务类型的session只为一个producer或者consumer服务;对于非事务性的session,它们可以公用.事实上一个session可以创建多个producer或者consumer.

    producer.send()方法是阻塞方法,当消息完全传输给server且正确"确认"之后,才会返回.如果session为事务性,那么JMS Provider将会在内存中或者临时文件中保存此session(注意每个session会携带sessionId,和事务ID)中当前事务的所有未提交的消息,直到session.commit(),此时消息才会被认为消费者可见;如果session回滚,将导致JMS Provider删除此事务中所有的未提交的消息.每次事务提交或回滚,都会导致新的事务开启(新的事务ID).

    很多开发者将"消息确认"和事务混为一谈;"消息确认"即为JMS Provider向Producer发送类似于ACK信号,以表示当前正在发送的消息已经正确被JMS Provider接收且存储;或者Consumer向JMS Provider发送ACK信号表示当前正在接收的消息已经正确被接收且执行(比如receive方法正确返回,或者onMessage方法执行完毕且没有抛出异常).那么事务则表达了一个场景,就是多条消息的接收或者发送作为一个"原子性"单元,要么它们被全部发送成功或者要么全部接收成功,需要显示的执行session.commit或者rollback.

    此处还需要单独提示一下,每个Session实例并不是一个线程,它只不过是一个普通的对象.JMS规范规定,一个Session不应该在多个线程中执行,比如Producer和异步的consumer(基于messageListener),它们应该使用不同的session;且多个异步的consumer,也应该使用不同的session;如果你违背了这个设计原则,也不意味着错误,当然公用session也不会导致彼此线程之间的互相阻塞(session不会带来任何阻塞,包括同步的receive方法之间),不过session所持有的"确认模式"/"事务类型"等公用选项会在并发中导致混乱.

    消息的传输模型包括:持久化和非持久化;体现在API上就是DeliveryMode,其中持久化方式表示任何发送到JMS Provider的消息都将被"持久存储",不同的JMS Provider对持久化技术的使用各不相同,有的使用外部RDBMS,有的使用内置的K-V数据库,有的基于特定的文件存储;无论如何,最终这些消息都将被保存在"物理存储可靠"的地方;比如activeMQ可以基于RDBMS,也可以使用内置的kahaDB等;对于"持久化"模式,一旦生产者的消息被"确认"收到,将意味着消息已经被正确存储在磁盘上,当然有些JMS Provider为了提高消息分发的效率(较少额外的消息检索),可能会额外的将最新消息的副本在内存中保存一份;持久化模式能够确保JMS Server在故障重启后,消息不丢失,且可以正常恢复;但是如果JMS Server的物理机器故障,且消息没有采取额外备份的话,有可能丢失,这不是JMS所能顾及的. 非持久化模式表示JMS Provider不担保消息的绝对安全,在JMS Server故障重启后,非持久化的消息将会丢失;JMS Provider将会把非持久化的消息保存在内存中,有些JMS Provider可能为了避免内存的过度消耗,而将非持久化消息存储在DB的临时表中,以确保内存使用量处于可控状态,且消息的消费和生产不会受到太大的影响;但是JMS Provider在故障重启后,将会删除这些临时表或者文件.

 

    2 消息的接收:

Java代码  收藏代码
  1. Connection connection = connectionFactory.createConnection();  
  2. connection.setClientID("test101010101");  
  3. Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);  
  4. Destination queue = session.createQueue(QUEUE);  
  5. MessageConsumer consumer = session.createConsumer(queue);  
  6. //consumer.setMessageListener(new QueueMessageListener());  
  7. connection.start();  
  8. while(true){  
  9.     System.out.println("+++++");  
  10.     try{  
  11.         Message message = consumer.receive(10000);  
  12.         if(message == null){  
  13.             continue;  
  14.         }  
  15.         if(message instanceof TextMessage){  
  16.             String text = ((TextMessage)message).getText();  
  17.             System.out.println("...." + text);  
  18.         }  
  19.         session.commit();  
  20.     }catch(Exception e){  
  21.         session.rollback();  
  22.     }  
  23. }  

    代码结构基本上和消息生产者一样,不过需要注意connection.start();此方法主要用来控制此链接上开启消息接收,如果不执行start,事实上connection上所有的consumer都无法正常接收到消息.消息的消费可以是同步的:recevie();也可以是异步的:使用consumer.setMessageListener(..);在异步消息接收中,最终将会在一个外部线程中调用onMessage方法.

    无论是同步,还是异步,都需要注意session的事务性;如果是事务类型的,那么在接收到消息之后,需要显示的执行commit方法,commit方法将会提交事务并同时进行消息确认操作.如果不执行commit,对于JMS Provider而言,意味着此消息未能确认,将会导致此消息"重发".因此有必要对"消息接收"/"消息处理"代码段进行try-catch操作.

    

    客户端spring配置例子:

Xml代码  收藏代码
  1. <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
  2.         <property name="brokerURL" value="${bjxizhan.activemq.broker.url}"></property>  
  3.     </bean>  
  4.   
  5.     <!-- 提现-->  
  6.     <bean id="withdrawQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">  
  7.         <constructor-arg index="0" value="${bjxizhan.activemq.queue.withdraw}"></constructor-arg>  
  8.     </bean>  
  9.   
  10.     <bean id="withdrawQueueListener" class="com.itlong.bjxizhan.support.web.listener.WithdrawQueueListener"/>  
  11.     <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
  12.         <property name="connectionFactory" ref="connectionFactory"></property>  
  13.         <property name="destination" ref="withdrawQueueDestination"></property>  
  14.         <property name="messageListener" ref="withdrawQueueListener"></property>  
  15.         <property name="concurrentConsumers" value="1"></property>  
  16.         <property name="autoStartup" value="true"/>  
  17.         <property name="sessionAcknowledgeMode" value="2"/>  
  18.     </bean>  
  19.   
  20.   
  21.   
  22.     <!-- 提现消息发送端 -->  
  23.     <bean id="withdrawQueueSender" class="org.springframework.jms.core.JmsTemplate">  
  24.         <property name="connectionFactory" ref="connectionFactory"></property>  
  25.         <property name="defaultDestination" ref="withdrawQueueDestination"></property>  
  26.         <!-- 是否在message中开启timestamp属性 -->  
  27.         <property name="messageTimestampEnabled" value="true"></property>  
  28.         <property name="deliveryPersistent" value="true"/>  
  29.     </bean>  

 


原文链接:[http://wely.iteye.com/blog/2228615]

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
深入浅出JMS(一)--JMS基本概念
摘要:The Java Message Service (JMS) API is a messaging standard that allows application component...
888 0
JMS&AMQP 简介|学习笔记
快速学习 JMS&AMQP 简介
0 0
JMS(1)——基本实例
Java Message Service是java ee的规范之一,可以用来发送异步消息,在某些场景下,可以作为不同系统,或者不同模块之间的集成方式。可以类比为通过数据库来集成的方式,模块A完成逻辑以后,往数据库插入一条记录,模块B定时轮询数据库,如果查到相应的记录,就进行处理。JMS集成实际上思路是差不多的功能更强,并且提供了标准的API支持,而且也可以避免反复轮询数据库或者读取文件的I
1055 0
+关注
文章
问答
文章排行榜
最热
最新
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载