Redis系列-10.Redis分布式锁(下)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: Redis系列-10.Redis分布式锁

Redis系列-10.Redis分布式锁(上):https://developer.aliyun.com/article/1414722


Lua保证原子性


前面有一个问题就是 finally块的判断 + del删除操作不是原子性的


为了保证原子性,需要启用lua脚本编写redis分布式锁判断 + 删除判断代码


Redis调用Lua脚本通过eval命令保证代码执行的原子性,直接用return返回脚本执行后的结果值


解决


@Service
@Slf4j
public class InventoryService
{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Value("${server.port}")
    private String port;
    private Lock lock = new ReentrantLock();
    public String sale()
    {
        String retMessage = "";
        String key = "zzyyRedisLock";
        String uuidValue = IdUtil.simpleUUID()+":"+Thread.currentThread().getId();
        while(!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue,30L,TimeUnit.SECONDS))
        {
            //暂停毫秒
                     try { TimeUnit.MILLISECONDS.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); }
        }
        try
        {
            //1 查询库存信息
            String result = stringRedisTemplate.opsForValue().get("inventory001");
            //2 判断库存是否足够
            Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);
            //3 扣减库存
            if(inventoryNumber > 0) {
                stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventoryNumber));
                retMessage = "成功卖出一个商品,库存剩余: "+inventoryNumber+"\t"+uuidValue;
                System.out.println(retMessage);
            }else{
                retMessage = "商品卖完了,o(╥﹏╥)o";
            }
        }finally {
            //V6.0 将判断+删除自己的合并为lua脚本保证原子性
            String luaScript =
                    "if (redis.call('get',KEYS[1]) == ARGV[1]) then " +
                        "return redis.call('del',KEYS[1]) " +
                    "else " +
                        "return 0 " +
                    "end";
            stringRedisTemplate.execute(new DefaultRedisScript<>(luaScript, Boolean.class), Arrays.asList(key), uuidValue);
        }
        return retMessage+"\t"+"服务端口号:"+port;
    }
}


可重入锁 + 设计模型


如何兼顾锁的可重入性问题呢?


可重入锁


可重入锁又名递归锁


是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提,锁对象得是同一个对象),不会因为之前已经获取过还没释放而阻塞。


如果是1个有 synchronized 修饰的递归调用方法,程序第2次进入被自己阻塞了岂不是天大的笑话,出现了作茧自缚。


所以Java中ReentrantLock和synchronized都是可重入锁,可重入锁的一个优点是可一定程度避免死锁。


可重入锁的种类


隐式锁(即synchronized关键字使用的锁)默认是可重入锁


指的是可重复可递归调用的锁,在外层使用锁之后,在内层仍然可以使用,并且不发生死锁,这样的锁就叫做可重入锁。


简单的来说就是:在一个synchronized修饰的方法或代码块的内部调用本类的其他synchronized修饰的方法或代码块时,是永远可以得到锁的


与可重入锁相反,不可重入锁不可递归调用,递归调用就发生死锁。

public class ReEntryLockDemo
{
    public static void main(String[] args)
    {
        final Object objectLockA = new Object();
        new Thread(() -> {
            synchronized (objectLockA)
            {
                System.out.println("-----外层调用");
                synchronized (objectLockA)
                {
                    System.out.println("-----中层调用");
                    synchronized (objectLockA)
                    {
                        System.out.println("-----内层调用");
                    }
                }
            }
        },"a").start();
    }
}
public class ReEntryLockDemo
{
    public synchronized void m1()
    {
        System.out.println("-----m1");
        m2();
    }
    public synchronized void m2()
    {
        System.out.println("-----m2");
        m3();
    }
    public synchronized void m3()
    {
        System.out.println("-----m3");
    }
    public static void main(String[] args)
    {
        ReEntryLockDemo reEntryLockDemo = new ReEntryLockDemo();
        reEntryLockDemo.m1();
    }
}


