ActiveMQ(14):Destination(目的地)高级特性

简介:

一、Wildcards

Wildcards用来支持名字分层体系,它不是JMS规范的一部分,是ActiveMQ的扩展。ActiveMQ支持以下三种wildcards:

 1:“.” 用于作为路径上名字间的分隔符

 2:“*” 用于匹配路径上的任何名字

 3:">" 用于递归地匹配任何以这个名字开始的destination


示例,设想你有如下两个destinations

  PRICE.STOCK.NASDAQ.IBM (IBM在NASDAQ的股价)

  PRICE.STOCK.NYSE.SUNW (SUN在纽约证券交易所的股价)

那么:

 1:PRICE.> :匹配任何产品的价格变动

 2:PRICE.STOCK.> :匹配任何产品的股票价格变动

 3:PRICE.STOCK.NASDAQ.* :匹配任何在NASDAQ下面的产品的股票价格变动

 4:PRICE.STOCK.*.IBM:匹配任何IBM的产品的股票价格变动

二、组合列队Composite Destinations

组合队列允许用一个虚拟的destination代表多个destinations。这样就可以通过composite destinations在一个操作中同时向多个queue发送消息。

2.1 客户端实现的方式

在composite destinations中,多个destination之间采用“,”分割。例如:

1
2
3
     Queue queue =  new  ActiveMQQueue( "FOO.A,FOO.B,FOO.C" );
  
   Destination destination = session.createQueue( "my-queue,my-queue2" );

如果你希望使用不同类型的destination,那么需要加上前缀如queue:// 或topic://,例如:

1
     Queue queue =  new  ActiveMQQueue( "FOO.A,topic://NOTIFY.FOO.A" );

2.2 在conf/activemq.xml中的broker下配置实现

1
2
3
4
5
6
7
8
9
10
11
12
< destinationInterceptors >
     < virtualDestinationInterceptor >
       < virtualDestinations >
         < compositeQueue  name = "MY.QUEUE" >
           < forwardTo >
             < queue  physicalName = "my-queue"  />
           < queue  physicalName = "my-queue2"  />
           </ forwardTo >
             </ compositeQueue >
         </ virtualDestinations >
     </ virtualDestinationInterceptor >
</ destinationInterceptors >

再java代码发送的时候,队列的的名字就用MY.QUEUQ

三、Configure Startup Destinations

如果需要在ActiveMQ启动的时候,创建Destination的话,可以如下配置conf/activemq.xml的broker下:

1
2
3
4
< destinations >
     < queue  physicalName = "FOO.BAR"  />
   < topic  physicalName = "SOME.TOPIC"  />
</ destinations >

四、Delete Inactive Destinations

一般情况下,ActiveMQ的queue在不使用之后,可以通过web控制台或是JMX方式来删除掉。当然,也可以通过配置,使得broker可以自动探测到无用

的队列(一定时间内为空的队列)并删除掉,回收响应资源。可以如下配置conf/activemq.xml:

1
2
3
4
5
6
7
8
9
< broker  schedulePeriodForDestinationPurge = "10000" >
     < destinationPolicy >
       < policyMap >
         < policyEntries >
           < policyEntry  queue=">" gcInactiveDestinations="true" inactiveTimoutBeforeGC="30000"/>
       </ policyEntries >
     </ policyMap >
     </ destinationPolicy >
</ broker >

说明:

  schedulePeriodForDestinationPurge:设置多长时间检查一次,这里是10秒,默认为0

  inactiveTimoutBeforeGC:设置当Destination为空后,多长时间被删除,这里是30秒,默认为60

  gcInactiveDestinations: 设置删除掉不活动队列,默认为false

五、Destination Options

队列选项是给consumer在JMS规范之外添加的功能特性,通过在队列名称后面使用类似URL的语法添加多个选项。包括:

 1:consumer.prefetchSize,consumer持有的未确认最大消息数量,默认值 variable

 2:consumer.maximumPendingMessageLimit:用来控制非持久化的topic在存在慢消费者的情况下,丢弃的数量,默认0

 3:consumer.noLocal :默认false

 4:consumer.dispatchAsync :是否异步分发 ,默认true

 5:consumer.retroactive:是否为回溯消费者 ,默认false

 6:consumer.selector:Jms的Selector,默认null

 7:consumer.exclusive:是否为独占消费者 ,默认false

 8:consumer.priority:设置消费者的优先级,默认0


