1、 视频处理-技术方案
1. 作业分片方案
掌握了xxl-job的分片广播调度方式,下边思考如何分布式去执行学成在线平台中的视频处理任务。
任务添加成功后,对于要处理的任务会添加到待处理任务表中,现在启动多个执行器实例去查询这些待处理任务,此时如何保证多个执行器不会查询到重复的任务呢?
XXL-JOB并不直接提供数据处理的功能,它只会给执行器分配好分片序号,在向执行器任务调度的同时下发分片总数以及分片序号等参数,执行器收到这些参数根据自己的业务需求去利用这些参数。
下图表示了多个执行器获取视频处理任务的结构:
每个执行器收到广播任务有两个参数:分片总数、分片序号。每个执行从数据表取任务时可以让任务id 模上 分片总数,如果等于分片序号则执行此任务。
上边两个执行器实例那么分片总数为2,序号为0、1,从任务1开始,如下:
1 % 2 = 1 执行器2执行
2 % 2 = 0 执行器1执行
3 % 2 = 1 执行器2执行
以此类推.
1.2 保证任务不重复执行
通过作业分片方案保证了执行器之间查询到不重复的任务,如果一个执行器在处理一个视频还没有完成,此时调度中心又一次请求调度,为了不重复处理同一个视频该怎么办?
首先配置调度过期策略:
查看文档如下:
- 调度过期策略:调度中心错过调度时间的补偿处理策略,包括:忽略、立即补偿触发一次等;
- 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;
- 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;
- 阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;
这里我们选择忽略,如果立即执行一次就可能重复执行相同的任务。
其次,再看阻塞处理策略,阻塞处理策略就是当前执行器正在执行任务还没有结束时调度中心进行任务调度,此时该如何处理。
查看文档如下:
单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;
这里如果选择覆盖之前调度则可能重复执行任务,这里选择 丢弃后续调度或单机串行方式来避免任务重复执行。
只做这些配置可以保证任务不会重复执行吗?
做不到,还需要保证任务处理的幂等性,什么是任务的幂等性?
任务的幂等性是指:对于数据的操作不论多少次,操作的结果始终是一致的。在本项目中要实现的是不论多少次任务调度同一个视频只执行一次成功的转码。
什么是幂等性?
它描述了一次和多次请求某一个资源对于资源本身应该具有同样的结果。
幂等性是为了解决重复提交问题,比如:恶意刷单,重复支付等。
解决幂等性常用的方案:
1)数据库约束,比如:唯一索引,主键。
2)乐观锁,常用于数据库,更新数据时根据乐观锁状态去更新。
3)唯一序列号,操作传递一个唯一序列号,操作时判断与该序列号相等则执行。
基于以上分析,在执行器接收调度请求去执行视频处理任务时要实现视频处理的幂等性,要有办法去判断该视频是否处理完成,如果正在处理中或处理完则不再处理。这里我们在数据库视频处理表中添加处理状态字段,视频处理完成更新状态为完成,执行视频处理前判断状态是否完成,如果完成则不再处理。
1.3 视频处理方案
确定了分片方案,下边梳理整个视频上传及处理的业务流程。
上传视频成功向视频处理待处理表添加记录。
视频处理的详细流程如下:
1、任务调度中心广播作业分片。
2、执行器收到广播作业分片,从数据库读取待处理任务,读取未处理及处理失败的任务。
3、执行器更新任务为处理中,根据任务内容从MinIO下载要处理的文件。
4、执行器启动多线程去处理任务。
5、任务处理完成,上传处理后的视频到MinIO。
6、将更新任务处理结果,如果视频处理完成除了更新任务处理结果以外还要将文件的访问地址更新至任务处理表及文件表中,最后将任务完成记录写入历史表。
2、查询待处理任务
/** * 添加待处理任务 * @param mediaFiles 媒资文件信息 */ private void addWaitingTask(MediaFiles mediaFiles){ //文件名称 String filename = mediaFiles.getFilename(); //文件扩展名 String extension = filename.substring(filename.lastIndexOf(".")); //获取文件的 mimeType String mimeType = getMimeType(extension); if(mimeType.equals("video/x-msvideo")){//如果是avi视频写入待处理任务 MediaProcess mediaProcess = new MediaProcess(); BeanUtils.copyProperties(mediaFiles,mediaProcess); //状态是未处理 mediaProcess.setStatus("1"); mediaProcess.setCreateDate(LocalDateTime.now()); mediaProcess.setFailCount(0);//失败次数默认0 mediaProcess.setUrl(null); mediaProcessMapper.insert(mediaProcess); } }
定时调度:
@Override public List<MediaProcess> getMediaProcessList(int shardIndex, int shardTotal, int count) { List<MediaProcess> mediaProcesses = mediaProcessMapper.selectListByShardIndex(shardTotal, shardIndex, count); return mediaProcesses; }
@Select("select * from media_process t where t.id % #{shardTotal} = #{shardIndex} and (t.status=1 or t.status=3) and t.fail_count<3 limit #{count}") List<MediaProcess> selectListByShardIndex(@Param("shardTotal") int shardTotal, @Param("shardIndex") int shardIndex, @Param("count") int count);
3、分布式锁开启任务
前边分析了保证任务不重复执行的方案,理论上每个执行器分到的任务是不重复的,但是当在执行器弹性扩容时无法绝对避免任务不重复执行,比如:原来有四个执行器正在执行任务,由于网络问题原有的0、1号执行器无法与调度中心通信,调度中心就会对执行器重新编号,原来的3、4执行器可能就会执行和0、1号执行器相同的任务。
为了避免多线程去争抢同一个任务可以使用synchronized同步锁去解决,如下代码:
Java synchronized(锁对象){ 执行任务... }
synchronized只能保证同一个虚拟机中多个线程去争抢锁。
如果是多个执行器分布式部署,并不能保证同一个视频只有一个执行器去处理。
现在要实现分布式环境下所有虚拟机中的线程去同步执行就需要让多个虚拟机去共用一个锁,虚拟机可以分布式部署,锁也可以分布式部署,如下图:
虚拟机都去抢占同一个锁,锁是一个单独的程序提供加锁、解锁服务。
该锁已不属于某个虚拟机,而是分布式部署,由多个虚拟机所共享,这种锁叫分布式锁。
实现分布式锁的方案有很多,常用的如下:
1、基于数据库实现分布锁
利用数据库主键唯一性的特点,或利用数据库唯一索引、行级锁的特点,比如:多个线程同时向数据库插入主键相同的同一条记录,谁插入成功谁就获取锁,多个线程同时去更新相同的记录,谁更新成功谁就抢到锁。
2、基于redis实现锁
redis提供了分布式锁的实现方案,比如:SETNX、set nx、redisson等。
拿SETNX举例说明,SETNX命令的工作过程是去set一个不存在的key,多个线程去设置同一个key只会有一个线程设置成功,设置成功的的线程拿到锁。
3、使用zookeeper实现
zookeeper是一个分布式协调服务,主要解决分布式程序之间的同步的问题。zookeeper的结构类似的文件目录,多线程向zookeeper创建一个子目录(节点)只会有一个创建成功,利用此特点可以实现分布式锁,谁创建该结点成功谁就获得锁。
本次我们选用数据库实现分布锁,后边的模块会选用其它方案到时再详细介绍。
public boolean startTask(long id) { int result = mediaProcessMapper.startTask(id); return result<=0?false:true; }
@Update("update media_process m set m.status='4' where (m.status='1' or m.status='3') and m.fail_count<3 and m.id=#{id}") int startTask(@Param("id") long id);
4、保存任务处理结果
@Override public void saveProcessFinishStatus(Long taskId, String status, String fileId, String url, String errorMsg) { //要更新的任务 MediaProcess mediaProcess = mediaProcessMapper.selectById(taskId); if(mediaProcess == null){ return ; } //如果任务执行失败 if(status.equals("3")){ //更新MediaProcess表的状态 mediaProcess.setStatus("3"); mediaProcess.setFailCount(mediaProcess.getFailCount()+1);//失败次数加1 mediaProcess.setErrormsg(errorMsg); mediaProcessMapper.updateById(mediaProcess); //更高效的更新方式 // mediaProcessMapper.update() //todo:将上边的更新方式更改为效的更新方式 return; } //======如果任务执行成功====== //文件表记录 MediaFiles mediaFiles = mediaFilesMapper.selectById(fileId); //更新media_file表中的url mediaFiles.setUrl(url); mediaFilesMapper.updateById(mediaFiles); //更新MediaProcess表的状态 mediaProcess.setStatus("2"); mediaProcess.setFinishDate(LocalDateTime.now()); mediaProcess.setUrl(url); mediaProcessMapper.updateById(mediaProcess); //将MediaProcess表记录插入到MediaProcessHistory表 MediaProcessHistory mediaProcessHistory = new MediaProcessHistory(); BeanUtils.copyProperties(mediaProcess,mediaProcessHistory); mediaProcessHistoryMapper.insert(mediaProcessHistory); //从MediaProcess删除当前任务 mediaProcessMapper.deleteById(taskId); }
5、视频处理任务类
先去查询任务数,然后执行任务,用多线程的线程池去执行,并采取计数器countdownlatch,过程大致是先从MinIO拉取avi视频文件,然后保存临时文件,转码成mp4,然后更新数据库,再上传到MinIO
package com.xuecheng.media.service.jobhandler; import com.xuecheng.base.utils.Mp4VideoUtil; import com.xuecheng.media.model.po.MediaProcess; import com.xuecheng.media.service.MediaFileProcessService; import com.xuecheng.media.service.MediaFileService; import com.xxl.job.core.context.XxlJobHelper; import com.xxl.job.core.handler.annotation.XxlJob; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.io.File; import java.io.IOException; import java.util.List; import java.util.concurrent.*; /** * 任务处理类 */ @Slf4j @Component public class VideoTask { @Autowired MediaFileProcessService mediaFileProcessService; @Autowired MediaFileService mediaFileService; //ffmpeg的路径 @Value("${videoprocess.ffmpegpath}") private String ffmpegpath; /** * 视频处理任务 */ @XxlJob("videoJobHandler") public void videoJobHandler() throws Exception { // 分片参数 int shardIndex = XxlJobHelper.getShardIndex();//执行器的序号,从0开始 int shardTotal = XxlJobHelper.getShardTotal();//执行器总数 //确定cpu的核心数 int processors = Runtime.getRuntime().availableProcessors(); //查询待处理的任务 List<MediaProcess> mediaProcessList = mediaFileProcessService.getMediaProcessList(shardIndex, shardTotal, processors); //任务数量 int size = mediaProcessList.size(); log.debug("取到视频处理任务数:"+size); if(size<=0){ return; } //创建一个线程池 ExecutorService executorService = Executors.newFixedThreadPool(size); //使用的计数器 CountDownLatch countDownLatch = new CountDownLatch(size); mediaProcessList.forEach(mediaProcess -> { //将任务加入线程池 executorService.execute(()->{ try { //任务id Long taskId = mediaProcess.getId(); //文件id就是md5 String fileId = mediaProcess.getFileId(); //开启任务 boolean b = mediaFileProcessService.startTask(taskId); if (!b) { log.debug("抢占任务失败,任务id:{}", taskId); return; } //桶 String bucket = mediaProcess.getBucket(); //objectName String objectName = mediaProcess.getFilePath(); //下载minio视频到本地 File file = mediaFileService.downloadFileFromMinIO(bucket, objectName); if (file == null) { log.debug("下载视频出错,任务id:{},bucket:{},objectName:{}", taskId, bucket, objectName); //保存任务处理失败的结果 mediaFileProcessService.saveProcessFinishStatus(taskId, "3", fileId, null, "下载视频到本地失败"); return; } //源avi视频的路径 String video_path = file.getAbsolutePath(); //转换后mp4文件的名称 String mp4_name = fileId + ".mp4"; //转换后mp4文件的路径 //先创建一个临时文件,作为转换后的文件 File mp4File = null; try { mp4File = File.createTempFile("minio", ".mp4"); } catch (IOException e) { log.debug("创建临时文件异常,{}", e.getMessage()); //保存任务处理失败的结果 mediaFileProcessService.saveProcessFinishStatus(taskId, "3", fileId, null, "创建临时文件异常"); return; } String mp4_path = mp4File.getAbsolutePath(); //创建工具类对象 Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpegpath, video_path, mp4_name, mp4_path); //开始视频转换,成功将返回success,失败返回失败原因 String result = videoUtil.generateMp4(); if (!result.equals("success")) { log.debug("视频转码失败,原因:{},bucket:{},objectName:{},", result, bucket, objectName); mediaFileProcessService.saveProcessFinishStatus(taskId, "3", fileId, null, result); return; } //上传到minio boolean b1 = mediaFileService.addMediaFilesToMinIO(mp4File.getAbsolutePath(), "video/mp4", bucket, objectName); if (!b1) { log.debug("上传mp4到minio失败,taskid:{}", taskId); mediaFileProcessService.saveProcessFinishStatus(taskId, "3", fileId, null, "上传mp4到minio失败"); return; } //mp4文件的url String url = getFilePath(fileId, ".mp4"); //更新任务状态为成功 mediaFileProcessService.saveProcessFinishStatus(taskId, "2", fileId, url, "创建临时文件异常"); }finally { //计算器减去1 countDownLatch.countDown(); } }); }); //阻塞,指定最大限制的等待时间,阻塞最多等待一定的时间后就解除阻塞 countDownLatch.await(30, TimeUnit.MINUTES); } private String getFilePath(String fileMd5,String fileExt){ return fileMd5.substring(0,1) + "/" + fileMd5.substring(1,2) + "/" + fileMd5 + "/" +fileMd5 +fileExt; } }
6、绑定媒资
@Transactional @Override public void associationMedia(BindTeachplanMediaDto bindTeachplanMediaDto) { //课程计划id Long teachplanId = bindTeachplanMediaDto.getTeachplanId(); Teachplan teachplan = teachplanMapper.selectById(teachplanId); if(teachplan == null){ XueChengPlusException.cast("课程计划不存在"); } //先删除原有记录,根据课程计划id删除它所绑定的媒资 int delete = teachplanMediaMapper.delete(new LambdaQueryWrapper<TeachplanMedia>().eq(TeachplanMedia::getTeachplanId, bindTeachplanMediaDto.getTeachplanId())); //再添加新记录 TeachplanMedia teachplanMedia = new TeachplanMedia(); BeanUtils.copyProperties(bindTeachplanMediaDto,teachplanMedia); teachplanMedia.setCourseId(teachplan.getCourseId()); teachplanMedia.setMediaFilename(bindTeachplanMediaDto.getFileName()); teachplanMediaMapper.insert(teachplanMedia); }
7、做到这里项目已经一半完成了~~~
8、Freemarker
点击某课程数据后的预览链接,即可对该课程进行预览,可以看到发布后的详情页面效果。
下图是课程详情首页,显示了课程的基本信息。
9、部署门户
修改本机hosts文件,加入127.0.0.1 www.51xuecheng.cn 51xuecheng.cn ucenter.51xuecheng.cn teacher.51xuecheng.cn file.51xuecheng.cn。
window10操作系统hosts文件在C:\Windows\System32\drivers\etc下
Centos7操作系统的hosts文件在/etc目录下。
在hosts文件加入如下配置
127.0.0.1 www.51xuecheng.cn 51xuecheng.cn ucenter.51xuecheng.cn teacher.51xuecheng.cn file.51xuecheng.cn
修改nginx配置文件
重新启动nginx
nginx.exe -s reload