【Redis项目实战】使用Springcloud整合Redis分布式锁+RabbitMQ技术实现高并发预约管理处理系统

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: 【Redis项目实战】使用Springcloud整合Redis分布式锁+RabbitMQ技术实现高并发预约管理处理系统

开发目的:


开发一个高并发预约管理处理系统,其中用户可以预约倾听者。由于并发预约可能导致冲突和混乱,需要实现分布式锁来确保同一时间段只有一个用户可以进行预约。为了实现这一目的,使用Redis作为分布式锁的存储介质,并设计一组表来存储倾听者的预约情况。


功能介绍:


  1. 高并发预约管理:系统能够处理大量用户同时预约倾听者的情况,通过使用分布式锁来保证同一时间段只有一个用户可以进行预约,防止冲突和混乱。


  1. 分布式锁实现:系统使用Redis作为分布式锁的存储介质,通过设置键值对来实现分布式锁。具体地,使用一组表来存储倾听者的预约情况,表名由倾听者的ID和日期组成。每个表使用Redis的哈希表结构,其中键表示时间段,值表示该时间段是否已被预约(真或假)。通过对这些表的操作,系统实现了分布式锁的效果。


  1. 预约冲突检测:在用户进行预约时,系统会检查对应倾听者的预约情况表,判断该时间段是否已被其他用户预约。如果时间段已被预约,则系统会阻止当前用户的预约请求,以避免冲突。


  1. 数据持久化:用户的预约信息会被保存到数据库中,以便后续查询和处理。同时,通过RabbitMQ等消息队列技术,系统可以将预约信息发送到其他模块进行处理。


技术实现步骤:


  • 系统中已经集成了Spring Cloud、Redis和RabbitMQ相关依赖。
  • 创建Redis分布式锁的工具类:
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.core.script.ScriptExecutor;
import org.springframework.data.redis.serializer.RedisSerializer;
 
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
 
public class RedisDistributedLock {
 
    private RedisTemplate<String, String> redisTemplate;
    private ScriptExecutor<String> scriptExecutor;
    private RedisSerializer<String> redisSerializer;
 
    public RedisDistributedLock(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
        this.scriptExecutor = this.redisTemplate.getScriptExecutor();
        this.redisSerializer = this.redisTemplate.getStringSerializer();
    }
 
    /**
     * 尝试获取分布式锁
     *
     * @param lockKey     锁的key
     * @param requestId   锁的唯一标识
     * @param expireTime  锁的过期时间(单位:秒)
     * @return 是否成功获取锁
     */
    public boolean tryLock(String lockKey, String requestId, long expireTime) {
        RedisScript<Long> script = new DefaultRedisScript<>(
                "if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then " +
                        "return redis.call('expire', KEYS[1], ARGV[2]) " +
                        "else " +
                        "return 0 " +
                        "end",
                Long.class
        );
 
        Long result = scriptExecutor.execute(script, Collections.singletonList(lockKey), requestId, expireTime);
        return result != null && result == 1;
    }
 
    /**
     * 释放分布式锁
     *
     * @param lockKey   锁的key
     * @param requestId 锁的唯一标识
     */
    public void releaseLock(String lockKey, String requestId) {
        RedisScript<Long> script = new DefaultRedisScript<>(
                "if redis.call('get', KEYS[1]) == ARGV[1] then " +
                        "return redis.call('del', KEYS[1]) " +
                        "else " +
                        "return 0 " +
                        "end",
                Long.class
        );
 
        scriptExecutor.execute(script, Collections.singletonList(lockKey), requestId);
    }
 
    /**
     * 生成唯一的锁标识
     *
     * @return 锁的唯一标识
     */
    public String generateRequestId() {
        return UUID.randomUUID().toString();
    }
}
  • 在预约服务中使用分布式锁来保证同一时间段只有一个用户可以进行预约:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
 
@Service
public class ReservationService {
 
