高并发-【抢红包案例】之四:使用Redis+Lua脚本实现抢红包并异步持久化到数据库

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 高并发-【抢红包案例】之四:使用Redis+Lua脚本实现抢红包并异步持久化到数据库

导读


20181010212453235.png


高并发-【抢红包案例】之一:SSM环境搭建及复现红包超发问题

高并发-【抢红包案例】之二:使用悲观锁方式修复红包超发的bug

高并发-【抢红包案例】之三:使用乐观锁方式修复红包超发的bug


概述


上面三篇博文是使用的MySql数据库来作为数据的载体数据最终会将数据保存到磁盘中,而Redis使用的是内存,内存的速度比磁盘速度肯定要快很多.


对于使用 Redis实现抢红包,首先需要知道的是Redis的功能不如数据库强大,事务也不是很完整.因此要保证数据的正确性数据的正确性可以通过严格的验证得以保证。


而 Redis的 Lua 语言是原子性的,且功能更为强大,所以优先选择使用Lua语言来实现抢红包。


但是无论如何对于数据而言,在 Redis 当中存储,始终都不是长久之计 , 因为 Redis并非一个长久储存数据的地方,更多的时候只是为了提供更为快速的缓存,所以当红包金额为 0 或者红包超时的时候(超时操作可以使用定时机制实,这里暂不讨论), 会将红包数据保存到数据库中, 这样才能够保证数据的安全性和严格性。


所以本篇博文我们将使用Redis + lua脚本来实现抢红包的功能。


实现步骤


注解方式配置 Redis


首先在类 RootConfig 上创建一个 RedisTemplate 对象,并将其装载到 Spring IoC 容器中。

  /**
   * 创建一个 RedisTemplate 对象
   */
  @Bean(name = "redisTemplate")
  public RedisTemplate initRedisTemplate() {
    JedisPoolConfig poolConfig = new JedisPoolConfig();
    // 最大空闲数
    poolConfig.setMaxIdle(50);
    // 最大连接数
    poolConfig.setMaxTotal(100);
    // 最大等待毫秒数
    poolConfig.setMaxWaitMillis(20000);
    // 创建Jedis链接工厂
    JedisConnectionFactory connectionFactory = new JedisConnectionFactory(poolConfig);
    connectionFactory.setHostName("192.168.31.66");
    connectionFactory.setPort(6379);
    // 调用后初始化方法,没有它将抛出异常
    connectionFactory.afterPropertiesSet();
    // 自定Redis序列化器
    RedisSerializer jdkSerializationRedisSerializer = new JdkSerializationRedisSerializer();
    RedisSerializer stringRedisSerializer = new StringRedisSerializer();
    // 定义RedisTemplate,并设置连接工厂
    RedisTemplate redisTemplate = new RedisTemplate();
    redisTemplate.setConnectionFactory(connectionFactory);
    // 设置序列化器
    redisTemplate.setDefaultSerializer(stringRedisSerializer);
    redisTemplate.setKeySerializer(stringRedisSerializer);
    redisTemplate.setValueSerializer(stringRedisSerializer);
    redisTemplate.setHashKeySerializer(stringRedisSerializer);
    redisTemplate.setHashValueSerializer(stringRedisSerializer);
    return redisTemplate;
  }



这样 RedisTemplate 就可以在 Spring 上下文中使用了。


注意, JedisConnectionFactory对象在最后的时候需要自行调用 afterPropertiesSet 方法,它实现了 lnitializingBean 接 口。 如果将其配置在 Spring IoC 容器中, Spring 会自动调用它,但是这里我们是自行创建的, 因此需要自行调用,否则在运用的时候会抛出异常。


lua脚本和异步持久化功能的开发


Redis 并不是一个严格的事务,而且事务的功能也是有限的 。 加上 Redis 本身的命令也比较有限,功能性不强,为了增强功能性,还可以使用 Lua 语言。


Redis 中的 Lua 语言是一种原子性的操作,可以保证数据的一致性 。


依据这个原理可以避免超发现象,完成抢红包的功能,而且对于性能而言, Redis 会比数据库快得多。


第一次运行 Lua 脚本的时候,先在 Redis 中编译和缓存脚本,这样就可以得到一个 SHA1字符串,之后通过 SHAl 字符串和参数就能调用 Lua 脚本了

