浅谈基于 Zookeeper 实现分布式锁对 Maxwell 完成高可用

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
简介: 浅谈基于 Zookeeper 实现分布式锁对 Maxwell 完成高可用


1. 背景

麦斯威尔CDC框架使用方法,但后来声称基于筏子的框架实现了很高的可用性,存在MySQL协议进行相关测试试验发现上的问题,然后还是通过性克隆这个框架,通过Zookeeper框架,完成对Maxwell的高可用。

2.原理

2.1.文字介绍

分布式服务通过在代码里约定的路径向动物园管理员中注册自己,注意这里注册需要临时有序的子节点,分布式服务根据自己注册完成的子节点的先后顺序,依次监听自己前置位的子等,当 1.变成子节点的时候消失,且 2. 自己为当前的 Zookeeper 路径下节点号的最小节点的时候,开启自己的服务端。

  • 应该是为了更好地服务于他人的陪伴
  • 临时的目的是为了当前设备由于停机机,能够从动物园管理员撤掉自己,给服务的“腾位置”

2.2. 图示介绍

315f449bc90beb026b8aa9308d979756.png

3.代码实现

3.1.修改pom文件

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.11.1</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.11.1</version>
</dependency>

3.2.修改框架入口类com.zendesk.maxwell.Maxwell的主要函数

public static void main(String[] args) {
  try {
   Logging.setupLogBridging();
   MaxwellConfig config = new MaxwellConfig(args);
   if ( config.log_level != null ) {
    Logging.setLevel(config.log_level);
   }
   final Maxwell maxwell = new Maxwell(config);
   Runtime.getRuntime().addShutdownHook(new Thread() {
    @Override
    public void run() {
     maxwell.terminate();
     StaticShutdownCallbackRegistry.invoke();
    }
   });
   LOGGER.info("Starting Maxwell. maxMemory: " + Runtime.getRuntime().maxMemory() + " bufferMemoryUsage: " + config.bufferMemoryUsage);
   /*
   if ( config.haMode ) {
    new MaxwellHA(maxwell, config.jgroupsConf, config.raftMemberID, config.clientID).startHA();
   } else {
    maxwell.start();
   }
    */
   if ( config.haMode ) {
    CuratorUtil curatorUtil = new CuratorUtil(config.zookeeperServers, config.sessionTimeoutMs, config.connectionTimeoutMs, config.baseSleepTimeMs, config.maxRetries);
    curatorUtil.highAvailable();
   }
   maxwell.start();
  } catch ( SQLException e ) {
   // catch SQLException explicitly because we likely don't care about the stacktrace
   LOGGER.error("SQLException: " + e.getLocalizedMessage());
   System.exit(1);
  } catch ( URISyntaxException e ) {
   // catch URISyntaxException explicitly as well to provide more information to the user
   LOGGER.error("Syntax issue with URI, check for misconfigured host, port, database, or JDBC options (see RFC 2396)");
   LOGGER.error("URISyntaxException: " + e.getLocalizedMessage());
   System.exit(1);
  } catch ( ServerException e ) {
   LOGGER.error("Maxwell couldn't find the requested binlog, exiting...");
   System.exit(2);
  } catch ( Exception e ) {
   e.printStackTrace();
   System.exit(1);
  }
 }

3.3.新增代码 com.zendesk.maxwell.util.CuratorUtil

package com.zendesk.maxwell.util;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
public class CuratorUtil {
 private String zookeeperServers;
 private int sessionTimeoutMs;
 private int connectionTimeoutMs;
 private int baseSleepTimeMs;
 private int maxRetries;
 private CuratorFramework client;
 private String lockPath = "/maxwell/ha/lock";
 private String leaderPath = "/maxwell/ha/leader";
 public CuratorUtil(String zookeeperServers,int sessionTimeoutMs,int connectionTimeoutMs,int baseSleepTimeMs,int maxRetries){
  this.zookeeperServers = zookeeperServers;
  this.sessionTimeoutMs = sessionTimeoutMs;
  this.connectionTimeoutMs = connectionTimeoutMs;
  this.baseSleepTimeMs = baseSleepTimeMs;
  this.maxRetries = maxRetries;
 }
 /*
  * 构造 zookeeper 客户端,并连接 zookeeper 集群
  */
 public void start(){
  ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(this.baseSleepTimeMs, this.maxRetries);
  client = CuratorFrameworkFactory.newClient(
    this.zookeeperServers,
    this.sessionTimeoutMs,
    this.connectionTimeoutMs,
    retryPolicy
  );
  client.start();
 }
 /*
  * 实现分布式锁
  */
 public void highAvailable(){
  // 1.连接 Zookeeper 客户端
  this.start();
  // 2.向 zookeeper 注册自己
  InterProcessMutex lock = new InterProcessMutex(client, lockPath);
  try {
   // 3.获取锁
   lock.acquire();
   // 4.将自己信息注册到 leader 路径
   client.create()
     .withMode(CreateMode.EPHEMERAL)
     .forPath(leaderPath);
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
}

3.4.修改代码com.zendesk.maxwell.MaxwellConfig

// 类新增属性
public String zookeeperServers;
public int sessionTimeoutMs;
public int connectionTimeoutMs;
public int baseSleepTimeMs;
public int maxRetries;
// 函数 MaxwellOptionParser 新增代码
parser.accepts( "zookeeper", "zookeeper servers support maxwell high available" )
    .withRequiredArg();
parser.accepts( "session_timeout_ms", "session timeout ms with zookeeper" )
    .withRequiredArg();
parser.accepts( "connection_timeout_ms", "connection timeout ms with zookeeper" )
    .withRequiredArg();
parser.accepts( "base_sleep_time_ms", "base sleep time ms if retry" )
    .withRequiredArg();
parser.accepts( "max_retries", "max retry times" )
    .withRequiredArg();
// 函数 setup 新增代码
this.haMode = fetchBooleanOption("ha", options, properties, false);
this.zookeeperServers = fetchStringOption("zookeeper",options, properties, null);
this.sessionTimeoutMs = fetchIntegerOption("session_timeout_ms",options, properties, 5000);
this.connectionTimeoutMs = fetchIntegerOption("connection_timeout_ms",options, properties, 5000);
this.baseSleepTimeMs = fetchIntegerOption("base_sleep_time_ms",options, properties, 5000);
this.maxRetries = fetchIntegerOption("max_retries",options, properties, 3);
if (haMode){
 if (zookeeperServers == null){
  LOGGER.warn("you must specify --zookeeper because you want to use maxwell in ha mode");
 }
}

4.说明

需要修改源代码是基于 1.29.2 完成对源代码的相关版本,使用高版本,按照相同的步骤对源代码进行修改。

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
2月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
2月前
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
1月前
|
分布式计算 NoSQL Java
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
45 2
|
1月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
48 1
|
2月前
分布式-Zookeeper-数据订阅
分布式-Zookeeper-数据订阅
|
2月前
|
监控
分布式-Zookeeper-Zab协议
分布式-Zookeeper-Zab协议
|
1月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
49 0
|
2月前
|
Java
分布式-Zookeeper-分布式锁
分布式-Zookeeper-分布式锁
|
2月前
|
存储 负载均衡 算法
分布式-Zookeeper-Master选举
分布式-Zookeeper-Master选举
|
2月前
|
存储 负载均衡 Dubbo
分布式-Zookeeper(一)
分布式-Zookeeper(一)
下一篇
无影云桌面