    private RedisDistributedLock distributedLock;
    private ReservationRepository reservationRepository;
    private RabbitTemplate rabbitTemplate;
 
    @Autowired
    public ReservationService(RedisDistributedLock distributedLock, ReservationRepository reservationRepository, RabbitTemplate rabbitTemplate) {
        this.distributedLock = distributedLock;
        this.reservationRepository = reservationRepository;
        this.rabbitTemplate = rabbitTemplate;
    }
 
    @Transactional
    public void makeReservation(String listenerId, String userId, String timeSlot) {
        String lockKey = listenerId + "-" + timeSlot;
        String requestId = distributedLock.generateRequestId();
 
        try {
            // 尝试获取分布式锁,设置锁的过期时间为一定的秒数(例如10秒)
            if (distributedLock.tryLock(lockKey, requestId, 10)) {
                // 获取到锁,可以进行预约操作
 
                // 检查时间段是否已经被预约
                if (!isTimeSlotBooked(listenerId, timeSlot)) {
                    // 保存预约信息到数据库
                    Reservation reservation = new Reservation(listenerId, userId, timeSlot);
                    reservationRepository.save(reservation);
 
                    // 发送预约消息到RabbitMQ
                    rabbitTemplate.convertAndSend("reservation-exchange", "reservation-queue", reservation);
                } else {
                    // 时间段已经被预约,抛出异常或返回错误信息
                    throw new RuntimeException("The time slot is already booked.");
                }
            } else {
                // 未获取到锁,抛出异常或返回错误信息
                throw new RuntimeException("Failed to acquire lock for the time slot.");
            }
        } finally {
            // 释放分布式锁
            distributedLock.releaseLock(lockKey, requestId);
        }
    }
 
    private boolean isTimeSlotBooked(String listenerId, String timeSlot) {
        // 查询数据库或Redis,判断时间段是否已经被预约
        // 返回true表示已被预约,返回false表示未被预约
    }
}
  • 在配置类中配置RedisTemplate和RabbitTemplate
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
 
@Configuration
public class AppConfig {
 
    private RedisConnectionFactory redisConnectionFactory;
    private ConnectionFactory rabbitConnectionFactory;
 
    @Autowired
    public AppConfig(RedisConnectionFactory redisConnectionFactory, ConnectionFactory rabbitConnectionFactory) {
        this.redisConnectionFactory = redisConnectionFactory;
        this.rabbitConnectionFactory = rabbitConnectionFactory;
    }
 
    @Bean
    public RedisTemplate<String, String> redisTemplate() {
        RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(new StringRedisSerializer());
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
 
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
        rabbitTemplate.setConnectionFactory(rabbitConnectionFactory);
        return rabbitTemplate;
    }
 
    @Bean
    public RedisDistributedLock distributedLock(RedisTemplate<String, String> redisTemplate) {
        return new RedisDistributedLock(redisTemplate);
    }
}

RabbitMQ是一种消息队列系统,可以用于异步处理预约消息,提高系统的可伸缩性和稳定性。以下是实现RabbitMQ部分的步骤:


  • 创建预约消息的实体类:
public class ReservationMessage {
    private String listenerId;
    private String userId;
    private String timeSlot;
 
    // 构造方法、Getter和Setter方法省略
}
  • 创建预约消息的消费者:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
@Component
public class ReservationMessageConsumer {
 
    @RabbitListener(queues = "reservation-queue")
    public void processReservationMessage(ReservationMessage reservationMessage) {
        // 处理预约消息,例如发送通知给倾听者或用户
        System.out.println("Received reservation message: " + reservationMessage.toString());
    }
}
  • 在配置类中配置RabbitMQ相关的队列和交换机:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class RabbitMQConfig {
 
    @Bean
    public Queue reservationQueue() {
        return new Queue("reservation-queue");
    }
 
    @Bean
    public DirectExchange reservationExchange() {
        return new DirectExchange("reservation-exchange");
    }
 
    @Bean
    public Binding reservationBinding(Queue reservationQueue, DirectExchange reservationExchange) {
        return BindingBuilder.bind(reservationQueue).to(reservationExchange).with("reservation-queue");
    }
}
  • 在预约服务中发送预约消息到RabbitMQ:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
 
