【Redis项目实战】使用Springcloud整合Redis分布式锁+RabbitMQ技术实现高并发预约管理处理系统

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 【Redis项目实战】使用Springcloud整合Redis分布式锁+RabbitMQ技术实现高并发预约管理处理系统

开发目的:


开发一个高并发预约管理处理系统,其中用户可以预约倾听者。由于并发预约可能导致冲突和混乱,需要实现分布式锁来确保同一时间段只有一个用户可以进行预约。为了实现这一目的,使用Redis作为分布式锁的存储介质,并设计一组表来存储倾听者的预约情况。


功能介绍:


  1. 高并发预约管理:系统能够处理大量用户同时预约倾听者的情况,通过使用分布式锁来保证同一时间段只有一个用户可以进行预约,防止冲突和混乱。


  1. 分布式锁实现:系统使用Redis作为分布式锁的存储介质,通过设置键值对来实现分布式锁。具体地,使用一组表来存储倾听者的预约情况,表名由倾听者的ID和日期组成。每个表使用Redis的哈希表结构,其中键表示时间段,值表示该时间段是否已被预约(真或假)。通过对这些表的操作,系统实现了分布式锁的效果。


  1. 预约冲突检测:在用户进行预约时,系统会检查对应倾听者的预约情况表,判断该时间段是否已被其他用户预约。如果时间段已被预约,则系统会阻止当前用户的预约请求,以避免冲突。


  1. 数据持久化:用户的预约信息会被保存到数据库中,以便后续查询和处理。同时,通过RabbitMQ等消息队列技术,系统可以将预约信息发送到其他模块进行处理。


技术实现步骤:


  • 系统中已经集成了Spring Cloud、Redis和RabbitMQ相关依赖。
  • 创建Redis分布式锁的工具类:
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.core.script.ScriptExecutor;
import org.springframework.data.redis.serializer.RedisSerializer;
 
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
 
public class RedisDistributedLock {
 
    private RedisTemplate<String, String> redisTemplate;
    private ScriptExecutor<String> scriptExecutor;
    private RedisSerializer<String> redisSerializer;
 
    public RedisDistributedLock(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
        this.scriptExecutor = this.redisTemplate.getScriptExecutor();
        this.redisSerializer = this.redisTemplate.getStringSerializer();
    }
 
    /**
     * 尝试获取分布式锁
     *
     * @param lockKey     锁的key
     * @param requestId   锁的唯一标识
     * @param expireTime  锁的过期时间(单位:秒)
     * @return 是否成功获取锁
     */
    public boolean tryLock(String lockKey, String requestId, long expireTime) {
        RedisScript<Long> script = new DefaultRedisScript<>(
                "if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then " +
                        "return redis.call('expire', KEYS[1], ARGV[2]) " +
                        "else " +
                        "return 0 " +
                        "end",
                Long.class
        );
 
        Long result = scriptExecutor.execute(script, Collections.singletonList(lockKey), requestId, expireTime);
        return result != null && result == 1;
    }
 
    /**
     * 释放分布式锁
     *
     * @param lockKey   锁的key
     * @param requestId 锁的唯一标识
     */
    public void releaseLock(String lockKey, String requestId) {
        RedisScript<Long> script = new DefaultRedisScript<>(
                "if redis.call('get', KEYS[1]) == ARGV[1] then " +
                        "return redis.call('del', KEYS[1]) " +
                        "else " +
                        "return 0 " +
                        "end",
                Long.class
        );
 
        scriptExecutor.execute(script, Collections.singletonList(lockKey), requestId);
    }
 
    /**
     * 生成唯一的锁标识
     *
     * @return 锁的唯一标识
     */
    public String generateRequestId() {
        return UUID.randomUUID().toString();
    }
}
  • 在预约服务中使用分布式锁来保证同一时间段只有一个用户可以进行预约:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
 
@Service
public class ReservationService {
 
    private RedisDistributedLock distributedLock;
    private ReservationRepository reservationRepository;
    private RabbitTemplate rabbitTemplate;
 
    @Autowired
    public ReservationService(RedisDistributedLock distributedLock, ReservationRepository reservationRepository, RabbitTemplate rabbitTemplate) {
        this.distributedLock = distributedLock;
        this.reservationRepository = reservationRepository;
        this.rabbitTemplate = rabbitTemplate;
    }
 
    @Transactional
    public void makeReservation(String listenerId, String userId, String timeSlot) {
        String lockKey = listenerId + "-" + timeSlot;
        String requestId = distributedLock.generateRequestId();
 
        try {
            // 尝试获取分布式锁,设置锁的过期时间为一定的秒数(例如10秒)
            if (distributedLock.tryLock(lockKey, requestId, 10)) {
                // 获取到锁,可以进行预约操作
 
                // 检查时间段是否已经被预约
                if (!isTimeSlotBooked(listenerId, timeSlot)) {
                    // 保存预约信息到数据库
                    Reservation reservation = new Reservation(listenerId, userId, timeSlot);
                    reservationRepository.save(reservation);
 
                    // 发送预约消息到RabbitMQ
                    rabbitTemplate.convertAndSend("reservation-exchange", "reservation-queue", reservation);
                } else {
                    // 时间段已经被预约,抛出异常或返回错误信息
                    throw new RuntimeException("The time slot is already booked.");
                }
            } else {
                // 未获取到锁,抛出异常或返回错误信息
                throw new RuntimeException("Failed to acquire lock for the time slot.");
            }
        } finally {
            // 释放分布式锁
            distributedLock.releaseLock(lockKey, requestId);
        }
    }
 