Synchronized重入的实现机理


每个锁对象拥有一个锁计数器和一个指向持有该锁的线程的指针。


当执行monitorenter时,如果目标锁对象的计数器为零,那么说明它没有被其他线程所持有,Java虚拟机会将该锁对象的持有线程设置为当前线程,并且将其计数器加1。


在目标锁对象的计数器不为零的情况下,如果锁对象的持有线程是当前线程,那么 Java 虚拟机可以将其计数器加1,否则需要等待,直至持有线程释放该锁。


**当执行monitorexit时,Java虚拟机则需将锁对象的计数器减1。**计数器为零代表锁已被释放。


lock/unlock配合可重入锁进行AQS源码分析


如果还是当前线程,则nextc = c + acquires


可重入锁的计数问题,redis中那个数据类型可以代替


K,K,V


Map>


命令


hset key field value


hset redis锁名字(zzyyRedisLock) 某个请求线程的UUID+ThreadID 加锁的次数


总结


setnx,只能解决有无的问题,够用但是不完美


hset,不但解决有无,还解决可重入


思考+设计


目前有两条支线,目的是保证同一个时刻只能有一个线程持有锁进去redis做扣减库存动作


  • 可重入
  • 扣减原子性


Lua脚本


redis命令过程分析



加锁lua脚本lock


先判断redis分布式锁这个key是否存在


EXISTS key


返回零说明不存在,hset新建当前线程属于自己的锁 BY UUID :ThreadID


HSET zzyyRedisLock 0c90d37cb6ec42268861b3d739f8b3a8:1 1


命令 key value = UUID:ThreadID 次数


返回一说明已经有锁,需进一步判断是不是当前线程自己的

返回零说明不是自己的


返回一说明是自己的锁,自增一次表示重入


上述设计修改为Lua脚本


if redis.call('exists','key') == 0 then
  redis.call('hset','key','uuid:threadid',1)
  redis.call('expire','key',30)
  return 1
elseif redis.call('hexists','key','uuid:threadid') == 1 then
  redis.call('hincrby','key','uuid:threadid',1)
  redis.call('expire','key',30)
  return 1
else
  return 0
end
相同部分是否可以替换处理???
hincrby命令可否替代hset命令
if redis.call('exists','key') == 0 or redis.call('hexists','key','uuid:threadid') == 1 then
  redis.call('hincrby','key','uuid:threadid',1)
  redis.call('expire','key',30)
  return 1
else
  return 0
end


测试



解锁lua脚本unlock


设计思路:有锁且还是自己的锁


HEXISTS key uuid:ThreadID


返回零,说明根本没有锁,程序块返回nil


不是零,说明有锁且是自己的锁,直接调用HINCRBY -1,表示每次减个一,解锁一次。直到它变为零表示可以删除该锁key。


全套流程

if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then
 return nil
elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then
 return redis.call('del',KEYS[1])
else
 return 0
end


测试全套流程



将上述Lua脚本整个进入微服务Java程序


通过实现JUC里面的Lock接口,实现Redis分布式锁RedisDistributedLock


RedisDistributedLock