@Service
public class ReservationService {
 
    private RabbitTemplate rabbitTemplate;
 
    @Autowired
    public ReservationService(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }
 
    public void makeReservation(String listenerId, String userId, String timeSlot) {
        // 创建预约消息
        ReservationMessage reservationMessage = new ReservationMessage(listenerId, userId, timeSlot);
 
        // 发送预约消息到RabbitMQ
        rabbitTemplate.convertAndSend("reservation-exchange", "reservation-queue", reservationMessage);
    }
}


为了实现倾听者预约状态的管理,可以在Redis中建立一个临时状态表,与之前的分布式锁表类似。以下是该表的设计和状态介绍:


  1. 表名设计:使用倾听者的ID和日期作为表名,一次性生成七天的表。例如,表名可以是形如"listener_id_20240201"的格式。
  2. 状态类型:
  • 等待(Waiting):表示用户的预约请求已提交,但倾听者尚未对其进行处理。
  • 接受(Accepted):表示倾听者已接受用户的预约请求。
  • 拒绝(Rejected):表示倾听者已拒绝用户的预约请求。
  • 已处理(Processed):表示该预约请求已被处理,并且不再需要保留在状态表中。
  1. Redis表结构:使用Redis的哈希表结构来存储倾听者的预约状态。表中的键表示时间段,值表示对应时间段的预约状态。


以下是使用RedisTemplate实现倾听者预约状态管理的代码:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
 
import javax.annotation.PostConstruct;
import java.util.Map;
 
@Component
public class ReservationStatusManager {
 
    private static final String TABLE_PREFIX = "listener_";
 
    private RedisTemplate<String, Object> redisTemplate;
    private HashOperations<String, String, String> hashOperations;
 
    @Autowired
    public ReservationStatusManager(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }
 
    @PostConstruct
    private void init() {
        hashOperations = redisTemplate.opsForHash();
    }
 
    public void setReservationStatus(String listenerId, String date, String timeSlot, String status) {
        String tableName = TABLE_PREFIX + listenerId + "_" + date;
        hashOperations.put(tableName, timeSlot, status);
    }
 
    public String getReservationStatus(String listenerId, String date, String timeSlot) {
        String tableName = TABLE_PREFIX + listenerId + "_" + date;
        return hashOperations.get(tableName, timeSlot);
    }
 
    public void removeProcessedReservations(String listenerId, String date) {
        String tableName = TABLE_PREFIX + listenerId + "_" + date;
        redisTemplate.delete(tableName);
    }
 
    public Map<String, String> getAllReservationStatus(String listenerId, String date) {
        String tableName = TABLE_PREFIX + listenerId + "_" + date;
        return hashOperations.entries(tableName);
    }
}


我们通过注入RedisTemplate来操作Redis。在ReservationStatusManager类中,我们使用HashOperations来进行哈希表的操作。init()方法使用@PostConstruct注解进行初始化,确保RedisTemplate和HashOperations在对象创建后进行实例化。


setReservationStatus()方法用于设置预约状态,getReservationStatus()方法用于获取预约状态,removeProcessedReservations()方法用于删除已处理的预约状态。另外,getAllReservationStatus()方法可以用于获取某个倾听者在特定日期的所有预约状态。


那么这样就实现了  在多个用户想要同时预约同一个倾听者的同一时间时 所避免的数据混乱


