你以为高并发很远?一个爆款课程上线时,它离你只有一个点击的距离。
引言:为什么是在线教育?为什么是高并发?
近年来,在线教育经历了爆发式增长。其技术核心,就是一个典型的内容型+交互型网站。它既有电商秒杀般的课程购买场景,又有流媒体般的视频直播/点播需求,同时还包含了社区论坛般的评论、问答互动。
这种业务复杂性,使得在线教育网站成为一个绝佳的全栈实践项目。而其中最大的技术挑战,往往来自于 “高并发” ——当一门热门课程发布、一位名师开讲直播时,瞬间涌入的流量如何平稳承接?这就是我们今天要攻克的堡垒。
本文将手把手带你实现一个业务俱全的在线教育网站,并深度剖析其中“高并发”场景的解决方案。
本项目中用到的所有组件,自行下载,都是以docker形式在linux中运行
本项目中的所有代码都存放在这个gitee网址 :https://gitee.com/xiashaofeng/online_-education_-website ,此网站中也有其他项目,设计同城在线交友等!!!
天机学堂是一个基于微服务架构的生产级的在线教育项目,面向成年人的非学历职业技能培训平台
通过此项目,我们可以学习到学习辅助系统、商品促销系统等知识点,还能学习到微服务开发中的各种热点问题以及解决方案
二端二角色
整体架构
天机学堂目前是一个 B2C(Business to Customer,企业对客户)类型的教育网站,因此分为两个端:
管理端:给老师用,核心业务是课程管理、考试题管理、学习问答管理、营销订单管理
用户端:给学员用,核心业务是买课、学习、知识分享交流
整体架构如下:
核心功能
教师端:
学生端
技术架构
| 微服务名称 | 功能描述 | 完成状态 |
| ------------ | ---- | ----------------------------------- |
| tj-parent | 父工程 | √ |
| tj-common | 通用工程 | √ |
| tj-message | 消息中心 | √ |
| tj-gateway | 网关 | √ |
| tj-auth | 权限服务 | √ |
| tj-user | 用户服务 | √ |
| tj-pay | 支付服务 | √ |
| tj-course | 课程服务 | √ |
| tj-exam | 考试服务 | O |
| tj-search | 搜索服务 | √ |
| tj-trade | 交易服务 | O |
| tj-learning | 学习服务 | X |
| tj-promotion | 促销服务 | X |
| tj-media | 媒资服务 | √ |
| tj-data | 数据服务 | O |
| tj-remark | 评价服务 | X |用户端界面:
管理端界面:
熟悉项目
熟悉项目的第一步是熟悉项目的结构、用到的技术、编码的一些规范等。
项目结构
我们先来看看项目结构,目前企业微服务开发项目结构有两种模式:
- 项目下的每一个微服务,都创建为一个独立的 Project,有独立的 Git 仓库,尽可能降低耦合,适合于大型项目,架构更为复杂,管理和维护成本都比较高
- 项目创建一个 Project,项目下的每一个微服务都是一个 Module,方便管理,适合中小型项目,架构更为简单,管理和维护成本都比较低;
天机学堂采用的正是第二种模式,结构如图:
当我们要创建新的微服务时,也必须以 tjxt 为父工程,创建一个子 module. 例如交易微服务:
微服务 module 中如果有对外暴露的 Feign 接口,需要定义到 tj-api 模块中:
实体类规范
在天机学堂项目中,所有实体类按照所处领域不同,划分为 4 种不同类型:
- DTO:数据传输对象,在客户端与服务端间传递数据,例如微服务之间的请求参数和返回值、前端提交的表单
- PO:持久层对象,与数据库表一一对应,作为查询数据库时的返回值
- VO:视图对象,返回给前端用于封装页面展示的数据
- QUERY:查询对象,一般是用于封装复杂查询条件
例如交易服务:
异常处理
在项目运行过程中,或者业务代码流程中,可能会出现各种类型异常,为了加以区分,我们定义了一些自定义异常对应不同场景
在开发业务的过程中,如果出现对应类型的问题,应该优先使用这些自定义异常。
当微服务抛出这些异常时,需要一个统一的异常处理类,同样在 tj-common 模块中定义了:
@RestControllerAdvice
@Slf4j
public class CommonExceptionAdvice {
@ExceptionHandler(DbException.class)
public Object handleDbException(DbException e) {
log.error("mysql数据库操作异常 -> ", e);
return processResponse(e.getStatus(), e.getCode(), e.getMessage());
}
@ExceptionHandler(CommonException.class)
public Object handleBadRequestException(CommonException e) {
log.error("自定义异常 -> {} , 状态码:{}, 异常原因:{} ",e.getClass().getName(), e.getStatus(), e.getMessage());
log.debug("", e);
return processResponse(e.getStatus(), e.getCode(), e.getMessage());
}
@ExceptionHandler(FeignException.class)
public Object handleFeignException(FeignException e) {
log.error("feign远程调用异常 -> ", e);
return processResponse(e.status(), e.status(), e.contentUTF8());
}
@ExceptionHandler(MethodArgumentNotValidException.class)
public Object handleMethodArgumentNotValidException(MethodArgumentNotValidException e) {
String msg = e.getBindingResult().getAllErrors()
.stream().map(ObjectError::getDefaultMessage)
.collect(Collectors.joining("|"));
log.error("请求参数校验异常 -> {}", msg);
log.debug("", e);
return processResponse(400, 400, msg);
}
@ExceptionHandler(BindException.class)
public Object handleBindException(BindException e) {
log.error("请求参数绑定异常 ->BindException, {}", e.getMessage());
log.debug("", e);
return processResponse(400, 400, "请求参数格式错误");
}
@ExceptionHandler(NestedServletException.class)
public Object handleNestedServletException(NestedServletException e) {
log.error("参数异常 -> NestedServletException,{}", e.getMessage());
log.debug("", e);
return processResponse(400, 400, "请求参数异常");
}
@ExceptionHandler(ConstraintViolationException.class)
public Object handViolationException(ConstraintViolationException e) {
log.error("请求参数异常 -> ConstraintViolationException, {}", e.getMessage());
return processResponse( HttpStatus.OK.value(), HttpStatus.BAD_REQUEST.value(),
e.getConstraintViolations().stream().map(ConstraintViolation::getMessage).distinct().collect(Collectors.joining("|"))
);
}
@ExceptionHandler(Exception.class)
public Object handleRuntimeException(Exception e) {
log.error("其他异常 uri : {} -> ", WebUtils.getRequest().getRequestURI(), e);
return processResponse(500, 500, "服务器内部异常");
}
private Object processResponse(int status, int code, String msg){
// 1.标记响应异常已处理(避免重复处理)
WebUtils.setResponseHeader(Constant.BODY_PROCESSED_MARK_HEADER, "true");
// 2.如果是网关请求,http状态码修改为200返回,前端基于业务状态码code来判断状态
// 如果是微服务请求,http状态码基于异常原样返回,微服务自己做fallback处理
return WebUtils.isGatewayRequest() ?
R.error(code, msg).requestId(MDC.get(Constant.REQUEST_ID_HEADER))
: ResponseEntity.status(status).body(msg);
}
}
配置文件
SpringBoot 的配置文件支持多环境配置,在天机学堂中也基于不同环境有不同配置文件:

说明:
| 文件 | 说明 |
|---|---|
| bootstrap.yml | 通用配置属性,包含服务名、端口、日志等等各环境通用信息 |
| bootstrap-dev.yml | 线上开发环境配置属性,虚拟机中部署使用 |
| bootstrap-local.yml | 本地开发环境配置属性,本地开发、测试、部署使用 |
| 项目中的很多共性的配置都放到了 Nacos 配置中心管理: |

例如 mybatis、mq、redis 等,都有对应的 shared-xxx.yaml 共享配置文件。
在微服务中如果用到了相关技术,无需重复配置,只要引用上述共享配置即可:
bootstrap.yml
我们来看看 bootstrap.yml 文件的基本内容:
接下来,我们就分别看看每一个共享的配置文件内容。
shared-spring.yml
spring:
jackson:
default-property-inclusion: non_null # 忽略json处理时的空值字段
main:
allow-bean-definition-overriding: true # 允许同名Bean重复定义
mvc:
pathmatch:
# 解决异常:swagger Failed to start bean 'documentationPluginsBootstrapper'; nested exception is java.lang.NullPointerException
# 因为Springfox使用的路径匹配是基于AntPathMatcher的,而Spring Boot 2.6.X使用的是PathPatternMatcher
matching-strategy: ant_path_matcher
shared-mybatis.yaml
mybatis-plus:
configuration: # 默认的枚举处理器
default-enum-type-handler: com.baomidou.mybatisplus.core.handlers.MybatisEnumTypeHandler
global-config:
field-strategy: 0
db-config:
logic-delete-field: deleted # mybatis逻辑删除字段
id-type: assign_id # 默认的id策略是雪花算法id
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver # 数据库驱动
url: jdbc:mysql://${
tj.jdbc.host:192.168.150.101}:${
tj.jdbc.port:3306}/${
tj.jdbc.database}?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
username: ${
tj.jdbc.username:root}
password: ${
tj.jdbc.password:123}
注意到这里把 mybatis 的 datasource 都配置了,不过由于 jdbc 连接时的数据库 ip、端口,数据库名、用户名、密码是不确定的,这里做了参数映射:
| 参数名 | 描述 | 默认值 |
|---|---|---|
| tj.jdbc.host | 主机名 | 192.168.150.101,也就是虚拟机 ip |
| tj.jdbc.port | 数据库端口 | 3306 |
| tj.jdbc.database | 数据库 database 名称 | 无 |
| tj.jdbc.username | 数据库用户名 | root |
| tj.jdbc.password | 数据库密码 | 123 |
除了 tj.jdbc.database 外,其它参数都有默认值,在没有配置的情况下会按照默认值来配置,也可以按照参数名来自定义这些参数值。其中 tj.jdbc.database 是必须自定义的值,例如在交易服务中:
tj:
jdbc:
database: tj_trade
shared-mq.yaml
spring:
rabbitmq:
host: ${
tj.mq.host:192.168.150.101} # mq的IP
port: ${
tj.mq.port:5672}
virtual-host: ${
tj.mq.vhost:/tjxt}
username: ${
tj.mq.username:tjxt}
password: ${
tj.mq.password:123321}
listener:
simple:
retry:
enabled: ${
tj.mq.listener.retry.enable:true} # 开启消费者失败重试
initial-interval: ${
tj.mq.listener.retry.interval:1000ms} # 初始的失败等待时长为1秒
multiplier: ${
tj.mq.listener.retry.multiplier:1} # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: ${
tj.mq.listener.retry.max-attempts:3} # 最大重试次数
stateless: ${
tj.mq.listener.retry.stateless:true} # true无状态;false有状态。如果业务中包含事务,这里改为false
这里配置了 mq 的基本配置,例如地址、端口等,默认就是 tjxt 的地址,不需要修改。另外还配置类消费者的失败重试机制,如有需要可以按需修改。
shared-redis.yaml
spring:
redis:
host: ${
tj.redis.host:192.168.150.101}
password: ${
tj.redis.password:123321}
lettuce:
pool:
max-active: ${
tj.redis.pool.max-active:8}
max-idle: ${
tj.redis.pool.max-idle:8}
min-idle: ${
tj.redis.pool.min-idle:1}
max-wait: ${
tj.redis.pool.max-wait:300}
注意配置了 Redis 的基本地址和连接池配置,省去了我们大部分的工作
shared-feign.yaml
feign:
client:
config:
default: # default全局的配置
loggerLevel: BASIC # 日志级别,BASIC就是基本的请求和响应信息
httpclient:
enabled: true # 开启feign对HttpClient的支持
max-connections: 200 # 最大的连接数
max-connections-per-route: 50 # 每个路径的最大连接数
这里配置了默认的 Feign 日志级别以及连接池配置,一般不需要修改。
shared-xxljob.yaml
tj:
xxl-job:
access-token: tianji
admin:
address: http://192.168.150.101:8880/xxl-job-admin
executor:
appname: ${
spring.application.name}
log-retention-days: 10
logPath: job/${
spring.application.name}
这里配置了 xxl-job 组件的地址等信息,一般不需要修改。
阅读源码
阅读源码是有一定的技巧,一般阅读源码的流程如下:
下面简单做一个需求分析:
接下来,我们来完成本项目的第一个模块:我的课表,我的课表指的是我购买了的课程,在这里先区分两个概念:课程和课表
课程指的是:系统中添加好的提供给人们学习的课
课表则更加强调的是:某位用户购买(拥有)某个课程
数据模型

