ActiveMQ(14):Destination(目的地)高级特性

简介:

一、Wildcards

Wildcards用来支持名字分层体系,它不是JMS规范的一部分,是ActiveMQ的扩展。ActiveMQ支持以下三种wildcards:

 1:“.” 用于作为路径上名字间的分隔符

 2:“*” 用于匹配路径上的任何名字

 3:">" 用于递归地匹配任何以这个名字开始的destination


示例,设想你有如下两个destinations

  PRICE.STOCK.NASDAQ.IBM (IBM在NASDAQ的股价)

  PRICE.STOCK.NYSE.SUNW (SUN在纽约证券交易所的股价)

那么:

 1:PRICE.> :匹配任何产品的价格变动

 2:PRICE.STOCK.> :匹配任何产品的股票价格变动

 3:PRICE.STOCK.NASDAQ.* :匹配任何在NASDAQ下面的产品的股票价格变动

 4:PRICE.STOCK.*.IBM:匹配任何IBM的产品的股票价格变动

二、组合列队Composite Destinations

组合队列允许用一个虚拟的destination代表多个destinations。这样就可以通过composite destinations在一个操作中同时向多个queue发送消息。

2.1 客户端实现的方式

在composite destinations中,多个destination之间采用“,”分割。例如:

1
2
3
     Queue queue =  new  ActiveMQQueue( "FOO.A,FOO.B,FOO.C" );
  
   Destination destination = session.createQueue( "my-queue,my-queue2" );

如果你希望使用不同类型的destination,那么需要加上前缀如queue:// 或topic://,例如:

1
     Queue queue =  new  ActiveMQQueue( "FOO.A,topic://NOTIFY.FOO.A" );

2.2 在conf/activemq.xml中的broker下配置实现

1
2
3
4
5
6
7
8
9
10
11
12
< destinationInterceptors >
     < virtualDestinationInterceptor >
       < virtualDestinations >
         < compositeQueue  name = "MY.QUEUE" >
           < forwardTo >
             < queue  physicalName = "my-queue"  />
           < queue  physicalName = "my-queue2"  />
           </ forwardTo >
             </ compositeQueue >
         </ virtualDestinations >
     </ virtualDestinationInterceptor >
</ destinationInterceptors >

再java代码发送的时候,队列的的名字就用MY.QUEUQ

三、Configure Startup Destinations

如果需要在ActiveMQ启动的时候,创建Destination的话,可以如下配置conf/activemq.xml的broker下:

1
2
3
4
< destinations >
     < queue  physicalName = "FOO.BAR"  />
   < topic  physicalName = "SOME.TOPIC"  />
</ destinations >

四、Delete Inactive Destinations

一般情况下,ActiveMQ的queue在不使用之后,可以通过web控制台或是JMX方式来删除掉。当然,也可以通过配置,使得broker可以自动探测到无用

的队列(一定时间内为空的队列)并删除掉,回收响应资源。可以如下配置conf/activemq.xml:

1
2
3
4
5
6
7
8
9
< broker  schedulePeriodForDestinationPurge = "10000" >
     < destinationPolicy >
       < policyMap >
         < policyEntries >
           < policyEntry  queue=">" gcInactiveDestinations="true" inactiveTimoutBeforeGC="30000"/>
       </ policyEntries >
     </ policyMap >
     </ destinationPolicy >
</ broker >

说明:

  schedulePeriodForDestinationPurge:设置多长时间检查一次,这里是10秒,默认为0

  inactiveTimoutBeforeGC:设置当Destination为空后,多长时间被删除,这里是30秒,默认为60

  gcInactiveDestinations: 设置删除掉不活动队列,默认为false

五、Destination Options

队列选项是给consumer在JMS规范之外添加的功能特性,通过在队列名称后面使用类似URL的语法添加多个选项。包括:

 1:consumer.prefetchSize,consumer持有的未确认最大消息数量,默认值 variable

 2:consumer.maximumPendingMessageLimit:用来控制非持久化的topic在存在慢消费者的情况下,丢弃的数量,默认0

 3:consumer.noLocal :默认false

 4:consumer.dispatchAsync :是否异步分发 ,默认true

 5:consumer.retroactive:是否为回溯消费者 ,默认false

 6:consumer.selector:Jms的Selector,默认null

 7:consumer.exclusive:是否为独占消费者 ,默认false

 8:consumer.priority:设置消费者的优先级,默认0


使用示例:

1
2
queue =  new  ActiveMQQueue( "TEST.QUEUE?consumer.dispatchAsync=false&consumer.prefetchSize=10" );
consumer = session.createConsumer(queue);

六、虚拟Destinations(Visual Destinations)