我们这样做保证了数据的一致性 又使用了 以速度性能著称的Redis 和异步处理的RabbitMQ这样就能使得 在预约成功的一瞬间 完成通知倾听者的效果 达到了预期的业务效果


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
3天前
|
存储 监控 NoSQL
【Redis】分布式锁及其他常见问题
【Redis】分布式锁及其他常见问题
16 0
|
3天前
|
NoSQL Java Redis
【Redis】Redis实现分布式锁
【Redis】Redis实现分布式锁
7 0
|
3天前
|
NoSQL 数据可视化 Java
【个人博客系统 × Redis】“最后的升级” · 连接Redis · Redis的基本使用
【个人博客系统 × Redis】“最后的升级” · 连接Redis · Redis的基本使用
5 0
|
3天前
|
存储 缓存 监控
利用Redis构建高性能的缓存系统
在现今高负载、高并发的互联网应用中,缓存系统的重要性不言而喻。Redis,作为一款开源的、内存中的数据结构存储系统,它可以用作数据库、缓存和消息代理。本文将深入探讨Redis的核心特性,以及如何利用Redis构建高性能的缓存系统,并通过实际案例展示Redis在提升系统性能方面的巨大潜力。
|
3天前
|
存储 NoSQL 测试技术
Redis数据存储系统为什么快?
Redis的快速并非偶然,而是深思熟虑的设计理念的结果。通过将数据存储于内存、采用单线程模型、实现非阻塞I/O等独特的技术选择,Redis在高并发和低延迟方面展现了卓越的表现。
39 16
|
3天前
|
监控 NoSQL 算法
探秘Redis分布式锁:实战与注意事项
本文介绍了Redis分区容错中的分布式锁概念,包括利用Watch实现乐观锁和使用setnx防止库存超卖。乐观锁通过Watch命令监控键值变化,在事务中执行修改,若键值被改变则事务失败。Java代码示例展示了具体实现。setnx命令用于库存操作,确保无超卖,通过设置锁并检查库存来更新。文章还讨论了分布式锁存在的问题,如客户端阻塞、时钟漂移和单点故障,并提出了RedLock算法来提高可靠性。Redisson作为生产环境的分布式锁实现,提供了可重入锁、读写锁等高级功能。最后,文章对比了Redis、Zookeeper和etcd的分布式锁特性。
134 16
探秘Redis分布式锁:实战与注意事项
|
3天前
|
NoSQL Java 大数据
介绍redis分布式锁
分布式锁是解决多进程在分布式环境中争夺资源的问题,与本地锁相似但适用于不同进程。以Redis为例,通过`setIfAbsent`实现占锁,加锁同时设置过期时间避免死锁。然而,获取锁与设置过期时间非原子性可能导致并发问题,解决方案是使用`setIfAbsent`的超时参数。此外,释放锁前需验证归属,防止误删他人锁,可借助Lua脚本确保原子性。实际应用中还有锁续期、重试机制等复杂问题,现成解决方案如RedisLockRegistry和Redisson。
|
3天前
|
缓存 NoSQL Java
【亮剑】分布式锁是保证多服务实例同步的关键机制,常用于互斥访问共享资源、控制访问顺序和系统保护,如何使用注解来实现 Redis 分布式锁的功能?
【4月更文挑战第30天】分布式锁是保证多服务实例同步的关键机制,常用于互斥访问共享资源、控制访问顺序和系统保护。基于 Redis 的分布式锁利用 SETNX 或 SET 命令实现,并考虑自动过期、可重入及原子性以确保可靠性。在 Java Spring Boot 中,可通过 `@EnableCaching`、`@Cacheable` 和 `@CacheEvict` 注解轻松实现 Redis 分布式锁功能。
|
3天前
|
NoSQL Redis 微服务
分布式锁_redis实现
分布式锁_redis实现
|
3天前
|
负载均衡 监控 NoSQL
Redis的几种主要集群方案
【5月更文挑战第15天】Redis集群方案包括主从复制(基础,读写分离,手动故障恢复)、哨兵模式(自动高可用,自动故障转移)和Redis Cluster(官方分布式解决方案,自动分片、容错和扩展)。此外,还有Codis、Redisson和Twemproxy等工具用于代理分片和负载均衡。选择方案需考虑应用场景、数据量和并发需求,权衡可用性、性能和扩展性。
29 2

热门文章

最新文章