手拉手Springboot+RocketMQ+Redis抢单实现10W级QPS

简介: 手拉手Springboot+RocketMQ+Redis抢单实现10W级QPS

环境介绍

技术

springboot+mybatis-plus+mysql+rocketmq

软件

版本

mysql

8

IDEA

IntelliJ IDEA 2022.2.1

JDK

17

Spring Boot

3.1.7

Redis

7.0.4

mybatis-plus

3.5.3.2

rocketmq

4.9.4

QPS:每秒处理请求的数量

高并发:很短时间内处理大量请求

并发:多个请求在同一时间内执行(模拟淘宝抢单活动)

并行:多核CPU说多给任务在同一时刻进行

synchronized (this):同步方法支持一种简单的策略来防止线程受到干扰和内存一致性错误;如果一个对象对多个线程可见,则对该对象变量的所有读取或写入都是通过同步方法完成。通俗点来说就是程序中用于保护线程安全的一种机制。

-十万级QPS

假设:一个用户ID只能抢一个商品

RedisTemplate常用方法

redisTemplate.hasKey(key);                //判断是否有key所对应的值,有则返回true,没有则返回false

redisTemplate.opsForValue().get(key);    //有则取出key值所对应的值

redisTemplate.delete(key);                //删除单个key值

redisTemplate.delete(keys);             //其中keys:Collection<K> keys

redisTemplate.dump(key);                //将当前传入的key值序列化为byte[]类型

redisTemplate.expire(key, timeout, unit);    //设置过期时间

redisTemplate.expireAt(key, date);        //设置过期时间

redisTemplate.keys(pattern);            //查找匹配的key值,返回一个Set集合类型

redisTemplate.rename(oldKey, newKey);    //返回传入key所存储的值的类型

redisTemplate.renameIfAbsent(oldKey, newKey);    //如果旧值存在时,将旧值改为新值

redisTemplate.randomKey();                //从redis中随机取出一个key

redisTemplate.getExpire(key);            //返回当前key所对应的剩余过期时间

redisTemplate.getExpire(key, unit);        //返回剩余过期时间并且指定时间单位

redisTemplate.persist(key);                //将key持久化保存

redisTemplate.move(key, dbIndex);        //将当前数据库的key移动到指定redis中数据库当中

String类型

ValueOperations opsForValue = redisTemplate.opsForValue();

opsForValue.set(key, value);    //设置当前的key以及value值

opsForValue.set(key, value, offset);//用 value 参数覆写给定 key 所储存的字符串值,从偏移量 offset 开始

opsForValue.set(key, value, timeout, unit);     //设置当前的key以及value值并且设置过期时间

opsForValue.setBit(key, offset, value);    //将二进制第offset位值变为value

opsForValue.setIfAbsent(key, value);//重新设置key对应的值,如果存在返回false,否则返回true

opsForValue.get(key, start, end);    //返回key中字符串的子字符

opsForValue.getAndSet(key, value);    //将旧的key设置为value,并且返回旧的key

opsForValue.multiGet(keys);            //批量获取值

opsForValue.size(key);                //获取字符串的长度

opsForValue.append(key, value);    //在原有的值基础上新增字符串到末尾

opsForValue.increment(key,double increment);//以增量的方式将double值存储在变量中

opsForValue.increment(key,long increment);    //通过increment(K key, long delta)方法以增量方式存储long值(正值则自增,负值则自减)

Map valueMap = new HashMap();

valueMap.put("valueMap1","map1");

valueMap.put("valueMap2","map2");

valueMap.put("valueMap3","map3");

opsForValue.multiSetIfAbsent(valueMap);     //如果对应的map集合名称不存在,则添加否则不做修改

opsForValue.multiSet(valueMap);                //设置map集合到redis

数据库表

Yml配置

redis:

host: 192.168.68.133

port: 6379

password: xxxxxx

database: 12

mapper

@Mapper

public interface GoodsMapper extends BaseMapper<Goods> {


@Select("SELECT id,stocks FROM goods WHERE `status`= 1 and spike = 1")

List<Goods> getAllGoodsFlash();

}

@Mapper

public interface SnlogsMapper extends BaseMapper<Snlogs> {


}

Service

public interface GoodsService extends IService<Goods> {


void FlashSaleBusiness(Integer userID, Integer goodsID);

}

public interface SnlogsService extends IService<Snlogs> {

}

@Service

public class GoodsServiceImpl extends ServiceImpl<GoodsMapper, Goods>