6.1 简介

虚拟Destinations用来创建逻辑Destinations,客户端可以通过它来生产和消费消息,它会把消息映射到物理Destinations。

ActiveMQ支持两种方式:

 1:虚拟主题(Virtual Topics)

 2:组合Destinations(Composite Destinations)


6.2 为何使用虚拟主题

ActiveMQ中,topic只有在持久订阅下才是持久化的。持久订阅时,每个持久订阅者,都相当于一个queue的客户端,它会收取所有消息。这种情况下存在两个问题:

 1:同一应用内consumer端负载均衡的问题:也即是同一个应用上的一个持久订阅不能使用多个consumer来共同承担消息处理功能。因为每个consumer都会获取所有消息。

    queue模式可以解决这个问题,但broker端又不能将消息发送到多个应用端。所以,既要发布订阅,又要让消费者分组,这个功能JMS规范本身是没有的。

  2:同一应用内consumer端failover的问题:由于只能使用单个的持久订阅者,如果这个订阅者出错,则应用就无法处理消息了,系统的健壮性不高


 为了解决这两个问题,ActiveMQ中实现了虚拟Topic的功能

6.3 如何使用虚拟主题

1:对于消息发布者来说,就是一个正常的Topic,名称以VirtualTopic.开头。例如VirtualTopic.Orders,代码示例如下:

1
Topic destination = session.createTopic("VirtualTopic.myTest");

2:对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为队列的名称,即可表明自己的身份即可实现消费端应用分组。

  例如Consumer.A.VirtualTopic.Orders,说明它是名称为A的消费端,同理Consumer.B.VirtualTopic.Orders说明是一个名称为B的客户端。可以在同一个应用里使用

  多个consumer消费此queue,则可以实现上面两个功能。


  又因为不同应用使用的queue名称不同(前缀不同),所以不同的应用中都可以接收到全部的消息。每个客户端相当于一个持久订阅者,而且这个客户端可以使用多

  个消费者共同来承担消费任务。代码示例如下:

1
     Destination destination = session.createQueue( "Consumer.A.VirtualTopic.myTest" );

注意:将destinationInterceptors注释掉


wKioL1kBwtaCeyk-AADNu9WLdSw854.jpg


java代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public  void  topic()  throws  Exception {
     ConnectionFactory connectionFactory =  new  ActiveMQConnectionFactory( "liuy" , "123456" , "failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)" );
         
   Connection connection = connectionFactory.createConnection();
         
   Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
          
   Destination destination = session.createTopic( "VirtualTopic.myTest" );
         
   MessageProducer producer = session.createProducer(destination);
   producer.setDeliveryMode(DeliveryMode.PERSISTENT);  // 设置DeliveryMode.PERSISTENT模式
         
   connection.start();
}
 
public  void  queue()  throws  Exception {
     ConnectionFactory cf =  new  ActiveMQConnectionFactory( "liuy" , "123456" , "failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)" );
     Connection connection = cf.createConnection();
     connection.start();
         
     final  Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
     Destination destination = session.createQueue( "Consumer.A.VirtualTopic.myTest" );
     MessageConsumer consumer = session.createConsumer(destination);
     int  i= 0 ;
     while (i< 3 ) {
         i++;
     TextMessage message = (TextMessage) consumer.receive();
     session.commit();
     System.out.println( "收到消 息:"  + message.getText());
     }
     session.close();
     connection.close();
}

3:默认虚拟主题的前缀是 :VirtualTopic.>

 自定义消费虚拟地址默认格式:Consumer.*.VirtualTopic.>

 自定义消费虚拟地址可以改,比如下面的配置就把它修改了。

 xml配置示例如下:

1
2
3
4
5
6
7
8
9
< broker  xmlns = "http://activemq.apache.org/schema/core" >
     < destinationInterceptors >
         < virtualDestinationInterceptor >
             < virtualDestinations >
                 < virtualTopic  name=">" prefix="VirtualTopicConsumers.*." selectorAware="false"/>
             </ virtualDestinations >
         </ virtualDestinationInterceptor >
     </ destinationInterceptors >
</ broker >

七、镜像Queues(Mirrored Queues)

7.1 简介

ActiveMQ中每个queue中的消息只能被一个consumer消费。然而,有时候你可能希望能够监视生产者和消费者之间的消息流。你可以通过使用Virtual Destinations 来建立一

个virtual queue 来把消息转发到多个queues中。但是 为系统中每个queue都进行如此的配置可能会很麻烦。

7.2 使用

ActiveMQ支持Mirrored Queues。Broker会把发送到某个queue的所有消息转发到一个名称类似的topic,因此监控程序只需要订阅这个mirrored queue topic。为了启用

