准备工作
从下面官网下载ActiveMQ,本文使用5.7版本,写博时Maven库能找到的最高版本为5.7
解压缩下载的文件到你希望安装的目录
进入bin目录运行activemq.bat以启动ActiveMQ服务,启动后默认
broker URL : tcp://localhost:61616
Admin URL:http://localhost:8161/admin
打开Admin URL可以查看Queue、Topic等信息
关于ActiveMQ相关依赖包就交给Gradle工具,请参看demo配置
JMS框架基本角色和编程模块
JMS Message Producers : 消息生产者,向消息队列提供消息
JMS Provider(Broker) : JMS提供商,如ActiveMQ,提供JMS Queues/Topics服务
JMS Message listener/Consumer :接收并使用消息
JMS Provider有如数据库,Producers/Consumers发送/接收消息前需要先连接到Provider,与建立数据库连接相似,JMS ConnectionFactory负责创建JMS Connection,JMS Connection创建JMS Session
JMS ConnectionFactory : Producers/Consumers用于创建一个到Provider的连接
JMS Connection :封装一个到Provider的连接
JMS Session : 消息发送接收上下文
在JMS Provider上可以定义多个Queue和Topic,Producers发送消息到哪个Queue/Topic,称具体的那个Queue/Topic为Destination.
JMS Destination : 一对一的Queue或者一对多的Topic
关于JMS编程模型可以参考以下文章:
http://docs.oracle.com/javaee/6/tutorial/doc/bnceh.html
Spring集成配置
Queue消息发送者(JMS Message Producers) :
ConnectionFactory : 用于连接Provider,支持连接池配置
JmsTemplate : Spring封装类,可用于创造消息、发送消息、接收消息等
1
2
3
4
5
6
7
|
<
bean
id
=
"connectionFactory"
class
=
"org.apache.activemq.ActiveMQConnectionFactory"
p:brokerURL
=
"tcp://localhost:61616"
/>
<
bean
id
=
"jmsTemplate"
class
=
"org.springframework.jms.core.JmsTemplate"
>
<
constructor-arg
name
=
"connectionFactory"
ref
=
"connectionFactory"
/>
<
property
name
=
"defaultDestinationName"
value
=
"TestQueue"
/>
</
bean
>
<
context:component-scan
base-package
=
"com.stevex.demo"
/>
|
连接池配置示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
<!-- a pooling based JMS provider -->
<
bean
id
=
"jmsFactory"
class
=
"org.apache.activemq.pool.PooledConnectionFactory"
destroy-method
=
"stop"
>
<
property
name
=
"connectionFactory"
>
<
bean
class
=
"org.apache.activemq.ActiveMQConnectionFactory"
>
<
property
name
=
"brokerURL"
>
<
value
>tcp://localhost:61616</
value
>
</
property
>
</
bean
>
</
property
>
</
bean
>
<!-- Spring JMS Template -->
<
bean
id
=
"jmsTemplate"
class
=
"org.springframework.jms.core.JmsTemplate"
>
<
property
name
=
"connectionFactory"
>
<
ref
local
=
"jmsFactory"
/>
</
property
>
</
bean
>
|
Queue消息接收者(JMS Message listener) :
ConnectionFactory : 用于连接Provider,支持连接池配置
Listener : 接收消息
jms:listener-container 简化了配置工作,destination只需要给queue/topic名称,不需要额外定义
1
2
3
4
5
6
7
8
|
<
bean
id
=
"connectionFactory"
class
=
"org.apache.activemq.ActiveMQConnectionFactory"
p:brokerURL
=
"tcp://localhost:61616"
/>
<
jms:listener-container
container-type
=
"default"
connection-factory
=
"connectionFactory"
acknowledge
=
"auto"
>
<
jms:listener
destination
=
"TestQueue"
ref
=
"messageListener"
method
=
"onMessage"
/>
</
jms:listener-container
>
<
bean
id
=
"messageListener"
class
=
"com.stevex.demo.SimpleMessageListener"
/>
|
Topic消息发送者(JMS Message Producers) :
ConnectionFactory : 用于连接Provider,支持连接池配置
JmsTemplate : Spring封装类,可用于创造消息、发送消息、接收消息等
JmsTemplate 的pubSubDomain属性值为true表示destination为Topic类型,默认false表示destination为Queue类型。
1
2
3
4
5
6
7
8
|
<
bean
id
=
"connectionFactory"
class
=
"org.apache.activemq.ActiveMQConnectionFactory"
p:brokerURL
=
"tcp://localhost:61616"
/>
<
bean
id
=
"jmsTemplate"
class
=
"org.springframework.jms.core.JmsTemplate"
>
<
constructor-arg
name
=
"connectionFactory"
ref
=
"connectionFactory"
/>
<
property
name
=
"defaultDestinationName"
value
=
"TestTopic"
/>
<
property
name
=
"pubSubDomain"
value
=
"true"
/>
</
bean
>
<
context:component-scan
base-package
=
"com.stevex.demo"
/>
|
Topic消息接收者(JMS Message listener) :
ConnectionFactory : 用于连接Provider,支持连接池配置
Listener : 接收消息
Topic类型的消息,所有订阅者都可以接收到,本文Demo定义了两个接收者。
jms:listener-container 的destination-type属性默认值为queue,如果是Topic需要显示指定。
1
2
3
4
5
6
7
8
9
|
<
bean
id
=
"connectionFactory"
class
=
"org.apache.activemq.ActiveMQConnectionFactory"
p:brokerURL
=
"tcp://localhost:61616"
/>
<
bean
id
=
"subscriber1"
class
=
"com.stevex.demo.SimpleMessageListener"
/>
<
jms:listener-container
container-type
=
"default"
destination-type
=
"topic"
connection-factory
=
"connectionFactory"
acknowledge
=
"auto"
>
<
jms:listener
destination
=
"TestTopic"
ref
=
"subscriber1"
method
=
"onMessage"
/>
</
jms:listener-container
>
|
Producer类实现代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
@Component
(
"messageSender"
)
public
class
SimpleMessageSender
implements
MessageSender {
@Autowired
private
JmsTemplate jmsTemplate;
@Override
public
void
sendMessage(
final
String message) {
jmsTemplate.send(
new
MessageCreator() {
public
Message createMessage(Session session)
throws
JMSException {
return
session.createTextMessage(message);
}
});
}
}
|
Listener类实现代码
1
2
3
4
5
6
7
8
9
10
11
12
13
|
@Component
(
"messageListener"
)
public
class
SimpleMessageListener
implements
MessageListener {
private
static
final
Logger logger = LoggerFactory.getLogger(SimpleMessageListener.
class
);
public
void
onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try
{
logger.info(
"Message received: "
+ textMessage.getText());
}
catch
(JMSException ex) {
logger.error(
"JMS error"
, ex);
}
}
}
|
应用跑起来后,ActiveMQ接收到连接后,如果请求的Queue/Topic不存在它会自动创建,我们也可以通过ActiveMQ Admin界面给Listener发消息。
附件:http://down.51cto.com/data/2364002
本文转自sarchitect 51CTO博客,原文链接:http://blog.51cto.com/stevex/1352334,如需转载请自行联系原作者