public class RedisDistributedLock implements Lock
{
    private StringRedisTemplate stringRedisTemplate;
    private String lockName;//KEYS[1]
    private String uuidValue;//ARGV[1]
    private long   expireTime;//ARGV[2]
    public RedisDistributedLock(StringRedisTemplate stringRedisTemplate, String lockName)
    {
        this.stringRedisTemplate = stringRedisTemplate;
        this.lockName = lockName;
        this.uuidValue = IdUtil.simpleUUID()+":"+Thread.currentThread().getId();//UUID:ThreadID
        this.expireTime = 30L;
    }
    @Override
    public void lock()
    {
        tryLock();
    }
    @Override
    public boolean tryLock()
    {
        try {tryLock(-1L,TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}
        return false;
    }
    /**
     * 干活的,实现加锁功能,实现这一个干活的就OK,全盘通用
     * @param time
     * @param unit
     * @return
     * @throws InterruptedException
     */
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException{
        if(time != -1L){
            this.expireTime = unit.toSeconds(time);
        }
        String script =
                "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then " +
                        "redis.call('hincrby',KEYS[1],ARGV[1],1) " +
                        "redis.call('expire',KEYS[1],ARGV[2]) " +
                        "return 1 " +
                "else " +
                        "return 0 " +
                "end";
        System.out.println("script: "+script);
        System.out.println("lockName: "+lockName);
        System.out.println("uuidValue: "+uuidValue);
        System.out.println("expireTime: "+expireTime);
        while (!stringRedisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime))) {
            TimeUnit.MILLISECONDS.sleep(50);
        }
        return true;
    }
    /**
     *干活的,实现解锁功能
     */
    @Override
    public void unlock()
    {
        String script =
                "if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then " +
                "   return nil " +
                "elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then " +
                "   return redis.call('del',KEYS[1]) " +
                "else " +
                "   return 0 " +
                "end";
        // nil = false 1 = true 0 = false
        System.out.println("lockName: "+lockName);
        System.out.println("uuidValue: "+uuidValue);
        System.out.println("expireTime: "+expireTime);
        Long flag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime));
        if(flag == null)
        {
            throw new RuntimeException("This lock doesn't EXIST");
        }
    }
    //===下面的redis分布式锁暂时用不到=======================================
    //===下面的redis分布式锁暂时用不到=======================================
    //===下面的redis分布式锁暂时用不到=======================================
    @Override
    public void lockInterruptibly() throws InterruptedException
    {
    }
    @Override
    public Condition newCondition()
    {
        return null;
    }
}

这样设计的问题?

考虑扩展,本次是redis实现分布式锁,以后是zookeeper、mysql呢?


引入工厂模式改造


DistributedLockFactory

@Component
public class DistributedLockFactory
{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    private String lockName;
    public Lock getDistributedLock(String lockType)
    {
        if(lockType == null) return null;
        if(lockType.equalsIgnoreCase("REDIS")){
            lockName = "zzyyRedisLock";
            return new RedisDistributedLock(stringRedisTemplate,lockName);
        } else if(lockType.equalsIgnoreCase("ZOOKEEPER")){
            //TODO zookeeper版本的分布式锁实现
            return new ZookeeperDistributedLock();
        } else if(lockType.equalsIgnoreCase("MYSQL")){
            //TODO mysql版本的分布式锁实现
            return null;
        }
        return null;
    }
}

RedisDistributedLock

public class RedisDistributedLock implements Lock
{
    private StringRedisTemplate stringRedisTemplate;
    private String lockName;//KEYS[1]
    private String uuidValue;//ARGV[1]
    private long   expireTime;//ARGV[2]
    public RedisDistributedLock(StringRedisTemplate stringRedisTemplate, String lockName){
        this.stringRedisTemplate = stringRedisTemplate;
        this.lockName = lockName;
        this.uuidValue = IdUtil.simpleUUID()+":"+Thread.currentThread().getId();//UUID:ThreadID
        this.expireTime = 30L;
    }
    @Override
    public void lock(){
        tryLock();
    }
    @Override
    public boolean tryLock(){
        try {tryLock(-1L,TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}
        return false;
    }
    /**
     * 干活的,实现加锁功能,实现这一个干活的就OK,全盘通用
     * @param time
     * @param unit
     * @return
     * @throws InterruptedException
     */
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException{
        if(time != -1L){
            this.expireTime = unit.toSeconds(time);
        }
        String script =
                "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then " +
                        "redis.call('hincrby',KEYS[1],ARGV[1],1) " +
                        "redis.call('expire',KEYS[1],ARGV[2]) " +
                        "return 1 " +
                "else " +
                        "return 0 " +
                "end";
        System.out.println("script: "+script);
        System.out.println("lockName: "+lockName);
        System.out.println("uuidValue: "+uuidValue);
        System.out.println("expireTime: "+expireTime);
        while (!stringRedisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime))) {
            TimeUnit.MILLISECONDS.sleep(50);
        }
        return true;
    }
    /**
     *干活的,实现解锁功能
     */
    @Override
    public void unlock()
    {
        String script =
                "if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then " +
                "   return nil " +
                "elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then " +
                "   return redis.call('del',KEYS[1]) " +
                "else " +
                "   return 0 " +
                "end";
        // nil = false 1 = true 0 = false
        System.out.println("lockName: "+lockName);
        System.out.println("uuidValue: "+uuidValue);
        System.out.println("expireTime: "+expireTime);
        Long flag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime));
        if(flag == null)
        {
            throw new RuntimeException("This lock doesn't EXIST");
        }
    }
    //===下面的redis分布式锁暂时用不到=======================================
    //===下面的redis分布式锁暂时用不到=======================================
    //===下面的redis分布式锁暂时用不到=======================================
    @Override
    public void lockInterruptibly() throws InterruptedException
    {
    }
    @Override
    public Condition newCondition()
    {
        return null;
    }
}