Mirrored Queues,首先要将BrokerService的useMirroredQueues属性设置成true,然后可以通过destinationInterceptors设置其它属性,如mirror topic的前缀,缺省

是“VirtualTopic.Mirror.”。

比如修改后缀的配置示例如下conf/activemq.xml:

1
2
3
4
5
6
7
8
9
< broker  xmlns = "http://activemq.apache.org/schema/core"  brokerName = "localhost"  dataDirectory = "${activemq.data}"
         useMirroredQueues = "true"
     >
     ...
< destinationInterceptors >
     < mirroredQueue  copyMessage = "true"  postfix = ".qmirror"  prefix = "" />
</ destinationInterceptors >
     ...
</ broker >

发送列队的时候,就会自动在topic里也发布

  wKiom1kCpTLjkggDAABDf2CuOKk596.jpg


八、Per Destination Policies

ActiveMQ支持多种不同的策略,来单独配置每一个Destination它的属性很多,可以参见官方文档:

http://activemq.apache.org/per-destination-policies.html 

本文转自我爱大金子博客51CTO博客,原文链接http://blog.51cto.com/1754966750/1920339如需转载请自行联系原作者


我爱大金子

相关文章
|
8月前
|
消息中间件 安全 Kafka
一文搞懂Kafka中的listeners配置策略
1. listeners中的plaintext controller external是什么意思? 2. Kraft模式下controller和broker有何区别? 3. 集群节点之间同步什么数据,通过哪个端口,是否可以自定义端口? 4. 客户端通过哪个端口连接到kafka,通过9092连接的是什么,broker还是controller? 5. 为controller配置了单独的端口有什么用? 6. control.plane.listener.name与controller.listener.names有何区别?
1681 2
|
5月前
|
消息中间件 大数据 Kafka
Kafka消息封装揭秘:从Producer到Consumer,一文掌握高效传输的秘诀!
【8月更文挑战第24天】在分布式消息队列领域,Apache Kafka因其实现的高吞吐量、良好的可扩展性和数据持久性备受开发者青睐。Kafka中的消息以Record形式存在,包括固定的头部与可变长度的消息体。生产者(Producer)将消息封装为`ProducerRecord`对象后发送;消费者(Consumer)则从Broker拉取并解析为`ConsumerRecord`。消息格式简化示意如下:消息头 + 键长度 + 键 + 值长度 + 值。键和值均为字节数组,需使用特定的序列化/反序列化器。理解Kafka的消息封装机制对于实现高效、可靠的数据传输至关重要。
162 4
|
消息中间件 网络协议 安全
【Kafka从入门到成神系列 八】Kafka 多线程消费者及TCP连接
【Kafka从入门到成神系列 八】Kafka 多线程消费者及TCP连接
【Kafka从入门到成神系列 八】Kafka 多线程消费者及TCP连接
|
消息中间件 JSON 运维
Rocket MQ报错No route info of this topic的问题探究
Rocket MQ报错No route info of this topic的问题探究
926 0
|
消息中间件 缓存 网络协议
【Kafka从入门到成神系列 四】Kafka 消息丢失及 TCP 管理
【Kafka从入门到成神系列 四】Kafka 消息丢失及 TCP 管理
【Kafka从入门到成神系列 四】Kafka 消息丢失及 TCP 管理
|
消息中间件 分布式计算 网络协议
Kafka:第一章:基本概念以及安装Kafka,单播模式和多播模式
Kafka:第一章:基本概念以及安装Kafka,单播模式和多播模式
265 0
|
消息中间件 存储 安全
Broker消息设计--Kafka从入门到精通(十三)
Broker消息设计--Kafka从入门到精通(十三)
RabbitMQ02_简单模式、Publish/Subscribe发布与订阅模式、Routing路由模式、Topics通配符模式、Work模式-轮询、公平(十一)
RabbitMQ02_简单模式、Publish/Subscribe发布与订阅模式、Routing路由模式、Topics通配符模式、Work模式-轮询、公平(十一)
165 0
RabbitMQ02_简单模式、Publish/Subscribe发布与订阅模式、Routing路由模式、Topics通配符模式、Work模式-轮询、公平(十一)
RabbitMQ02_简单模式、Publish/Subscribe发布与订阅模式、Routing路由模式、Topics通配符模式、Work模式-轮询、公平(七)
RabbitMQ02_简单模式、Publish/Subscribe发布与订阅模式、Routing路由模式、Topics通配符模式、Work模式-轮询、公平(七)
166 0
RabbitMQ02_简单模式、Publish/Subscribe发布与订阅模式、Routing路由模式、Topics通配符模式、Work模式-轮询、公平(七)