    private boolean isTimeSlotBooked(String listenerId, String timeSlot) {
        // 查询数据库或Redis,判断时间段是否已经被预约
        // 返回true表示已被预约,返回false表示未被预约
    }
}
  • 在配置类中配置RedisTemplate和RabbitTemplate
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
 
@Configuration
public class AppConfig {
 
    private RedisConnectionFactory redisConnectionFactory;
    private ConnectionFactory rabbitConnectionFactory;
 
    @Autowired
    public AppConfig(RedisConnectionFactory redisConnectionFactory, ConnectionFactory rabbitConnectionFactory) {
        this.redisConnectionFactory = redisConnectionFactory;
        this.rabbitConnectionFactory = rabbitConnectionFactory;
    }
 
    @Bean
    public RedisTemplate<String, String> redisTemplate() {
        RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(new StringRedisSerializer());
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
 
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
        rabbitTemplate.setConnectionFactory(rabbitConnectionFactory);
        return rabbitTemplate;
    }
 
    @Bean
    public RedisDistributedLock distributedLock(RedisTemplate<String, String> redisTemplate) {
        return new RedisDistributedLock(redisTemplate);
    }
}

RabbitMQ是一种消息队列系统,可以用于异步处理预约消息,提高系统的可伸缩性和稳定性。以下是实现RabbitMQ部分的步骤:


  • 创建预约消息的实体类:
public class ReservationMessage {
    private String listenerId;
    private String userId;
    private String timeSlot;
 
    // 构造方法、Getter和Setter方法省略
}
  • 创建预约消息的消费者:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
@Component
public class ReservationMessageConsumer {
 
    @RabbitListener(queues = "reservation-queue")
    public void processReservationMessage(ReservationMessage reservationMessage) {
        // 处理预约消息,例如发送通知给倾听者或用户
        System.out.println("Received reservation message: " + reservationMessage.toString());
    }
}
  • 在配置类中配置RabbitMQ相关的队列和交换机:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class RabbitMQConfig {
 
    @Bean
    public Queue reservationQueue() {
        return new Queue("reservation-queue");
    }
 
    @Bean
    public DirectExchange reservationExchange() {
        return new DirectExchange("reservation-exchange");
    }
 
    @Bean
    public Binding reservationBinding(Queue reservationQueue, DirectExchange reservationExchange) {
        return BindingBuilder.bind(reservationQueue).to(reservationExchange).with("reservation-queue");
    }
}
  • 在预约服务中发送预约消息到RabbitMQ:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
 
@Service
public class ReservationService {
 
    private RabbitTemplate rabbitTemplate;
 
    @Autowired
    public ReservationService(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }
 
    public void makeReservation(String listenerId, String userId, String timeSlot) {
        // 创建预约消息
        ReservationMessage reservationMessage = new ReservationMessage(listenerId, userId, timeSlot);
 
        // 发送预约消息到RabbitMQ
        rabbitTemplate.convertAndSend("reservation-exchange", "reservation-queue", reservationMessage);
    }
}


为了实现倾听者预约状态的管理,可以在Redis中建立一个临时状态表,与之前的分布式锁表类似。以下是该表的设计和状态介绍:


  1. 表名设计:使用倾听者的ID和日期作为表名,一次性生成七天的表。例如,表名可以是形如"listener_id_20240201"的格式。
  2. 状态类型:
  • 等待(Waiting):表示用户的预约请求已提交,但倾听者尚未对其进行处理。
  • 接受(Accepted):表示倾听者已接受用户的预约请求。
  • 拒绝(Rejected):表示倾听者已拒绝用户的预约请求。
  • 已处理(Processed):表示该预约请求已被处理,并且不再需要保留在状态表中。
  1. Redis表结构:使用Redis的哈希表结构来存储倾听者的预约状态。表中的键表示时间段,值表示对应时间段的预约状态。


以下是使用RedisTemplate实现倾听者预约状态管理的代码:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
 
import javax.annotation.PostConstruct;
import java.util.Map;
 
@Component
public class ReservationStatusManager {
 
    private static final String TABLE_PREFIX = "listener_";
 
    private RedisTemplate<String, Object> redisTemplate;
    private HashOperations<String, String, String> hashOperations;
 
    @Autowired
    public ReservationStatusManager(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }
 
    @PostConstruct
    private void init() {
        hashOperations = redisTemplate.opsForHash();
    }
 
    public void setReservationStatus(String listenerId, String date, String timeSlot, String status) {
        String tableName = TABLE_PREFIX + listenerId + "_" + date;
        hashOperations.put(tableName, timeSlot, status);
    }
 