InventoryService

@Service
@Slf4j
public class InventoryService
{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Value("${server.port}")
    private String port;
    @Autowired
    private DistributedLockFactory distributedLockFactory;
    public String sale()
    {
        String retMessage = "";
        Lock redisLock = distributedLockFactory.getDistributedLock("redis");
        redisLock.lock();
        try
        {
            //1 查询库存信息
            String result = stringRedisTemplate.opsForValue().get("inventory001");
            //2 判断库存是否足够
            Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);
            //3 扣减库存
            if(inventoryNumber > 0)
            {
                inventoryNumber = inventoryNumber - 1;
                stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(inventoryNumber));
                retMessage = "成功卖出一个商品,库存剩余: "+inventoryNumber+"\t服务端口:" +port;
                System.out.println(retMessage);
                return retMessage;
            }
            retMessage = "商品卖完了,o(╥﹏╥)o"+"\t服务端口:" +port;
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            redisLock.unlock();
        }
        return retMessage;
    }
}

进行可重入测试发现出现问题

DistributedLockFactory

@Component
public class DistributedLockFactory
{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    private String lockName;
    private String uuidValue;
    public DistributedLockFactory()
    {
        this.uuidValue = IdUtil.simpleUUID();//UUID
    }
    public Lock getDistributedLock(String lockType)
    {
        if(lockType == null) return null;
        if(lockType.equalsIgnoreCase("REDIS")){
            lockName = "zzyyRedisLock";
            return new RedisDistributedLock(stringRedisTemplate,lockName,uuidValue);
        } else if(lockType.equalsIgnoreCase("ZOOKEEPER")){
            //TODO zookeeper版本的分布式锁实现
            return new ZookeeperDistributedLock();
        } else if(lockType.equalsIgnoreCase("MYSQL")){
            //TODO mysql版本的分布式锁实现
            return null;
        }
        return null;
    }
}

RedisDistributedLock

