【EJB学习笔记】——JMS和消息驱动Bean

简介:  认识消息驱动Bean之前,先了解一下JMS。

JMS

 JMS(Java Message Service):java消息服务,客户端与服务端之间可以通过JSM服务进行消息的异步传输(消息的发送和消息的接收不是同时进行的,即发送了消息后,不需要等待消息的返回就可以继续执行),客户端只管发送,不需要考虑服务端什么时候处理。因此,如果客户端与服务端对消息发送和接收对时间相关不是很严格的话,用JMS可以很大程度上提高性能。


 JMS支持两种消息模型:Point-to-Point(P2P)和Publish/Subscribe(Pub/Sub)。


 点对点模型(P2P)


22.png


 生产者(发送者)异步把消息发送到队列,消费者(接受者)从队列中获取消息。消息在被消费或超时之前,始终保持在消息队列中。


 特点:

 1、生产者和消费者之间没有时间依赖性,无论消费者是否收到消息,都不影响生产者发送消息;

 2、消费者收到消息后需要向队列反馈;

 3、适用于每条消息都需要被消费者消费的场景。



 **发布/订阅模型(Pub/Sub)**


23.png


 与P2P不同的是,一个生产者把消息发布后,这些消息可以传送给多个消费者。


 特点:每条消息可以有多个消费者。



消息驱动Bean(以下简称MDB)


 在上面的JMS介绍中了解了异步消息,消息驱动Bean可以看做是异步消息的消费者。


 实现消息驱动Bean,需要在JBoss的安装目录(jboss-5.0.1.GA\server\default\deploy)下添加一个配置文件:


 xxx-service.xml


<?xml version="1.0" encoding="UTF-8" ?> 
<server> 
   <!-- Queue,name:Queue的名称 -->
   <mbean code="org.jboss.mq.server.jmx.Queue" name="jboss.org.destination:server=Queue,name=myqueue" > 
     <!-- JNDI名称 -->
     <attribute name="JNDIName">queue/myqueue</attribute> 
     <depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends> 
   </mbean> 
   <!-- Topic,name:Topic的名称-->
   <mbean code="org.jboss.mq.server.jmx.Topic" name="jboss.org.destination:server=Topic,name=mytopic" > 
     <!-- JNDI名称 -->
     <attribute name="JNDIName">topic/mytopic</attribute> 
     <depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends> 
   </mbean> 
</server>


  **实现P2P模式的消息驱动Bean**

  服务端

  MyQueueMDBBean.java


import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
@MessageDriven(
  activationConfig={
      @ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Queue"),
      @ActivationConfigProperty(propertyName="destination",propertyValue="queue/myqueue")
  }
)
public class MyQueueMDBBean implements MessageListener{
  static int i=0;
  @Override
  public void onMessage(Message msg) {
    TextMessage textMessage=(TextMessage)msg;
    try {
      System.out.println("【MyQueueMDBBean】消息"+(i++)+"被接收了。TextMessage:"+textMessage.getText());
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }
}


  对以上代码的说明:


  用@MessageDriven注解来定义消息驱动Bean,如果查看EJB的源码会发现,MessageDriven中有一个数组类型的变量activationConfig:


ActivationConfigProperty[] activationConfig() default {}; 


  所以这里需要为activationConfig赋值:


activationConfig={
      @ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Queue"),
      @ActivationConfigProperty(propertyName="destination",propertyValue="topic/mytopic")
  }


 destinationType属性值为javax.jms.Topic说明此MDB实现的是P2P模式的消息服务;destination属性值为topic/mytopic表示此MDB的消息来源,也表示生产者的发送消息的目的地,jndi地址为topic/mytopic,这个可以在xxx-service.xml中自定义。


 客户端


import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
public class MyQueueMDBBeanClient {
   public static void main(String[] args) throws Exception {
      InitialContext context=new InitialContext();
      //获取QueueConnectionFactory
      QueueConnectionFactory factory=(QueueConnectionFactory)context.lookup("ConnectionFactory");
      //创建QueueConnection对象
      QueueConnection connection=factory.createQueueConnection();
      //创建QueueSession对象,第一个参数表示事务自动提交,第二个参数表示一旦消息被正确送达,将自动发回响应
      QueueSession session=connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
      //获取Destination对象
      Queue queue=(Queue)context.lookup("queue/myqueue");
      //创建文本消息
      TextMessage msg=session.createTextMessage("Hello world!");
      //创建发送者
      QueueSender sender=session.createSender(queue);
      //发送信息
      for(int i=0;i<10;i++){
          sender.send(msg);
          System.out.println("消息"+i+"已经发送");
      }
      //关闭会话
      session.close();
      connection.close();
  }
}

  客户端执行结果 ``` 消息0已经发送 消息1已经发送 消息2已经发送 消息3已经发送 消息4已经发送 消息5已经发送 消息6已经发送 消息7已经发送 消息8已经发送 消息9已经发送 ```