--缓存抢红包列表信息列表 key
local listKey = 'red_packet_list_'..KEYS[1]  
--当前被抢红包 key
local redPacket = 'red_packet_'..KEYS[1] 
--获取当前红包库存
local stock = tonumber(redis.call('hget', redPacket, 'stock')) 
--没有库存,返回为 0 
if stock <= 0 then 
  return 0 
end 
--库存减 1
stock = stock-1
--保存当前库存
redis.call('hset', redPacket, 'stock', tostring(stock)) 
--往链表中加入当前红包信息
redis.call('rpush', listKey, ARGV[1])  
--如果是最后一个红包,则返回 2 ,表示抢红包已经结束,需要将列表中的数据保存到数据库中
if stock == 0 then 
  return 2 
end  
--如果并非最后一个红包,则返回 l ,表示抢红包成功
return 1


流程:


判断是否存在可抢的库存,如果己经没有可抢夺 的红包,则返回为 0,结束流程

有可抢夺的红包,对于红包的库存减1 ,然后重新设置库存

将抢红包数据保存到 Redis 的链表当中,链表的 key 为 red_packet_list_ {id}

如果当前库存为 0 ,那么返回 2,这说明可以触发数据库对 Redis 链表数据的保存,链表的 key 为 red_packet_ list_ {id},它将保存抢红包的用户名和抢的时间

如果当前库存不为 0 ,那么将返回 1,这说明抢红包信息保存成功。

当返回为 2 的时候,说明红包己经没有库存,会触发数据库对链表数据的保存, 这是一个大数据量的保存。为了不影响最后一次抢红包的响应,在实际的操作中往往会考虑使用 JMS 消息发送到别的服务器进行操作,我们这里选择一种简单的方式来实现,去创建一条新的线程去运行保存 Redis 链表数据到数据库。


那就在Service层写一个持久到数据库的服务类吧


接口

package com.artisan.redpacket.service;
public interface RedisRedPacketService {
  /**
   * 保存redis抢红包列表
   * @param redPacketId --抢红包编号
   * @param unitAmount -- 红包金额
   */
  public void saveUserRedPacketByRedis(Long redPacketId, Double unitAmount);
}

实现类

package com.artisan.redpacket.service.impl;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import javax.sql.DataSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.BoundListOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import com.artisan.redpacket.pojo.UserRedPacket;
import com.artisan.redpacket.service.RedisRedPacketService;
@Service
public class RedisRedPacketServiceImpl implements RedisRedPacketService {
  private static final String PREFIX = "red_packet_list_";
  // 每次取出1000条,避免一次取出消耗太多内存
  private static final int TIME_SIZE = 1000;
  @Autowired
  private RedisTemplate redisTemplate; // RedisTemplate
  @Autowired
  private DataSource dataSource; // 数据源
  @Override
  // 开启新线程运行
  @Async
  public void saveUserRedPacketByRedis(Long redPacketId, Double unitAmount) {
    System.err.println("开始保存数据");
    Long start = System.currentTimeMillis();
    // 获取列表操作对象
    BoundListOperations ops = redisTemplate.boundListOps(PREFIX + redPacketId);
    Long size = ops.size();
    Long times = size % TIME_SIZE == 0 ? size / TIME_SIZE : size / TIME_SIZE + 1;
    int count = 0;
    List<UserRedPacket> userRedPacketList = new ArrayList<UserRedPacket>(TIME_SIZE);
    for (int i = 0; i < times; i++) {
      // 获取至多TIME_SIZE个抢红包信息
      List userIdList = null;
      if (i == 0) {
        userIdList = ops.range(i * TIME_SIZE, (i + 1) * TIME_SIZE);
      } else {
        userIdList = ops.range(i * TIME_SIZE + 1, (i + 1) * TIME_SIZE);
      }
      userRedPacketList.clear();
      // 保存红包信息
      for (int j = 0; j < userIdList.size(); j++) {
        String args = userIdList.get(j).toString();
        String[] arr = args.split("-");
        String userIdStr = arr[0];
        String timeStr = arr[1];
        Long userId = Long.parseLong(userIdStr);
        Long time = Long.parseLong(timeStr);
        // 生成抢红包信息
        UserRedPacket userRedPacket = new UserRedPacket();
        userRedPacket.setRedPacketId(redPacketId);
        userRedPacket.setUserId(userId);
        userRedPacket.setAmount(unitAmount);
        userRedPacket.setGrabTime(new Timestamp(time));
        userRedPacket.setNote("抢红包 " + redPacketId);
        userRedPacketList.add(userRedPacket);
      }
      // 插入抢红包信息
      count += executeBatch(userRedPacketList);
    }
    // 删除Redis列表
    redisTemplate.delete(PREFIX + redPacketId);
    Long end = System.currentTimeMillis();
    System.err.println("保存数据结束,耗时" + (end - start) + "毫秒,共" + count + "条记录被保存。");
  }
  /**
   * 使用JDBC批量处理Redis缓存数据.
   * 
   * @param userRedPacketList
   *            -- 抢红包列表
   * @return 抢红包插入数量.
   */
  private int executeBatch(List<UserRedPacket> userRedPacketList) {
    Connection conn = null;
    Statement stmt = null;
    int[] count = null;
    try {
      conn = dataSource.getConnection();
      conn.setAutoCommit(false);
      stmt = conn.createStatement();
      for (UserRedPacket userRedPacket : userRedPacketList) {
        String sql1 = "update T_RED_PACKET set stock = stock-1 where id=" + userRedPacket.getRedPacketId();
        DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String sql2 = "insert into T_USER_RED_PACKET(red_packet_id, user_id, " + "amount, grab_time, note)"
            + " values (" + userRedPacket.getRedPacketId() + ", " + userRedPacket.getUserId() + ", "
            + userRedPacket.getAmount() + "," + "'" + df.format(userRedPacket.getGrabTime()) + "'," + "'"
            + userRedPacket.getNote() + "')";
        stmt.addBatch(sql1);
        stmt.addBatch(sql2);
      }
      // 执行批量
      count = stmt.executeBatch();
      // 提交事务
      conn.commit();
    } catch (SQLException e) {
      /********* 错误处理逻辑 ********/
      throw new RuntimeException("抢红包批量执行程序错误");
    } finally {
      try {
        if (conn != null && !conn.isClosed()) {
          conn.close();
        }
      } catch (SQLException e) {
        e.printStackTrace();
      }
    }
    // 返回插入抢红包数据记录
    return count.length / 2;
  }
}



