分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)(上)

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
云原生内存数据库 Tair,内存型 2GB
云数据库 Redis 版,倚天版 1GB 1个月
简介: 分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)

分布式锁解决方案_数据库乐观锁实现的分布式锁



什么是乐观锁


总是假设最好的情况,每次去拿数据的时候都认为别人不会修改, 所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有 去更新这个数据,可以使用版本号机制和CAS算法实现。


编写乐观锁更新语句

<update id="decreaseStockForVersion" parameterType="int" >
        UPDATE product SET count = count - # {count}, version = version + 1 WHERE id = #{id} AND count > 0 AND version = #{version}
</update>


编写创建订单业务层

/**
     * 创建订单 乐观锁
     *
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    @Override
    public String createOrder(Integer productId, Integer count) throws Exception {
        int retryCount = 0;
        int update = 0;
        // 1、根据商品id查询商品信息
        Product product = productMapper.selectById(productId);
        // 2、判断商品是否存在
        if (product == null) {
            throw new RuntimeException("购买商品不存在:" + productId + "不存在");
       }
        // 3、校验库存
        if (count > product.getCount()) {
            throw new Exception("库存不够");
       }
         // 乐观锁更新库存
        // 更新失败,说明其他线程已经修改过数据,本次扣减库存失败,可以重试一定次数或者返回
        // 最多重试3次
        while(retryCount < 3 && update == 0){
            update = this.reduceStock(product.getId(),count);
            retryCount++;
       }
        if (update == 0){
            throw new Exception("库存不够");
       }
        // 6、 创建订单
        TOrder order = new TOrder();
        order.setOrderStatus(1);//待处理
        order.setReceiverName("张三");
        order.setReceiverMobile("18587781068");
        order.setOrderAmount(product.getPrice().multiply(new BigDecimal(count)));//订单价格
        baseMapper.insert(order);
        // 7、 创建订单和商品关系数据
        OrderItem orderItem = new OrderItem();
        orderItem.setOrderId(order.getId());
        orderItem.setProduceId(product.getId());
        orderItem.setPurchasePrice(product.getPrice());
        orderItem.setPurchaseNum(count);
        orderItemMapper.insert(orderItem);
        return order.getId();
   }
    /**
     * 减库存
     * <p>
     * 由于默认的事务隔离级别是可重复读,produce.findById()
     * 得到的数据始终是相同的,所以需要提取 reduceStock方法。每次循环都启动新的事务尝试扣减库存操作。
     */
    @Transactional(rollbackFor = Exception.class)
    public int reduceStock(int gid,int count) {
        int result = 0;
        //1、查询商品库存
        Product product = productMapper.selectById(gid);
        //2、判断库存是否充足
        if (product.getCount() >= count) {
            //3、减库存
            // 乐观锁更新库存
            result = productMapper.decreaseStockForVersion(gid,count, product.getVersion());
       }
        return result;
   }


分布式锁解决方案_Redis实现的分布式锁原理


获取锁


互斥:确保只有一个线程获得锁

# 添加锁 利用setnx的互斥性
127.0.0.1:6379> setnx lock thread1


释放锁


1、手动释放锁

2、超时释放:获取锁时设置一个超时时间

#释放锁 删除即可
127.0.0.1:6379> del lock


超时释放

127.0.0.1:6379> setnx lock tread1
127.0.0.1:6379> expire lock 5
127.0.0.1:6379> ttl lock


两步合成一步

help set
 SET key value [EX seconds] [PX milliseconds] [NX|XX]
 summary: Set the string value of a key
 since: 1.0.0
 group: string
127.0.0.1:6379> get k1
(nil)
127.0.0.1:6379> set lock k1 ex 5 nx
OK
127.0.0.1:6379> set lock k1 ex 5 nx
nil



分布式锁解决方案_Redis实现的分布式锁



引入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>


添加Redis配置

spring:
 redis:
   host: localhost
   port: 6379


编写创建订单实现类

