手拉手Springboot+RocketMQ+Redis抢单实现10W级QPS

本文涉及的产品
云服务器 ECS,每月免费额度200元 3个月
云服务器ECS,u1 2核4GB 1个月
简介: 手拉手Springboot+RocketMQ+Redis抢单实现10W级QPS

环境介绍

技术

springboot+mybatis-plus+mysql+rocketmq

软件

版本

mysql

8

IDEA

IntelliJ IDEA 2022.2.1

JDK

17

Spring Boot

3.1.7

Redis

7.0.4

mybatis-plus

3.5.3.2

rocketmq

4.9.4

QPS:每秒处理请求的数量

高并发:很短时间内处理大量请求

并发:多个请求在同一时间内执行(模拟淘宝抢单活动)

并行:多核CPU说多给任务在同一时刻进行

synchronized (this):同步方法支持一种简单的策略来防止线程受到干扰和内存一致性错误;如果一个对象对多个线程可见,则对该对象变量的所有读取或写入都是通过同步方法完成。通俗点来说就是程序中用于保护线程安全的一种机制。

-十万级QPS

假设:一个用户ID只能抢一个商品

RedisTemplate常用方法

redisTemplate.hasKey(key);                //判断是否有key所对应的值,有则返回true,没有则返回false

redisTemplate.opsForValue().get(key);    //有则取出key值所对应的值

redisTemplate.delete(key);                //删除单个key值

redisTemplate.delete(keys);             //其中keys:Collection<K> keys

redisTemplate.dump(key);                //将当前传入的key值序列化为byte[]类型

redisTemplate.expire(key, timeout, unit);    //设置过期时间

redisTemplate.expireAt(key, date);        //设置过期时间

redisTemplate.keys(pattern);            //查找匹配的key值,返回一个Set集合类型

redisTemplate.rename(oldKey, newKey);    //返回传入key所存储的值的类型

redisTemplate.renameIfAbsent(oldKey, newKey);    //如果旧值存在时,将旧值改为新值

redisTemplate.randomKey();                //从redis中随机取出一个key

redisTemplate.getExpire(key);            //返回当前key所对应的剩余过期时间

redisTemplate.getExpire(key, unit);        //返回剩余过期时间并且指定时间单位

redisTemplate.persist(key);                //将key持久化保存

redisTemplate.move(key, dbIndex);        //将当前数据库的key移动到指定redis中数据库当中

String类型

ValueOperations opsForValue = redisTemplate.opsForValue();

opsForValue.set(key, value);    //设置当前的key以及value值

opsForValue.set(key, value, offset);//用 value 参数覆写给定 key 所储存的字符串值,从偏移量 offset 开始

opsForValue.set(key, value, timeout, unit);     //设置当前的key以及value值并且设置过期时间

opsForValue.setBit(key, offset, value);    //将二进制第offset位值变为value

opsForValue.setIfAbsent(key, value);//重新设置key对应的值,如果存在返回false,否则返回true

opsForValue.get(key, start, end);    //返回key中字符串的子字符

opsForValue.getAndSet(key, value);    //将旧的key设置为value,并且返回旧的key

opsForValue.multiGet(keys);            //批量获取值

opsForValue.size(key);                //获取字符串的长度

opsForValue.append(key, value);    //在原有的值基础上新增字符串到末尾

opsForValue.increment(key,double increment);//以增量的方式将double值存储在变量中

opsForValue.increment(key,long increment);    //通过increment(K key, long delta)方法以增量方式存储long值(正值则自增,负值则自减)

Map valueMap = new HashMap();

valueMap.put("valueMap1","map1");

valueMap.put("valueMap2","map2");

valueMap.put("valueMap3","map3");

opsForValue.multiSetIfAbsent(valueMap);     //如果对应的map集合名称不存在,则添加否则不做修改

opsForValue.multiSet(valueMap);                //设置map集合到redis

数据库表

Yml配置

redis:

host: 192.168.68.133

port: 6379

password: xxxxxx

database: 12

mapper

@Mapper

public interface GoodsMapper extends BaseMapper<Goods> {


@Select("SELECT id,stocks FROM goods WHERE `status`= 1 and spike = 1")

List<Goods> getAllGoodsFlash();

}

@Mapper

public interface SnlogsMapper extends BaseMapper<Snlogs> {


}

Service

public interface GoodsService extends IService<Goods> {


void FlashSaleBusiness(Integer userID, Integer goodsID);

}

