开源jms服务ActiveMQ的负载均衡+高可用部署方案探索

本文涉及的产品
传统型负载均衡 CLB,每月750个小时 15LCU
EMR Serverless StarRocks,5000CU*H 48000GB*H
应用型负载均衡 ALB,每月750个小时 15LCU
简介:

最近公司做项目需要用到jms消息服务,最终选择了apache的activemq这个开源消息总线,但是在activemq的官网没能找到既满足高可用又满足集群部署的方案,所以探索了其集群+高可用部署方案,经试用验证ok,这里和大家分享下。

一、架构和技术介绍

1、简介

ActiveMQ Apache出品,最流行的,能力强劲的开源消息总线。完全支持JMS1.1J2EE 1.4规范的 JMS Provider实现

2、activemq的特性

1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

2. 完全支持JMS1.1J2EE 1.4规范 (持久化,XA消息,事务)

3. Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性

4. 通过了常见J2EE服务器( Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resourceadaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE1.4商业服务器上

5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA

6. 支持通过JDBCjournal提供高速的消息持久化

7. 从设计上保证了高性能的集群,客户端-服务器,点对点

8. 支持Ajax

9. 支持与Axis的整合

10. 可以很容易得调用内嵌JMS provider,进行测试

3、下载和安装ActiveMQ

1、下载

ActiveMQ的最新版本是5.10.0,但由于我们内网下载存在问题,所以目前通过内网只能下载到5.9.0,下载地址:http://activemq.apache.org/activemq-590-release.html

2、安装

如果是在windows系统中运行,可以直接解压apache-activemq-5.9.0-bin.zip,并运行bin目录下的activemq.bat文件,此时使用的是默认的服务端口:61616和默认的console端口:8161

如果是在linuxunix下运行,在bin目录下执行命令:./activemq setup

3、修改ActiveMQ的服务端口和console端口

A、修改服务端口:打开conf/activemq.xml文件,修改以下红色字体部分

<transportConnectors>

<transportConnector name="openwire" uri="tcp://10.42.220.72:61618"discoveryUri="multicast://default"/>

</transportConnectors>

 

B、修改console的地址和端口:打开conf/jetty.xml文件,修改以下红色字体部分

<bean id="jettyPort"class="org.apache.activemq.web.WebConsolePort"init-method="start">

<property name="port" value="8162"/>

</bean>

4、通过客户端代码试用ActiveMQ

需要提前将activemq解压包中的lib目录下的相关包引入到工程中,再进行如下编码:

1、发送端的代码:

importjavax.jms.Connection;

importjavax.jms.ConnectionFactory;

importjavax.jms.DeliveryMode;

importjavax.jms.Destination;

importjavax.jms.MessageProducer;

importjavax.jms.Session;

importjavax.jms.TextMessage;

importorg.apache.activemq.ActiveMQConnection;

importorg.apache.activemq.ActiveMQConnectionFactory;

publicclass Sender {

privatestaticfinalintSEND_NUMBER = 5;

 

publicstaticvoid main(String[] args) {

// ConnectionFactory:连接工厂,JMS用它创建连接

ConnectionFactory connectionFactory;

// ConnectionJMS客户端到JMS Provider的连接

Connection connection = null;

// Session一个发送或接收消息的线程

Session session;

// Destination:消息的目的地;消息发送给谁.

Destination destination;

// MessageProducer:消息发送者

MessageProducer producer;

// TextMessage message;

//构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar

connectionFactory = new ActiveMQConnectionFactory(

ActiveMQConnection.DEFAULT_USER,

ActiveMQConnection.DEFAULT_PASSWORD,

"failover:(tcp://10.42.220.72:61617,tcp://10.42.220.72:61618)");

try {

//构造从工厂得到连接对象

connection =connectionFactory.createConnection();

//启动

connection.start();

//获取操作连接

session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

//获取session

destination = session.createQueue("FirstQueue");

//得到消息生成者【发送者】

producer =session.createProducer(destination);

//设置不持久化,此处学习,实际根据项目决定

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

//构造消息,此处写死,项目就是参数,或者方法获取

sendMessage(session, producer);

session.commit();

catch (Exception e) {

e.printStackTrace();

finally {

try {

if (null != connection)

connection.close();

catch (Throwable ignore) {

}

}

}

 

publicstaticvoid sendMessage(Session session,MessageProducer producer)

throws Exception {

for (int i = 1; i <=SEND_NUMBER; i++) {

TextMessage message = session

.createTextMessage("ActiveMq发送的消息" + i);

//发送消息到目的地方

System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);

producer.send(message);

}

}

}

 

2、接收端代码:

importjavax.jms.Connection;

importjavax.jms.ConnectionFactory;

importjavax.jms.Destination;

importjavax.jms.MessageConsumer;

importjavax.jms.Session;

importjavax.jms.TextMessage;

importorg.apache.activemq.ActiveMQConnection;

importorg.apache.activemq.ActiveMQConnectionFactory;

 

publicclass Receive {

publicstaticvoid main(String[] args) {

// ConnectionFactory:连接工厂,JMS用它创建连接

ConnectionFactory connectionFactory;

// ConnectionJMS客户端到JMS Provider的连接

Connection connection = null;

// Session一个发送或接收消息的线程

Session session;

// Destination:消息的目的地;消息发送给谁.

Destination destination;

//消费者,消息接收者

MessageConsumer consumer;

connectionFactory = new ActiveMQConnectionFactory(

ActiveMQConnection.DEFAULT_USER,

ActiveMQConnection.DEFAULT_PASSWORD,

"failover:(tcp://10.42.220.72:61617,tcp://10.42.220.72:61618)");

try {

//构造从工厂得到连接对象

connection =connectionFactory.createConnection();

//启动

connection.start();

//获取操作连接

session = connection.createSession(false,

Session.AUTO_ACKNOWLEDGE);

//获取session

destination = session.createQueue("FirstQueue");

consumer =session.createConsumer(destination);

while (true) {

//设置接收者接收消息的时间,为了便于测试,这里谁定为100s

TextMessage message =(TextMessage) consumer.receive(100000);

if (null != message) {

System.out.println("收到消息" + message.getText());

else {

break;

}

}

catch (Exception e) {

e.printStackTrace();

finally {

try {

if (null != connection)

connection.close();

catch (Throwable ignore) {

}

}

}

}

3、通过监控查看消息堆栈的记录:

登陆http://localhost:8162/admin/queues.jsp,默认的用户名和密码:admin/admin

二、ActiveMQ的多种部署方式

单点的ActiveMQ作为企业应用无法满足高可用和集群的需求,所以ActiveMQ提供了master-slavebroker cluster等多种部署方式,但通过分析多种部署方式之后我认为需要将两种部署方式相结合才能满足我们公司分布式和高可用的需求,所以后面就重点将解如何将两种部署方式相结合。

1、Master-Slave部署方式
1)shared filesystem Master-Slave部署方式

主要是通过共享存储目录来实现masterslave的热备,所有的ActiveMQ应用都在不断地获取共享目录的控制权,哪个应用抢到了控制权,它就成为master

多个共享存储目录的应用,谁先启动,谁就可以最早取得共享目录的控制权成为master,其他的应用就只能作为slave

2)shared database Master-Slave方式

shared filesystem方式类似,只是共享的存储介质由文件系统改成了数据库而已。

 

3)Replicated LevelDB Store方式

这种主备方式是ActiveMQ5.9以后才新增的特性,使用ZooKeeper协调选择一个node作为master。被选择的master broker node开启并接受客户端连接。

其他node转入slave模式,连接master并同步他们的存储状态。slave不接受客户端连接。所有的存储操作都将被复制到连接至Masterslaves

如果master死了,得到了最新更新的slave被允许成为masterfialed node能够重新加入到网络中并连接master进入slave mode。所有需要同步的disk的消息操作都将等待存储状态被复制到其他法定节点的操作完成才能完成。所以,如果你配置了replicas=3,那么法定大小是(3/2)+1=2. Master将会存储并更新然后等待 (2-1)=1slave存储和更新完成,才汇报success。至于为什么是2-1,熟悉Zookeeper的应该知道,有一个node要作为观擦者存在。

单一个新的master被选中,你需要至少保障一个法定node在线以能够找到拥有最新状态的node。这个node将会成为新的master。因此,推荐运行至少3replica nodes,以防止一个node失败了,服务中断。

2、Broker-Cluster部署方式

前面的Master-Slave的方式虽然能解决多服务热备的高可用问题,但无法解决负载均衡和分布式的问题。Broker-Cluster的部署方式就可以解决负载均衡的问题。

Broker-Cluster部署方式中,各个broker通过网络互相连接,并共享queue。当broker-A上面指定的queue-A中接收到一个message处于pending状态,而此时没有consumer连接broker-A时。如果cluster中的broker-B上面由一个consumer在消费queue-A的消息,那么broker-B会先通过内部网络获取到broker-A上面的message,并通知自己的consumer来消费。

1)static Broker-Cluster部署

activemq.xml文件中静态指定Broker需要建立桥连接的其他Broker

1、 首先在Broker-A节点中添加networkConnector节点:

<networkConnectors>

<networkConnector uri="static:(tcp:// 0.0.0.0:61617)"duplex="false"/>

</networkConnectors>

2、 修改Broker-A节点中的服务提供端口为61616

<transportConnectors>

<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

</transportConnectors>

3、 Broker-B节点中添加networkConnector节点:

<networkConnectors>

<networkConnector uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>

</networkConnectors>

4、 修改Broker-A节点中的服务提供端口为61617

<transportConnectors>

<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

</transportConnectors>

5、分别启动Broker-ABroker-B

2)Dynamic Broker-Cluster部署

activemq.xml文件中不直接指定Broker需要建立桥连接的其他Broker,由activemq在启动后动态查找:

1、 首先在Broker-A节点中添加networkConnector节点:

<networkConnectors>

<networkConnectoruri="multicast://default"

dynamicOnly="true"

networkTTL="3"

prefetchSize="1"

decreaseNetworkConsumerPriority="true" />

</networkConnectors>

2、修改Broker-A节点中的服务提供端口为61616

<transportConnectors>

<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616? " discoveryUri="multicast://default"/>

</transportConnectors>

3、在Broker-B节点中添加networkConnector节点:

<networkConnectors>

<networkConnectoruri="multicast://default"

dynamicOnly="true"

networkTTL="3"

prefetchSize="1"

decreaseNetworkConsumerPriority="true" />

</networkConnectors>

4、修改Broker-B节点中的服务提供端口为61617

<transportConnectors>

<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617" discoveryUri="multicast://default"/>

</transportConnectors>

5、启动Broker-ABroker-B

2、Master-Slave与Broker-Cluster相结合的部署方式

可以看到Master-Slave的部署方式虽然解决了高可用的问题,但不支持负载均衡,Broker-Cluster解决了负载均衡,但当其中一个Broker突然宕掉的话,那么存在于该Broker上处于Pending状态的message将会丢失,无法达到高可用的目的。

由于目前ActiveMQ官网上并没有一个明确的将两种部署方式相结合的部署方案,所以我尝试者把两者结合起来部署:

1、部署的配置修改

这里以Broker-A + Broker-B建立clusterBroker-C作为Broker-Bslave为例:

1)首先在Broker-A节点中添加networkConnector节点:

<networkConnectors>

<networkConnector uri="masterslave:(tcp://0.0.0.0:61617,tcp:// 0.0.0.0:61618)" duplex="false"/>

</networkConnectors>

2)修改Broker-A节点中的服务提供端口为61616

