【Redis项目实战】使用Springcloud整合Redis分布式锁+RabbitMQ技术实现高并发预约管理处理系统

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: 【Redis项目实战】使用Springcloud整合Redis分布式锁+RabbitMQ技术实现高并发预约管理处理系统

开发目的:


开发一个高并发预约管理处理系统,其中用户可以预约倾听者。由于并发预约可能导致冲突和混乱,需要实现分布式锁来确保同一时间段只有一个用户可以进行预约。为了实现这一目的,使用Redis作为分布式锁的存储介质,并设计一组表来存储倾听者的预约情况。


功能介绍:


  1. 高并发预约管理:系统能够处理大量用户同时预约倾听者的情况,通过使用分布式锁来保证同一时间段只有一个用户可以进行预约,防止冲突和混乱。


  1. 分布式锁实现:系统使用Redis作为分布式锁的存储介质,通过设置键值对来实现分布式锁。具体地,使用一组表来存储倾听者的预约情况,表名由倾听者的ID和日期组成。每个表使用Redis的哈希表结构,其中键表示时间段,值表示该时间段是否已被预约(真或假)。通过对这些表的操作,系统实现了分布式锁的效果。


  1. 预约冲突检测:在用户进行预约时,系统会检查对应倾听者的预约情况表,判断该时间段是否已被其他用户预约。如果时间段已被预约,则系统会阻止当前用户的预约请求,以避免冲突。


  1. 数据持久化:用户的预约信息会被保存到数据库中,以便后续查询和处理。同时,通过RabbitMQ等消息队列技术,系统可以将预约信息发送到其他模块进行处理。


技术实现步骤:


  • 系统中已经集成了Spring Cloud、Redis和RabbitMQ相关依赖。
  • 创建Redis分布式锁的工具类:
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.core.script.ScriptExecutor;
import org.springframework.data.redis.serializer.RedisSerializer;
 
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
 
public class RedisDistributedLock {
 
    private RedisTemplate<String, String> redisTemplate;
    private ScriptExecutor<String> scriptExecutor;
    private RedisSerializer<String> redisSerializer;
 
    public RedisDistributedLock(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
        this.scriptExecutor = this.redisTemplate.getScriptExecutor();
        this.redisSerializer = this.redisTemplate.getStringSerializer();
    }
 
    /**
     * 尝试获取分布式锁
     *
     * @param lockKey     锁的key
     * @param requestId   锁的唯一标识
     * @param expireTime  锁的过期时间(单位:秒)
     * @return 是否成功获取锁
     */
    public boolean tryLock(String lockKey, String requestId, long expireTime) {
        RedisScript<Long> script = new DefaultRedisScript<>(
                "if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then " +
                        "return redis.call('expire', KEYS[1], ARGV[2]) " +
                        "else " +
                        "return 0 " +
                        "end",
                Long.class
        );
 
        Long result = scriptExecutor.execute(script, Collections.singletonList(lockKey), requestId, expireTime);
        return result != null && result == 1;
    }
 
    /**
     * 释放分布式锁
     *
     * @param lockKey   锁的key
     * @param requestId 锁的唯一标识
     */
    public void releaseLock(String lockKey, String requestId) {
        RedisScript<Long> script = new DefaultRedisScript<>(
                "if redis.call('get', KEYS[1]) == ARGV[1] then " +
                        "return redis.call('del', KEYS[1]) " +
                        "else " +
                        "return 0 " +
                        "end",
                Long.class
        );
 
        scriptExecutor.execute(script, Collections.singletonList(lockKey), requestId);
    }
 
    /**
     * 生成唯一的锁标识
     *
     * @return 锁的唯一标识
     */
    public String generateRequestId() {
        return UUID.randomUUID().toString();
    }
}
  • 在预约服务中使用分布式锁来保证同一时间段只有一个用户可以进行预约:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
 
@Service
public class ReservationService {
 
    private RedisDistributedLock distributedLock;
    private ReservationRepository reservationRepository;
    private RabbitTemplate rabbitTemplate;
 
    @Autowired
    public ReservationService(RedisDistributedLock distributedLock, ReservationRepository reservationRepository, RabbitTemplate rabbitTemplate) {
        this.distributedLock = distributedLock;
        this.reservationRepository = reservationRepository;
        this.rabbitTemplate = rabbitTemplate;
    }
 
