在分布式系统中,为了保证数据的一致性,往往需要进行同步控制,比如减库存、唯一流水号生成等。Curator对Zookeeper进行了封装,实现了分布式锁的功能,提供了线程的同步控制。同时,Curator也提供了多种锁机制。下面对通过时间戳生成流水号的场景进行逐步分析。
普通示例
先看一个简单的程序:
package com.secbro.learn.curator; import java.text.SimpleDateFormat; import java.util.Date; /** * Created by zhuzs on 2017/5/4. */ public class CreateOrderNo { public static void main(String[] args) { for(int i=0; i< 10; i++){ SimpleDateFormat sdf = new SimpleDateFormat("yyyyDDmm HH:mm:ss|SSS"); String orderNo = sdf.format(new Date()); System.out.println(orderNo); } } }
以上代码通过一个循环连续打印出10个时间戳。这里没有使用多线程,但分析下面的打印结果就会发现,其实在同一时刻会生成多个相同的流水号,运行时间在毫秒级别。
201712457 18:57:29|262 201712457 18:57:29|263 201712457 18:57:29|263 201712457 18:57:29|263 201712457 18:57:29|263 201712457 18:57:29|263 201712457 18:57:29|263 201712457 18:57:29|263 201712457 18:57:29|263 201712457 18:57:29|264
如果业务量不大,没有并发情况,上面生成的流水号重复的可能性不大,一旦出现高并发,那么重复的订单号就会大量出现,当然也有其他方案进行解决,本篇文章就不再进行讨论。下说说如何通过分布式锁来解决此问题。
分布式锁示例
下面的代码利用Curator的分布式锁来实现在同一时刻只会生成一个唯一的流水号。
package com.secbro.learn.curator; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.CountDownLatch; /** * Created by zhuzs on 2017/5/4. */ public class CreateOrderNoWithZK { private static final String path = "/lock_path"; public static void main(String[] args) { CuratorFramework client = getClient(); final InterProcessMutex lock = new InterProcessMutex(client, path); final CountDownLatch countDownLatch = new CountDownLatch(1); final long startTime = new Date().getTime(); for (int i = 0; i < 10; i++) { new Thread(new Runnable() { @Override public void run() { try { countDownLatch.await(); lock.acquire(); } catch (Exception e) { e.printStackTrace(); } SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd HH:mm:ss|SSS"); System.out.println(sdf.format(new Date())); try { lock.release(); } catch (Exception e) { e.printStackTrace(); } System.out.println("显示此线程大概花费时间(等待+执行):" + (new Date().getTime() - startTime) + "ms"); } }).start(); } System.out.println("创建线程花费时间:" + (new Date().getTime() - startTime) + "ms"); countDownLatch.countDown(); } private static CuratorFramework getClient() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") .retryPolicy(retryPolicy) .sessionTimeoutMs(6000) .connectionTimeoutMs(3000) .namespace("demo") .build(); client.start(); return client; } }
打印结果为:
创建线程花费时间:1ms 20170504 19:02:13|178 显示此线程大概花费时间(等待+执行):210ms 20170504 19:02:13|386 显示此线程大概花费时间(等待+执行):416ms 20170504 19:02:13|574 显示此线程大概花费时间(等待+执行):629ms 20170504 19:02:13|660 显示此线程大概花费时间(等待+执行):678ms 20170504 19:02:13|769 显示此线程大概花费时间(等待+执行):787ms 20170504 19:02:13|804 显示此线程大概花费时间(等待+执行):814ms 20170504 19:02:13|851 显示此线程大概花费时间(等待+执行):881ms 20170504 19:02:13|899 显示此线程大概花费时间(等待+执行):927ms 20170504 19:02:13|946 显示此线程大概花费时间(等待+执行):955ms 20170504 19:02:13|976 显示此线程大概花费时间(等待+执行):993ms
仔细观察可发现,通过多线程的访问,打印的时间戳却是唯一的。这里使用InterProcessMutex类来进行处理分布式锁,实现了一个生产唯一流水号的功能。
注意事项
在上面的代码中,打印了每步操作的时间,其中访问的zookeeper服务器是远程服务器。从打印的时间我们可以看出,通过这种方式生成唯一流水号并不能支撑很大的并发量。每次操作都需要通过网络访问,zookeeper的节点操作等,会花费大量的时间。另外,由于精确到毫秒,因此一秒钟最多也只能处理999个请求。
同时,在分布式环境中上面的示例还是会出现重复的可能性的,比如两个服务器的时间不一致,即两个服务器相差10ms,恰好第一个执行完,第二个执行的间隙也是10ms,那么第二个生成的订单号还是有可能跟第一个重复的,虽然这种概率及其小。
以上通过示例演示了Curator的分布式锁功能,根据具体的业务需求可选择不同的业务场景来使用。