一、前言
前段时间出去耍,没得时间写博客,最近没有什么系统性的文章,写一些实际工作中遇到的问题。我个人项目在公司服务器是部署到两台机子上,但是在消费资源的时候,出现数据同步问题。我们原来用的是class的实例做同步出来,这个实例对于单台机子拥有,对两台机子就失去作用了。这个时候就需要分布式锁实现同步处理。
二、redis 实现分布式锁
首先我想分享一点,当然在网上看的redis 的源码,自己没有仔细分析,redis 的同步是支持多进程多线程的,最后都是用文件来做同步,这样就摆脱单线程问题。既然它本身解决了分布式锁的问题,我们就可以用它来解决我们事物同步问题。对于redis不了解的同学不重要,毕竟只是一个数据库,你不是dba一样用mysql是不是??
2.1 编写redis 锁
对于 redis锁的主要步骤如下,思路首先要清晰
- 首先初始化redis获取redis客户端jedis(或者SharedJedis)
- 制定锁定时间参数,锁等待与锁超时
- 编写锁方法
- 关闭锁
注:再次我们就要贴出代码了, 代理里面前面的都是构造方法,最后两个方法才是关键(申明:代码来自网络)
package com.github.jedis.lock; import redis.clients.jedis.Jedis; /** * Redis distributed lock implementation. */ public class JedisLock { Jedis jedis; /** * Lock key path. */ String lockKey; /** * Lock expiration in miliseconds. * 锁超时,防止线程在入锁以后,无限的执行等待 */ int expireMsecs = 60 * 1000; /** * Acquire timeout in miliseconds. * 锁等待,防止线程饥饿 */ int timeoutMsecs = 10 * 1000; boolean locked = false; /** * Detailed constructor with default acquire timeout 10000 msecs and lock expiration of 60000 msecs. * * @param jedis * @param lockKey lock key (ex. account:1, ...) */ public JedisLock(Jedis jedis, String lockKey) { this.jedis = jedis; this.lockKey = lockKey; } /** * Detailed constructor with default lock expiration of 60000 msecs. * * @param jedis * @param lockKey lock key (ex. account:1, ...) * @param timeoutMsecs acquire timeout in miliseconds (default: 10000 msecs) */ public JedisLock(Jedis jedis, String lockKey, int timeoutMsecs) { this(jedis, lockKey); this.timeoutMsecs = timeoutMsecs; } /** * Detailed constructor. * * @param jedis * @param lockKey lock key (ex. account:1, ...) * @param timeoutMsecs acquire timeout in miliseconds (default: 10000 msecs) * @param expireMsecs lock expiration in miliseconds (default: 60000 msecs) */ public JedisLock(Jedis jedis, String lockKey, int timeoutMsecs, int expireMsecs) { this(jedis, lockKey, timeoutMsecs); this.expireMsecs = expireMsecs; } /** * Detailed constructor with default acquire timeout 10000 msecs and lock expiration of 60000 msecs. * * @param lockKey lock key (ex. account:1, ...) */ public JedisLock(String lockKey) { this(null, lockKey); } /** * Detailed constructor with default lock expiration of 60000 msecs. * * @param lockKey lock key (ex. account:1, ...) * @param timeoutMsecs acquire timeout in miliseconds (default: 10000 msecs) */ public JedisLock(String lockKey, int timeoutMsecs) { this(null, lockKey, timeoutMsecs); } /** * Detailed constructor. * * @param lockKey lock key (ex. account:1, ...) * @param timeoutMsecs acquire timeout in miliseconds (default: 10000 msecs) * @param expireMsecs lock expiration in miliseconds (default: 60000 msecs) */ public JedisLock(String lockKey, int timeoutMsecs, int expireMsecs) { this(null, lockKey, timeoutMsecs, expireMsecs); } /** * @return lock key */ public String getLockKey() { return lockKey; } /** * Acquire lock. * * @return true if lock is acquired, false acquire timeouted * @throws InterruptedException in case of thread interruption */ public synchronized boolean acquire() throws InterruptedException { return acquire(jedis); } /** * Acquire lock. * * @param jedis * @return true if lock is acquired, false acquire timeouted * @throws InterruptedException in case of thread interruption */ public synchronized boolean acquire(Jedis jedis) throws InterruptedException { //锁等待,防止线程饥饿 int timeout = timeoutMsecs; while (timeout >= 0) { //锁超时,防止线程在入锁以后,无限的执行等待(为什么加1:因为可能有相同时间的操作,这样做不完美,但是实用) long expires = System.currentTimeMillis() + expireMsecs + 1; String expiresStr = String.valueOf(expires); //锁到期时间 if (jedis.setnx(lockKey, expiresStr) == 1) { // lock acquired locked = true; return true; } String currentValueStr = jedis.get(lockKey); //redis里的时间 if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) { //判断是否为空,不为空的情况下,如果被其他线程设置了值,则第二个条件判断是过不去的 // lock is expired String oldValueStr = jedis.getSet(lockKey, expiresStr); //获取上一个锁到期时间,并设置现在的锁到期时间, //只有一个线程才能获取上一个线上的设置时间,因为jedis.getSet是同步的 if (oldValueStr != null && oldValueStr.equals(currentValueStr)) { //如过这个时候,多个线程恰好都到了这里,但是只有一个线程的设置值和当前值相同,他才有权利获取锁 // lock acquired locked = true; return true; } } timeout -= 100; Thread.sleep(100); } return false; } /** * Acqurired lock release. */ public synchronized void release() { release(jedis); } /** * Acqurired lock release. */ public synchronized void release(Jedis jedis) { if (locked) { jedis.del(lockKey); locked = false; } } }
以上锁定原理:就是锁定一个键,时间为自己定义,为了防止其他操作修改值 特意判断如下
Long.parseLong(currentValueStr) < System.currentTimeMillis()保证了每次都是原来的值。
2.2 测试锁
package com.chinaso.phl.concurrent; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import com.github.jedis.lock.JedisLock; /** * @author come from network kk @date 2014-3-1 */ public class JedisLockTest { private static ExecutorService executor = Executors.newFixedThreadPool(100); /** * @param args * @author piaohailin * @date 2014-3-1 */ public static void main(String[] args) { // final JedisPool pool = new JedisPool("192.168.56.2", 6379); GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); final JedisPool pool = new JedisPool(poolConfig, "192.168.142.237", 6379, 3000); //最后一个参数为密码 final String key = "h"; for (int i = 0; i < 5; i++) { executor.execute(new Runnable() { @Override public void run() { try { Jedis jedis = pool.getResource(); JedisLock lock = new JedisLock(jedis, key, 3000, 10000); if (lock.acquire()) { //如果锁上了 try { Thread.sleep(1000); System.out.println(System.currentTimeMillis()); } catch (Exception e) { e.printStackTrace(); } finally { lock.release();//则解锁 } } } catch (Throwable t) { t.printStackTrace(); } } }); } } }
测试结果:
1459417921818 1459417922871 1459417923922
2.2.1 观看不能锁的情况
package com.chinaso.phl.concurrent; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import com.github.jedis.lock.JedisLock; /** * @author come from network kk @date 2014-3-1 */ public class JedisLockTest { private static ExecutorService executor = Executors.newFixedThreadPool(100); /** * @param args * @author piaohailin * @date 2014-3-1 */ public static void main(String[] args) { // final JedisPool pool = new JedisPool("192.168.56.2", 6379); GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); final JedisPool pool = new JedisPool(poolConfig, "192.168.142.237", 6379, 3000); //最后一个参数为密码 final String key = "h"; for (int i = 0; i < 3; i++) { executor.execute(new Runnable() { @Override public void run() { try { Jedis jedis = pool.getResource(); JedisLock lock = new JedisLock(jedis, key, 3000, 10000); if (true) { //如果锁上了 try { Thread.sleep(1000); System.out.println(System.currentTimeMillis()); } catch (Exception e) { e.printStackTrace(); } finally { lock.release();//则解锁 } } } catch (Throwable t) { t.printStackTrace(); } } }); } } }
测试结果
1459418004056 1459418004056 1459418004056