    public String getReservationStatus(String listenerId, String date, String timeSlot) {
        String tableName = TABLE_PREFIX + listenerId + "_" + date;
        return hashOperations.get(tableName, timeSlot);
    }
 
    public void removeProcessedReservations(String listenerId, String date) {
        String tableName = TABLE_PREFIX + listenerId + "_" + date;
        redisTemplate.delete(tableName);
    }
 
    public Map<String, String> getAllReservationStatus(String listenerId, String date) {
        String tableName = TABLE_PREFIX + listenerId + "_" + date;
        return hashOperations.entries(tableName);
    }
}


我们通过注入RedisTemplate来操作Redis。在ReservationStatusManager类中,我们使用HashOperations来进行哈希表的操作。init()方法使用@PostConstruct注解进行初始化,确保RedisTemplate和HashOperations在对象创建后进行实例化。


setReservationStatus()方法用于设置预约状态,getReservationStatus()方法用于获取预约状态,removeProcessedReservations()方法用于删除已处理的预约状态。另外,getAllReservationStatus()方法可以用于获取某个倾听者在特定日期的所有预约状态。


那么这样就实现了  在多个用户想要同时预约同一个倾听者的同一时间时 所避免的数据混乱


我们这样做保证了数据的一致性 又使用了 以速度性能著称的Redis 和异步处理的RabbitMQ这样就能使得 在预约成功的一瞬间 完成通知倾听者的效果 达到了预期的业务效果


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
24天前
|
消息中间件 缓存 NoSQL
Redis 高并发竞争 key ,如何解决这个难点?
本文主要探讨 Redis 在高并发场景下的并发竞争 Key 问题,以及较为常用的两种解决方案(分布式锁+时间戳、利用消息队列)。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
Redis 高并发竞争 key ,如何解决这个难点?
|
26天前
|
存储 运维 负载均衡
构建高可用性GraphRAG系统:分布式部署与容错机制
【10月更文挑战第28天】作为一名数据科学家和系统架构师,我在构建和维护大规模分布式系统方面有着丰富的经验。最近,我负责了一个基于GraphRAG(Graph Retrieval-Augmented Generation)模型的项目,该模型用于构建一个高可用性的问答系统。在这个过程中,我深刻体会到分布式部署和容错机制的重要性。本文将详细介绍如何在生产环境中构建一个高可用性的GraphRAG系统,包括分布式部署方案、负载均衡、故障检测与恢复机制等方面的内容。
84 4
构建高可用性GraphRAG系统:分布式部署与容错机制
|
26天前
|
缓存 NoSQL 中间件
redis高并发缓存中间件总结!
本文档详细介绍了高并发缓存中间件Redis的原理、高级操作及其在电商架构中的应用。通过阿里云的角度,分析了Redis与架构的关系,并展示了无Redis和使用Redis缓存的架构图。文档还涵盖了Redis的基本特性、应用场景、安装部署步骤、配置文件详解、启动和关闭方法、systemctl管理脚本的生成以及日志警告处理等内容。适合初学者和有一定经验的技术人员参考学习。
130 7
|
21天前
|
NoSQL Java Redis
springCloud中将redis共用到common模块
通过将Redis配置和操作服务提取到Common模块,可以在Spring Cloud微服务架构中实现高效的代码复用和统一管理。这种设计不仅简化了各个服务的配置和依赖管理,还提高了代码的可维护性和可读性。希望本文对你在Spring Cloud项目中集成和使用Redis有所帮助。
27 0
|
2月前
|
消息中间件 中间件 数据库
NServiceBus:打造企业级服务总线的利器——深度解析这一面向消息中间件如何革新分布式应用开发与提升系统可靠性
【10月更文挑战第9天】NServiceBus 是一个面向消息的中间件,专为构建分布式应用程序设计,特别适用于企业级服务总线(ESB)。它通过消息队列实现服务间的解耦,提高系统的可扩展性和容错性。在 .NET 生态中,NServiceBus 提供了强大的功能,支持多种传输方式如 RabbitMQ 和 Azure Service Bus。通过异步消息传递模式,各组件可以独立运作,即使某部分出现故障也不会影响整体系统。 示例代码展示了如何使用 NServiceBus 发送和接收消息,简化了系统的设计和维护。
52 3
|
2月前
|
消息中间件 存储 监控
消息队列系统中的确认机制在分布式系统中如何实现
消息队列系统中的确认机制在分布式系统中如何实现
|
2月前
|
存储 缓存 NoSQL
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
66 4
|
2月前
|
缓存 NoSQL Ubuntu
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
57 3
|
2月前
|
消息中间件 存储 监控
【10月更文挑战第2天】消息队列系统中的确认机制在分布式系统中如何实现
【10月更文挑战第2天】消息队列系统中的确认机制在分布式系统中如何实现
|
2月前
|
存储 开发框架 .NET
C#语言如何搭建分布式文件存储系统
C#语言如何搭建分布式文件存储系统
74 2