浅谈基于 Zookeeper 实现分布式锁对 Maxwell 完成高可用

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 浅谈基于 Zookeeper 实现分布式锁对 Maxwell 完成高可用


1. 背景

麦斯威尔CDC框架使用方法,但后来声称基于筏子的框架实现了很高的可用性,存在MySQL协议进行相关测试试验发现上的问题,然后还是通过性克隆这个框架,通过Zookeeper框架,完成对Maxwell的高可用。

2.原理

2.1.文字介绍

分布式服务通过在代码里约定的路径向动物园管理员中注册自己,注意这里注册需要临时有序的子节点,分布式服务根据自己注册完成的子节点的先后顺序,依次监听自己前置位的子等,当 1.变成子节点的时候消失,且 2. 自己为当前的 Zookeeper 路径下节点号的最小节点的时候,开启自己的服务端。

  • 应该是为了更好地服务于他人的陪伴
  • 临时的目的是为了当前设备由于停机机,能够从动物园管理员撤掉自己,给服务的“腾位置”

2.2. 图示介绍

315f449bc90beb026b8aa9308d979756.png

3.代码实现

3.1.修改pom文件

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.11.1</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.11.1</version>
</dependency>

3.2.修改框架入口类com.zendesk.maxwell.Maxwell的主要函数

public static void main(String[] args) {
  try {
   Logging.setupLogBridging();
   MaxwellConfig config = new MaxwellConfig(args);
   if ( config.log_level != null ) {
    Logging.setLevel(config.log_level);
   }
   final Maxwell maxwell = new Maxwell(config);
   Runtime.getRuntime().addShutdownHook(new Thread() {
    @Override
    public void run() {
     maxwell.terminate();
     StaticShutdownCallbackRegistry.invoke();
    }
   });
   LOGGER.info("Starting Maxwell. maxMemory: " + Runtime.getRuntime().maxMemory() + " bufferMemoryUsage: " + config.bufferMemoryUsage);
   /*
   if ( config.haMode ) {
    new MaxwellHA(maxwell, config.jgroupsConf, config.raftMemberID, config.clientID).startHA();
   } else {
    maxwell.start();
   }
    */
   if ( config.haMode ) {
    CuratorUtil curatorUtil = new CuratorUtil(config.zookeeperServers, config.sessionTimeoutMs, config.connectionTimeoutMs, config.baseSleepTimeMs, config.maxRetries);
    curatorUtil.highAvailable();
   }
   maxwell.start();
  } catch ( SQLException e ) {
   // catch SQLException explicitly because we likely don't care about the stacktrace
   LOGGER.error("SQLException: " + e.getLocalizedMessage());
   System.exit(1);
  } catch ( URISyntaxException e ) {
   // catch URISyntaxException explicitly as well to provide more information to the user
   LOGGER.error("Syntax issue with URI, check for misconfigured host, port, database, or JDBC options (see RFC 2396)");
   LOGGER.error("URISyntaxException: " + e.getLocalizedMessage());
   System.exit(1);
  } catch ( ServerException e ) {
   LOGGER.error("Maxwell couldn't find the requested binlog, exiting...");
   System.exit(2);
  } catch ( Exception e ) {
   e.printStackTrace();
   System.exit(1);
  }
 }

3.3.新增代码 com.zendesk.maxwell.util.CuratorUtil

