ActiveMQ高可用+负载均衡集群之功能测试

本文涉及的产品
传统型负载均衡 CLB,每月750个小时 15LCU
应用型负载均衡 ALB,每月750个小时 15LCU
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。JMS的全称是Java Message Service,即Java消息服务。它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。

1.基础功能测试

ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。JMS的全称是Java Message Service,即Java消息服务。它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。而消息的传递有两种类型,主要如下:

  • 一种是点对点的,即一个生产者和一个消费者一一对应。
    image
  • 另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
    image

ActiveMQ和JMS的消息类型对应如下

JMS消息模型 P2P消息模型 Pub/Sub消息模型
ActiveMQ Queue队列 Topic队列
特点 一对一,生产者生产了一个消息,只能由一个消费者进行消费 一对多,生产者生产了一个消息,可以由多个消费者进行消费

接下来将对两种类型的场景进行分别验证。

1.1点对点模式(Queue)

点对点的模式主要建立在某个queue上,消息可以被同步或异步的发送和接收。点对点的消息模式可以有多个发送端,多个接收端,但是每个消息只会给一个Consumer传送一次。

1.1.1引入依赖

1.ActiveMQ依赖

<dependency>
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-all</artifactId>
   <version>5.15.12</version>
</dependency>

2.Springboot-ActiveMQ连接池依赖
如果要启用连接池,且使用springboot2.0+及以下版本的时候,maven配置依赖是:

<dependency> 
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId> 
</dependency>

如果要启用连接池,且使用springboot2.1+的时候,maven配置依赖是:

<dependency>
<groupId>org.messaginghub</groupId>
<artifactId>pooled-jms</artifactId>
</dependency>

1.1.2生产者发布Queue消息

由于ActiveMQ的客户端只能访问Master的Broker,其他处于Slave的Broker不能被访问,所以客户端连接Broker应该使用failover协议,具体代码如下。

String url="failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)";//服务地址
String queueName = "queue-testqq";//要创建的消息名称
//1.创建ConnectiongFactory,绑定地址
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
//2.创建Connection
Connection connection = factory.createConnection();
//3.启动连接
connection.start();
//4.创建会话
//第一个参数:是否开启事务
//第二个参数:消息是否自动确认
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建一个队列
Destination destination = session.createQueue(queueName);
//6.创建一个生产者
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 20; i++) {
    //7.创建消息
    TextMessage textMessage = session.createTextMessage("我是Queue消息生产者:" + i);
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    //8.发送消息
    producer.send(textMessage);
    System.out.println("发送第一组消息:" + i);
}
connection.close();

运行代码后,可在ActiveMQ控制台查看对应的Queue信息,此时有消息待接收。
image

1.1.3消费者接收Queue消息

Consumer 可以使用 MessageConsumer.receive() 同步地接收消息,也可以通过使用MessageConsumer.setMessageListener() 注册一个 MessageListener 实现异步接收。
这里使用异步接收的方式消费消息,具体代码如下:

String url="failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)";//服务地址
String queueName="queue-testqq";//要消费的消息名称

