1. 背景
❝分布式服务通过在代码里约定的路径向动物园管理员中注册自己,注意这里注册需要「临时有序」的子节点,分布式服务根据自己注册完成的子节点的先后顺序,依次监听自己前置位的子等,当 1.「变成子节点的时候」消失,且 2. 自己为当前的 Zookeeper 路径下节点号的最小节点的时候,开启自己的服务端。
- 应该是为了更好地服务于他人的陪伴
- 临时的目的是为了当前设备由于停机机,能够从动物园管理员撤掉自己,给服务的“腾位置”
2.2. 图示介绍
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.11.1</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.11.1</version> </dependency>
public static void main(String[] args) { try { Logging.setupLogBridging(); MaxwellConfig config = new MaxwellConfig(args); if ( config.log_level != null ) { Logging.setLevel(config.log_level); } final Maxwell maxwell = new Maxwell(config); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { maxwell.terminate(); StaticShutdownCallbackRegistry.invoke(); } }); LOGGER.info("Starting Maxwell. maxMemory: " + Runtime.getRuntime().maxMemory() + " bufferMemoryUsage: " + config.bufferMemoryUsage); /* if ( config.haMode ) { new MaxwellHA(maxwell, config.jgroupsConf, config.raftMemberID, config.clientID).startHA(); } else { maxwell.start(); } */ if ( config.haMode ) { CuratorUtil curatorUtil = new CuratorUtil(config.zookeeperServers, config.sessionTimeoutMs, config.connectionTimeoutMs, config.baseSleepTimeMs, config.maxRetries); curatorUtil.highAvailable(); } maxwell.start(); } catch ( SQLException e ) { // catch SQLException explicitly because we likely don't care about the stacktrace LOGGER.error("SQLException: " + e.getLocalizedMessage()); System.exit(1); } catch ( URISyntaxException e ) { // catch URISyntaxException explicitly as well to provide more information to the user LOGGER.error("Syntax issue with URI, check for misconfigured host, port, database, or JDBC options (see RFC 2396)"); LOGGER.error("URISyntaxException: " + e.getLocalizedMessage()); System.exit(1); } catch ( ServerException e ) { LOGGER.error("Maxwell couldn't find the requested binlog, exiting..."); System.exit(2); } catch ( Exception e ) { e.printStackTrace(); System.exit(1); } }
3.3.新增代码 com.zendesk.maxwell.util.CuratorUtil
package com.zendesk.maxwell.util; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; public class CuratorUtil { private String zookeeperServers; private int sessionTimeoutMs; private int connectionTimeoutMs; private int baseSleepTimeMs; private int maxRetries; private CuratorFramework client; private String lockPath = "/maxwell/ha/lock"; private String leaderPath = "/maxwell/ha/leader"; public CuratorUtil(String zookeeperServers,int sessionTimeoutMs,int connectionTimeoutMs,int baseSleepTimeMs,int maxRetries){ this.zookeeperServers = zookeeperServers; this.sessionTimeoutMs = sessionTimeoutMs; this.connectionTimeoutMs = connectionTimeoutMs; this.baseSleepTimeMs = baseSleepTimeMs; this.maxRetries = maxRetries; } /* * 构造 zookeeper 客户端,并连接 zookeeper 集群 */ public void start(){ ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(this.baseSleepTimeMs, this.maxRetries); client = CuratorFrameworkFactory.newClient( this.zookeeperServers, this.sessionTimeoutMs, this.connectionTimeoutMs, retryPolicy ); client.start(); } /* * 实现分布式锁 */ public void highAvailable(){ // 1.连接 Zookeeper 客户端 this.start(); // 2.向 zookeeper 注册自己 InterProcessMutex lock = new InterProcessMutex(client, lockPath); try { // 3.获取锁 lock.acquire(); // 4.将自己信息注册到 leader 路径 client.create() .withMode(CreateMode.EPHEMERAL) .forPath(leaderPath); } catch (Exception e) { e.printStackTrace(); } } }
// 类新增属性 public String zookeeperServers; public int sessionTimeoutMs; public int connectionTimeoutMs; public int baseSleepTimeMs; public int maxRetries; // 函数 MaxwellOptionParser 新增代码 parser.accepts( "zookeeper", "zookeeper servers support maxwell high available" ) .withRequiredArg(); parser.accepts( "session_timeout_ms", "session timeout ms with zookeeper" ) .withRequiredArg(); parser.accepts( "connection_timeout_ms", "connection timeout ms with zookeeper" ) .withRequiredArg(); parser.accepts( "base_sleep_time_ms", "base sleep time ms if retry" ) .withRequiredArg(); parser.accepts( "max_retries", "max retry times" ) .withRequiredArg(); // 函数 setup 新增代码 this.haMode = fetchBooleanOption("ha", options, properties, false); this.zookeeperServers = fetchStringOption("zookeeper",options, properties, null); this.sessionTimeoutMs = fetchIntegerOption("session_timeout_ms",options, properties, 5000); this.connectionTimeoutMs = fetchIntegerOption("connection_timeout_ms",options, properties, 5000); this.baseSleepTimeMs = fetchIntegerOption("base_sleep_time_ms",options, properties, 5000); this.maxRetries = fetchIntegerOption("max_retries",options, properties, 3); if (haMode){ if (zookeeperServers == null){ LOGGER.warn("you must specify --zookeeper because you want to use maxwell in ha mode"); } }
❝需要修改源代码是基于 1.29.2 完成对源代码的相关版本,使用高版本,按照相同的步骤对源代码进行修改。