准备工作

  1. 从下面官网下载ActiveMQ,本文使用5.7版本,写博时Maven库能找到的最高版本为5.7

    http://activemq.apache.org/download-archives.html

  2. 解压缩下载的文件到你希望安装的目录

  3. 进入bin目录运行activemq.bat以启动ActiveMQ服务,启动后默认

    broker URL : tcp://localhost:61616

    Admin URL:http://localhost:8161/admin

  4. 打开Admin URL可以查看Queue、Topic等信息

  5. 关于ActiveMQ相关依赖包就交给Gradle工具,请参看demo配置


JMS框架基本角色和编程模块

  • JMS Message Producers : 消息生产者,向消息队列提供消息

  • JMS Provider(Broker) : JMS提供商,如ActiveMQ,提供JMS Queues/Topics服务

  • JMS Message listener/Consumer :接收并使用消息


JMS Provider有如数据库,Producers/Consumers发送/接收消息前需要先连接到Provider,与建立数据库连接相似,JMS ConnectionFactory负责创建JMS Connection,JMS Connection创建JMS Session

  • JMS ConnectionFactory : Producers/Consumers用于创建一个到Provider的连接

  • JMS Connection :封装一个到Provider的连接

  • JMS Session : 消息发送接收上下文

wKiom1LXvQrxVnOnAAD304pI3Qk941.jpg

在JMS Provider上可以定义多个Queue和Topic,Producers发送消息到哪个Queue/Topic,称具体的那个Queue/Topic为Destination.

  • JMS Destination : 一对一的Queue或者一对多的Topic

关于JMS编程模型可以参考以下文章:

http://docs.oracle.com/javaee/6/tutorial/doc/bnceh.html


Spring集成配置

  • Queue消息发送者(JMS Message Producers) :

ConnectionFactory : 用于连接Provider,支持连接池配置

JmsTemplate : Spring封装类,可用于创造消息、发送消息、接收消息等

1
2
3
4
5
6
7
< bean  id = "connectionFactory"  class = "org.apache.activemq.ActiveMQConnectionFactory"
     p:brokerURL = "tcp://localhost:61616"  />
< bean  id = "jmsTemplate"  class = "org.springframework.jms.core.JmsTemplate" >
     < constructor-arg  name = "connectionFactory"  ref = "connectionFactory"  />
     < property  name = "defaultDestinationName"  value = "TestQueue"  />
</ bean >
< context:component-scan  base-package = "com.stevex.demo"  />
  • 连接池配置示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<!-- a pooling based JMS provider -->
   < bean  id = "jmsFactory"  class = "org.apache.activemq.pool.PooledConnectionFactory"  destroy-method = "stop" >
     < property  name = "connectionFactory" >
       < bean  class = "org.apache.activemq.ActiveMQConnectionFactory" >
         < property  name = "brokerURL" >
           < value >tcp://localhost:61616</ value >
         </ property >
       </ bean >
     </ property >
   </ bean >
                                                                                                  
   <!-- Spring JMS Template -->
   < bean  id = "jmsTemplate"  class = "org.springframework.jms.core.JmsTemplate" >
     < property  name = "connectionFactory" >
       < ref  local = "jmsFactory" />
     </ property >
   </ bean >
  • Queue消息接收者(JMS Message listener) :

ConnectionFactory : 用于连接Provider,支持连接池配置

Listener : 接收消息

jms:listener-container 简化了配置工作,destination只需要给queue/topic名称,不需要额外定义

1
2
3
4
5
6
7
8
< bean  id = "connectionFactory"  class = "org.apache.activemq.ActiveMQConnectionFactory"
     p:brokerURL = "tcp://localhost:61616"  />
< jms:listener-container  container-type = "default"
     connection-factory = "connectionFactory"  acknowledge = "auto" >
     < jms:listener  destination = "TestQueue"  ref = "messageListener"
         method = "onMessage"  />
</ jms:listener-container >
< bean  id = "messageListener"  class = "com.stevex.demo.SimpleMessageListener" />
  • Topic消息发送者(JMS Message Producers) :

ConnectionFactory : 用于连接Provider,支持连接池配置

JmsTemplate : Spring封装类,可用于创造消息、发送消息、接收消息等

JmsTemplate 的pubSubDomain属性值为true表示destination为Topic类型,默认false表示destination为Queue类型。

1
2
3
4
5
6
7
8
< bean  id = "connectionFactory"  class = "org.apache.activemq.ActiveMQConnectionFactory"
     p:brokerURL = "tcp://localhost:61616"  />
< bean  id = "jmsTemplate"  class = "org.springframework.jms.core.JmsTemplate" >
     < constructor-arg  name = "connectionFactory"  ref = "connectionFactory"  />
     < property  name = "defaultDestinationName"  value = "TestTopic"  />
     < property  name = "pubSubDomain"  value = "true"  />
</ bean >
< context:component-scan  base-package = "com.stevex.demo"  />
  • Topic消息接收者(JMS Message listener) :

ConnectionFactory : 用于连接Provider,支持连接池配置

Listener : 接收消息

Topic类型的消息,所有订阅者都可以接收到,本文Demo定义了两个接收者。

jms:listener-container 的destination-type属性默认值为queue,如果是Topic需要显示指定。

1
2
3
4
5
6
7
8
9
< bean  id = "connectionFactory"  class = "org.apache.activemq.ActiveMQConnectionFactory"
     p:brokerURL = "tcp://localhost:61616"  />
< bean  id = "subscriber1"  class = "com.stevex.demo.SimpleMessageListener"  />
< jms:listener-container  container-type = "default"
     destination-type = "topic"  connection-factory = "connectionFactory"
     acknowledge = "auto" >
     < jms:listener  destination = "TestTopic"  ref = "subscriber1"
         method = "onMessage"  />
</ jms:listener-container >
  • Producer类实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Component ( "messageSender" )
public  class  SimpleMessageSender  implements  MessageSender {
     @Autowired
     private  JmsTemplate jmsTemplate;
     @Override
     public  void  sendMessage( final  String message) {
         jmsTemplate.send( new  MessageCreator() {
                                                                                                   
             public  Message createMessage(Session session)  throws  JMSException {
                 return  session.createTextMessage(message);
             }
                                                                                                   
         });
     }
}
  • Listener类实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
@Component ( "messageListener" )
public  class  SimpleMessageListener  implements  MessageListener {
     private  static  final  Logger logger = LoggerFactory.getLogger(SimpleMessageListener. class );
     public  void  onMessage(Message message) {
         TextMessage textMessage = (TextMessage) message;
                                                                                             
         try  {
             logger.info( "Message received: "  + textMessage.getText());
         catch  (JMSException ex) {
             logger.error( "JMS error" , ex);
         }
     }
}

应用跑起来后,ActiveMQ接收到连接后,如果请求的Queue/Topic不存在它会自动创建,我们也可以通过ActiveMQ Admin界面给Listener发消息。