我们引入中间件是为了解决我们应用场景中的某些特定问题
Zookeeer leader选举
在分布式计算中,leader election是很重要的一个功能,这个选举过程是这样子的:指派一个进程作为组织者,将任务分发给各节点。在任务开始前,哪个节点都不知道谁是leader或者coordinator。当选举算法开始执行后,每个节点最终会得到一个唯一的节点作为任务leader。除此之外,选举还经常会发生在leader意外宕机的情况下,新的leader要被选举出来。
接下来会介绍Curator基于Zookeeper封装的Leader选举工具类LeaderLatch与LeaderSelector的使用及原理分析
Leader Latch
利用临时有序节点
Leader Selector
利用-> curator-recepis 中锁的实现
zookeeper里面有leader和follower的概念,但是它本身也可以帮助其它进行leader选举,例如kafka、nacos。
那么怎么基于Zookeeper来实现选举呢?创建临时有序节点
那么具体的判断方式有哪些,比如基于最小的来判断(kafka)
Leader Latch 的实现其实是基于 临时有序节点,没有抢到的节点都会去监听它前一个节点,如果前一个节点删除以后,它会重新去抢主。
实际应用
public class QuartzJob extends QuartzJobBean{ @Override protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException { System.out.println("[QuartzJob]-----:"+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); } } @Configuration public class QuartzConfig { @Bean public JobDetail jobDetail(){ return JobBuilder.newJob(QuartzJob.class).storeDurably().build(); } @Bean public Trigger trigger(){ SimpleScheduleBuilder simpleScheduleBuilder= SimpleScheduleBuilder.simpleSchedule(). withIntervalInSeconds(1).repeatForever(); return TriggerBuilder.newTrigger().forJob(jobDetail()). withSchedule(simpleScheduleBuilder).build(); } }
单体应用下,实际上一个定时任务就启动了。如果多节点下呢?比如说复制一下,修改下端口,同时启动,发现两个项目都会同时执行。那么如何去解决呢?在这里可以用到leader选举,两个节点,谁是leader,谁有资格执行。
定时调度里面有一个 SchedulerFactoryBean ,我们可以继承这个 SchedulerFactoryBean,让它不自动启动,并且按照我们的意愿去启动
public class ZkSchedulerFactoryBean extends SchedulerFactoryBean{ private static CuratorFramework zkClient; private static String ZOOKEEPER_CONNECTION_STRING="192.168.216.128:2181"; private LeaderLatch leaderLatch; //leader选举的api private static final String LEADER_PATH="/leader"; Logger LOG= LoggerFactory.getLogger(ZkSchedulerFactoryBean.class); public ZkSchedulerFactoryBean() throws Exception { this.setAutoStartup(false); //设置为非自动启动 此时不会去启动默认的定时任务 leaderLatch=new LeaderLatch(getClient(),LEADER_PATH); leaderLatch.addListener(new ZkJobLeaderLatchListener(getIp(),this)); leaderLatch.start(); //表示当前节点参与到leader选举中来 } @Override protected void startScheduler(Scheduler scheduler, int startupDelay) throws SchedulerException { if(this.isAutoStartup()){//默认情况下,是true super.startScheduler(scheduler,startupDelay); } } @Override public void destroy() throws SchedulerException { CloseableUtils.closeQuietly(leaderLatch); super.destroy(); } //初始化连接 private CuratorFramework getClient(){ // 重试策略 RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000,3); zkClient = CuratorFrameworkFactory.builder(). connectString(ZOOKEEPER_CONNECTION_STRING).retryPolicy(retryPolicy).build(); zkClient.start(); return zkClient; } private String getIp(){ String host=null; try { host= InetAddress.getLocalHost().getHostAddress(); } catch (UnknownHostException e) { e.printStackTrace(); } return host; } class ZkJobLeaderLatchListener implements LeaderLatchListener{ private String ip; private SchedulerFactoryBean schedulerFactoryBean; public ZkJobLeaderLatchListener(String ip) { this.ip = ip; } public ZkJobLeaderLatchListener(String ip, SchedulerFactoryBean schedulerFactoryBean) { this.ip = ip; this.schedulerFactoryBean = schedulerFactoryBean; } @Override public void isLeader() { LOG.info("ip:{} 成为leader,执行scheduler~",ip); schedulerFactoryBean.setAutoStartup(true); schedulerFactoryBean.start(); //启动(抢占到leader的节点去执行任务) } @Override public void notLeader() { LOG.info("ip:{} 不是leader,停止scheduler~",ip); schedulerFactoryBean.setAutoStartup(false); schedulerFactoryBean.stop(); //启动(抢占到leader的节点去执行任务) } } }
此时再次修改QuartzConfig
@Configuration public class QuartzConfig { @Bean public ZkSchedulerFactoryBean schedulerFactoryBean() throws Exception { ZkSchedulerFactoryBean schedulerFactoryBean=new ZkSchedulerFactoryBean(); schedulerFactoryBean.setJobDetails(jobDetail()); schedulerFactoryBean.setTriggers(trigger()); return schedulerFactoryBean; } @Bean public JobDetail jobDetail(){ return JobBuilder.newJob(QuartzJob.class).storeDurably().build(); } @Bean public Trigger trigger(){ SimpleScheduleBuilder simpleScheduleBuilder= SimpleScheduleBuilder.simpleSchedule(). withIntervalInSeconds(1).repeatForever(); return TriggerBuilder.newTrigger().forJob(jobDetail()). withSchedule(simpleScheduleBuilder).build(); } }
此时运行两个节点,只有leader才可以执行定时任务,当leader节点停止的时候,此时非leader节点就会接收,然后执行定时任务。
Leader Latch和Leader Selector是Apache Curator提供的两种用于协调分布式系统中领导者选举的机制。
Leader Latch是一种简单的机制,它允许一个进程获得锁并成为领导者,直到它显式地释放该锁为止。一旦该进程释放了锁,其他进程可以争取获得领导者锁。这种机制适用于那些只允许一个进程作为领导者的场景。
Leader Selector是另一种机制,它允许多个进程在同一时间成为领导者。每个进程都会尝试获得领导者标记,并在成功后执行操作,直到它被另一个进程替代。这种机制适用于那些需要多个进程参与计算的场景,例如MapReduce等。
其实际应用有很多,比如:
- dubbo +zookeeper 注册中心、注册服务
- dubbo+ zookeeper 配置中心、元数据管理
- 实现分布式锁(Curator)
- leader选举(定时任务的互斥执行)
1.leader latch(kafka的leader选取基于此)
Kafka的leader选举机制是基于Leader Latch的。在Kafka集群中,每个分区都会有一个领导者(leader)和若干个副本(replica),领导者负责处理读写请求,而副本则用于数据冗余和故障转移。
当领导者出现故障或网络分区等情况时,Kafka需要进行新的领导者选举。这时,每个副本都可以争取成为新的领导者,但只有其中一个副本能够成功地获得领导者锁。获得领导者锁的副本将成为新的领导者,并负责处理读写请求,直到它自己出现故障或被替换为止。
Kafka使用ZooKeeper来实现领导者选举的机制。每个分区的领导者选举都会在ZooKeeper中创建一个临时节点,并竞争成为该节点的拥有者。只有拥有该节点的副本才能成为领导者,其他副本则成为副本。当领导者出现故障时,其对应的临时节点将被删除,触发新一轮的领导者选举过程。
2.leader selector
Zookeeper的实现原理分析
数据一致性模型
- 弱一致性模型
- 2pc协议
- 过半提交
zookeeper是一个顺序一致性模型。由于zookeeper设计出来是提供分布式锁服务,那么意味着它本身需要实现顺序一致性。顺序一致性是在分布式环境中实现分布式锁的基本要求,比如当一个多个程序来争抢锁,如果clientA获得锁以后,后续所有来争抢锁的程序看到的锁的状态都应该是被clientA 锁定了,而不是其他状态。
什么是顺序一致性呢?
在讲顺序一致性之前,咱们思考一个问题,假如说zookeeper是一个最终一致性模型,那么他会发生什么情况
ClientA/B/C假设只串行执行, clientA更新zookeeper上的一个值x。ClientB和clientC分别读取集群的不同副本,返回的x的值是不一样的。clientC的读取操作是发生在clientB之后,但是却读到了过期的值。很明显,这是一种弱一致模型。如果用它来实现锁机制是有问题的。
顺序一致性提供了更强的一致性保证,我们来观察下面这个图,从时间轴来看,B0发生在A0之前,读取的值是0,B2发生在A0之后,读取到的x的值为1.而读操作B1/C0/C1和写操作A0在时间轴上有重叠,因此他们可能读到旧的值为0,也可能读到新的值1. 但是在强顺序一致性模型中,如果B1得到的x的值为1,那么C1看到的值也一定是1.
需要注意的是:由于网络的延迟以及系统本身执行请求的不确定性,会导致请求发起的早的客户端不一定会在服务端执行得早。最终以服务端执行的结果为准。
简单来说:顺序一致性是针对单个操作,单个数据对象。属于CAP中C这个范畴。一个数据被更新后,能够立马被后续的读操作读到。
但是zookeeper的顺序一致性实现是缩水版的,zookeeper不保证在每个实例中,两个不同的客户端具有相同的zookeeper数据视图,由于网络延迟等因素,一个客户端可能会在另外一个客户端收到更改通知之前执行更新。
考虑到2个客户端A和B的场景,如果A把znode /a的值从0设置为1,然后告诉客户端B读取 /a, 则客户端B可能会读取到旧的值0,具体取决于他连接到那个服务器,如果客户端A和B要读取必须要读取到相同的值,那么client B在读取操作之前执行sync方法。 zooKeeper.sync();
具体详解:https://zookeeper.apache.org/doc/r3.6.1/zookeeperProgrammers.html#ch_zkGuarantees
ZAB
ZAB(Zookeeper Atomic Broadcast) 协议是为分布式协调服务 ZooKeeper 专门设计的一种支持崩溃恢复的原子广播协议。在 ZooKeeper 中,主要依赖 ZAB 协议来实现分布式数据一致性,基于该协议,ZooKeeper 实现了一种主备模式的系统架构来保持集群中各个副本之间的数据一致性。
- 崩溃恢复
- 原子广播
这个协议的本质
消息广播
其存在一个全局唯一的64位的事务id,zxid,zxid随着数据的更新也会不断地变化,zxid越大,也就决定着数据的新和旧。
上图是这样做的,leader会为每一个follower准备一个队列,其实在整个zookeeper中存在着很多的生产者消费者模型,其本质上是fifo队列,所有的请求都是基于这个队列去做一个数据同步,其会在每个消息中带一个zxid,它会将消息 与 zxid分发到所有的follower节点上,这个过程就叫做 propose事务提案。
follower节点收到这个提案的时候,会先将其写入本地磁盘,写入之后再返回ack,收到ack后会判断合法数量,如果超过了半数,就commit提交本地的事务
- Observer(不参与投票和ack,只和leader保持数据同步)
崩溃恢复
- 选举出新leader(选举谁作为leader)
- 数据同步
选举谁作为leader?需要怎么考虑?
- 已经被处理的消息不能丢失
当leader收到合法数量的ack之后,会向follower发送commit请求,follower会提交本地的事务。
但是如果在follower节点收到commit命令之前,leader挂掉了,也就是 server3没有commit,所以我们需要保证,已经被提交的消息不能被丢失。
所以如果server2想成为leader,那么必须发起一个commit给server3
- 被丢弃的消息不能再次出现
也就是没有提交的消息不能被使用。
深入理解Zookeeper系列-3.Zookeeper实现原理及Leader选举源码分析(下):https://developer.aliyun.com/article/1413780