//1.创建ConnectiongFactory,绑定地址
ConnectionFactory factory=new ActiveMQConnectionFactory(url);
//2.创建Connection
Connection connection= factory.createConnection();
//3.启动连接
connection.start();
//4.创建会话
/** 第一个参数,是否使用事务
 如果设置true,操作消息队列后,必须使用 session.commit();
 如果设置false,操作消息队列后,不使用session.commit();
 */
Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建一个目标
Destination destination=session.createQueue(queueName);
//6.创建一个消费者
MessageConsumer consumer=session.createConsumer(destination);
//7.创建一个监听器
consumer.setMessageListener(new MessageListener() {
    public void onMessage(Message arg0) {
        TextMessage textMessage=(TextMessage)arg0;
        try {
            System.out.println("接收消息:"+textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
});

执行代码后,可以在控制台如下输出:
image
在ActiveMQ控制台查看对应的Queue信息,此时消息已被消费。
image

1.1.4多消费者模式

同时开启2个以上的消费者,再次运行生产者,观察每个消费者控制台的输出。
image

image
观察后,得出结论:一条消息只会被一个消费者会接收消费,不可重复消费。同时还发现,多个消费者的情况下消息会被均分,即负载均衡策略。
生产者发送消息情况:
image
消费者1接收消息情况:
image
消费者2接收消息情况:
image

1.2发布/订阅模式(Topic)

Pub/Sub(发布/订阅,Publish/Subscribe)消息域使用topic作为Destination,发布者向topic发送消息,订阅者注册接收来自topic的消息。发送到topic的任何消息都将自动传递给所有订阅者。接收方式(同步和异步)与P2P域相同。

1.2.1生产者发布Topic消息

Pub/Sub模式与P2P模式在代码实现层面基本一样,变化的只有Queue与Topic。生产者发布Topic消息的具体代码如下:

String url="failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)";//服务地址,端口默认61616
        String topicName="topic-testqq";//要创建的消息名称

        //1.创建ConnectiongFactory,绑定地址
        ConnectionFactory factory=new ActiveMQConnectionFactory(url);
        //2.创建Connection
        Connection connection= factory.createConnection();
        //3.启动连接
        connection.start();
        //4.创建会话 (参数1:是否启动事务,参数2:消息确认模式)
        Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建一个目标
        Destination destination=session.createTopic(topicName);
//        Topic topic = session.createTopic(topicName);
        //6.创建一个生产者
        MessageProducer producer=session.createProducer(destination);
        for (int i = 0; i < 15; i++) {
            //7.创建消息
            TextMessage textMessage=session.createTextMessage("我是topic类型消息生产者:"+i);
            //8.发送消息
            producer.send(textMessage);
            System.out.println("发送消息:"+i);
        }
        connection.close();

此时,消息生产者先不启动。Pub/Sub模式下必须先启动sub,否则在启动sub之前发布的消息是不能消费的,就像你今天开始订报纸,那今天之前的报纸你肯定是收不到了,发布/订阅模式与此同理。

1.2.2消费者接收Topic消息

与P2P模式相同,Pub/Sub模式的消息接收方式也有两种:同步接收和异步接收。这里采用异步接收的方式消费Topic消息,具体代码如下:

String url = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)";//服务地址,端口默认61616
String topicName = "topic-testqq";//要创建的消息名称

//1.创建ConnectiongFactory,绑定地址
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
//2.创建Connection
Connection connection = factory.createConnection();
//3.启动连接
connection.start();
//4.创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建一个目标
Destination destination = session.createTopic(topicName);
//6.创建一个消费者
MessageConsumer consumer = session.createConsumer(destination);
//7.接收消息,可选择同步接收或者异步接收
/*//消费者同步接收,receive(long timeout)主线程阻塞式等待下一个消息到来,可设置超时时间,超时则返回null。
TextMessage message = (TextMessage) consumer.receive(1000);
System.out.println("同步接收Topic消息: " + message);*/
//消费者异步接收,创建一个监听器
consumer.setMessageListener(new MessageListener() {
    public void onMessage(Message arg0) {
        TextMessage textMessage = (TextMessage) arg0;
        try {
            System.out.println("异步接收Topic消息:" + textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
});

运行代码,可以看到控制台输出已消费Topic消息。
image

image
ActiveMQ控制台也可以看到对应的Topic发布、订阅信息。如下
image

1.2.3多消费者模式

同时开启2个以上的消费者,再次运行生产者,观察每个消费者控制台的输出.
image
两个消费者运行情况如下:
image

image
可以发现:

  • topic发布/订阅模式,一个消息可以被多个消费者消费
  • topic发布/订阅模式要求消费者必须即时消费,即生产者发布消息时,消费者必须同时在线才可接收消费消息。

2.高可用测试

2.1测试方案一

2.1.1测试用例

1.生产者连接集群发送50条消息并设置每发送一条消息,sleep1秒
2.观察生产者发送消息所连接的节点,并将所在的节点停掉
3.观察生产者发送消息日志,查看所有消息是不是正常发送

2.1.2测试代码

1.生产者发布消息

String url = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)";
String queueName = "queue-testqq";//要创建的消息名称
        
//1.创建ConnectiongFactory,绑定地址
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
//2.创建Connection
Connection connection = factory.createConnection();
//3.启动连接
connection.start();
//4.创建会话
//第一个参数:是否开启事务
//第二个参数:消息是否自动确认
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建一个目标
Destination destination = session.createQueue(queueName);
//        Queue queue = session.createQueue("test-Queue");
//6.创建一个生产者
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 50; i++) {
//7.创建消息
TextMessage textMessage = session.createTextMessage("我是第一组消息生产者:" + i);
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
}
//8.发送消息
producer.send(textMessage);
System.out.println("发送第一组消息:" + i);
  }
connection.close();

2.1.3测试过程

1.运行生产者发布消息,观察生产者控制台
image
2.停止mq1(61616)节点
image
3.继续观察生产者控制台
(1)此时61616节点已无法连接
image
(2)生产者已成功连接61619节点,并继续发送消息
image

2.2测试方案二

2.2.1测试用例

1.生产者连接集群发送20条消息
2.观察生产者发送消息所连接的节点,并将所在的节点停掉
3.消费者连接集群消费消息,观察消息消费情况

2.2.2测试代码

1.生产者发布消息

String url = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)";
String queueName = "queue-testqq";//要创建的消息名称
        
//1.创建ConnectiongFactory,绑定地址
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
//2.创建Connection
Connection connection = factory.createConnection();
//3.启动连接
connection.start();
//4.创建会话
//第一个参数:是否开启事务
//第二个参数:消息是否自动确认
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建一个目标
Destination destination = session.createQueue(queueName);
//        Queue queue = session.createQueue("test-Queue");
//6.创建一个生产者
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 20; i++) {
//7.创建消息
TextMessage textMessage = session.createTextMessage("我是第一组消息生产者:" + i);
//8.发送消息
producer.send(textMessage);
System.out.println("发送第一组消息:" + i);
  }