package com.zendesk.maxwell.util;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
public class CuratorUtil {
 private String zookeeperServers;
 private int sessionTimeoutMs;
 private int connectionTimeoutMs;
 private int baseSleepTimeMs;
 private int maxRetries;
 private CuratorFramework client;
 private String lockPath = "/maxwell/ha/lock";
 private String leaderPath = "/maxwell/ha/leader";
 public CuratorUtil(String zookeeperServers,int sessionTimeoutMs,int connectionTimeoutMs,int baseSleepTimeMs,int maxRetries){
  this.zookeeperServers = zookeeperServers;
  this.sessionTimeoutMs = sessionTimeoutMs;
  this.connectionTimeoutMs = connectionTimeoutMs;
  this.baseSleepTimeMs = baseSleepTimeMs;
  this.maxRetries = maxRetries;
 }
 /*
  * 构造 zookeeper 客户端,并连接 zookeeper 集群
  */
 public void start(){
  ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(this.baseSleepTimeMs, this.maxRetries);
  client = CuratorFrameworkFactory.newClient(
    this.zookeeperServers,
    this.sessionTimeoutMs,
    this.connectionTimeoutMs,
    retryPolicy
  );
  client.start();
 }
 /*
  * 实现分布式锁
  */
 public void highAvailable(){
  // 1.连接 Zookeeper 客户端
  this.start();
  // 2.向 zookeeper 注册自己
  InterProcessMutex lock = new InterProcessMutex(client, lockPath);
  try {
   // 3.获取锁
   lock.acquire();
   // 4.将自己信息注册到 leader 路径
   client.create()
     .withMode(CreateMode.EPHEMERAL)
     .forPath(leaderPath);
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
}

3.4.修改代码com.zendesk.maxwell.MaxwellConfig

// 类新增属性
public String zookeeperServers;
public int sessionTimeoutMs;
public int connectionTimeoutMs;
public int baseSleepTimeMs;
public int maxRetries;
// 函数 MaxwellOptionParser 新增代码
parser.accepts( "zookeeper", "zookeeper servers support maxwell high available" )
    .withRequiredArg();
parser.accepts( "session_timeout_ms", "session timeout ms with zookeeper" )
    .withRequiredArg();
parser.accepts( "connection_timeout_ms", "connection timeout ms with zookeeper" )
    .withRequiredArg();
parser.accepts( "base_sleep_time_ms", "base sleep time ms if retry" )
    .withRequiredArg();
parser.accepts( "max_retries", "max retry times" )
    .withRequiredArg();
// 函数 setup 新增代码
this.haMode = fetchBooleanOption("ha", options, properties, false);
this.zookeeperServers = fetchStringOption("zookeeper",options, properties, null);
this.sessionTimeoutMs = fetchIntegerOption("session_timeout_ms",options, properties, 5000);
this.connectionTimeoutMs = fetchIntegerOption("connection_timeout_ms",options, properties, 5000);
this.baseSleepTimeMs = fetchIntegerOption("base_sleep_time_ms",options, properties, 5000);
this.maxRetries = fetchIntegerOption("max_retries",options, properties, 3);
if (haMode){
 if (zookeeperServers == null){
  LOGGER.warn("you must specify --zookeeper because you want to use maxwell in ha mode");
 }
}

4.说明

需要修改源代码是基于 1.29.2 完成对源代码的相关版本,使用高版本,按照相同的步骤对源代码进行修改。

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
1月前
|
监控 负载均衡 Cloud Native
ZooKeeper分布式协调服务详解:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入剖析ZooKeeper分布式协调服务原理,涵盖核心概念如Server、Client、ZNode、ACL、Watcher,以及ZAB协议在一致性、会话管理、Leader选举中的作用。讨论ZooKeeper数据模型、操作、会话管理、集群部署与管理、性能调优和监控。同时,文章探讨了ZooKeeper在分布式锁、队列、服务注册与发现等场景的应用,并在面试方面分析了与其它服务的区别、实战挑战及解决方案。附带Java客户端实现分布式锁的代码示例,助力提升面试表现。
423 2
|
1月前
|
监控 Dubbo 前端开发
快速入门分布式系统与Dubbo+zookeeper Demo
快速入门分布式系统与Dubbo+zookeeper Demo
428 0
|
1月前
|
监控 NoSQL Java
Zookeeper分布式锁
Zookeeper分布式锁
513 1
|
6天前
|
Shell 虚拟化
分布式系统详解--框架(Zookeeper-基本shell命令)
分布式系统详解--框架(Zookeeper-基本shell命令)
11 1
|
6天前
|
Java 网络安全
分布式系统详解--框架(Zookeeper-简介和集群搭建)
分布式系统详解--框架(Zookeeper-简介和集群搭建)
19 0
|
13天前
|
存储 监控 负载均衡
Zookeeper 详解:分布式协调服务的核心概念与实践
Zookeeper 详解:分布式协调服务的核心概念与实践
14 0
|
1月前
|
分布式计算 Ubuntu Hadoop
【分布式计算框架】hadoop全分布式及高可用搭建
【分布式计算框架】hadoop全分布式及高可用搭建
50 1
|
1月前
|
存储 大数据 Apache
深入理解ZooKeeper:分布式协调服务的核心与实践
【5月更文挑战第7天】ZooKeeper是Apache的分布式协调服务,确保大规模分布式系统中的数据一致性与高可用性。其特点包括强一致性、高可用性、可靠性、顺序性和实时性。使用ZooKeeper涉及安装配置、启动服务、客户端连接及执行操作。实际应用中,面临性能瓶颈、不可伸缩性和单点故障等问题,可通过水平扩展、集成其他服务和多集群备份来解决。理解ZooKeeper原理和实践,有助于构建高效分布式系统。
|
1月前
|
前端开发 JavaScript 算法
分布式系统的一致性级别划分及Zookeeper一致性级别分析
分布式系统的一致性级别划分及Zookeeper一致性级别分析
|
1月前
|
算法 Go 分布式数据库
构建高可用的分布式数据库集群:使用Go语言与Raft共识算法
随着数据量的爆炸式增长,单一数据库服务器已难以满足高可用性和可扩展性的需求。在本文中,我们将探讨如何使用Go语言结合Raft共识算法来构建一个高可用的分布式数据库集群。我们不仅会介绍Raft算法的基本原理,还会详细阐述如何利用Go语言的并发特性和网络编程能力来实现这一目标。此外,我们还将分析构建过程中可能遇到的挑战和解决方案,为读者提供一个完整的实践指南。

热门文章

最新文章