implements GoodsService{

@Resource

private GoodsMapper goodsMapper;

@Resource

private SnlogsMapper snlogsMapper;


@Override

@Transactional(rollbackFor = Exception.class)

public void FlashSaleBusiness(Integer userID, Integer goodsID) {

//减库存

Goods goods = goodsMapper.selectById(goodsID);

Integer stocks = goods.getStocks()-1;

if (stocks < 0){

throw new RuntimeException("商品ID"+goods.getId()+"用户ID:"+userID+"抢单失败");

}

goods.setStocks(stocks);

goods.setUpdateTime(new Date());

goodsMapper.updateById(goods);

//生成订单

Snlogs snlogs = new Snlogs();

snlogs.setCreateTime(new Date());

snlogs.setUserId(userID);

snlogs.setGoodsId(goodsID);

String sn = String.valueOf(UUID.randomUUID());

snlogs.setOrderSn(userID+"-"+goodsID+"-"+sn);

snlogsMapper.insert(snlogs);

}

}

@Service

public class SnlogsServiceImpl extends ServiceImpl<SnlogsMapper, Snlogs>

implements SnlogsService{

}

初始化Mysql同步Redis

@Component

public class DataSync {

//定时任务,将mysql库存定时同步到redis

//@Scheduled(initialDelay = 1000,fixedDelay = 1000) initialDelay初始化一秒后执行,fixedDelay每间隔1秒后执行

//@Scheduled(cron = "0 0 8 0 0 ?") //分时年月 每天8点执行

//public void MysqlSyncRedis(){

//

//}

@Resource

private GoodsMapper goodsMapper;


@Resource

private RedisTemplate redisTemplate;


/**

* 项目启动后,将mysql库存同步到redis

* 初始化(PostConstruct,InitializingBean,BeanPostProcessor)

*/

@PostConstruct

public void initGoodsData(){

List<Goods> goods =goodsMapper.getAllGoodsFlash();

if (!CollectionUtils.isEmpty(goods)){

goods.forEach(good->{

redisTemplate.opsForValue().set("goodsId:"+String.valueOf(good.getId()),String.valueOf(good.getStocks()));

});

}

}

}

生产者Controller

@RestController

@RequestMapping("/A")

public class FlashSaleController {


@Resource

private RedisTemplate redisTemplate;


@Resource

private RocketMQTemplate rocketMQTemplate;



/**

* 1.去重(查询该用户是否已经参与秒杀活动)

* 2.扣减库存

* 3.消息传入MQ

* @param goodsId

* @return

*/

@GetMapping("/flashSaleA")

public String flashSaleTest(Integer goodsId){

//模拟用户id

int userid = 200019987;

/**

* 1.查询去重-- 每个商品只能抢一次

* userid + ":" + goodsId;

*/

String uKey = userid + ":" + goodsId;

System.out.println("uKey:"+uKey);

//redis存入唯一标识

Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(uKey, "");

if (!aBoolean) {

return "你已经秒杀过了";

}


/**s

*2.扣减redis库存

*/

Long count = redisTemplate.opsForValue().decrement("goodsId:"+goodsId);

if (count < 0) {

return "库存消耗完了";

}

/**

*3.发送至MQ

*/

rocketMQTemplate.asyncSend("FlashSaleTopic", uKey, new SendCallback() {

@Override

public void onSuccess(SendResult sendResult) {

System.out.println("发送成功");

}


@Override

public void onException(Throwable throwable) {

System.out.println("发送失败:"+throwable.getMessage());

}

});


return "抢单成功,请稍后查看清单信息";

}



}

消费者FlashMQListener

@Component

@RocketMQMessageListener(

topic = "FlashSaleTopic",

consumerGroup = "FlashSaleConsumerGroup",

consumeMode = ConsumeMode.CONCURRENTLY,

consumeThreadMax = 10)

public class FlashMQListener implements RocketMQListener<MessageExt> {

@Resource

private GoodsService goodsService;

/**

* onMessage 消费者方法

* @param messages 消息内容

*/

@Override

public void onMessage(MessageExt messages) {

//userid + ":" + goodsId;

System.out.println("接收到消息:"+new String(messages.getBody()));

String msg = new String(messages.getBody());

Integer userID = Integer.valueOf(msg.split(":")[0]);

Integer goodsID = Integer.valueOf(msg.split(":")[1]);

//4.减库存,添加订单信息--1、业务代码事务外加锁可实现线程安全 或2、数据库语句行锁update goods set stocks = stocks -1,update_time = now() where id = 1;

synchronized (this){

goodsService.FlashSaleBusiness(userID,goodsID);

}


}

}

验证

压力测试

压力测试工具apache-jmeter

https://dlcdn.apache.org//jmeter/binaries/apache-jmeter-5.6.3.zip

消息堆积问题

单条队列消息差值大于5w条,算消息堆积问题,根据引用场景定义

消息堆积的情况:

一、生产者生产太快

处理方式:

1、 生产者做业务限流

2、动态增加消费者数量(rocketmq-dashboard-1.0.0.jar)