  EJB执行结果


13:39:30,045 INFO  [STDOUT] 【MyQueueMDBBean】消息4被接收了。TextMessage:Hello world!
13:39:30,045 INFO  [STDOUT] 【MyQueueMDBBean】消息6被接收了。TextMessage:Hello world!
13:39:30,045 INFO  [STDOUT] 【MyQueueMDBBean】消息8被接收了。TextMessage:Hello world!
13:39:30,045 INFO  [STDOUT] 【MyQueueMDBBean】消息9被接收了。TextMessage:Hello world!
13:39:30,045 INFO  [STDOUT] 【MyQueueMDBBean】消息2被接收了。TextMessage:Hello world!
13:39:30,045 INFO  [STDOUT] 【MyQueueMDBBean】消息7被接收了。TextMessage:Hello world!
13:39:30,045 INFO  [STDOUT] 【MyQueueMDBBean】消息5被接收了。TextMessage:Hello world!
13:39:30,045 INFO  [STDOUT] 【MyQueueMDBBean】消息0被接收了。TextMessage:Hello world!
13:39:30,045 INFO  [STDOUT] 【MyQueueMDBBean】消息1被接收了。TextMessage:Hello world!
13:39:30,045 INFO  [STDOUT] 【MyQueueMDBBean】消息3被接收了。TextMessage:Hello world!


  从结果可以看出,发送消息的时候是有序的,但是MDB接收消息不一定是有序的。

  **实现Pub/Sub模式的消息驱动Bean**

  服务端

  MyTopicMDBBean1.java


import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
@MessageDriven(
  activationConfig={
      @ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Topic"),
      @ActivationConfigProperty(propertyName="destination",propertyValue="topic/mytopic")
  }
)
public class MyTopicMDBBean1 implements MessageListener{
  @Override
  public void onMessage(Message msg) {  
    TextMessage textMessage=(TextMessage)msg;
    try {
      System.out.println("【MyTopicMDBBean1】消息被接收了。TextMessage:"+textMessage.getText());
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }
}

  MyTopicMDBBean2.java、MyTopicMDBBean3.java

  MyTopicMDBBean2.java、MyTopicMDBBean3.java的代码与MyTopicMDBBean1.java的实现一模一样。


  客户端


import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.InitialContext;
public class MyTopicMDBBeanClient {
   public static void main(String[] args) throws Exception {
          InitialContext context=new InitialContext();
          //获取QueueConnectionFactory
          TopicConnectionFactory factory=(TopicConnectionFactory)context.lookup("ConnectionFactory");
          //创建QueueConnection对象
          TopicConnection connection=factory.createTopicConnection();
          //创建QueueSession对象,第一个参数表示事务自动提交,第二个参数表示一旦消息被正确送达,将自动发回响应
          TopicSession session=connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
          //获取Destination对象
          Topic topic=(Topic)context.lookup("topic/mytopic");
          //创建文本消息
          TextMessage msg=session.createTextMessage("Hello world!");
          //创建发布者
          TopicPublisher publisher=session.createPublisher(topic);
          //发布信息
          publisher.publish(msg);
          System.out.println("消息已经发布");       
          //关闭会话
          session.close();  
          connection.close();
      }
}


  客户端执行结果 ``` 消息已经发布 ```

  EJB执行结果


14:16:24,287 INFO  [STDOUT] 【MyTopicMDBBean01】消息被接收了。TextMessage:Hello world!
14:16:24,287 INFO  [STDOUT] 【MyTopicMDBBean03】消息被接收了。TextMessage:Hello world!
14:16:24,288 INFO  [STDOUT] 【MyTopicMDBBean02】消息被接收了。TextMessage:Hello world!


  这种场景类似于,我新发表了一篇博客,订阅我博客的人都会收到RSS推送。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件 Java Kafka
Spring boot 自定义kafkaTemplate的bean实例进行生产消息和发送消息
Spring boot 自定义kafkaTemplate的bean实例进行生产消息和发送消息
190 5
|
6月前
|
消息中间件 Java Spring
Spring Boot中异步消息JMS的讲解与通信实例
Spring Boot中异步消息JMS的讲解与通信实例
88 1
|
消息中间件 搜索推荐 Java
消息中间件JMS介绍、入门demo与spring整合
消息中间件JMS介绍、入门demo与spring整合
370 7
消息中间件JMS介绍、入门demo与spring整合
|
前端开发 Java 数据库
JMS 作用| 学习笔记
快速学习 JMS 作用
JMS 作用| 学习笔记
|
前端开发 Java 开发者
JMS作用|学习笔记
快速学习JMS作用
JMS作用|学习笔记
|
消息中间件 Java 测试技术
Spring整合JMS(消息中间件)实例
本篇文章主要描述了如何配置Spring-JMS,至于为何这样配置及Spring-JMS相关介绍,请阅读这篇文章:Spring整合JMS(消息中间件)。我们这里的消息broker用的是ActiveMQ。 一、相关配置 本篇主要讲解如何在Spring中配置JMS,关于Spring本身的配置本文就不多做介绍了。 1.1 配置maven依赖 在使用Spring-JMS之前,先
2000 6
|
消息中间件 Java API
Spring整合JMS(消息中间件)
本篇主要介绍了异步消息机制及Spring对JMS封装,本篇文章讲解较为详细,如果想直接看如何配置,可以参考: Spring整合JMS(消息中间件)实例,但还是建议大家先看完本篇文章。 一、消息异步处理 类似于RMI、Hessian、Burlap等远程方法调用,它们都是同步的,所谓同步调用就是客户端必须等待操作完成,如果远程服务没有返回任何响应,客户端会一直等待直到服务完成
1598 5
|
消息中间件 Java Spring
Spring消息之JMS.
一、概念 异步消息简介     与远程调用机制以及REST接口类似,异步消息也是用于应用程序之间通信的。     RMI、Hessian、Burlap、HTTP invoker和Web服务在应用程序之间的通信机制是同步的,即客户端应用程序直接与远程服务相交互,并且一直等到远程过程完成后才继续执行。
1245 0
|
Spring Java 网络协议