一个Redis实现的分布式锁

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介:   import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.

 

 

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisTemplate;

import java.util.Random;
import java.util.concurrent.TimeUnit;

public class RedisLock implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisLock.class);
    public static final String REDIS_LOCK = "RedisLock:";


    private static final long DEFAULT_WAIT_LOCK_TIME_OUT = 60;//60s 有慢sql,超时时间设置长一点
    private static final long DEFAULT_EXPIRE = 80;//80s 有慢sql,超时时间设置长一点
    private String key;
    private RedisTemplate redisTemplate;

    public RedisLock(RedisTemplate redisTemplate,String key) {
        this.redisTemplate = redisTemplate;
        this.key = key;
    }

    /**
     * 等待锁的时间,单位为s
     *
     * @param key
     * @param timeout s
     * @param seconds
     */
    public boolean lock(String key, long timeout, TimeUnit seconds) {
        String lockKey = generateLockKey(key);
        long nanoWaitForLock = seconds.toNanos(timeout);
        long start = System.nanoTime();

        try {
            while ((System.nanoTime() - start) < nanoWaitForLock) {
                if (redisTemplate.getConnectionFactory().getConnection().setNX(lockKey.getBytes(), new byte[0])) {
                    redisTemplate.expire(lockKey, DEFAULT_EXPIRE, TimeUnit.SECONDS);//暂设置为80s过期,防止异常中断锁未释放
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("add RedisLock[{}].{}", key, Thread.currentThread());
                    }
                    return true;
                }
                TimeUnit.MILLISECONDS.sleep(1000 + new Random().nextInt(100));//加随机时间防止活锁
            }
        } catch (Exception e) {
            LOGGER.error("{}", e.getMessage(), e);
            unlock();
        }
        return false;
    }

    public void unlock() {
        try {
            String lockKey = generateLockKey(key);
            RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
            connection.del(lockKey.getBytes());
            connection.del(key.getBytes());
            connection.close();
        } catch (Exception e) {
            LOGGER.error("{}", e.getMessage(), e);
        }
    }

    private String generateLockKey(String key) {
        return String.format(REDIS_LOCK + "%s", key);
    }

    public boolean lock() {
        return lock(key, DEFAULT_WAIT_LOCK_TIME_OUT, TimeUnit.SECONDS);
    }

    @Override
    public void close(){
        try {
            String lockKey = generateLockKey(key);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("release RedisLock[" + lockKey + "].");
            }
            RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
            connection.del(lockKey.getBytes());
            connection.close();
        } catch (Exception e) {
            LOGGER.error("{}", e.getMessage(), e);
        }
    }
}

 

 

在高并发的使用场景下,如何让redis里的数据尽量保持一致,可以采用分布式锁。以分布式锁的方式来保证对临界资源的互斥读写。

   redis使用缓存作为分布式锁,性能非常强劲,在一些不错的硬件上,redis可以每秒执行10w次,内网延迟不超过1ms,足够满足绝大部分应用的锁定需求。

   redis常用的分布式锁的实现方式:

一、setbit / getbit

   用索引号为0的第一个比特位来表示锁定状态,其中:0表示未获得锁,1表示已获得锁。

   优势:简单;

   劣势:竞态条件(race condition),死锁。

   获得锁的过程至少需要两步:先getbit判断,后setbit上锁。由于不是原子操作,因此可能存在竞态条件;如果一个客户端使用setbit获取到锁,然后没来得及释放crash掉了,那么其他在等待的客户端将永远无法获得该锁,进而形成了死锁。所以这种形式不太适合实现分布式锁。