注解@Async 表示让 Spring 自动创建另外一条线程去运行它,这样它便不在抢最后一个红包的线程之内。因为这个方法是一个较长时间的方法,如果在同一个线程内,那么对于最后抢红包的用户需要等待的时间太长,用户体验不好


这里是每次取出 1 000 个抢红包的信息,之所以这样做是为了避免取出 的数据过大 , 导致JVM 消耗过多的内存影响系统性能。


对于大批量的数据操作,这是我们在实际操作中要注意的,最后还会删除 Redis保存的链表信息,这样就帮助 Redis 释放内存了


对于数据库的保存 ,这里采用了 JDBC的批量处理,每 1000 条批量保存一次,使用批量有助于性能的提高。


注解@Async 的前提是提供一个任务池给 Spring 环境,这个时候要在原有的基础上改写配置类 WebConfig

@EnableAsync
public class WebConfig extends AsyncConfigurerSupport { 
  ....
  ....
  ....
  @Override
  public Executor getAsyncExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(5);
    taskExecutor.setMaxPoolSize(10);
    taskExecutor.setQueueCapacity(200);
    taskExecutor.initialize();
    return taskExecutor;
  }
}

使用@EnableAsync 表明支持异步调用,而我们实现了接口 AsyncConfigurerSupport 的getAsyncExecutor 方法,它是获取一个任务池,当在 Spring 环境中遇到注解@Async就会启动这个任务池的一条线程去运行对应的方法,这样便能执行异步了。


Service层添加Redis抢红包的逻辑


UserRedPacketService接口新增接口方法grapRedPacketByRedis

/**
   * 通过Redis实现抢红包
   * 
   * @param redPacketId
   *            --红包编号
   * @param userId
   *            -- 用户编号
   * @return 0-没有库存,失败 1--成功,且不是最后一个红包 2--成功,且是最后一个红包
   */
  public Long grapRedPacketByRedis(Long redPacketId, Long userId);


实现类