使用示例:

1
2
queue =  new  ActiveMQQueue( "TEST.QUEUE?consumer.dispatchAsync=false&consumer.prefetchSize=10" );
consumer = session.createConsumer(queue);

六、虚拟Destinations(Visual Destinations)

6.1 简介

虚拟Destinations用来创建逻辑Destinations,客户端可以通过它来生产和消费消息,它会把消息映射到物理Destinations。

ActiveMQ支持两种方式:

 1:虚拟主题(Virtual Topics)

 2:组合Destinations(Composite Destinations)


6.2 为何使用虚拟主题

ActiveMQ中,topic只有在持久订阅下才是持久化的。持久订阅时,每个持久订阅者,都相当于一个queue的客户端,它会收取所有消息。这种情况下存在两个问题:

 1:同一应用内consumer端负载均衡的问题:也即是同一个应用上的一个持久订阅不能使用多个consumer来共同承担消息处理功能。因为每个consumer都会获取所有消息。

    queue模式可以解决这个问题,但broker端又不能将消息发送到多个应用端。所以,既要发布订阅,又要让消费者分组,这个功能JMS规范本身是没有的。

  2:同一应用内consumer端failover的问题:由于只能使用单个的持久订阅者,如果这个订阅者出错,则应用就无法处理消息了,系统的健壮性不高


 为了解决这两个问题,ActiveMQ中实现了虚拟Topic的功能

6.3 如何使用虚拟主题

1:对于消息发布者来说,就是一个正常的Topic,名称以VirtualTopic.开头。例如VirtualTopic.Orders,代码示例如下:

1
Topic destination = session.createTopic("VirtualTopic.myTest");

2:对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为队列的名称,即可表明自己的身份即可实现消费端应用分组。

  例如Consumer.A.VirtualTopic.Orders,说明它是名称为A的消费端,同理Consumer.B.VirtualTopic.Orders说明是一个名称为B的客户端。可以在同一个应用里使用

  多个consumer消费此queue,则可以实现上面两个功能。


  又因为不同应用使用的queue名称不同(前缀不同),所以不同的应用中都可以接收到全部的消息。每个客户端相当于一个持久订阅者,而且这个客户端可以使用多

  个消费者共同来承担消费任务。代码示例如下:

1
     Destination destination = session.createQueue( "Consumer.A.VirtualTopic.myTest" );

注意:将destinationInterceptors注释掉


wKioL1kBwtaCeyk-AADNu9WLdSw854.jpg


java代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public  void  topic()  throws  Exception {
     ConnectionFactory connectionFactory =  new  ActiveMQConnectionFactory( "liuy" , "123456" , "failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)" );
         
   Connection connection = connectionFactory.createConnection();
         
   Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
          
   Destination destination = session.createTopic( "VirtualTopic.myTest" );
         
   MessageProducer producer = session.createProducer(destination);
   producer.setDeliveryMode(DeliveryMode.PERSISTENT);  // 设置DeliveryMode.PERSISTENT模式
         
   connection.start();
}
 
public  void  queue()  throws  Exception {
     ConnectionFactory cf =  new  ActiveMQConnectionFactory( "liuy" , "123456" , "failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)" );
     Connection connection = cf.createConnection();
     connection.start();
         
     final  Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
     Destination destination = session.createQueue( "Consumer.A.VirtualTopic.myTest" );
     MessageConsumer consumer = session.createConsumer(destination);
     int  i= 0 ;
     while (i< 3 ) {
         i++;
     TextMessage message = (TextMessage) consumer.receive();
     session.commit();
     System.out.println( "收到消 息:"  + message.getText());
     }
     session.close();
     connection.close();
}

3:默认虚拟主题的前缀是 :VirtualTopic.>

 自定义消费虚拟地址默认格式:Consumer.*.VirtualTopic.>

 自定义消费虚拟地址可以改,比如下面的配置就把它修改了。

 xml配置示例如下:

1
2
3
4
5
6
7
8
9
< broker  xmlns = "http://activemq.apache.org/schema/core" >
     < destinationInterceptors >
         < virtualDestinationInterceptor >
             < virtualDestinations >
                 < virtualTopic  name=">" prefix="VirtualTopicConsumers.*." selectorAware="false"/>
             </ virtualDestinations >
         </ virtualDestinationInterceptor >
     </ destinationInterceptors >
