一、Wildcards
Wildcards用来支持名字分层体系,它不是JMS规范的一部分,是ActiveMQ的扩展。ActiveMQ支持以下三种wildcards:
1:“.” 用于作为路径上名字间的分隔符
2:“*” 用于匹配路径上的任何名字
3:">" 用于递归地匹配任何以这个名字开始的destination
示例,设想你有如下两个destinations
PRICE.STOCK.NASDAQ.IBM (IBM在NASDAQ的股价)
PRICE.STOCK.NYSE.SUNW (SUN在纽约证券交易所的股价)
那么:
1:PRICE.> :匹配任何产品的价格变动
2:PRICE.STOCK.> :匹配任何产品的股票价格变动
3:PRICE.STOCK.NASDAQ.* :匹配任何在NASDAQ下面的产品的股票价格变动
4:PRICE.STOCK.*.IBM:匹配任何IBM的产品的股票价格变动
二、组合列队Composite Destinations
组合队列允许用一个虚拟的destination代表多个destinations。这样就可以通过composite destinations在一个操作中同时向多个queue发送消息。
2.1 客户端实现的方式
在composite destinations中,多个destination之间采用“,”分割。例如:
1
2
3
|
Queue queue =
new
ActiveMQQueue(
"FOO.A,FOO.B,FOO.C"
);
或
Destination destination = session.createQueue(
"my-queue,my-queue2"
);
|
如果你希望使用不同类型的destination,那么需要加上前缀如queue:// 或topic://,例如:
1
|
Queue queue =
new
ActiveMQQueue(
"FOO.A,topic://NOTIFY.FOO.A"
);
|
2.2 在conf/activemq.xml中的broker下配置实现
1
2
3
4
5
6
7
8
9
10
11
12
|
<
destinationInterceptors
>
<
virtualDestinationInterceptor
>
<
virtualDestinations
>
<
compositeQueue
name
=
"MY.QUEUE"
>
<
forwardTo
>
<
queue
physicalName
=
"my-queue"
/>
<
queue
physicalName
=
"my-queue2"
/>
</
forwardTo
>
</
compositeQueue
>
</
virtualDestinations
>
</
virtualDestinationInterceptor
>
</
destinationInterceptors
>
|
再java代码发送的时候,队列的的名字就用MY.QUEUQ
三、Configure Startup Destinations
如果需要在ActiveMQ启动的时候,创建Destination的话,可以如下配置conf/activemq.xml的broker下:
1
2
3
4
|
<
destinations
>
<
queue
physicalName
=
"FOO.BAR"
/>
<
topic
physicalName
=
"SOME.TOPIC"
/>
</
destinations
>
|
四、Delete Inactive Destinations
一般情况下,ActiveMQ的queue在不使用之后,可以通过web控制台或是JMX方式来删除掉。当然,也可以通过配置,使得broker可以自动探测到无用
的队列(一定时间内为空的队列)并删除掉,回收响应资源。可以如下配置conf/activemq.xml:
1
2
3
4
5
6
7
8
9
|
<
broker
schedulePeriodForDestinationPurge
=
"10000"
>
<
destinationPolicy
>
<
policyMap
>
<
policyEntries
>
<
policyEntry
queue=">" gcInactiveDestinations="true" inactiveTimoutBeforeGC="30000"/>
</
policyEntries
>
</
policyMap
>
</
destinationPolicy
>
</
broker
>
|
说明:
schedulePeriodForDestinationPurge:设置多长时间检查一次,这里是10秒,默认为0
inactiveTimoutBeforeGC:设置当Destination为空后,多长时间被删除,这里是30秒,默认为60
gcInactiveDestinations: 设置删除掉不活动队列,默认为false
五、Destination Options
队列选项是给consumer在JMS规范之外添加的功能特性,通过在队列名称后面使用类似URL的语法添加多个选项。包括:
1:consumer.prefetchSize,consumer持有的未确认最大消息数量,默认值 variable
2:consumer.maximumPendingMessageLimit:用来控制非持久化的topic在存在慢消费者的情况下,丢弃的数量,默认0
3:consumer.noLocal :默认false
4:consumer.dispatchAsync :是否异步分发 ,默认true
5:consumer.retroactive:是否为回溯消费者 ,默认false
6:consumer.selector:Jms的Selector,默认null
7:consumer.exclusive:是否为独占消费者 ,默认false
8:consumer.priority:设置消费者的优先级,默认0
使用示例:
1
2
|
queue =
new
ActiveMQQueue(
"TEST.QUEUE?consumer.dispatchAsync=false&consumer.prefetchSize=10"
);
consumer = session.createConsumer(queue);
|
六、虚拟Destinations(Visual Destinations)
6.1 简介
虚拟Destinations用来创建逻辑Destinations,客户端可以通过它来生产和消费消息,它会把消息映射到物理Destinations。
ActiveMQ支持两种方式:
1:虚拟主题(Virtual Topics)
2:组合Destinations(Composite Destinations)
6.2 为何使用虚拟主题
ActiveMQ中,topic只有在持久订阅下才是持久化的。持久订阅时,每个持久订阅者,都相当于一个queue的客户端,它会收取所有消息。这种情况下存在两个问题:
1:同一应用内consumer端负载均衡的问题:也即是同一个应用上的一个持久订阅不能使用多个consumer来共同承担消息处理功能。因为每个consumer都会获取所有消息。
queue模式可以解决这个问题,但broker端又不能将消息发送到多个应用端。所以,既要发布订阅,又要让消费者分组,这个功能JMS规范本身是没有的。
2:同一应用内consumer端failover的问题:由于只能使用单个的持久订阅者,如果这个订阅者出错,则应用就无法处理消息了,系统的健壮性不高
为了解决这两个问题,ActiveMQ中实现了虚拟Topic的功能
6.3 如何使用虚拟主题
1:对于消息发布者来说,就是一个正常的Topic,名称以VirtualTopic.开头。例如VirtualTopic.Orders,代码示例如下:
1
|
Topic destination = session.createTopic("VirtualTopic.myTest");
|
2:对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为队列的名称,即可表明自己的身份即可实现消费端应用分组。
例如Consumer.A.VirtualTopic.Orders,说明它是名称为A的消费端,同理Consumer.B.VirtualTopic.Orders说明是一个名称为B的客户端。可以在同一个应用里使用
多个consumer消费此queue,则可以实现上面两个功能。
又因为不同应用使用的queue名称不同(前缀不同),所以不同的应用中都可以接收到全部的消息。每个客户端相当于一个持久订阅者,而且这个客户端可以使用多
个消费者共同来承担消费任务。代码示例如下:
1
|
Destination destination = session.createQueue(
"Consumer.A.VirtualTopic.myTest"
);
|
注意:将destinationInterceptors注释掉
java代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
public
void
topic()
throws
Exception {
ConnectionFactory connectionFactory =
new
ActiveMQConnectionFactory(
"liuy"
,
"123456"
,
"failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)"
);
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(
"VirtualTopic.myTest"
);
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 设置DeliveryMode.PERSISTENT模式
connection.start();
}
public
void
queue()
throws
Exception {
ConnectionFactory cf =
new
ActiveMQConnectionFactory(
"liuy"
,
"123456"
,
"failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)"
);
Connection connection = cf.createConnection();
connection.start();
final
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(
"Consumer.A.VirtualTopic.myTest"
);
MessageConsumer consumer = session.createConsumer(destination);
int
i=
0
;
while
(i<
3
) {
i++;
TextMessage message = (TextMessage) consumer.receive();
session.commit();
System.out.println(
"收到消 息:"
+ message.getText());
}
session.close();
connection.close();
}
|
3:默认虚拟主题的前缀是 :VirtualTopic.>
自定义消费虚拟地址默认格式:Consumer.*.VirtualTopic.>
自定义消费虚拟地址可以改,比如下面的配置就把它修改了。
xml配置示例如下:
1
2
3
4
5
6
7
8
9
|
<
broker
xmlns
=
"http://activemq.apache.org/schema/core"
>
<
destinationInterceptors
>
<
virtualDestinationInterceptor
>
<
virtualDestinations
>
<
virtualTopic
name=">" prefix="VirtualTopicConsumers.*." selectorAware="false"/>
</
virtualDestinations
>
</
virtualDestinationInterceptor
>
</
destinationInterceptors
>
</
broker
>
|
七、镜像Queues(Mirrored Queues)
7.1 简介
ActiveMQ中每个queue中的消息只能被一个consumer消费。然而,有时候你可能希望能够监视生产者和消费者之间的消息流。你可以通过使用Virtual Destinations 来建立一
个virtual queue 来把消息转发到多个queues中。但是 为系统中每个queue都进行如此的配置可能会很麻烦。
7.2 使用
ActiveMQ支持Mirrored Queues。Broker会把发送到某个queue的所有消息转发到一个名称类似的topic,因此监控程序只需要订阅这个mirrored queue topic。为了启用
Mirrored Queues,首先要将BrokerService的useMirroredQueues属性设置成true,然后可以通过destinationInterceptors设置其它属性,如mirror topic的前缀,缺省
是“VirtualTopic.Mirror.”。
比如修改后缀的配置示例如下conf/activemq.xml:
1
2
3
4
5
6
7
8
9
|
<
broker
xmlns
=
"http://activemq.apache.org/schema/core"
brokerName
=
"localhost"
dataDirectory
=
"${activemq.data}"
useMirroredQueues
=
"true"
>
...
<
destinationInterceptors
>
<
mirroredQueue
copyMessage
=
"true"
postfix
=
".qmirror"
prefix
=
""
/>
</
destinationInterceptors
>
...
</
broker
>
|
发送列队的时候,就会自动在topic里也发布
八、Per Destination Policies
ActiveMQ支持多种不同的策略,来单独配置每一个Destination它的属性很多,可以参见官方文档:
http://activemq.apache.org/per-destination-policies.html
本文转自我爱大金子博客51CTO博客,原文链接http://blog.51cto.com/1754966750/1920339如需转载请自行联系原作者
我爱大金子