@Autowired
  private RedisTemplate redisTemplate;
  @Autowired
  private RedisRedPacketService redisRedPacketService;
  // Lua脚本
  String script = "local listKey = 'red_packet_list_'..KEYS[1] \n" 
      + "local redPacket = 'red_packet_'..KEYS[1] \n"
      + "local stock = tonumber(redis.call('hget', redPacket, 'stock')) \n" 
      + "if stock <= 0 then return 0 end \n"
      + "stock = stock -1 \n" 
      + "redis.call('hset', redPacket, 'stock', tostring(stock)) \n"
      + "redis.call('rpush', listKey, ARGV[1]) \n" 
      + "if stock == 0 then return 2 end \n" 
      + "return 1 \n";
  // 在缓存LUA脚本后,使用该变量保存Redis返回的32位的SHA1编码,使用它去执行缓存的LUA脚本[加入这句话]
  String sha1 = null;
  @Override
  public Long grapRedPacketByRedis(Long redPacketId, Long userId) {
    // 当前抢红包用户和日期信息
    String args = userId + "-" + System.currentTimeMillis();
    Long result = null;
    // 获取底层Redis操作对象
    Jedis jedis = (Jedis) redisTemplate.getConnectionFactory().getConnection().getNativeConnection();
    try {
      // 如果脚本没有加载过,那么进行加载,这样就会返回一个sha1编码
      if (sha1 == null) {
        sha1 = jedis.scriptLoad(script);
      }
      // 执行脚本,返回结果
      Object res = jedis.evalsha(sha1, 1, redPacketId + "", args);
      result = (Long) res;
      // 返回2时为最后一个红包,此时将抢红包信息通过异步保存到数据库中
      if (result == 2) {
        // 获取单个小红包金额
        String unitAmountStr = jedis.hget("red_packet_" + redPacketId, "unit_amount");
        // 触发保存数据库操作
        Double unitAmount = Double.parseDouble(unitAmountStr);
              redisRedPacketService.saveUserRedPacketByRedis(redPacketId, unitAmount);
      }
    } finally {
      // 确保jedis顺利关闭
      if (jedis != null && jedis.isConnected()) {
        jedis.close();
      }
    }
    return result;
  }


这里使用了保存脚本返回 的 SHAl 字符串 ,所以只会发送一次脚本到 Redis 服务器,之后只传输 SHAl 字符串和参数到 Redis 就能执行脚本 了, 当脚本返回为 2 的时候, 表示此时所有的红包都已经被抢光了 ,那么就会触发 redisRedPacketService 的 saveUserRedPacketByRedis 方法。由于在 saveU serRedPacketByRedis 加入注解@Async , 所以 Spring 会创建一条新的线程去运行它 , 这样就不会影响最后抢一个红包用户 的响应时间了 。


Controller层新增路由方法

@RequestMapping(value = "/grapRedPacketByRedis")
  @ResponseBody
  public Map<String, Object> grapRedPacketByRedis(Long redPacketId, Long userId) {
    Map<String, Object> resultMap = new HashMap<String, Object>();
    Long result = userRedPacketService.grapRedPacketByRedis(redPacketId, userId);
    boolean flag = result > 0;
    resultMap.put("result", flag);
    resultMap.put("message", flag ? "抢红包成功" : "抢红包失败");
    return resultMap;
  }


构造模拟数据,测试

先在 Redis 上添加红包信息

127.0.0.1:6379> HMSET red_packet_1 stock 20000 unit_amount 10
OK


初始化了一个编号为1 的大红包,其中库存为 2 万个,每个 10 元. 需要保证数据库的红包表内也有对应的记录才可以。

复制个grapByRedis.jsp,测试吧

<%@ page language="java" contentType="text/html; charset=UTF-8"
  pageEncoding="UTF-8"%>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>参数</title>
<!-- 加载Query文件-->
<script type="text/javascript"
  src="https://code.jquery.com/jquery-3.2.0.js">
        </script>
<script type="text/javascript">
            $(document).ready(function () {
             //模拟30000个异步请求,进行并发
              var max = 30000;
              for (var i = 1; i <= max; i++) {
                $.post({
                    //请求抢id为1的红包
                    //根据自己请求修改对应的url和大红包编号
                    url: "./userRedPacket/grapRedPacketByRedis.do?redPacketId=1&userId=1",
                    //成功后的方法
                    success: function (result) {
                      console.log("OK")
                    }
                });
                }
          });
        </script>
</head>
<body>
</body>
</html>


启动应用,访问 http://localhost:8080/ssm_redpacket/grapByRedis.jsp

20181010231431363.png

2018101023150547.png

结合前几篇的数据统计,使用Redis的方式数据一致性也得到了保证且性能远远高于乐观锁和悲观锁的方式。