@Override
    public String createOrderRedis(Integer productId, Integer count) throws Exception {
        log.info("*************** 进入方法 **********");
        String key = "lock:";
        String value = UUID.randomUUID().toString();
        // 获取分布式锁
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key+productId,String.valueOf(Thread.currentThread().getId()),30,TimeUnit.SECONDS);
        // 判断是否获取锁成功
        if (!result){
            log.info("我进入了锁");
            return "不允许重复下单";
       }
        try {
            // 1、根据商品id查询商品信息
            Product product = productMapper.selectById(productId);
            // 2、判断商品是否存在 if (product == null) {
                throw new RuntimeException("购买商品不存在:" + productId + "不存在");
           }
            // 3、校验库存
            if (count > product.getCount()) {
                throw new RuntimeException("商品" + productId + "仅剩" + product.getCount() + "件,无法购买");
           }
            // 4、计算库存
            Integer leftCount = product.getCount() - count;
            // 5、更新库存
            product.setCount(leftCount);
            productMapper.updateById(product);
            // 6、 创建订单
            TOrder order = new TOrder();
            order.setOrderStatus(1);//待处理
            order.setReceiverName("张三");
            order.setReceiverMobile("18587781068");
            order.setOrderAmount(product.getPrice().multiply(new BigDecimal(count)));//订单价格
            baseMapper.insert(order);
            // 7、 创建订单和商品关系数据
            OrderItem orderItem = new OrderItem();
            orderItem.setOrderId(order.getId());
            orderItem.setProduceId(product.getId());
            orderItem.setPurchasePrice(product.getPrice());
            orderItem.setPurchaseNum(count);
            orderItemMapper.insert(orderItem);
            return order.getId();
       }catch (Exception e){
            e.printStackTrace();
       }finally {
            // 释放锁
          stringRedisTemplate.delete(key+productId);
       }
        return "创建失败";
   }


分布式锁解决方案_Redis分布式锁误删除问题



配置锁标识

private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString().replace("-" ,"");


获取锁

//1、获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 2、获得锁 setnx key   value   time   type
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX+produceId, threadId, 30,TimeUnit.SECONDS);


释放锁

// 获取锁标识
  String s = stringRedisTemplate.opsForValue().get(KEY_PREFIX + produceId);
            // 判断标识是否一致
            if (s.equals(threadId)){
                // 释放锁
               stringRedisTemplate.delete(KEY_PREFIX + produceId);
           }


分布式锁解决方案_Redis分布式锁不可重入问题



不可重入问题


如何解决


分布式锁解决方案_基于Redisson实现的分布式锁实现



Redisson介绍


Redisson - 是一个高级的分布式协调Redis客服端,能帮助用户在分 布式环境中轻松实现一些Java的对象,Redisson、Jedis、Lettuce 是三个不同的操作 Redis 的客户端,Jedis、Lettuce 的 API 更侧重 对 Reids 数据库的 CRUD(增删改查),而 Redisson API 侧重于分布式开发。


引入Redisson依赖

<dependency>
     <groupId>org.redisson</groupId>
     <artifactId>redisson-spring-boot-starter</artifactId>
     <version>3.17.2</version>
</dependency>


添加Reids的配置

spring:
 redis:
   host: localhost
   port: 6379


编写Redis分布式锁工具类

package com.itbaizhan.lock.utils;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class DistributedRedisLock {
    @Autowired
    private RedissonClient redissonClient;
    // 加锁
    public Boolean lock(String lockName) {
        if (redissonClient == null) {
            log.info("DistributedRedisLock redissonClient is null");
            return false;
       }
        try {
            RLock lock = redissonClient.getLock(lockName);
            // 锁15秒后自动释放,防止死锁
            lock.lock(15, TimeUnit.SECONDS);
            log.info("Thread [{}] DistributedRedisLock lock [{}] success",Thread.currentThread().getName(), lockName);
            // 加锁成功
            return true;
       } catch (Exception e) {
            log.error("DistributedRedisLocklock [{}] Exception:", lockName, e);
            return false;
       }
   }
    // 释放锁
    public Boolean unlock(String lockName) {
        if (redissonClient == null) {
            log.info("DistributedRedisLock redissonClient is null");
            return false;
       }
        try {
            RLock lock = redissonClient.getLock(lockName);
            lock.unlock();
            log.info("Thread [{}] DistributedRedisLock unlock [{}] success",Thread.currentThread().getName(), lockName);
            // 释放锁成功
            return true;
} catch (Exception e) {
            log.error("DistributedRedisLock unlock [{}] Exception:", lockName, e);
            return false;
       }
   }
}


编写创建订单接口实现