    @Transactional
    public void makeReservation(String listenerId, String userId, String timeSlot) {
        String lockKey = listenerId + "-" + timeSlot;
        String requestId = distributedLock.generateRequestId();
 
        try {
            // 尝试获取分布式锁,设置锁的过期时间为一定的秒数(例如10秒)
            if (distributedLock.tryLock(lockKey, requestId, 10)) {
                // 获取到锁,可以进行预约操作
 
                // 检查时间段是否已经被预约
                if (!isTimeSlotBooked(listenerId, timeSlot)) {
                    // 保存预约信息到数据库
                    Reservation reservation = new Reservation(listenerId, userId, timeSlot);
                    reservationRepository.save(reservation);
 
                    // 发送预约消息到RabbitMQ
                    rabbitTemplate.convertAndSend("reservation-exchange", "reservation-queue", reservation);
                } else {
                    // 时间段已经被预约,抛出异常或返回错误信息
                    throw new RuntimeException("The time slot is already booked.");
                }
            } else {
                // 未获取到锁,抛出异常或返回错误信息
                throw new RuntimeException("Failed to acquire lock for the time slot.");
            }
        } finally {
            // 释放分布式锁
            distributedLock.releaseLock(lockKey, requestId);
        }
    }
 
    private boolean isTimeSlotBooked(String listenerId, String timeSlot) {
        // 查询数据库或Redis,判断时间段是否已经被预约
        // 返回true表示已被预约,返回false表示未被预约
    }
}
  • 在配置类中配置RedisTemplate和RabbitTemplate
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
 
@Configuration
public class AppConfig {
 
    private RedisConnectionFactory redisConnectionFactory;
    private ConnectionFactory rabbitConnectionFactory;
 
    @Autowired
    public AppConfig(RedisConnectionFactory redisConnectionFactory, ConnectionFactory rabbitConnectionFactory) {
        this.redisConnectionFactory = redisConnectionFactory;
        this.rabbitConnectionFactory = rabbitConnectionFactory;
    }
 
    @Bean
    public RedisTemplate<String, String> redisTemplate() {
        RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(new StringRedisSerializer());
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
 
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
        rabbitTemplate.setConnectionFactory(rabbitConnectionFactory);
        return rabbitTemplate;
    }
 
    @Bean
    public RedisDistributedLock distributedLock(RedisTemplate<String, String> redisTemplate) {
        return new RedisDistributedLock(redisTemplate);
    }
}

RabbitMQ是一种消息队列系统,可以用于异步处理预约消息,提高系统的可伸缩性和稳定性。以下是实现RabbitMQ部分的步骤:


  • 创建预约消息的实体类:
public class ReservationMessage {
    private String listenerId;
    private String userId;
    private String timeSlot;
 
    // 构造方法、Getter和Setter方法省略
}
  • 创建预约消息的消费者:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
@Component
public class ReservationMessageConsumer {
 
    @RabbitListener(queues = "reservation-queue")
    public void processReservationMessage(ReservationMessage reservationMessage) {
        // 处理预约消息,例如发送通知给倾听者或用户
        System.out.println("Received reservation message: " + reservationMessage.toString());
    }
}
  • 在配置类中配置RabbitMQ相关的队列和交换机:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class RabbitMQConfig {
 
    @Bean
    public Queue reservationQueue() {
        return new Queue("reservation-queue");
    }
 
    @Bean
    public DirectExchange reservationExchange() {
        return new DirectExchange("reservation-exchange");
    }
 
    @Bean
    public Binding reservationBinding(Queue reservationQueue, DirectExchange reservationExchange) {
        return BindingBuilder.bind(reservationQueue).to(reservationExchange).with("reservation-queue");
    }
}
  • 在预约服务中发送预约消息到RabbitMQ:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
 
@Service
public class ReservationService {
 
    private RabbitTemplate rabbitTemplate;
 
    @Autowired
    public ReservationService(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }
 
    public void makeReservation(String listenerId, String userId, String timeSlot) {
        // 创建预约消息
        ReservationMessage reservationMessage = new ReservationMessage(listenerId, userId, timeSlot);
 
        // 发送预约消息到RabbitMQ
        rabbitTemplate.convertAndSend("reservation-exchange", "reservation-queue", reservationMessage);
    }
}


为了实现倾听者预约状态的管理,可以在Redis中建立一个临时状态表,与之前的分布式锁表类似。以下是该表的设计和状态介绍:


  1. 表名设计:使用倾听者的ID和日期作为表名,一次性生成七天的表。例如,表名可以是形如"listener_id_20240201"的格式。
  2. 状态类型:
  • 等待(Waiting):表示用户的预约请求已提交,但倾听者尚未对其进行处理。
  • 接受(Accepted):表示倾听者已接受用户的预约请求。
  • 拒绝(Rejected):表示倾听者已拒绝用户的预约请求。
  • 已处理(Processed):表示该预约请求已被处理,并且不再需要保留在状态表中。
  1. Redis表结构:使用Redis的哈希表结构来存储倾听者的预约状态。表中的键表示时间段,值表示对应时间段的预约状态。


以下是使用RedisTemplate实现倾听者预约状态管理的代码:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
 
import javax.annotation.PostConstruct;
import java.util.Map;
 
@Component
public class ReservationStatusManager {
 
    private static final String TABLE_PREFIX = "listener_";
 