代码

https://github.com/yangshangwei/ssm_redpacket


总结


20181015160415539.png


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
19天前
|
存储 Java 关系型数据库
在Java开发中,数据库连接是应用与数据交互的关键环节。本文通过案例分析,深入探讨Java连接池的原理与最佳实践
在Java开发中,数据库连接是应用与数据交互的关键环节。本文通过案例分析,深入探讨Java连接池的原理与最佳实践,包括连接创建、分配、复用和释放等操作,并通过电商应用实例展示了如何选择合适的连接池库(如HikariCP)和配置参数,实现高效、稳定的数据库连接管理。
37 2
|
1月前
|
缓存 NoSQL Java
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
59 3
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
|
24天前
|
存储 数据挖掘 数据库
数据库数据恢复—SQLserver数据库ndf文件大小变为0KB的数据恢复案例
一个运行在存储上的SQLServer数据库,有1000多个文件,大小几十TB。数据库每10天生成一个NDF文件,每个NDF几百GB大小。数据库包含两个LDF文件。 存储损坏,数据库不可用。管理员试图恢复数据库,发现有数个ndf文件大小变为0KB。 虽然NDF文件大小变为0KB,但是NDF文件在磁盘上还可能存在。可以尝试通过扫描&拼接数据库碎片来恢复NDF文件,然后修复数据库。
|
24天前
|
关系型数据库 MySQL 数据库
一个 MySQL 数据库死锁的案例和解决方案
本文介绍了一个 MySQL 数据库死锁的案例和解决方案。
44 3
|
1月前
|
Java 数据库
案例一:去掉数据库某列中的所有英文,利用java正则表达式去做,核心:去掉字符串中的英文
这篇文章介绍了如何使用Java正则表达式从数据库某列中去除所有英文字符。
46 15
|
27天前
|
存储 Oracle 关系型数据库
数据库数据恢复—Oracle ASM磁盘组故障数据恢复案例
Oracle数据库数据恢复环境&故障: Oracle ASM磁盘组由4块磁盘组成。Oracle ASM磁盘组掉线 ,ASM实例不能mount。 Oracle数据库故障分析&恢复方案: 数据库数据恢复工程师对组成ASM磁盘组的磁盘进行分析。对ASM元数据进行分析发现ASM存储元数据损坏,导致磁盘组无法挂载。
|
1月前
|
缓存 分布式计算 NoSQL
大数据-43 Redis 功能扩展 Lua 脚本 对Redis扩展 eval redis.call redis.pcall
大数据-43 Redis 功能扩展 Lua 脚本 对Redis扩展 eval redis.call redis.pcall
29 2
|
1月前
|
NoSQL Java 关系型数据库
阿里 P7二面:Redis 执行 Lua,到底能不能保证原子性?
Redis 和 Lua,两个看似风流马不相及的技术点,为何能产生“爱”的火花,成为工作开发中的黄金搭档?技术面试中更是高频出现,Redis 执行 Lua 到底能不能保证原子性?今天就来聊一聊。 
85 1
|
1月前
|
Oracle 关系型数据库 数据库
oracle数据恢复—Oracle数据库文件损坏导致数据库打不开的数据恢复案例
打开oracle数据库时报错,报错信息:“system01.dbf需要更多的恢复来保持一致性,数据库无法打开”。急需恢复zxfg用户下的数据。 出现上述报错的原因有:控制文件损坏、数据文件损坏、数据文件与控制文件的SCN不一致等。数据恢复工程师对数据库文件做进一步检测分析后发现sysaux01.dbf文件有坏块。修复sysaux01.dbf文件,启动数据库依然有许多查询报错。export和data pump工具无法使用,查询告警日志并分析报错,确认发生上述错误的原因就是sysaux01.dbf文件损坏。由于该文件损坏,从数据库层面无法修复数据库。由于system和用户表空间的数据文件是正常的,
|
5月前
|
消息中间件 NoSQL Java
Redis系列学习文章分享---第六篇(Redis实战篇--Redis分布式锁+实现思路+误删问题+原子性+lua脚本+Redisson功能介绍+可重入锁+WatchDog机制+multiLock)
Redis系列学习文章分享---第六篇(Redis实战篇--Redis分布式锁+实现思路+误删问题+原子性+lua脚本+Redisson功能介绍+可重入锁+WatchDog机制+multiLock)
227 0