开发者社区> 范大脚脚> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

关于ActiveMQ的几种集群配置

简介:
+关注继续查看

ActiveMQ的几种集群配置。

Queue consumer clusters

此集群让多个消费者同时消费一个队列,若某个消费者出问题无法消费信息,则未消费掉的消息将被发给其他正常的消费者,结构图如下:

Broker clusters

此种配置是一个消费者连接到多个broker集群的中的一个broker,当该broker出问题时,消费者自动连接到其他一个正常的broker。消费者使用 failover:// 协议来连接broker。

failover:(tcp://localhost:61616,tcp://localhost:61617)

failover官网介绍 http://activemq.apache.org/failover-transport-reference.html

broker之间的通过静态发现(static discovery)和动态发现(dynamic discovery)来维持彼此发现,下面来介绍静态发现和动态发现的机制:

静态发现:

静态发现通过配置固定的broker uri来发现彼此,配置语法如下:

static:(uri1,uri2,uri3,...)?options

例如:

1
static:(tcp://localhost:61616,tcp://remotehost:61617?trace=false,vm://localbroker)?initialReconnectDelay=100

  

更多静态发现介绍,见ActiveMQ官网 http://activemq.apache.org/static-transport-reference.html

动态发现:

动态发现机制是在各个broker启动时通过Fanout transport来发现彼此,配置举例如下:

1
2
3
4
5
6
<broker name="foo">
  <transportConnectors>
    <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>
  </transportConnectors>
  ...
</broker>

 更多动态发现机制介绍,见官网 http://activemq.apache.org/discovery-transport-reference.html

 

Networks of brokers

多个broker组成集群,当其中一个broker的消费者出问题导致消息堆积无法消费掉时,通过ActiveMQ支持的Network of Broker方案可将该broker堆积的消息转发到其他有消费者的broker。

该方案主要有以下两种配置方式:

1、为broker配置文件配置networkConnector元素

2、使用发现机制互相探测broker

Here is an example of using the fixed list of URIs:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<?xml version="1.0" encoding="UTF-8"?>
  
<beans xmlns="http://activemq.org/config/1.0">
  
  <broker brokerName="receiver" persistent="false" useJmx="false">
    <networkConnectors>
      <!-- Static discovery -->
      <networkConnector uri="static:(tcp://localhost:62001)"/>
      <!-- MasterSlave Discovery -->
      <!--<networkConnector uri="masterslave:(tcp://host1:61616,tcp://host2:61616,tcp://..)"/> -->
    </networkConnectors>
  
    <persistenceAdapter>
      <memoryPersistenceAdapter/>
    </persistenceAdapter>
  
   <transportConnectors>
      <transportConnector uri="tcp://localhost:62002"/>
    </transportConnectors>
  </broker>
  
</beans>

  

This example uses multicast discovery:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<?xml version="1.0" encoding="UTF-8"?>
  
<beans xmlns="http://activemq.org/config/1.0">
  
  <broker name="sender" persistent="false" useJmx="false">
    <networkConnectors>
      <networkConnector uri="multicast://default"/>
    </networkConnectors>
  
    <persistenceAdapter>
      <memoryPersistenceAdapter/>
    </persistenceAdapter>
  
  <transportConnectors>
      <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>
    </transportConnectors>
  </broker>
  
</beans>

  

Master Slave

通过部署多个broker实例,一个master和多个slave关系的broker来达到高可用性,有三种方案:

1、Master-Slave
2、SharedFile System Master Slave
3、JDBCMaster Slave

第一种方案由于只可以由两个AMQ实例组件,实际应用场景并不广泛;
第三种方案支持N个AMQ实例组网,但他的性能会受限于数据库;
第二种方案同样支持N个AMQ实例组网,基于kahadb存储策略,亦可以部署在分布式文件系统上,应用灵活、高效且安全。

Master Slave方案当其中一个broker启动并拿到独占锁时自动成为master,其他后续的broker则一直等待锁,当master宕机释放锁时其他slave拿到独占锁则自动成为master,部署结构如下:

第二种方案的配置只需修改config文件夹下activemq.xml文件,修改消息持久化使用的方案:

1
2
3
4
5
6
7
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="D:/Platform/mq_share_file">
  ...
    <persistenceAdapter>
            <kahaDB directory="D:/Platform/mq_share_file/kahadb" enableIndexWriteAsync="true" enableJournalDiskSyncs="false"/>
    </persistenceAdapter>
    ...
</broker>

 

消息生产者代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class P2PSender {
    private static final String QUEUE = "client1-to-client2";
 
    public static void main(String[] args) {
        // ConnectionFactory :连接工厂,JMS用它创建连接
        ConnectionFactory connectionFactory;
        // Connection :JMS客户端到JMS Provider的连接
        Connection connection = null;
        // Session:一个发送或接收消息的线程
        Session session;
        // Destination :消息的目的地;消息发送给谁.
        Destination destination;
        // MessageProducer:消息发送者
        MessageProducer producer;
        // TextMessage message;
        // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现
        connectionFactory = new ActiveMQConnectionFactory(
                "failover:(tcp://localhost:61616?wireFormat.maxInactivityDuration=0,tcp://localhost:61617?wireFormat.maxInactivityDuration=0)");
        try {
            // 构造从工厂得到连接对象
            connection = connectionFactory.createConnection();
            // 启动
            connection.start();
            // 获取操作连接
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue(QUEUE);
            // 获取session,FirstQueue是一个服务器的queue destination = session.createQueue("FirstQueue");
            // 得到消息生成者【发送者】
            producer = session.createProducer(destination);
            // 设置不持久化
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            // 构造消息
            sendMessage(session, producer);
            // session.commit();
            connection.close();
        catch (Exception e) {
            e.printStackTrace();
        finally {
            if (null != connection) {
                try {
                    connection.close();
                catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
 
    public static void sendMessage(Session session, MessageProducer producer) throws Exception {
        for (int i = 1; i <= 1; i++) {
            Date d = new Date();
            TextMessage message = session.createTextMessage("ActiveMQ发送消息" + i + "  " new Date());
            System.out.println("发送消息:ActiveMQ发送的消息" + i + "  " new Date());
            producer.send(message);
        }
    }
}

消息消费者代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class P2PReceiver {
    private static final String QUEUE = "client1-to-client2";
     
    public static void main(String[] args) {
        // ConnectionFactory :连接工厂,JMS用它创建连接
        ConnectionFactory connectionFactory;
        // Connection :JMS客户端到JMS Provider的连接
        Connection connection = null;
        // Session:一个发送或接收消息的线程
        Session session;
        // Destination :消息的目的地;消息发送给谁.
        Destination destination;
        // 消费者,消息接收者
        MessageConsumer consumer;
        connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616?wireFormat.maxInactivityDuration=0,tcp://localhost:61617?wireFormat.maxInactivityDuration=0)");
        try {
            // 得到连接对象
            connection = connectionFactory.createConnection();
            // 启动
            connection.start();
            // 获取操作连接
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建Queue
            destination = session.createQueue(QUEUE);
            consumer = session.createConsumer(destination);
            while (true) {
                TextMessage message = (TextMessage) consumer.receive();
                if (null != message) {
                    System.out.println("收到消息" + message.getText());
                }
            }
        catch (Exception e) {
            e.printStackTrace();
        finally {
            try {
                if (null != connection)
                    connection.close();
            catch (Throwable ignore) {
            }
        }
    }
}

  



本文转自邴越博客园博客,原文链接:http://www.cnblogs.com/binyue/p/5325945.html,如需转载请自行联系原作者

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
Docker安装rocketMQ集群之挂载方式安装
Docker安装rocketMQ集群之挂载方式安装
81 0
activeMQ入门安装
activeMQ入门安装
30 0
工作流Activiti框架的事务和并发!流程引擎中异步和排他操作详细解析
本篇文章介绍了在工作流的场景下项目中的事务操作和并发操作。主要内容包括异步操作和排他任务。通过这篇文章,可以了解到在工作流Activiti框架中处理事务操作的具体方式以及排他任务机制保证的在同一个流程实例中的Job不会并发执行。在阅读完这篇文章,基本上就能清楚地了解了工作流Activiti框架中的并发处理了。
500 0
ActiveMQ高可用集群部署方案
ActiveMQ是分布式系统中重要的组件,在生产环境中如何保证让ActiveMQ能够持续工作,同时还要使消息中间件服务保持可靠性和高效的处理性能。
2877 0
ActiveMQ笔记(4):搭建Broker集群(cluster)
上一篇介绍了基于Networks of Borkers的2节点HA方案,这一篇继续来折腾Networks of Brokers,当应用规模日渐增长时,2节点的broker可能仍然抗不住访问压力,这时候就需要多加一些broker,弄一个更大规模的Broker集群,但是怎么合理设置broker之间的网络...
1355 0
一:MetaMq集群中单个节点的安装配置示意图
MetaMQ集群一个节点的安装和配置示意图【1】:下载metaMQ的安装包【2】:进行metaMq的解压,重命名为  并进入真正的主目录,该mq由于是阿里开源产品,所以目录很深【3】:进行metaMq的环境变量配置【4】:进入metaMq的配置文件存放目录【5】:进入conf目录进行该文件的配置 server.
968 0
Blend 3.0入门之SketchFlow详解(下)
接上一篇Blend 3.0入门之SketchFlow详解(上),因发布限制,所以分为两篇,不好意思啊:-) 你可以通过三种方式浏览各个页面a. 通过Navigate导航,下面的滑块控制页面大小。这个里面包含了很好的层次结构,当然,也包括并不直观的控件页嵌套b.通过Map导航右上角的是放大在右侧View中。
717 0
+关注
3656
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载