正文
一、分布式锁
我们知道,如果需要对某一个共享变量进行多线程同步访问的时候,可以使用我们学到的锁进行处理,使用synchronized关键字或者使用Lock锁,但是这些只是针对单个应用,也就是只能在同一个JVM生效。随着业务发展,单机应用已经不能满足我们的需要,我们需要集群,分布式,这个时候就需要考虑到分布式锁。
二、Zookeeper实现分布式锁的原理
根据zk临时节点的唯一性,当多个请求同时创建相同的节点,只要谁能够创建成功 谁就能够获取到锁。
在创建节点时,如果该节点已经被其他请求创建则进入等待。
只要能够创建节点成功,就认为获取到了锁,则开始进入到正常业务逻辑操作,其他没有获取锁进行等待;
正常业务逻辑流程执行完后,调用zk关闭连接方式释放锁,从而是其他的请求开始进入到获取锁的资源。
三、代码
package com.xiaojie.template; /** * 分布式锁 */ public abstract class DistributeLock { public void getLock() { //获取锁 if (tryLock()) { System.out.println(Thread.currentThread().getName() + "获取锁成功"); } else { //等待锁 waitLock(); //重新获取 getLock(); } } /** * 等待锁 */ protected abstract void waitLock(); /** * 尝试获取锁 */ protected abstract boolean tryLock(); /** * 释放锁 */ public abstract void unLock(); }
package com.xiaojie.template; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import java.util.concurrent.CountDownLatch; /** * 使用Zk获取分布式锁 */ public class ZkDistributeLock extends DistributeLock { //参数1 连接地址 private static final String ADDRESS = "192.168.139.154:2181,192.168.139.154:2182,192.168.139.154:2183"; // 参数2 zk超时时间 private static final int TIMEOUT = 5000; // 创建我们的zk连接 private ZkClient zkClient = new ZkClient(ADDRESS, TIMEOUT); /** * 共同的创建临时节点 */ private String lockPath = "/myLock"; private CountDownLatch countDownLatch = null; @Override protected void waitLock() { /** * 使用事件通知,如果有节点删除,则重新开始竞争锁 */ IZkDataListener iZkDataListener = new IZkDataListener() { @Override public void handleDataChange(String s, Object o) throws Exception { } @Override public void handleDataDeleted(String s) throws Exception { if (countDownLatch != null) { //如果删除节点,countDownLatch变为0,开始竞争锁 countDownLatch.countDown(); } } }; zkClient.subscribeDataChanges(lockPath, iZkDataListener); // 2.使用countDownLatch等待 if (countDownLatch == null) { countDownLatch = new CountDownLatch(1); System.out.println("开始等待锁。。。。。。。。。。"); } try { // 如果当前计数器不是为0 就一直等待 countDownLatch.await(); } catch (Exception e) { } // 3. 如果当前节点被删除的情况下,需要重新进入到获取锁 zkClient.unsubscribeDataChanges(lockPath, iZkDataListener); } /** * 尝试获取锁 */ @Override public boolean tryLock() { // 获取锁的思想:多个jvm同时创建临时节点,只要谁能够创建成功 谁能够获取到锁 try { zkClient.createEphemeral(lockPath); return true; } catch (Exception e) { return false; } } /** * 释放锁。关闭临时节点,其他资源去竞争锁 */ @Override public void unLock() { if (zkClient != null) { zkClient.close(); System.out.println(Thread.currentThread().getName() + ",释放了锁>>>"); } } }
package com.xiaojie.utils; /** * 雪花算法生成id */ public class SnowFlake { /** * 起始的时间戳 */ private final static long START_STMP = 1480166465631L; /** * 每一部分占用的位数 */ private final static long SEQUENCE_BIT = 12; //序列号占用的位数 private final static long MACHINE_BIT = 5; //机器标识占用的位数 private final static long DATACENTER_BIT = 5;//数据中心占用的位数 /** * 每一部分的最大值 */ private final static long MAX_DATACENTER_NUM = -1L ^ (-1L << DATACENTER_BIT); private final static long MAX_MACHINE_NUM = -1L ^ (-1L << MACHINE_BIT); private final static long MAX_SEQUENCE = -1L ^ (-1L << SEQUENCE_BIT); /** * 每一部分向左的位移 */ private final static long MACHINE_LEFT = SEQUENCE_BIT; private final static long DATACENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT; private final static long TIMESTMP_LEFT = DATACENTER_LEFT + DATACENTER_BIT; private long datacenterId; //数据中心 private long machineId; //机器标识 private long sequence = 0L; //序列号 private long lastStmp = -1L;//上一次时间戳 public SnowFlake() { } public SnowFlake(long datacenterId, long machineId) { if (datacenterId > MAX_DATACENTER_NUM || datacenterId < 0) { throw new IllegalArgumentException("datacenterId can't be greater than MAX_DATACENTER_NUM or less than 0"); } if (machineId > MAX_MACHINE_NUM || machineId < 0) { throw new IllegalArgumentException("machineId can't be greater than MAX_MACHINE_NUM or less than 0"); } this.datacenterId = datacenterId; this.machineId = machineId; } /** * 产生下一个ID * * @return */ public long nextId() { long currStmp = getNewstmp(); if (currStmp < lastStmp) { throw new RuntimeException("Clock moved backwards. Refusing to generate id"); } if (currStmp == lastStmp) { //相同毫秒内,序列号自增 sequence = (sequence + 1) & MAX_SEQUENCE; //同一毫秒的序列数已经达到最大 if (sequence == 0L) { currStmp = getNextMill(); } } else { //不同毫秒内,序列号置为0 sequence = 0L; } lastStmp = currStmp; return (currStmp - START_STMP) << TIMESTMP_LEFT //时间戳部分 | datacenterId << DATACENTER_LEFT //数据中心部分 | machineId << MACHINE_LEFT //机器标识部分 | sequence; //序列号部分 } private long getNextMill() { long mill = getNewstmp(); while (mill <= lastStmp) { mill = getNewstmp(); } return mill; } private long getNewstmp() { return System.currentTimeMillis(); } }
package com.xiaojie.utils; import com.xiaojie.template.DistributeLock; import com.xiaojie.template.ZkDistributeLock; public class IdGenerate implements Runnable { private DistributeLock distributeLock = new ZkDistributeLock(); private SnowFlake snowFlake = new SnowFlake(); @Override public void run() { String s = genId(); } public String genId() { try { distributeLock.getLock(); //获取到锁执行业务,获取不到等待 String id = String.valueOf(snowFlake.nextId()); System.out.println(">>>>>>>>>>>" + id); return id; } catch (Exception e) { e.printStackTrace(); } finally { distributeLock.unLock(); } return null; } }
public class Test { public static void main(String[] args) { // IdGenerate idGenerate = new IdGenerate(); for (int i = 0; i < 100; i++) { new Thread(new IdGenerate()).start(); } } }
参考:https://blog.csdn.net/weixin_44455476/article/details/105397576