<transportConnectors>

<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

</transportConnectors>

3)Broker-B节点中添加networkConnector节点:

<networkConnectors>

<networkConnector uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>

</networkConnectors>

4)修改Broker-B节点中的服务提供端口为61617

<transportConnectors>

<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

</transportConnectors>

5)修改Broker-B节点中的持久化方式:

<persistenceAdapter>

<kahaDB directory="/localhost/kahadb"/>

</persistenceAdapter>

6)Broker-C节点中添加networkConnector节点:

<networkConnectors>

<networkConnector uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>

</networkConnectors>

7)修改Broker-C节点中的服务提供端口为61618

<transportConnectors>

<transportConnectorname="openwire"uri="tcp://0.0.0.0:61618?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

</transportConnectors>

8)修改Broker-B节点中的持久化方式:

<persistenceAdapter>

<kahaDB directory="/localhost/kahadb"/>

</persistenceAdapter>

9)分别启动broker-Abroker-Bbroker-C,因为是broker-B先启动,所以“/localhost/kahadb”目录被lock住,broker-C将一直处于挂起状态,当人为停掉broker-B之后,broker-C将获取目录“/localhost/kahadb”的控制权,重新与broker-A组成cluster提供服务。


相关实践学习
SLB负载均衡实践
本场景通过使用阿里云负载均衡 SLB 以及对负载均衡 SLB 后端服务器 ECS 的权重进行修改,快速解决服务器响应速度慢的问题
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
目录
相关文章
|
17天前
|
负载均衡 Kubernetes 区块链
随机密码生成器+阿里k8s负载均衡型服务加证书方法+移动终端设计+ico生成器等
随机密码生成器+阿里k8s负载均衡型服务加证书方法+移动终端设计+ico生成器等
43 1
|
2月前
|
存储 设计模式 缓存
OpenFeign集成Ribbon负载均衡-过滤和选择服务核心实现
该文章主要介绍了如何在OpenFeign中集成Ribbon以实现负载均衡,并详细分析了Ribbon中服务选择和服务过滤的核心实现过程。文章还涉及了Ribbon中负载均衡器(ILoadBalancer)和负载均衡策略(IRule)的初始化方式。
OpenFeign集成Ribbon负载均衡-过滤和选择服务核心实现
|
2月前
|
负载均衡 jenkins 应用服务中间件
大规模部署下的 Jenkins 高可用性与负载均衡
【8月更文第31天】随着软件开发流程的加速,持续集成/持续交付(CI/CD)工具的重要性日益凸显。Jenkins 作为最受欢迎的 CI/CD 平台之一,为企业提供了强大的自动化构建和部署功能。然而,在大规模部署场景下,单一的 Jenkins 实例可能无法满足高可用性和性能的需求。本文将探讨如何设计和实施 Jenkins 高可用集群,以支持大型组织的需求,并通过负载均衡技术来提高系统的稳定性和响应速度。
189 0
|
2月前
|
负载均衡 Kubernetes 开发工具
k8s相关服务与负载均衡
k8s相关服务与负载均衡
38 0
|
3月前
|
负载均衡 监控 算法
|
2月前
|
负载均衡 Dubbo 算法
Dubbo服务负载均衡原理
该文章主要介绍了Dubbo服务负载均衡的原理,包括Dubbo中负载均衡的实现位置、为什么需要负载均衡机制、Dubbo支持的负载均衡算法以及随机负载均衡策略的源码分析。
|
3月前
|
负载均衡 监控 Kubernetes
Service Mesh 是一种用于处理服务间通信的基础设施层,它通常与微服务架构一起使用,以提供诸如服务发现、负载均衡、熔断、监控、追踪和安全性等功能。
Service Mesh 是一种用于处理服务间通信的基础设施层,它通常与微服务架构一起使用,以提供诸如服务发现、负载均衡、熔断、监控、追踪和安全性等功能。
|
3月前
|
负载均衡 NoSQL 应用服务中间件
搭建高可用及负载均衡的Redis
【7月更文挑战第10天】
109 1
|
3月前
|
Kubernetes Cloud Native 微服务
企业级容器部署实战:基于ACK与ALB灵活构建云原生应用架构
这篇内容概述了云原生架构的优势,特别是通过阿里云容器服务Kubernetes版(ACK)和应用负载均衡器(ALB)实现的解决方案。它强调了ACK相对于自建Kubernetes的便利性,包括优化的云服务集成、自动化管理和更强的生态系统支持。文章提供了部署云原生应用的步骤,包括一键部署和手动部署的流程,并指出手动部署更适合有技术背景的用户。作者建议在预算允许的情况下使用ACK,因为它能提供高效、便捷的管理体验。同时,文章也提出了对文档改进的建议,如添加更多技术细节和解释,以帮助用户更好地理解和实施解决方案。最后,展望了ACK未来在智能化、安全性与边缘计算等方面的潜在发展。水文一篇,太忙了,见谅!
|
3月前
|
负载均衡 安全 Cloud Native
云上负载均衡:构建高可用、高性能的网络应用架构
与云原生技术深度融合:随着云原生技术的普及和发展未来的云上负载均衡将更加紧密地与云原生技术相结合。例如与Kubernetes等容器编排平台集成实现自动化的服务发现和路由管理;与Serverless架构结合提供无缝的流量接入和请求处理能力。 安全性能提升:面对日益严峻的网络安全威胁云上负载均衡将更加注重安全性能的提升。通过引入加密传输、访问控制、DDoS防护等安全措施确保网络流量的安全性和隐私性;同时还将建立完善的安全监控和应急响应机制以应对各种安全事件和突发事件。 支持多协议和多场景:未来的云上负载均衡将支持更多种类的网络协议和应用场景以满足不同用户和业务的需求。例如支持HTTP/2、
193 0