public interface SnlogsService extends IService<Snlogs> {

}

@Service

public class GoodsServiceImpl extends ServiceImpl<GoodsMapper, Goods>

implements GoodsService{

@Resource

private GoodsMapper goodsMapper;

@Resource

private SnlogsMapper snlogsMapper;


@Override

@Transactional(rollbackFor = Exception.class)

public void FlashSaleBusiness(Integer userID, Integer goodsID) {

//减库存

Goods goods = goodsMapper.selectById(goodsID);

Integer stocks = goods.getStocks()-1;

if (stocks < 0){

throw new RuntimeException("商品ID"+goods.getId()+"用户ID:"+userID+"抢单失败");

}

goods.setStocks(stocks);

goods.setUpdateTime(new Date());

goodsMapper.updateById(goods);

//生成订单

Snlogs snlogs = new Snlogs();

snlogs.setCreateTime(new Date());

snlogs.setUserId(userID);

snlogs.setGoodsId(goodsID);

String sn = String.valueOf(UUID.randomUUID());

snlogs.setOrderSn(userID+"-"+goodsID+"-"+sn);

snlogsMapper.insert(snlogs);

}

}

@Service

public class SnlogsServiceImpl extends ServiceImpl<SnlogsMapper, Snlogs>

implements SnlogsService{

}

初始化Mysql同步Redis

@Component

public class DataSync {

//定时任务,将mysql库存定时同步到redis

//@Scheduled(initialDelay = 1000,fixedDelay = 1000) initialDelay初始化一秒后执行,fixedDelay每间隔1秒后执行

//@Scheduled(cron = "0 0 8 0 0 ?") //分时年月 每天8点执行

//public void MysqlSyncRedis(){

//

//}

@Resource

private GoodsMapper goodsMapper;


@Resource

private RedisTemplate redisTemplate;


/**

* 项目启动后,将mysql库存同步到redis

* 初始化(PostConstruct,InitializingBean,BeanPostProcessor)

*/

@PostConstruct

public void initGoodsData(){

List<Goods> goods =goodsMapper.getAllGoodsFlash();

if (!CollectionUtils.isEmpty(goods)){

goods.forEach(good->{

redisTemplate.opsForValue().set("goodsId:"+String.valueOf(good.getId()),String.valueOf(good.getStocks()));

});

}

}

}

生产者Controller

@RestController

@RequestMapping("/A")

public class FlashSaleController {


@Resource

private RedisTemplate redisTemplate;


@Resource

private RocketMQTemplate rocketMQTemplate;



/**

* 1.去重(查询该用户是否已经参与秒杀活动)

* 2.扣减库存

* 3.消息传入MQ

* @param goodsId

* @return

*/

@GetMapping("/flashSaleA")

public String flashSaleTest(Integer goodsId){

//模拟用户id

int userid = 200019987;

/**

* 1.查询去重-- 每个商品只能抢一次

* userid + ":" + goodsId;

*/

String uKey = userid + ":" + goodsId;

System.out.println("uKey:"+uKey);

//redis存入唯一标识

Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(uKey, "");

if (!aBoolean) {

return "你已经秒杀过了";

}


/**s

*2.扣减redis库存

*/

Long count = redisTemplate.opsForValue().decrement("goodsId:"+goodsId);

if (count < 0) {

return "库存消耗完了";

}

/**

*3.发送至MQ

*/

rocketMQTemplate.asyncSend("FlashSaleTopic", uKey, new SendCallback() {

@Override

public void onSuccess(SendResult sendResult) {

System.out.println("发送成功");

}


@Override

public void onException(Throwable throwable) {

System.out.println("发送失败:"+throwable.getMessage());

}

});


return "抢单成功,请稍后查看清单信息";

}



}

消费者FlashMQListener

@Component

@RocketMQMessageListener(

topic = "FlashSaleTopic",

consumerGroup = "FlashSaleConsumerGroup",

consumeMode = ConsumeMode.CONCURRENTLY,

consumeThreadMax = 10)

public class FlashMQListener implements RocketMQListener<MessageExt> {

@Resource

private GoodsService goodsService;

/**

* onMessage 消费者方法

* @param messages 消息内容

*/

@Override

public void onMessage(MessageExt messages) {

//userid + ":" + goodsId;

System.out.println("接收到消息:"+new String(messages.getBody()));

String msg = new String(messages.getBody());

Integer userID = Integer.valueOf(msg.split(":")[0]);

Integer goodsID = Integer.valueOf(msg.split(":")[1]);

//4.减库存,添加订单信息--1、业务代码事务外加锁可实现线程安全 或2、数据库语句行锁update goods set stocks = stocks -1,update_time = now() where id = 1;

synchronized (this){

goodsService.FlashSaleBusiness(userID,goodsID);

}


}

}