很显然:课表和用户之间有一个多对多的关系,而反映这个关系的中间表就是用户课程表
当然这个中间表中除了记录多对多关系意外,还有很多其它的字段,比如:有效期限、学习进度等等,下面我们来观察下这张表的详细字段
use tj_learning;
-- 创建表
CREATE TABLE IF NOT EXISTS learning_lesson (id bigint NOT NULL COMMENT '主键',user_id bigint NOT NULL COMMENT '学员id',course_id bigint NOT NULL COMMENT '课程id',status tinyint DEFAULT '0' COMMENT '课程状态,0-未学习,1-学习中,2-已学完,3-已失效',week_freq tinyint DEFAULT NULL COMMENT '每周学习频率,例如每周学习6小节,则频率为6',plan_status tinyint NOT NULL DEFAULT '0' COMMENT '学习计划状态,0-没有计划,1-计划进行中',learned_sections int NOT NULL DEFAULT '0' COMMENT '已学习小节数量',latest_section_id bigint DEFAULT NULL COMMENT '最近一次学习的小节id',latest_learn_time datetime DEFAULT NULL COMMENT '最近一次学习的时间',create_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',expire_time datetime DEFAULT NULL COMMENT '过期时间',update_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (id) USING BTREE,
UNIQUE KEY idx_user_id (user_id,course_id) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ROW_FORMAT=DYNAMIC COMMENT='学生课表';
-- 添加数据
INSERT INTO learning_lesson (id, user_id, course_id, status, week_freq, plan_status, learned_sections, latest_section_id, latest_learn_time, create_time, expire_time, update_time) VALUES
(1, 2, 2, 2, 6, 1, 12, 16, '2023-04-11 22:34:45', '2022-08-05 20:02:50', '2023-08-05 20:02:29', '2023-04-19 10:29:29'),
(2, 2, 3, 1, 4, 1, 3, 31, '2023-04-19 11:42:50', '2022-08-06 15:16:48', '2023-08-06 15:16:37', '2023-04-19 11:42:50'),
(1585170299127607297, 129, 2, 0, NULL, 0, 0, 16, '2023-04-11 22:37:05', '2022-12-05 23:00:29', '2023-10-26 15:14:54', '2023-04-11 22:37:05'),
(1601061367207464961, 2, 1549025085494521857, 1, 3, 1, 4, 1550383240983875589, '2023-04-11 16:34:44', '2022-12-09 11:49:11', '2023-12-09 11:49:11', '2023-04-11 16:34:43');
业务流程
要做添加课表的功能,首先要搞清楚表中的数据是什么时候添加进去的呢?这就要看一下买课的业务流程了
交易服务:用户在客户端首页浏览课程,加入课程到购物车,完成课程购买,完成支付,发送消息到MQ
学习服务:监听MQ,订阅消息,执行保存课表的动作
其中交易服务已经开发好了,核心代码如下:
在trade-service的OrderController中,有一个报名免费课程的接口:
@ApiOperation("免费课立刻报名接口")
@PostMapping("/freeCourse/{courseId}")
public PlaceOrderResultVO enrolledFreeCourse(@ApiParam("免费课程id") @PathVariable("courseId") Long courseId) {
return orderService.enrolledFreeCourse(courseId);
}
// 5.发送MQ消息,通知报名成功
rabbitMqHelper.send(
MqConstants.Exchange.ORDER_EXCHANGE,
MqConstants.Key.ORDER_PAY_KEY,
OrderBasicDTO.builder()
.orderId(orderId) // 订单id
.userId(userId) // 下单的用户id
.courseIds(cIds) // 购买的课程id集合
.finishTime(order.getFinishTime()) // 支付完成时间
.build()
);
由此,我们可以得知发送消息的Exchange、RoutingKey,以及消息体OrderBasicDTO
由于交易服务已经写好了,因此我们在学习服务中要实现的是:监听MQ, 获取课程信息,将课程加入课表。
接口设计
监听MQ接收消息,保存对应的课表信息
功能实现


LessonChangeListener

//添加课表的监听器,交易服务中用户下单完成后,会投递消息到mq,在此处接收消息
//{"orderId":"1673675057831903234","userId":"2","courseIds":["1549025085494521857"],"finishTime":"2023-06-27 20:50:03"}
@Component
@Slf4j
public class LessonChangeListener {
@Autowired
private ILearningLessonService learningLessonService;
@RabbitListener(
bindings = @QueueBinding(
exchange = @Exchange(name = MqConstants.Exchange.ORDER_EXCHANGE, type = ExchangeTypes.TOPIC), //交换机
value = @Queue(value = "learning.lesson.pay.queue", durable = "true"),//队列
key = MqConstants.Key.ORDER_PAY_KEY //bindingKey
)
)
public void listenLessonPay(OrderBasicDTO orderBasicDTO) {
//1. 参数校验
if(orderBasicDTO == null || orderBasicDTO.getUserId() == null || CollUtils.isEmpty(orderBasicDTO.getCourseIds())){
// 数据有误,无需处理
log.error("接收到MQ消息有误,订单数据为空");
return;
}
//2. 调用service添加课程到课表
log.info("监听到用户{}的订单,需要将课程{}添加到课表", orderBasicDTO.getUserId(), orderBasicDTO.getCourseIds());
learningLessonService.saveLesson(orderBasicDTO);
}
}
ILearningLessonService
public interface ILearningLessonService extends IService {
// 添加课程到课表
void saveLesson(OrderBasicDTO orderBasicDTO);
}
LearningLessonServiceImpl
//1. 根据课程id远程调用tj-course查询课程信息
//2. 遍历课程列表,保存成学员课表
//3. 批量保存
// 将用户购买的课程遍历保存成课表
// 注意: 在设计表的时候使用了用户id和和课程id做联合约束,因此不存在重复购买问题(幂等性解决了)
@Service
public class LearningLessonServiceImpl extends ServiceImpl\ implements ILearningLessonService {
@Autowired
private CourseClient courseClient;
// 添加课程到课表
@Override
public void saveLesson(OrderBasicDTO orderBasicDTO) {
//1. 根据课程id远程调用tj-course查询课程信息
List<CourseSimpleInfoDTO> courseList = courseClient.getSimpleInfoList(orderBasicDTO.getCourseIds());
if (CollUtil.isEmpty(courseList)) {
log.error("课程不存在,无法生成课表");
return;//结束流程
}
//2. 遍历课程列表,保存成学员课表
List<LearningLesson> lessonList = new ArrayList<>(courseList.size());
for (CourseSimpleInfoDTO course : courseList) {
//2-1 创建课表对象
LearningLesson learningLesson = new LearningLesson();
//2-2 开始赋值
learningLesson.setUserId(orderBasicDTO.getUserId()); // 学员id
learningLesson.setCourseId(course.getId());// 课程id
// 过期时间=当前时间+课程有效期
Integer validDuration = course.getValidDuration();
if (validDuration != null && validDuration > 0) {
learningLesson.setExpireTime(LocalDateTime.now().plusMonths(validDuration));
}
//2-3 设置到课表list
lessonList.add(learningLesson);
}
//3. 批量保存
this.saveBatch(lessonList);
}
}
创建学习计划
提交学习记录
业务流程
用户点击某个课程小节时需要记录学习记录。而小节类型分为考试、视频两种:
- 考试:只要提交了就说明这一节学完了,要注意的点:只有本章下的所有小节学习才可以考试
视频:需要记录用户的播放进度,进度超过50%才算学完,要注意的点:视频播放的过程中每间隔15秒提交一次播放进度到服务端

此功能实现需要二张表:
学习记录learning_record表记录的是每一个小节的学习进度,而在learning_lesson表也需要记录一些学习进度相关字段latest_section_id:最近一次学习的小节id,每当有一个小节被学习,此字段更新
latest_learn_time:最近一次学习时间,每当有一个小节被学习,此字段更新
learned_sections:已学习小节数量
考试只会被参加一次,考试提交则小节学完,learned_sections累加1
视频可以被重复播放,只有在第一次学完一个视频时,learned_sections才需要累加1,第一次学完应当满足两个条件:
- 视频播放进度超过50%
- 目前学习记录的状态为未学完
status:
- 当已学习小节数量等于课程的总小节数,这就意味着当前课程被全部学完了。那么课程状态需要从 "学习中" 变更为 "已学完"。
- 当用户第一次更新课表时,需要将课程状态从“未学习”便成为“学习中”
下面是完整的流程图
## 接口设计
需求:在课程学习页面播放视频时或考试后,需要提交学习记录信息到服务端保存
LearningRecordController
前端提交的数据,可以使用LearningRecordFormDTO接收,此DTO已经在api模块提供
@PostMapping
@ApiOperation("提交学习记录")
public void addLearningRecord(@RequestBody LearningRecordFormDTO learningRecordFormDTO){
learningRecordService.addLearningRecord(learningRecordFormDTO);
}
ILearningRecordService
// 提交学习记录
void addLearningRecord(LearningRecordFormDTO learningRecordFormDTO);
LearningRecordServiceImpl
//1. 提交学习记录
//2. 更新课表
// 抽取处理考试记录逻辑方法
private Boolean handlerExamRecord(LearningRecordFormDTO learningRecordFormDTO) {
return null;}
// 抽取处理视频记录逻辑方法
private Boolean handlerVideoRecord(LearningRecordFormDTO learningRecordFormDTO) {
return null;}
// 抽取处理更新课表逻辑方法
private void handlerLearningLessonChanges(LearningRecordFormDTO learningRecordFormDTO, Boolean finished) {
}
@Autowired
private CourseClient courseClient;
// 提交学习记录
@Override
public void addLearningRecord(LearningRecordFormDTO learningRecordFormDTO) {
//1. 提交学习记录
Boolean finished = false;
// 判断记录考试还是视频
if (learningRecordFormDTO.getSectionType() == SectionType.EXAM.getValue()) {
//1-1 考试记录
finished = handlerExamRecord(learningRecordFormDTO);
} else {
//1-2 视频记录
finished = handlerVideoRecord(learningRecordFormDTO);
}
//2. 更新课表
handlerLearningLessonChanges(learningRecordFormDTO, finished);
}
// 抽取处理考试记录逻辑方法
private Boolean handlerExamRecord(LearningRecordFormDTO learningRecordFormDTO) {
//1. dto->po
LearningRecord learningRecord = BeanUtil.copyProperties(learningRecordFormDTO, LearningRecord.class);
//2. 补充属性
learningRecord.setUserId(UserContext.getUser());
learningRecord.setFinished(true);
learningRecord.setFinishTime(learningRecordFormDTO.getCommitTime());
//3. 保存到数据库
this.save(learningRecord);
//4. 返回已学完
return true;
}
// 抽取处理视频记录逻辑方法
private Boolean handlerVideoRecord(LearningRecordFormDTO learningRecordFormDTO) {
//1. 查询学习记录是否存在
LearningRecord learningRecord = this.lambdaQuery()
.eq(LearningRecord::getUserId, UserContext.getUser())
.eq(LearningRecord::getLessonId, learningRecordFormDTO.getLessonId())
.eq(LearningRecord::getSectionId, learningRecordFormDTO.getSectionId())
.one();
if (learningRecord == null) {
//2. 如果不存在,就新增一条
//2-1 dto->po
learningRecord = BeanUtil.copyProperties(learningRecordFormDTO, LearningRecord.class);
//2-2 补充属性
learningRecord.setUserId(UserContext.getUser());
//2-3 保存到数据库
this.save(learningRecord);
//2-4 返回未学完
return false;
} else {
//3. 如果存在,就更新一条
//3-1 判断是否是一次看完(时长>50%、finished=false)
boolean finished = learningRecord.getFinished() == false && learningRecordFormDTO.getMoment() * 2 > learningRecordFormDTO.getDuration();
if (finished) {
// 第一次看完需要更新视频完成状态和完成时间
learningRecord.setFinished(true);
learningRecord.setFinishTime(learningRecordFormDTO.getCommitTime());
}
//3-2 填充播放时长,更新数据库
learningRecord.setMoment(learningRecordFormDTO.getMoment());
this.updateById(learningRecord);
//3-3 返回标识
return finished;
}
}
// 抽取处理更新课表逻辑方法
private void handlerLearningLessonChanges(LearningRecordFormDTO learningRecordFormDTO, Boolean finished) {
//1. 查询课表
LearningLesson learningLesson = learningLessonService.getById(learningRecordFormDTO.getLessonId());
//2. 查询课程
CourseFullInfoDTO courseFullInfoDTO =
courseClient.getCourseInfoById(learningLesson.getCourseId(), false, false);
//3. 更新学习状态:未学习->学习中
if (learningLesson.getStatus().getValue() == LessonStatus.NOT_BEGIN.getValue()) {
learningLesson.setStatus(LessonStatus.LEARNING);//学习中
}
//4. 更新最近学习小节和时间
learningLesson.setLatestSectionId(learningRecordFormDTO.getSectionId());
learningLesson.setLatestLearnTime(learningRecordFormDTO.getCommitTime());
//5. 判断小结是否第一次学完
if (finished) {
// 如果是,课表学习小节+1
learningLesson.setLearnedSections(learningLesson.getLearnedSections() + 1);
// 判断是否学完全部小节
if (learningLesson.getLearnedSections() >= courseFullInfoDTO.getSectionNum()) {
// 如果是更新课程状态为已学完
learningLesson.setStatus(LessonStatus.FINISHED);
}
}
//6. 更新数据库
learningLessonService.updateById(learningLesson);
}
高并发场景
高并发问题
我们实现了一个学习进度的保存功能,实现的思路是:前端每隔15秒就发起一次请求,将播放记录写入数据库
在这种设计之下,当一个用户在观看视频的时候,前端就会不停的向后端发请求,那么如果有很多用户都在看视频呢?
很显然,后端就会接收到大量的请求,这就会给我们的后端服务器和数据库带来很大的压力,甚至压崩服务器
我们把这种同时有大量请求访问后端,从而导致后端崩溃的问题称为高并发问题,而今天我们要学习的就是如何来解决高并发问题
注意:我们在这讲的是通用方案,它是具有通用性的,方案学习完了之后,我们再加以变动去解决我们上面的问题
高并发方案
在机器性能一定的情况下,提高单机并发能力就是要尽可能缩短业务的响应时间,而对响应时间影响最大的往往是对数据库的操作。
而对数据库的操作主要就是读或写两种类型,因此我们就要从这两种类型上做优化即可
### 同步变异步
假如一个业务比较复杂,需要有多次数据库的写业务,如图所示:
由于各个业务之间是同步串行执行,因此整个业务的响应时间就是每一次数据库写业务的响应时间之和,并发能力肯定不会太好。
优化的思路很简单,利用MQ可以把同步业务变成异步,从而提高效率
>使用MQ进行异步处理业务的优缺点如下
| 优点 | 缺点 |
|---|---|
| 1. 无需等待,大大节省了响应时间 | 1. 高度依赖MQ |
| 2. 流量削峰,利用MQ队列抗住激增请求 | 2. 无法降低数据库的写次数,仅仅降低频率 |
| 3. 降低频率,降低了数据库写频率,减轻数据库压力 |
合并写请求
合并写请求就是指当写数据库并发较高时,不再直接写到数据库。而是先将数据缓存到Redis,然后定期将缓存中的数据批量写入数据库
由于Redis是内存操作,写效率也非常高,这样每次请求的处理速度大大提高,响应时间大大缩短,并发能力肯定有很大的提升
而且由于数据都缓存到Redis了,积累一些数据后再批量写入数据库,这样数据库的写频率、写次数都大大减少,对数据库压力小了非常多
## redis结构选择
确定使用Redis缓存播放进度之后,接下来要确定下要缓存的数据有哪些,然后再确定使用Redis的哪种数据结构来保存
要记录的数据:首先是前端传过来的播放进度数据(课表id、小节id、播放进度),除此之外,还要记录播放记录id、播放状态(finished)
既然一个小节要保存多个字段,是不是可以考虑使用Hash结构来保存这些数据,如下图所示:
不过,这样设计有一个问题。课程有很多,每个课程的小节也非常多。每个小节都是一个独立的KEY,需要创建的KEY也会非常多,浪费大量内存。
而且,用户学习视频的过程中,可能会在多个视频之间来回跳转,这就会导致频繁的创建缓存、缓存过期,影响到最终的业务性能。该如何解决呢?
可以把一个课程的多个小节作为一个KEY来缓存,如图:
注意:一个课表只能属于一个用户,因此key中不需要加入userId
延迟队列
前面添加缓存以后,学习记录提交的业务流程就需要发生一些变化了,需要使用缓存完成如下2个优化
- 小节频繁查询:前面已经加入了小节的缓存,完全可以使用查询缓存解决小节频繁查询数据库问题
- 学习记录的进度频繁写:先将播放进度缓存到redis,新的播放进度会覆盖旧的,最终记录的就是最后一次播放进度,然后写入数据库即可
那么这时候又出现了一个新的问题:我们怎么知道一条记录是不是最后一次小节播放进度呢?
这时候,我们需要分析下用户播放视频的情况:
- 只要用户前端一直在播放,那么最长每隔15s,Redis中的播放数据都会更新一下
- 用户一旦是暂停播放或者关闭播放器,Redis中播放进度数据将不会再更新
因此,我们只要能判断Redis中的播放进度在15s之后是否变化即可,这时候就可以适应延迟任务来实现
每当前端提交播放记录时,先将播放记录保存到redis中,然后启动一个延迟任务(20S后执行),检查Redis中的缓存的进度与任务中的进度是否一致
- 不一致:说明持续在提交,无需处理
- 一致:说明是最后一次提交,更新学习记录、更新课表最近学习小节和时间到数据库中

延迟任务的实现方案有很多,常见的有四类:
| DelayQueue | Redisson | MQ | 时间轮 | |
|---|---|---|---|---|
| 原理 | JDK自带延迟队列 | 基于Redis数据结构模拟JDK的DelayQueue实现 | 利用MQ的特性(RabbitMQ、EMQ) | 时间轮算法 |
| 优点 | 不依赖第三方服务 | 分布式系统下可用 | 分布式系统下可以 | 不依赖第三方服务,且性能优异 |
| 缺点 | 只能单机使用 | 依赖第三方服务Redis | 依赖第三方服务MQ | 只能单机使用 |
综合比较
DelayQueue和时间轮: 性能好,而且不依赖任何第三方服务,减少了网络交互,但缺点也很明显,就是只能单机使用
Redisson和MQ: 性能很好,而且支持分布式,缺点就是依赖于第三方软件
Redisson
Redisson是Redis的一个Java客户端,为开发者提供了一系列具有分布式特性的常用工具类,大大降低了设计和研发大规模分布式系统的难度。
在企业中使用场景:1. 分布式锁 2. 延迟队列
用法演示
首先定义一个Delayed类型的延迟任务类,要能保持任务数据。
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class RedissonDelayTask {
@Autowired
private RedissonClient redissonClient;
private static final String QUEUE_NAME = "queue.delay";
/*
Redisson实现延迟队底层通过二种数据结构
中转队列(ZSet):这是一个延迟队列,将数据按照延迟时间排序,过期后转移到List
目标队列(List):这是一个阻塞队列,按照先进先出规则存放数据,等待消费者获取
*/
// 生产者:向队列保存内容
public void setQueue(String value, Integer delay) {
// 设置List
RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(QUEUE_NAME);
// 设置ZSet
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
// 向延迟队列存储数据
delayedQueue.offer(value, delay, TimeUnit.SECONDS);
}
// 消费者:从队列获取内容
@PostConstruct // 对象创建完成后会自动执行此方法
public void getQueue() {
new Thread(new Runnable() {
@Override
public void run() {
// 获取List
RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(QUEUE_NAME);
// 持续监听
while (true) {
String value = blockingDeque.poll();
if (StrUtil.isNotEmpty(value)) {
System.out.println(value);
}
}
}
}).start();
}
}
测试代码代码:
import com.tianji.learning.delaytask.RedissonDelayTask;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class RedissonDelayTaskTest {
@Autowired
private RedissonDelayTask redissonDelayTask;
@Test
public void test() throws Exception {
redissonDelayTask.setQueue("8s数据", 8);
redissonDelayTask.setQueue("5s数据", 5);
redissonDelayTask.setQueue("3s数据", 3);
System.in.read();// 保持ioc容器运行
}
}
测试输出的结果:

代码改造
延迟任务工具类

我们要定义一个工具类来操作redis,根据上图,此工具类应该具备上述4个方法:
① 添加播放记录到Redis,并添加一个检测任务到DelayQueue
② 查询Redis缓存中的指定小节的播放记录
③ 删除Redis缓存中的指定小节的播放记录
④ 异步执行DelayQueue中的延迟检测任务,检测播放进度是否变化,如果无变化则写入数据库
工具类初始代码如下:
@Component
@Slf4j
public class LearningRecordDelayTaskHandler {
@Autowired
private RedissonClient redissonClient;
@Autowired
private StringRedisTemplate stringRedisTemplate;
// 1)添加播放记录到Redis,并添加一个检测任务到DelayQueue
public void writeCache(LearningRecord learningRecord) {
}
// 2)查询Redis缓存中的指定小节的播放记录
public LearningRecord readCache(Long lessonId, Long sectionId) {
return null;}
// 3)删除Redis缓存中的指定小节的播放记录
public void cleanCache(Long lessonId, Long sectionId) {
}
// 4)异步执行DelayQueue中的检测任务,检测播放进度是否变化,如果无变化则写入数据库
public void executeTask() {
}
}
工具类完整码如下:
package com.tianji.learning.delaytask;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.tianji.learning.domain.po.LearningLesson;
import com.tianji.learning.domain.po.LearningRecord;
import com.tianji.learning.mapper.LearningLessonMapper;
import com.tianji.learning.mapper.LearningRecordMapper;
import lombok.extern.slf4j.Slf4j;
import net.sf.jsqlparser.expression.operators.arithmetic.Concat;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class LearningRecordDelayTaskHandler {
@Autowired
private RedissonClient redissonClient;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private LearningRecordMapper learningRecordMapper;
@Autowired
private LearningLessonMapper learningLessonMapper;
public String getKey(Long lessonId) {
return "learning:record:" + lessonId;
}
// 1)添加播放记录到Redis,并添加一个检测任务到DelayQueue
public void writeCache(LearningRecord learningRecord) {
// 添加播放记录到Redis
String key = getKey(learningRecord.getLessonId());
String hkey = learningRecord.getSectionId().toString();
String json = JSONUtil.toJsonStr(learningRecord);
stringRedisTemplate.opsForHash().put(key, hkey, json);
// 并添加一个检测任务到DelayQueue
RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque("record.queue");
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
delayedQueue.offer(json, 20, TimeUnit.SECONDS);
}
// 2)查询Redis缓存中的指定小节的播放记录
public LearningRecord readCache(Long lessonId, Long sectionId) {
String json = (String) stringRedisTemplate.opsForHash().get(getKey(lessonId), sectionId.toString());
if (StrUtil.isEmpty(json)) {
return null;
}
return JSONUtil.toBean(json, LearningRecord.class);
}
// 3)删除Redis缓存中的指定小节的播放记录
public void cleanCache(Long lessonId, Long sectionId) {
stringRedisTemplate.opsForHash().delete(getKey(lessonId), sectionId.toString());
}
// 4)异步执行DelayQueue中的检测任务,检测播放进度是否变化,如果无变化则写入数据库
public void executeTask() {
while (true) {
//1. 从延迟队列获取数据
RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque("record.queue");
String poll = blockingDeque.poll();
if (StrUtil.isEmpty(poll)) {
continue;
}
LearningRecord recordFromQueue = JSONUtil.toBean(poll, LearningRecord.class);
//2. 从redis获取数据
LearningRecord recordFromRedis = readCache(recordFromQueue.getLessonId(), recordFromQueue.getSectionId());
if (recordFromRedis == null) {
continue;
}
//3. 比较播放时长是否一致,如果不一致,说明用户继续观看,不做操作
if (!ObjectUtil.equals(recordFromQueue.getMoment(), recordFromRedis.getMoment())) {
continue;
}
//4. 比较播放时长是否一致,如果一致,说明用户停止观看,同步数据库
//4-1 更新学习记录
recordFromQueue.setFinished(null);// 忽略完成状态修改
learningRecordMapper.updateById(recordFromQueue);
//4-2 更新课表
LearningLesson learningLesson = new LearningLesson();
learningLesson.setId(recordFromQueue.getLessonId());
learningLesson.setLatestSectionId(recordFromQueue.getSectionId()); // 最近播放小节id
learningLesson.setLatestLearnTime(LocalDateTime.now().minusSeconds(20));// 最近播放时间
learningLessonMapper.updateById(learningLesson);
}
}
@PostConstruct // 对象创建完毕后,执行此方法
public void init() {
// 创建一个新线程,处理延迟队列
new Thread(() -> {
executeTask();
}).start();
}
}
改造提交学习记录
@Autowired
private LearningRecordDelayTaskHandler taskHandler;
// 抽取处理视频记录逻辑方法
private Boolean handlerVideoRecord(LearningRecordFormDTO learningRecordFormDTO) {
//1. 查询学习记录是否存在
// 线程缓存中查询,如果没有再从数据库查询,同步redis
LearningRecord learningRecord = taskHandler.readCache(learningRecordFormDTO.getLessonId(), learningRecordFormDTO.getSectionId());
if (learningRecord == null) {
learningRecord = this.lambdaQuery()
.eq(LearningRecord::getLessonId, learningRecordFormDTO.getLessonId())
.eq(LearningRecord::getSectionId, learningRecordFormDTO.getSectionId())
.one();
if (learningRecord != null) {
taskHandler.writeCache(learningRecord);
}
}
if (learningRecord == null) {//2. 如果不存在,就新增一条
//2-1 dto->po
learningRecord = BeanUtil.copyProperties(learningRecordFormDTO, LearningRecord.class);
//2-2 补充属性
learningRecord.setUserId(UserContext.getUser());
//2-3 保存到数据库
this.save(learningRecord);
//2-4 返回未学完
return false;
} else {//3. 如果存在,就更新一条
//3-1 判断是否是一次看完(时长>50%、finished=false)
boolean finished = learningRecord.getFinished() == false && learningRecordFormDTO.getMoment() * 2 > learningRecordFormDTO.getDuration();
if (finished) {
//3-2 第一次看完
learningRecord.setFinished(true); // 已完成
learningRecord.setFinishTime(learningRecordFormDTO.getCommitTime()); // 完成时间
learningRecord.setMoment(learningRecordFormDTO.getMoment()); // 视频进度
// 更新数据库
this.updateById(learningRecord);
// 清理redis缓存
taskHandler.cleanCache(learningRecord.getLessonId(), learningRecord.getSectionId());
} else {
//3-3 不是第一次看完
// 填充播放时长
learningRecord.setMoment(learningRecordFormDTO.getMoment());
//更新redis
taskHandler.writeCache(learningRecord);
}
//3-3 返回标识
return finished;
}
}