二、setnx / del / getset

  redis官网有一篇文章专门谈论了实现分布式锁的话题。基本的原则是:采用setnx尝试获取锁并判断是否获得了锁,setnx设置的值是它想占用锁的时间(预估):

  • 如返回1,则该客户端获得锁,把lock.foo的键值设置为时间值表示该键已被锁定,该客户端最后可以通过DEL lock.foo来释放该锁。
  • 如返回0,表明该锁已被其他客户端取得,这时我们可以先返回或进行重试等对方完成或等待锁超时。

  通过del删除key来释放锁。某个想获得锁的客户端,先采用setnx尝试获取锁,如果获取失败了,那么会通过get命令来获得锁的过期时间以判断该锁的占用是否过期。如果跟当前时间对比,发现过期,那么先执行del,然后执行setnx获取锁。如果整个流程就这样,可能会产生死锁,请参考下面的执行序列:

   所以,在高并发的场景下,如果检测到锁过期,不能简单地进行del并尝试通过setnx获得锁。我们可以通过getset命令来避免这个问题。来看看,如果存在一个用户user4,它通过调用getset命令如何避免这种情况的发生:

 getset设置的过期时间跟上面的setnx设置的相同:

   如果该命令返回的结果跟上一步通过get获得的过期时间一致,则说明这两步之间,没有新的客户端抢占了锁,则该客户端即获得锁。如果该命令返回的结果跟上一步通过get获得的过期时间不一致,则该锁可能已被其他客户端抢先获得,则本次获取锁失败。

   这种实现方式得益于getset命令的原子性,从而有效得避免了竞态条件。并且,通过将比对锁的过期时间作为获取锁逻辑的一部分,从而避免了死锁。

三、setnx / del / expire

   这是使用最多的实现方式:setnx的目的同上,用来实现尝试获取锁以及判断是否获取到锁的原子性,del删除key来释放锁,与上面不同的是,使用redis自带的expire命令来防止死锁(可能出现某个客户端获得了锁,但是crash了,永不释放导致死锁)。这算是一种比较简单但粗暴的实现方式:因为,不管实际的情况如何,当你设置expire之后,它一定会在那个时间点删除key。如何当时某个客户端已获得了锁,正在执行临界区内的代码,但执行时间超过了expire的时间,将会导致另一个正在竞争该锁的客户端也获得了该锁,这个问题下面还会谈到。

  我们来看一下宿舍锁的简单实现很简单:

通过一个while(true),在当前线程上进行阻塞等待,并通过一个计数器进行自减操作,防止永久等待。 

http://www.cnblogs.com/moonandstar08/p/5682822.html

 

多节点的部署中,对锁的控制,参考:

http://www.jeffkit.info/2011/07/1000/

直接贴上代码实现,同上一篇文章一样,都是基于AOP

定义注解,标志切入点:

package com.ns.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface RedisLock {
    /**
     * redis的key
     * @return
     */
    String value();
    /**
     * 持锁时间,单位毫秒,默认一分钟
     */
    long keepMills() default 60000;
    /**
     * 当获取失败时候动作
     */
    LockFailAction action() default LockFailAction.GIVEUP;
    
    public enum LockFailAction{
        /**
         * 放弃
         */
        GIVEUP,
        /**
         * 继续
         */
        CONTINUE;
    }
    /**
     * 睡眠时间,设置GIVEUP忽略此项
     * @return
     */
    long sleepMills() default 1000;
}

 

切面实现:

package com.redis.aop;
import java.lang.reflect.Method;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import com.ns.annotation.RedisLock;
import com.ns.annotation.RedisLock.LockFailAction;
import com.ns.redis.dao.base.BaseRedisDao;
@Aspect
public class RedisLockAspect extends BaseRedisDao<String, Long>{
  private static final Logger log = LoggerFactory.getLogger(RedisLockAspect.class);
  //execution(* com.ns..*(*,..)) and @within(com.ns.annotation.RedisLock)
  