connection.close();

2.消费者接收消息

String url = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)";
String queueName = "queue-testqq";//要创建的消息名称

    //1.创建ConnectiongFactory,绑定地址
    ConnectionFactory factory = new ActiveMQConnectionFactory(url);
    //2.创建Connection
    Connection connection = factory.createConnection();
    //3.启动连接
    connection.start();
    //4.创建会话
    /** 第一个参数,是否使用事务
     如果设置true,操作消息队列后,必须使用 session.commit();
     如果设置false,操作消息队列后,不使用session.commit();
     */
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //5.创建一个目标
    Destination destination = session.createQueue(queueName);
    //6.创建一个消费者
    MessageConsumer consumer = session.createConsumer(destination);
    //7.创建一个监听器
    consumer.setMessageListener(new MessageListener() {
        public void onMessage(Message arg0) {
            TextMessage textMessage = (TextMessage) arg0;
            try {
                System.out.println("接收消息:" + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    });

2.2.3测试过程

1.运行生产者发布消息,观察生产者控制台
image
2.停止mq4(61619)节点
image
3.启动消费者,观察消费情况
image
观察发现,消费者连接61618节点,并成功接收消费消息。

2.3测试结论

经测试,当前集群在启动消息生产者发送消息时,使生产者所在节点宕机的情况下,得出如下结论:
1.高可用架构的ActiveMQ集群,在生产消息的过程中生产者所在节点挂掉,客户端会暂时阻塞无法发送消息,但整体可用性不受影响。
2.高可用架构的ActiveMQ集群,在消息生产者所在节点挂掉后,消费者仍可正常消费消息
3.当前ActiveMQ集群若其中一个节点挂掉,ActiveMQ正常提供服务,不影响服务可用性

3.负载均衡测试

最终的架构就是两个master-slave集群相互连通,两个集群可以相互消费对方的消息,但是如果客户端所连接的集群挂掉客户端依然是不能发送消息的,也就是说activemq的负载均衡只是做到消费的负载均衡,高可用是靠master-slave来保证的。

3.1测试用例

1.启动两个消费者监听相同的queue,且服务地址均配置集群所有节点
2.生产者连接集群向指定的queue连续发送20条消息
3.观察两个生产者消费消息的日志

3.2测试代码

1.生产者发布消息

String url = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)";
String queueName = "queue-testqq";//要创建的消息名

//1.创建ConnectiongFactory,绑定地址
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
//2.创建Connection
Connection connection = factory.createConnection();
//3.启动连接
connection.start();
//4.创建会话
//第一个参数:是否开启事务
//第二个参数:消息是否自动确认
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建一个目标
Destination destination = session.createQueue(queueName);
//Queue queue = session.createQueue("test-Queue");
//6.创建一个生产者
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 20; i++) {
//7.创建消息
TextMessage textMessage = session.createTextMessage("我是第一组消息生产者:" + i);
//8.发送消息
producer.send(textMessage);
System.out.println("发送第一组消息:" + i);
}
connection.close();

2.消费者接收消费消息

String url = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618," +
"tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)";
String queueName = "queue-testqq";//要创建的消息名称
//1.创建ConnectiongFactory,绑定地址
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
//2.创建Connection
Connection connection = factory.createConnection();
//3.启动连接
connection.start();
//4.创建会话
/** 第一个参数,是否使用事务
如果设置true,操作消息队列后,必须使用 session.commit();
如果设置false,操作消息队列后,不使用session.commit();
*/
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建一个目标
Destination destination = session.createQueue(queueName);
//6.创建一个消费者
MessageConsumer consumer = session.createConsumer(destination);
//7.创建一个监听器
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message arg0) {
TextMessage textMessage = (TextMessage) arg0;
try {
/* if (textMessage.getText().contains("10")) {
System.out.println("======消息异常=======");
throw new Exception();
}*/
System.out.println("接收消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});

3.3测试过程

  1. 同时开启2个消费者,再运行生产者,观察每个消费者控制台的输出。
    image

image

  1. 再运行生产者,观察每个消费者控制台的输出。
    生产者发布消息情况:

image
消费者1接收消息情况:
image
消费者2接收消息情况:
image
观察发现,多个消费者的情况下消息会被均分,即负载均衡策略。且同一条消息只会被一个消费者会接收消费。

3.4测试结论

经测试,当前集群在多个消费者消费相同队列的情况下,可以实现消息消费的负载均衡,从而实现ActiveMQ集群的分流,提高集群吞吐率。

到这里,ActiveMQ集群的功能测试、高可用测试及负载均衡测试已完成,当前ActiveMQ集群高可用+负载均衡功能正常。

相关实践学习
SLB负载均衡实践
本场景通过使用阿里云负载均衡 SLB 以及对负载均衡 SLB 后端服务器 ECS 的权重进行修改,快速解决服务器响应速度慢的问题
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
相关文章
|
1月前
|
SQL 分布式计算 NoSQL
大数据-170 Elasticsearch 云服务器三节点集群搭建 测试运行
大数据-170 Elasticsearch 云服务器三节点集群搭建 测试运行
41 4
|
1月前
|
分布式计算 Hadoop Shell
Hadoop-35 HBase 集群配置和启动 3节点云服务器 集群效果测试 Shell测试
Hadoop-35 HBase 集群配置和启动 3节点云服务器 集群效果测试 Shell测试
70 4
|
1月前
|
存储 NoSQL 大数据
大数据-51 Redis 高可用方案CAP-AP 主从复制 一主一从 全量和增量同步 哨兵模式 docker-compose测试
大数据-51 Redis 高可用方案CAP-AP 主从复制 一主一从 全量和增量同步 哨兵模式 docker-compose测试
34 3
|
1月前
|
分布式计算 大数据 Spark
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
40 1
|
1月前
|
分布式计算 监控 Hadoop
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
40 1
|
1月前
|
分布式计算 Hadoop Unix
Hadoop-28 ZooKeeper集群 ZNode简介概念和测试 数据结构与监听机制 持久性节点 持久顺序节点 事务ID Watcher机制
Hadoop-28 ZooKeeper集群 ZNode简介概念和测试 数据结构与监听机制 持久性节点 持久顺序节点 事务ID Watcher机制
42 1
|
1月前
|
存储 大数据 Apache
大数据-146 Apache Kudu 安装运行 Dockerfile 模拟集群 启动测试
大数据-146 Apache Kudu 安装运行 Dockerfile 模拟集群 启动测试
23 0
|
1月前
|
存储 SQL 分布式计算
大数据-135 - ClickHouse 集群 - 数据类型 实际测试
大数据-135 - ClickHouse 集群 - 数据类型 实际测试
38 0
|
1月前
|
存储 SQL 分布式计算
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(一)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(一)
42 0
|
2月前
|
Kubernetes Linux API
CentOS 7.6使用kubeadm部署k8s 1.17.2测试集群实战篇
该博客文章详细介绍了在CentOS 7.6操作系统上使用kubeadm工具部署kubernetes 1.17.2版本的测试集群的过程,包括主机环境准备、安装Docker、配置kubelet、初始化集群、添加节点、部署网络插件以及配置k8s node节点管理api server服务器。
121 0
CentOS 7.6使用kubeadm部署k8s 1.17.2测试集群实战篇

热门文章

最新文章

下一篇
无影云桌面