3、增加消费者数量,但是消费者线程数量要<=队列数。队列确定后不建议更改,根据实际场景修改

IO型:逻辑处理器数*2, cpu型:逻辑处理器数+1

consumeThreadMax = 24

二、消费者产问题(程序奔溃,BUG等)

处理方式:

1、重启消费者程序,新增消费者

2、重置消费位点(再次消费)

3、 跳过堆积

消息丢失

1、生产者使用同步发送模式,MQ消息中间件返回确认后,执行业务程序,数据写入消息状态和创建时间

2、消费者消费后 ,修改数据状态

3、开启MQ的trace机制,消息跟踪机制

4、使用集群模式,主倍模式,将消息持久化在不同硬盘

5、MQ的刷盘机制设置为同步刷盘,性能相对不高(磁盘:随机读写,顺序读写),机械(随机读写比固态快)

6、数据库持久化,log日记

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
9天前
|
消息中间件 SQL 中间件
大厂都在用的分布式事务方案,Seata+RocketMQ带你打破10万QPS瓶颈
分布式事务涉及跨多个数据库或服务的操作,确保数据一致性。本地事务通过数据库直接支持ACID特性,而分布式事务则需解决跨服务协调难、高并发压力及性能与一致性权衡等问题。常见的解决方案包括两阶段提交(2PC)、Seata提供的AT和TCC模式、以及基于消息队列的最终一致性方案。这些方法各有优劣,适用于不同业务场景,选择合适的方案需综合考虑业务需求、系统规模和技术团队能力。
88 7
|
16天前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
31 6
|
1月前
|
NoSQL Java API
springboot项目Redis统计在线用户
通过本文的介绍,您可以在Spring Boot项目中使用Redis实现在线用户统计。通过合理配置Redis和实现用户登录、注销及统计逻辑,您可以高效地管理在线用户。希望本文的详细解释和代码示例能帮助您在实际项目中成功应用这一技术。
32 4
|
1月前
|
消息中间件 NoSQL Java
Spring Boot整合Redis
通过Spring Boot整合Redis,可以显著提升应用的性能和响应速度。在本文中,我们详细介绍了如何配置和使用Redis,包括基本的CRUD操作和具有过期时间的值设置方法。希望本文能帮助你在实际项目中高效地整合和使用Redis。
48 2
|
2月前
|
NoSQL Java Redis
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
这篇文章介绍了Redis的基本命令,并展示了如何使用Netty框架直接与Redis服务器进行通信,包括设置Netty客户端、编写处理程序以及初始化Channel的完整示例代码。
64 1
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
|
2月前
|
缓存 NoSQL Java
springboot的缓存和redis缓存,入门级别教程
本文介绍了Spring Boot中的缓存机制,包括使用默认的JVM缓存和集成Redis缓存,以及如何配置和使用缓存来提高应用程序性能。
124 1
springboot的缓存和redis缓存,入门级别教程
|
2月前
|
缓存 NoSQL Java
Spring Boot与Redis:整合与实战
【10月更文挑战第15天】本文介绍了如何在Spring Boot项目中整合Redis,通过一个电商商品推荐系统的案例,详细展示了从添加依赖、配置连接信息到创建配置类的具体步骤。实战部分演示了如何利用Redis缓存提高系统响应速度,减少数据库访问压力,从而提升用户体验。
117 2
|
2月前
|
JSON NoSQL Java
springBoot:jwt&redis&文件操作&常见请求错误代码&参数注解 (九)
该文档涵盖JWT(JSON Web Token)的组成、依赖、工具类创建及拦截器配置,并介绍了Redis的依赖配置与文件操作相关功能,包括文件上传、下载、删除及批量删除的方法。同时,文档还列举了常见的HTTP请求错误代码及其含义,并详细解释了@RequestParam与@PathVariable等参数注解的区别与用法。
|
2月前
|
NoSQL Java Redis
shiro学习四:使用springboot整合shiro,正常的企业级后端开发shiro认证鉴权流程。使用redis做token的过滤。md5做密码的加密。
这篇文章介绍了如何使用Spring Boot整合Apache Shiro框架进行后端开发,包括认证和授权流程,并使用Redis存储Token以及MD5加密用户密码。
37 0
shiro学习四:使用springboot整合shiro,正常的企业级后端开发shiro认证鉴权流程。使用redis做token的过滤。md5做密码的加密。
|
1月前
|
JavaScript NoSQL Java
CC-ADMIN后台简介一个基于 Spring Boot 2.1.3 、SpringBootMybatis plus、JWT、Shiro、Redis、Vue quasar 的前后端分离的后台管理系统
CC-ADMIN后台简介一个基于 Spring Boot 2.1.3 、SpringBootMybatis plus、JWT、Shiro、Redis、Vue quasar 的前后端分离的后台管理系统
42 0