/**
     * Redis锁实现
     *
     * @param productId
     * @param count
     * @return
     * @throws Exception
     */
    @Override
    public String createOrderRedis(Integer productId, Integer count) throws Exception {
        //获取锁对象
        if (distributedRedisLock.lock(String.valueOf(productId))) {
            try {
                // 1、根据商品id查询商品信息
                Product product = productMapper.selectById(productId);
                // 2、判断商品是否存在
                if (product == null) {
                throw new RuntimeException("购买商品不存在:" + productId + "不存在");
               }
                // 3、校验库存
                if (count > product.getCount())
               {
                    throw new RuntimeException("商品" + productId + "仅剩" + product.getCount() + "件,无法购买");
               }
                // 4、计算库存
                Integer leftCount = product.getCount() - count;
                // 5、更新库存
                product.setCount(leftCount);
                productMapper.updateById(product);
                // 6、 创建订单
                TOrder order = new TOrder();
                order.setOrderStatus(1);//待处理
                order.setReceiverName("张三");
                order.setReceiverMobile("18587781068");
                order.setOrderAmount(product.getPrice().multiply(new BigDecimal(count)));//订单价格
                baseMapper.insert(order);
                // 7、 创建订单和商品关系数据
                OrderItem orderItem = new OrderItem();
                orderItem.setOrderId(order.getId());
                orderItem.setProduceId(product.getId());
                orderItem.setPurchasePrice(product.getPrice());
                orderItem.setPurchaseNum(count);
                orderItemMapper.insert(orderItem);
                return order.getId();
           } catch (Exception e) {
                e.printStackTrace();
           } finally {
               distributedRedisLock.unlock(String.valueOf(productId));
           }
       }
        return "创建失败";
   }


分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)(下):https://developer.aliyun.com/article/1420041

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
26天前
|
存储 缓存 NoSQL
Redis常见面试题(二):redis分布式锁、redisson、主从一致性、Redlock红锁;Redis集群、主从复制,哨兵模式,分片集群;Redis为什么这么快,I/O多路复用模型
redis分布式锁、redisson、可重入、主从一致性、WatchDog、Redlock红锁、zookeeper;Redis集群、主从复制,全量同步、增量同步;哨兵,分片集群,Redis为什么这么快,I/O多路复用模型——用户空间和内核空间、阻塞IO、非阻塞IO、IO多路复用,Redis网络模型
Redis常见面试题(二):redis分布式锁、redisson、主从一致性、Redlock红锁;Redis集群、主从复制,哨兵模式,分片集群;Redis为什么这么快,I/O多路复用模型
|
18天前
|
缓存 NoSQL 关系型数据库
(八)漫谈分布式之缓存篇:唠唠老生常谈的MySQL与Redis数据一致性问题!
本文来聊一个跟实际工作挂钩的老生常谈的问题:分布式系统中的缓存一致性。
72 10
|
12天前
|
SQL 关系型数据库 MySQL
「Go开源」goose:深入学习数据库版本管理工具
「Go开源」goose:深入学习数据库版本管理工具
「Go开源」goose:深入学习数据库版本管理工具
|
20天前
|
NoSQL 算法 Java
(十三)全面理解并发编程之分布式架构下Redis、ZK分布式锁的前世今生
本文探讨了从单体架构下的锁机制到分布式架构下的线程安全问题,并详细分析了分布式锁的实现原理和过程。
|
4天前
|
NoSQL Java Redis
Redis字符串数据类型之INCR命令,通常用于统计网站访问量,文章访问量,实现分布式锁
这篇文章详细解释了Redis的INCR命令,它用于将键的值增加1,通常用于统计网站访问量、文章访问量,以及实现分布式锁,同时提供了Java代码示例和分布式锁的实现思路。
12 0
|
20天前
|
数据采集 存储 NoSQL
Redis 与 Scrapy:无缝集成的分布式爬虫技术
Redis 与 Scrapy:无缝集成的分布式爬虫技术
|
26天前
|
NoSQL 前端开发 算法
Redis问题之Redis分布式锁与Zookeeper分布式锁有何不同
Redis问题之Redis分布式锁与Zookeeper分布式锁有何不同
|
6天前
|
存储 关系型数据库 MySQL
MySQL——数据库备份上传到阿里云OSS存储
MySQL——数据库备份上传到阿里云OSS存储
20 0
|
5天前
|
缓存 NoSQL Redis
一天五道Java面试题----第九天(简述MySQL中索引类型对数据库的性能的影响--------->缓存雪崩、缓存穿透、缓存击穿)
这篇文章是关于Java面试中可能会遇到的五个问题,包括MySQL索引类型及其对数据库性能的影响、Redis的RDB和AOF持久化机制、Redis的过期键删除策略、Redis的单线程模型为何高效,以及缓存雪崩、缓存穿透和缓存击穿的概念及其解决方案。
|
8天前
|
Oracle 关系型数据库 MySQL
Mysql和Oracle数据库死锁查看以及解决
【8月更文挑战第11天】本文介绍了解决MySQL与Oracle数据库死锁的方法。MySQL可通过`SHOW ENGINE INNODB STATUS`查看死锁详情,并自动回滚一个事务解除死锁;也可手动KILL事务。Oracle则通过查询V$LOCK与V$SESSION视图定位死锁,并用`ALTER SYSTEM KILL SESSION`命令终止相关会话。预防措施包括遵循ACID原则、优化索引及拆分大型事务。