    private RedisTemplate<String, Object> redisTemplate;
    private HashOperations<String, String, String> hashOperations;
 
    @Autowired
    public ReservationStatusManager(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }
 
    @PostConstruct
    private void init() {
        hashOperations = redisTemplate.opsForHash();
    }
 
    public void setReservationStatus(String listenerId, String date, String timeSlot, String status) {
        String tableName = TABLE_PREFIX + listenerId + "_" + date;
        hashOperations.put(tableName, timeSlot, status);
    }
 
    public String getReservationStatus(String listenerId, String date, String timeSlot) {
        String tableName = TABLE_PREFIX + listenerId + "_" + date;
        return hashOperations.get(tableName, timeSlot);
    }
 
    public void removeProcessedReservations(String listenerId, String date) {
        String tableName = TABLE_PREFIX + listenerId + "_" + date;
        redisTemplate.delete(tableName);
    }
 
    public Map<String, String> getAllReservationStatus(String listenerId, String date) {
        String tableName = TABLE_PREFIX + listenerId + "_" + date;
        return hashOperations.entries(tableName);
    }
}


我们通过注入RedisTemplate来操作Redis。在ReservationStatusManager类中,我们使用HashOperations来进行哈希表的操作。init()方法使用@PostConstruct注解进行初始化,确保RedisTemplate和HashOperations在对象创建后进行实例化。


setReservationStatus()方法用于设置预约状态,getReservationStatus()方法用于获取预约状态,removeProcessedReservations()方法用于删除已处理的预约状态。另外,getAllReservationStatus()方法可以用于获取某个倾听者在特定日期的所有预约状态。


那么这样就实现了  在多个用户想要同时预约同一个倾听者的同一时间时 所避免的数据混乱


我们这样做保证了数据的一致性 又使用了 以速度性能著称的Redis 和异步处理的RabbitMQ这样就能使得 在预约成功的一瞬间 完成通知倾听者的效果 达到了预期的业务效果


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
29天前
|
存储 NoSQL 算法
【Redis技术进阶之路】「底层源码解析」揭秘高效存储模型与数据结构底层实现(字典)(二)
【Redis技术进阶之路】「底层源码解析」揭秘高效存储模型与数据结构底层实现(字典)
47 0
|
29天前
|
NoSQL 数据处理 调度
【Redis深度专题】「踩坑技术提升」探索Redis 6.0为何必须启用多线程以提升性能与效率
【Redis深度专题】「踩坑技术提升」探索Redis 6.0为何必须启用多线程以提升性能与效率
218 0
|
3天前
|
存储 缓存 运维
软件体系结构 - 缓存技术(5)Redis Cluster
【4月更文挑战第20天】软件体系结构 - 缓存技术(5)Redis Cluster
136 10
|
29天前
|
存储 NoSQL Redis
作者推荐 |【Redis技术进阶之路】「原理系列开篇」揭秘高效存储模型与数据结构底层实现(SDS)(三)
作者推荐 |【Redis技术进阶之路】「原理系列开篇」揭秘高效存储模型与数据结构底层实现(SDS)
31 0
|
29天前
|
NoSQL Java Redis
【Redis深度专题】「踩坑技术提升」一文教会你如何在支持Redis在低版本Jedis情况下兼容Redis的ACL机制
【Redis深度专题】「踩坑技术提升」一文教会你如何在支持Redis在低版本Jedis情况下兼容Redis的ACL机制
234 0
|
29天前
|
缓存 NoSQL Shell
【Redis深度专题】「核心技术提升」探究Redis服务启动的过程机制的技术原理和流程分析的指南(持久化功能分析)
【Redis深度专题】「核心技术提升」探究Redis服务启动的过程机制的技术原理和流程分析的指南(持久化功能分析)
159 0
|
29天前
|
存储 缓存 NoSQL
【Redis深度专题】「核心技术提升」探究Redis服务启动的过程机制的技术原理和流程分析的指南(集群功能分析)(一)
【Redis深度专题】「核心技术提升」探究Redis服务启动的过程机制的技术原理和流程分析的指南(集群功能分析)
352 0
|
1月前
|
NoSQL Java 数据库
优惠券秒杀案例 - CAS、Redis+Lua脚本解决高并发并行
优惠券秒杀案例 - CAS、Redis+Lua脚本解决高并发并行
|
1月前
|
消息中间件 存储 Cloud Native
深度剖析 RocketMQ 5.0,IoT 消息:物联网需要什么样的消息技术?
本文来学习一个典型的物联网技术架构,以及在这个技术架构里面,消息队列所发挥的作用。在物联网的场景里面,对消息技术的要求和面向服务端应用的消息技术有什么区别?学习 RocketMQ 5.0 的子产品 MQTT,是如何解决这些物联网技术难题的。
90841 4
|
1月前
|
NoSQL Redis
Redis企业项目实战--登录校验拦截器
Redis企业项目实战--登录校验拦截器