  @Pointcut("execution(* com.ns..*(..)) && @annotation(com.ns.annotation.RedisLock)")
  private void lockPoint(){}
  @Around("lockPoint()")
  public Object arround(ProceedingJoinPoint pjp) throws Throwable{
    MethodSignature methodSignature = (MethodSignature) pjp.getSignature();
    Method method = methodSignature.getMethod();
    RedisLock lockInfo = method.getAnnotation(RedisLock.class);
    boolean lock = false;
    Object obj = null;
    while(!lock){
      long timestamp = System.currentTimeMillis()+lockInfo.keepMills();
      lock = setNX(lockInfo.value(), timestamp);
      //得到锁,已过期并且成功设置后旧的时间戳依然是过期的,可以认为获取到了锁(成功设置防止锁竞争)
      long now = System.currentTimeMillis();
      if(lock || ((now > getLock(lockInfo.value())) && (now > getSet(lockInfo.value(), timestamp)))){
        //得到锁,执行方法,释放锁
        log.info("得到锁...");
        obj = pjp.proceed();
        //不加这一行,对于只能执行一次的定时任务,时间差上不能保证另一个一定正好放弃
        if(lockInfo.action().equals(LockFailAction.CONTINUE)){
          delete(lockInfo.value());
        }
      }else{
        if(lockInfo.action().equals(LockFailAction.CONTINUE)){
          log.info("稍后重新请求锁...");
          Thread.currentThread().sleep(lockInfo.sleepMills());
        }else{
          log.info("放弃锁...");
          break;
        }
      }
    }
    return obj;
  }
  public boolean setNX(String key,Long value){
    return valueOperations.setIfAbsent(key, value);
  }
  public long getLock(String key){
    return valueOperations.get(key);
  }
  public Long getSet(String key,Long value){
    return valueOperations.getAndSet(key, value);
  }
  public void releaseLock(String key){
    delete(key);
  }
}

Python的一个实现

LOCK_TIMEOUT = 3
lock = 0
lock_timeout = 0
lock_key = 'lock.foo'

# 获取锁
while lock != 1:
    now = int(time.time())
    lock_timeout = now + LOCK_TIMEOUT + 1
    lock = redis_client.setnx(lock_key, lock_timeout)
    if lock == 1 or (now > int(redis_client.get(lock_key))) and now > int(redis_client.getset(lock_key, lock_timeout)):
        break
    else:
        time.sleep(0.001)

# 已获得锁
do_job()

# 释放锁
now = int(time.time())
if now < lock_timeout:
    redis_client.delete(lock_key)

http://blog.csdn.net/lihao21/article/details/49104695

以上有些代码只符合我现在的项目场景,根据实际需要进行调整