验证

压力测试

压力测试工具apache-jmeter

https://dlcdn.apache.org//jmeter/binaries/apache-jmeter-5.6.3.zip

消息堆积问题

单条队列消息差值大于5w条,算消息堆积问题,根据引用场景定义

消息堆积的情况:

一、生产者生产太快

处理方式:

1、 生产者做业务限流

2、动态增加消费者数量(rocketmq-dashboard-1.0.0.jar)

3、增加消费者数量,但是消费者线程数量要<=队列数。队列确定后不建议更改,根据实际场景修改

IO型:逻辑处理器数*2, cpu型:逻辑处理器数+1

consumeThreadMax = 24

二、消费者产问题(程序奔溃,BUG等)

处理方式:

1、重启消费者程序,新增消费者

2、重置消费位点(再次消费)

3、 跳过堆积

消息丢失

1、生产者使用同步发送模式,MQ消息中间件返回确认后,执行业务程序,数据写入消息状态和创建时间

2、消费者消费后 ,修改数据状态

3、开启MQ的trace机制,消息跟踪机制

4、使用集群模式,主倍模式,将消息持久化在不同硬盘

5、MQ的刷盘机制设置为同步刷盘,性能相对不高(磁盘:随机读写,顺序读写),机械(随机读写比固态快)

6、数据库持久化,log日记

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
10天前
|
消息中间件 Java Spring
SpringBoot实现RabbitMQ的简单队列(SpringAMQP 实现简单队列)
SpringBoot实现RabbitMQ的简单队列(SpringAMQP 实现简单队列)
24 1
|
10天前
|
消息中间件 Java Spring
SpringBoot实现RabbitMQ的广播交换机(SpringAMQP 实现Fanout广播交换机)
SpringBoot实现RabbitMQ的广播交换机(SpringAMQP 实现Fanout广播交换机)
19 2
|
10天前
|
消息中间件 JSON Java
RabbitMQ的springboot项目集成使用-01
RabbitMQ的springboot项目集成使用-01
|
10天前
|
消息中间件 Java Spring
Springboot 集成Rabbitmq之延时队列
Springboot 集成Rabbitmq之延时队列
15 0
|
10天前
|
JSON NoSQL Java
深入浅出Redis(十三):SpringBoot整合Redis客户端
深入浅出Redis(十三):SpringBoot整合Redis客户端
|
10天前
|
缓存 NoSQL Java
springboot业务开发--springboot集成redis解决缓存雪崩穿透问题
该文介绍了缓存使用中可能出现的三个问题及解决方案:缓存穿透、缓存击穿和缓存雪崩。为防止缓存穿透,可校验请求数据并缓存空值;缓存击穿可采用限流、热点数据预加载或加锁策略;缓存雪崩则需避免同一时间大量缓存失效,可设置随机过期时间。文章还提及了Spring Boot中Redis缓存的配置,包括缓存null值、使用前缀和自定义过期时间,并提供了改造代码以实现缓存到期时间的个性化设置。
|
10天前
|
存储 NoSQL Java
Spring Boot与Redis:整合与实战
【4月更文挑战第29天】Redis,作为一个高性能的键值存储数据库,广泛应用于缓存、消息队列、会话存储等多种场景中。在Spring Boot应用中整合Redis可以显著提高数据处理的效率和应用的响应速度。
31 0
|
10天前
|
消息中间件 Java
SpringBoot基于RabbitMQ实现死信队列 (SpringBoot整合RabbitMQ实战篇)
SpringBoot基于RabbitMQ实现死信队列 (SpringBoot整合RabbitMQ实战篇)
42 1
|
10天前
|
消息中间件 安全 Java
SpringBoot基于RabbitMQ实现消息可靠性
SpringBoot基于RabbitMQ实现消息可靠性
46 0
|
10天前
|
消息中间件 Java
SpringBoot实现RabbitMQ的通配符交换机(SpringAMQP 实现Topic交换机)
SpringBoot实现RabbitMQ的通配符交换机(SpringAMQP 实现Topic交换机)
15 1