JMS
JMS(Java Message Service):java消息服务,客户端与服务端之间可以通过JSM服务进行消息的异步传输(消息的发送和消息的接收不是同时进行的,即发送了消息后,不需要等待消息的返回就可以继续执行),客户端只管发送,不需要考虑服务端什么时候处理。因此,如果客户端与服务端对消息发送和接收对时间相关不是很严格的话,用JMS可以很大程度上提高性能。
JMS支持两种消息模型:Point-to-Point(P2P)和Publish/Subscribe(Pub/Sub)。
点对点模型(P2P)
生产者(发送者)异步把消息发送到队列,消费者(接受者)从队列中获取消息。消息在被消费或超时之前,始终保持在消息队列中。
特点:
1、生产者和消费者之间没有时间依赖性,无论消费者是否收到消息,都不影响生产者发送消息;
2、消费者收到消息后需要向队列反馈;
3、适用于每条消息都需要被消费者消费的场景。
**发布/订阅模型(Pub/Sub)**
与P2P不同的是,一个生产者把消息发布后,这些消息可以传送给多个消费者。
特点:每条消息可以有多个消费者。
消息驱动Bean(以下简称MDB)
在上面的JMS介绍中了解了异步消息,消息驱动Bean可以看做是异步消息的消费者。
实现消息驱动Bean,需要在JBoss的安装目录(jboss-5.0.1.GA\server\default\deploy)下添加一个配置文件:
xxx-service.xml
<?xml version="1.0" encoding="UTF-8" ?> <server> <!-- Queue,name:Queue的名称 --> <mbean code="org.jboss.mq.server.jmx.Queue" name="jboss.org.destination:server=Queue,name=myqueue" > <!-- JNDI名称 --> <attribute name="JNDIName">queue/myqueue</attribute> <depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends> </mbean> <!-- Topic,name:Topic的名称--> <mbean code="org.jboss.mq.server.jmx.Topic" name="jboss.org.destination:server=Topic,name=mytopic" > <!-- JNDI名称 --> <attribute name="JNDIName">topic/mytopic</attribute> <depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends> </mbean> </server>
**实现P2P模式的消息驱动Bean**
服务端
MyQueueMDBBean.java
import javax.ejb.ActivationConfigProperty; import javax.ejb.MessageDriven; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; @MessageDriven( activationConfig={ @ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Queue"), @ActivationConfigProperty(propertyName="destination",propertyValue="queue/myqueue") } ) public class MyQueueMDBBean implements MessageListener{ static int i=0; @Override public void onMessage(Message msg) { TextMessage textMessage=(TextMessage)msg; try { System.out.println("【MyQueueMDBBean】消息"+(i++)+"被接收了。TextMessage:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
对以上代码的说明:
用@MessageDriven注解来定义消息驱动Bean,如果查看EJB的源码会发现,MessageDriven中有一个数组类型的变量activationConfig:
ActivationConfigProperty[] activationConfig() default {};
所以这里需要为activationConfig赋值:
activationConfig={ @ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Queue"), @ActivationConfigProperty(propertyName="destination",propertyValue="topic/mytopic") }
destinationType属性值为javax.jms.Topic说明此MDB实现的是P2P模式的消息服务;destination属性值为topic/mytopic表示此MDB的消息来源,也表示生产者的发送消息的目的地,jndi地址为topic/mytopic,这个可以在xxx-service.xml中自定义。
客户端
import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.TextMessage; import javax.naming.InitialContext; public class MyQueueMDBBeanClient { public static void main(String[] args) throws Exception { InitialContext context=new InitialContext(); //获取QueueConnectionFactory QueueConnectionFactory factory=(QueueConnectionFactory)context.lookup("ConnectionFactory"); //创建QueueConnection对象 QueueConnection connection=factory.createQueueConnection(); //创建QueueSession对象,第一个参数表示事务自动提交,第二个参数表示一旦消息被正确送达,将自动发回响应 QueueSession session=connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); //获取Destination对象 Queue queue=(Queue)context.lookup("queue/myqueue"); //创建文本消息 TextMessage msg=session.createTextMessage("Hello world!"); //创建发送者 QueueSender sender=session.createSender(queue); //发送信息 for(int i=0;i<10;i++){ sender.send(msg); System.out.println("消息"+i+"已经发送"); } //关闭会话 session.close(); connection.close(); } }
客户端执行结果 ``` 消息0已经发送 消息1已经发送 消息2已经发送 消息3已经发送 消息4已经发送 消息5已经发送 消息6已经发送 消息7已经发送 消息8已经发送 消息9已经发送 ```
EJB执行结果
13:39:30,045 INFO [STDOUT] 【MyQueueMDBBean】消息4被接收了。TextMessage:Hello world! 13:39:30,045 INFO [STDOUT] 【MyQueueMDBBean】消息6被接收了。TextMessage:Hello world! 13:39:30,045 INFO [STDOUT] 【MyQueueMDBBean】消息8被接收了。TextMessage:Hello world! 13:39:30,045 INFO [STDOUT] 【MyQueueMDBBean】消息9被接收了。TextMessage:Hello world! 13:39:30,045 INFO [STDOUT] 【MyQueueMDBBean】消息2被接收了。TextMessage:Hello world! 13:39:30,045 INFO [STDOUT] 【MyQueueMDBBean】消息7被接收了。TextMessage:Hello world! 13:39:30,045 INFO [STDOUT] 【MyQueueMDBBean】消息5被接收了。TextMessage:Hello world! 13:39:30,045 INFO [STDOUT] 【MyQueueMDBBean】消息0被接收了。TextMessage:Hello world! 13:39:30,045 INFO [STDOUT] 【MyQueueMDBBean】消息1被接收了。TextMessage:Hello world! 13:39:30,045 INFO [STDOUT] 【MyQueueMDBBean】消息3被接收了。TextMessage:Hello world!
从结果可以看出,发送消息的时候是有序的,但是MDB接收消息不一定是有序的。
**实现Pub/Sub模式的消息驱动Bean**
服务端
MyTopicMDBBean1.java
import javax.ejb.ActivationConfigProperty; import javax.ejb.MessageDriven; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; @MessageDriven( activationConfig={ @ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Topic"), @ActivationConfigProperty(propertyName="destination",propertyValue="topic/mytopic") } ) public class MyTopicMDBBean1 implements MessageListener{ @Override public void onMessage(Message msg) { TextMessage textMessage=(TextMessage)msg; try { System.out.println("【MyTopicMDBBean1】消息被接收了。TextMessage:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
MyTopicMDBBean2.java、MyTopicMDBBean3.java
MyTopicMDBBean2.java、MyTopicMDBBean3.java的代码与MyTopicMDBBean1.java的实现一模一样。
客户端
import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.naming.InitialContext; public class MyTopicMDBBeanClient { public static void main(String[] args) throws Exception { InitialContext context=new InitialContext(); //获取QueueConnectionFactory TopicConnectionFactory factory=(TopicConnectionFactory)context.lookup("ConnectionFactory"); //创建QueueConnection对象 TopicConnection connection=factory.createTopicConnection(); //创建QueueSession对象,第一个参数表示事务自动提交,第二个参数表示一旦消息被正确送达,将自动发回响应 TopicSession session=connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); //获取Destination对象 Topic topic=(Topic)context.lookup("topic/mytopic"); //创建文本消息 TextMessage msg=session.createTextMessage("Hello world!"); //创建发布者 TopicPublisher publisher=session.createPublisher(topic); //发布信息 publisher.publish(msg); System.out.println("消息已经发布"); //关闭会话 session.close(); connection.close(); } }
客户端执行结果 ``` 消息已经发布 ```
EJB执行结果
14:16:24,287 INFO [STDOUT] 【MyTopicMDBBean01】消息被接收了。TextMessage:Hello world! 14:16:24,287 INFO [STDOUT] 【MyTopicMDBBean03】消息被接收了。TextMessage:Hello world! 14:16:24,288 INFO [STDOUT] 【MyTopicMDBBean02】消息被接收了。TextMessage:Hello world!
这种场景类似于,我新发表了一篇博客,订阅我博客的人都会收到RSS推送。