http://www.tuicool.com/articles/EzaM7by

 

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
2月前
|
数据采集 存储 数据可视化
分布式爬虫框架Scrapy-Redis实战指南
本文介绍如何使用Scrapy-Redis构建分布式爬虫系统,采集携程平台上热门城市的酒店价格与评价信息。通过代理IP、Cookie和User-Agent设置规避反爬策略,实现高效数据抓取。结合价格动态趋势分析,助力酒店业优化市场策略、提升服务质量。技术架构涵盖Scrapy-Redis核心调度、代理中间件及数据解析存储,提供完整的技术路线图与代码示例。
174 0
分布式爬虫框架Scrapy-Redis实战指南
|
3月前
|
NoSQL Java 中间件
【📕分布式锁通关指南 02】基于Redis实现的分布式锁
本文介绍了从单机锁到分布式锁的演变,重点探讨了使用Redis实现分布式锁的方法。分布式锁用于控制分布式系统中多个实例对共享资源的同步访问,需满足互斥性、可重入性、锁超时防死锁和锁释放正确防误删等特性。文章通过具体示例展示了如何利用Redis的`setnx`命令实现加锁,并分析了简化版分布式锁存在的问题,如锁超时和误删。为了解决这些问题,文中提出了设置锁过期时间和在解锁前验证持有锁的线程身份的优化方案。最后指出,尽管当前设计已解决部分问题,但仍存在进一步优化的空间,将在后续章节继续探讨。
589 131
【📕分布式锁通关指南 02】基于Redis实现的分布式锁
|
6天前
|
数据采集 存储 NoSQL
分布式爬虫去重:Python + Redis实现高效URL去重
分布式爬虫去重:Python + Redis实现高效URL去重
|
3月前
|
NoSQL Java Redis
Springboot使用Redis实现分布式锁
通过这些步骤和示例,您可以系统地了解如何在Spring Boot中使用Redis实现分布式锁,并在实际项目中应用。希望这些内容对您的学习和工作有所帮助。
240 83
|
2月前
|
NoSQL Redis
Redis分布式锁如何实现 ?
Redis分布式锁主要依靠一个SETNX指令实现的 , 这条命令的含义就是“SET if Not Exists”,即不存在的时候才会设置值。 只有在key不存在的情况下,将键key的值设置为value。如果key已经存在,则SETNX命令不做任何操作。 这个命令的返回值如下。 ● 命令在设置成功时返回1。 ● 命令在设置失败时返回0。 假设此时有线程A和线程B同时访问临界区代码,假设线程A首先执行了SETNX命令,并返回结果1,继续向下执行。而此时线程B再次执行SETNX命令时,返回的结果为0,则线程B不能继续向下执行。只有当线程A执行DELETE命令将设置的锁状态删除时,线程B才会成功执行S
|
3月前
|
缓存 NoSQL 搜索推荐
【📕分布式锁通关指南 03】通过Lua脚本保证redis操作的原子性
本文介绍了如何通过Lua脚本在Redis中实现分布式锁的原子性操作,避免并发问题。首先讲解了Lua脚本的基本概念及其在Redis中的使用方法,包括通过`eval`指令执行Lua脚本和通过`script load`指令缓存脚本。接着详细展示了如何用Lua脚本实现加锁、解锁及可重入锁的功能,确保同一线程可以多次获取锁而不发生死锁。最后,通过代码示例演示了如何在实际业务中调用这些Lua脚本,确保锁操作的原子性和安全性。
175 6
【📕分布式锁通关指南 03】通过Lua脚本保证redis操作的原子性
|
3月前
|
缓存 NoSQL 中间件
Redis,分布式缓存演化之路
本文介绍了基于Redis的分布式缓存演化,探讨了分布式锁和缓存一致性问题及其解决方案。首先分析了本地缓存和分布式缓存的区别与优劣,接着深入讲解了分布式远程缓存带来的并发、缓存失效(穿透、雪崩、击穿)等问题及应对策略。文章还详细描述了如何使用Redis实现分布式锁,确保高并发场景下的数据一致性和系统稳定性。最后,通过双写模式和失效模式讨论了缓存一致性问题,并提出了多种解决方案,如引入Canal中间件等。希望这些内容能为读者在设计分布式缓存系统时提供有价值的参考。感谢您的阅读!
167 6
Redis,分布式缓存演化之路
|
3月前
|
运维 NoSQL 算法
【📕分布式锁通关指南 04】redis分布式锁的细节问题以及RedLock算法原理
本文深入探讨了基于Redis实现分布式锁时遇到的细节问题及解决方案。首先,针对锁续期问题,提出了通过独立服务、获取锁进程自己续期和异步线程三种方式,并详细介绍了如何利用Lua脚本和守护线程实现自动续期。接着,解决了锁阻塞问题,引入了带超时时间的`tryLock`机制,确保在高并发场景下不会无限等待锁。最后,作为知识扩展,讲解了RedLock算法原理及其在实际业务中的局限性。文章强调,在并发量不高的场景中手写分布式锁可行,但推荐使用更成熟的Redisson框架来实现分布式锁,以保证系统的稳定性和可靠性。
107 0
【📕分布式锁通关指南 04】redis分布式锁的细节问题以及RedLock算法原理
|
7月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
5月前
|
存储 NoSQL Java
使用lock4j-redis-template-spring-boot-starter实现redis分布式锁
通过使用 `lock4j-redis-template-spring-boot-starter`,我们可以轻松实现 Redis 分布式锁,从而解决分布式系统中多个实例并发访问共享资源的问题。合理配置和使用分布式锁,可以有效提高系统的稳定性和数据的一致性。希望本文对你在实际项目中使用 Redis 分布式锁有所帮助。
353 5

热门文章

最新文章