public class RedisDistributedLock implements Lock
{
    private StringRedisTemplate stringRedisTemplate;
    private String lockName;
    private String uuidValue;
    private long   expireTime;
    public RedisDistributedLock(StringRedisTemplate stringRedisTemplate, String lockName,String uuidValue)
    {
        this.stringRedisTemplate = stringRedisTemplate;
        this.lockName = lockName;
        this.uuidValue = uuidValue+":"+Thread.currentThread().getId();
        this.expireTime = 30L;
    }
    @Override
    public void lock()
    {
        this.tryLock();
    }
    @Override
    public boolean tryLock()
    {
        try
        {
            return this.tryLock(-1L,TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
    {
        if(time != -1L)
        {
            expireTime = unit.toSeconds(time);
        }
        String script =
                "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then " +
                    "redis.call('hincrby',KEYS[1],ARGV[1],1) " +
                    "redis.call('expire',KEYS[1],ARGV[2]) " +
                    "return 1 " +
                "else " +
                    "return 0 " +
                "end";
        System.out.println("lockName: "+lockName+"\t"+"uuidValue: "+uuidValue);
        while (!stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime)))
        {
            try { TimeUnit.MILLISECONDS.sleep(60); } catch (InterruptedException e) { e.printStackTrace(); }
        }
        return true;
    }
    @Override
    public void unlock()
    {
        String script =
                "if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then " +
                    "return nil " +
                "elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then " +
                    "return redis.call('del',KEYS[1]) " +
                "else " +
                        "return 0 " +
                "end";
        System.out.println("lockName: "+lockName+"\t"+"uuidValue: "+uuidValue);
        Long flag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime));
        if(flag == null)
        {
            throw new RuntimeException("没有这个锁,HEXISTS查询无");
        }
    }
    //=========================================================
    @Override
    public void lockInterruptibly() throws InterruptedException
    {
    }
    @Override
    public Condition newCondition()
    {
        return null;
    }
}


自动续期


Redis分布式锁还存在一个问题就是续期问题。

//==============自动续期
if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 1 then
  return redis.call('expire',KEYS[1],ARGV[2])
else
  return 0
end


RedisDistributedLock


public class RedisDistributedLock implements Lock
{
    private StringRedisTemplate stringRedisTemplate;
    private String lockName;//KEYS[1]
    private String uuidValue;//ARGV[1]
    private long   expireTime;//ARGV[2]
    public RedisDistributedLock(StringRedisTemplate stringRedisTemplate,String lockName,String uuidValue)
    {
        this.stringRedisTemplate = stringRedisTemplate;
        this.lockName = lockName;
        this.uuidValue = uuidValue+":"+Thread.currentThread().getId();
        this.expireTime = 30L;
    }
    @Override
    public void lock()
    {
        tryLock();
    }
    @Override
    public boolean tryLock()
    {
        try {tryLock(-1L,TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}
        return false;
    }
    /**
     * 干活的,实现加锁功能,实现这一个干活的就OK,全盘通用
     * @param time
     * @param unit
     * @return
     * @throws InterruptedException
     */
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
    {
        if(time != -1L)
        {
            this.expireTime = unit.toSeconds(time);
        }
        String script =
                "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then " +
                        "redis.call('hincrby',KEYS[1],ARGV[1],1) " +
                        "redis.call('expire',KEYS[1],ARGV[2]) " +
                        "return 1 " +
                        "else " +
                        "return 0 " +
                        "end";
        System.out.println("script: "+script);
        System.out.println("lockName: "+lockName);
        System.out.println("uuidValue: "+uuidValue);
        System.out.println("expireTime: "+expireTime);
        while (!stringRedisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime))) {
            TimeUnit.MILLISECONDS.sleep(50);
        }
        this.renewExpire();
        return true;
    }
    /**
     *干活的,实现解锁功能
     */
    @Override
    public void unlock()
    {
        String script =
                "if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then " +
                        "   return nil " +
                        "elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then " +
                        "   return redis.call('del',KEYS[1]) " +
                        "else " +
                        "   return 0 " +
                        "end";
        // nil = false 1 = true 0 = false
        System.out.println("lockName: "+lockName);
        System.out.println("uuidValue: "+uuidValue);
        System.out.println("expireTime: "+expireTime);
        Long flag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime));
        if(flag == null)
        {
            throw new RuntimeException("This lock doesn't EXIST");
        }
    }
  // 上锁的情况
    private void renewExpire()
    {
        String script =
                "if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 1 then " +
                        "return redis.call('expire',KEYS[1],ARGV[2]) " +
                        "else " +
                        "return 0 " +
                        "end";
        new Timer().schedule(new TimerTask()
        {
            @Override
            public void run()
            {
                if (stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime))) {
                    renewExpire();
                }
            }
        },(this.expireTime * 1000)/3);
    }
    //===下面的redis分布式锁暂时用不到=======================================
    //===下面的redis分布式锁暂时用不到=======================================
    //===下面的redis分布式锁暂时用不到=======================================
    @Override
    public void lockInterruptibly() throws InterruptedException
    {
    }
    @Override
    public Condition newCondition()
    {
        return null;
    }
}