</ broker >

七、镜像Queues(Mirrored Queues)

7.1 简介

ActiveMQ中每个queue中的消息只能被一个consumer消费。然而,有时候你可能希望能够监视生产者和消费者之间的消息流。你可以通过使用Virtual Destinations 来建立一

个virtual queue 来把消息转发到多个queues中。但是 为系统中每个queue都进行如此的配置可能会很麻烦。

7.2 使用

ActiveMQ支持Mirrored Queues。Broker会把发送到某个queue的所有消息转发到一个名称类似的topic,因此监控程序只需要订阅这个mirrored queue topic。为了启用

Mirrored Queues,首先要将BrokerService的useMirroredQueues属性设置成true,然后可以通过destinationInterceptors设置其它属性,如mirror topic的前缀,缺省

是“VirtualTopic.Mirror.”。

比如修改后缀的配置示例如下conf/activemq.xml:

1
2
3
4
5
6
7
8
9
< broker  xmlns = "http://activemq.apache.org/schema/core"  brokerName = "localhost"  dataDirectory = "${activemq.data}"
         useMirroredQueues = "true"
     >
     ...
< destinationInterceptors >
     < mirroredQueue  copyMessage = "true"  postfix = ".qmirror"  prefix = "" />
</ destinationInterceptors >
     ...
</ broker >

发送列队的时候,就会自动在topic里也发布

  wKiom1kCpTLjkggDAABDf2CuOKk596.jpg


八、Per Destination Policies

ActiveMQ支持多种不同的策略,来单独配置每一个Destination它的属性很多,可以参见官方文档:

http://activemq.apache.org/per-destination-policies.html 

本文转自我爱大金子博客51CTO博客,原文链接http://blog.51cto.com/1754966750/1920339如需转载请自行联系原作者


我爱大金子

相关文章
|
7月前
|
消息中间件
RabbitMQ消息模型之Routing-Direct
RabbitMQ消息模型之Routing-Direct
100 1
|
7月前
|
消息中间件 Java Spring
SpringBoot实现RabbitMQ的广播交换机(SpringAMQP 实现Fanout广播交换机)
SpringBoot实现RabbitMQ的广播交换机(SpringAMQP 实现Fanout广播交换机)
79 2
|
4月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
76 2
|
4月前
|
消息中间件 SQL 监控
RocketMQ 5.3.0 版本中 Broker IP 配置为 IPv6 的情况
【8月更文第28天】RocketMQ 是一款分布式消息中间件,支持多种消息发布和订阅模式。在 RocketMQ 5.3.0 版本中,Broker 的配置文件 `broker.conf` 允许配置 IPv6 地址。当 Broker 的 `brokerIP1` 配置为 IPv6 地址时,会对 Broker 的启动、消息推送和状态监控等方面产生影响。本文将探讨如何在 RocketMQ 中配置 IPv6 地址,并检查 Broker 的状态。
228 0
|
5月前
|
消息中间件 Kafka
面试题Kafka问题之RabbitMQ的路由配置工作如何解决
面试题Kafka问题之RabbitMQ的路由配置工作如何解决
67 1
|
6月前
|
消息中间件 Java API
消息队列 MQ产品使用合集之遇到"No topic route info in name server for the topic"错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7月前
|
消息中间件 Java Spring
SpringBoot实现RabbitMQ的定向交换机(SpringAMQP 实现Direct定向交换机)
SpringBoot实现RabbitMQ的定向交换机(SpringAMQP 实现Direct定向交换机)
78 1
|
7月前
|
消息中间件 Java
SpringBoot实现RabbitMQ的通配符交换机(SpringAMQP 实现Topic交换机)
SpringBoot实现RabbitMQ的通配符交换机(SpringAMQP 实现Topic交换机)
65 1
|
消息中间件 Java Maven
消息中间件系列教程(10) -RabbitMQ -案例代码(路由模式)
消息中间件系列教程(10) -RabbitMQ -案例代码(路由模式)
82 0
|
7月前
|
消息中间件 微服务
RabbitMQ入门指南(十):延迟消息-死信交换机
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了死信交换机、死信交换机实现延迟消息等内容。
187 0