总结


synchronized单机版可以,但是上分布式死翘翘


nginx分布式微服务单机锁不行


如果取消宕机所,可以考虑上redis分布式锁setnx


但是只是加了锁,没有释放锁,出现异常的话,可能无法释放锁,必须要在代码层面finally释放锁


宕机了,部署了微服务代码层面根本没有走到finally这块,没办法保证解锁,这个key没有被删除,需要有lockkey的过期时间设定


为redis的分布式锁key,增加过期时间,此外,还必须要setnx+过期时间在同一行,保证原子性


且必须规定只能自己删除自己的锁,你不能把别人的锁删除了,防止张冠李戴


同时unlock变为Lua脚本保证


最后还需要考虑锁重入问题,使用hset替代setnx+lock变为lua脚本办证,以及自动续期问题


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
1月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
7天前
|
NoSQL Redis
Redis分布式锁如何实现 ?
Redis分布式锁通过SETNX指令实现,确保仅在键不存在时设置值。此机制用于控制多个线程对共享资源的访问,避免并发冲突。然而,实际应用中需解决死锁、锁超时、归一化、可重入及阻塞等问题,以确保系统的稳定性和可靠性。解决方案包括设置锁超时、引入Watch Dog机制、使用ThreadLocal绑定加解锁操作、实现计数器支持可重入锁以及采用自旋锁思想处理阻塞请求。
40 16
|
1月前
|
缓存 NoSQL Java
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
59 3
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
|
1月前
|
NoSQL Redis 数据库
计数器 分布式锁 redis实现
【10月更文挑战第5天】
47 1
|
1月前
|
NoSQL 算法 关系型数据库
Redis分布式锁
【10月更文挑战第1天】分布式锁用于在多进程环境中保护共享资源,防止并发冲突。通常借助外部系统如Redis或Zookeeper实现。通过`SETNX`命令加锁,并设置过期时间防止死锁。为避免误删他人锁,加锁时附带唯一标识,解锁前验证。面对锁提前过期的问题,可使用守护线程自动续期。在Redis集群中,需考虑主从同步延迟导致的锁丢失问题,Redlock算法可提高锁的可靠性。
73 4
|
1月前
|
缓存 NoSQL Ubuntu
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
55 3
|
1月前
|
缓存 NoSQL 算法
面试题:Redis如何实现分布式锁!
面试题:Redis如何实现分布式锁!
|
3月前
|
NoSQL Redis
基于Redis的高可用分布式锁——RedLock
这篇文章介绍了基于Redis的高可用分布式锁RedLock的概念、工作流程、获取和释放锁的方法,以及RedLock相比单机锁在高可用性上的优势,同时指出了其在某些特殊场景下的不足,并提到了ZooKeeper作为另一种实现分布式锁的方案。
110 2
基于Redis的高可用分布式锁——RedLock
|
3月前
|
缓存 NoSQL Java
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
这篇文章是关于如何在SpringBoot应用中整合Redis并处理分布式场景下的缓存问题,包括缓存穿透、缓存雪崩和缓存击穿。文章详细讨论了在分布式情况下如何添加分布式锁来解决缓存击穿问题,提供了加锁和解锁的实现过程,并展示了使用JMeter进行压力测试来验证锁机制有效性的方法。
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
|
1月前
|
存储 缓存 NoSQL
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
60 4