1、断点续传
断点续传指的是在下载或上传时,将下载或上传任务(一个文件或一个压缩包)人为的划分为几个部分,每一个部分采用一个线程进行上传或下载,如果碰到网络故障,可以从已经上传或下载的部分开始继续上传下载未完成的部分,而没有必要从头开始上传下载
,断点续传可以提高节省操作时间,提高用户体验性。
断点续传流程如下图:
2、文件分块与合并
RandomAccessFile: 读写流
package com.xuecheng.media; import javafx.print.Collation; import org.apache.commons.codec.digest.DigestUtils; import org.junit.jupiter.api.Test; import java.io.*; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; public class BigFileTest { //分块测试 @Test public void testChunk() throws IOException { //源文件 File sourceFile = new File("F:\\develop\\video\\3.mp4"); //分块文件存储路径 String chunkFilePath = "F:\\develop\\video\\chunk\\"; //分块文件大小 int chunkSize = 1024 * 1024 * 5; //分块文件个数 int chunkNum = (int) Math.ceil(sourceFile.length() * 1.0 / chunkSize); //使用流从源文件读数据,向分块文件中写数据 RandomAccessFile raf_r = new RandomAccessFile(sourceFile, "r"); //缓存区 byte[] bytes = new byte[1024]; for (int i = 0; i < chunkNum; i++) { File chunkFile = new File(chunkFilePath + i); //分块文件写入流 RandomAccessFile raf_rw = new RandomAccessFile(chunkFile,"rw"); int len = -1; while ((len=raf_r.read(bytes))!=-1){ raf_rw.write(bytes,0,len); if(chunkFile.length()>=chunkSize){ break; } } raf_rw.close(); } raf_r.close(); } //将分块进行合并 @Test public void testMerge() throws IOException { //块文件目录 File chunkFolder = new File("F:\\develop\\video\\chunk"); //源文件 File sourceFile = new File("F:\\develop\\video\\3.mp4"); //合并后的文件 File mergeFile = new File("F:\\develop\\video\\3 _2.mp4"); //取出所有分块文件 File[] files = chunkFolder.listFiles(); //将数组转成list List<File> filesList = Arrays.asList(files); //对分块文件排序 Collections.sort(filesList, new Comparator<File>() { @Override public int compare(File o1, File o2) { return Integer.parseInt(o1.getName())-Integer.parseInt(o2.getName()); } }); //向合并文件写的流 RandomAccessFile raf_rw = new RandomAccessFile(mergeFile, "rw"); //缓存区 byte[] bytes = new byte[1024]; //遍历分块文件,向合并 的文件写 for (File file : filesList) { //读分块的流 RandomAccessFile raf_r = new RandomAccessFile(file, "r"); int len = -1; while ((len=raf_r.read(bytes))!=-1){ raf_rw.write(bytes,0,len); } raf_r.close(); } raf_rw.close(); //合并文件完成后对合并的文件md5校验 FileInputStream fileInputStream_merge = new FileInputStream(mergeFile); FileInputStream fileInputStream_source = new FileInputStream(sourceFile); String md5_merge = DigestUtils.md5Hex(fileInputStream_merge); String md5_source = DigestUtils.md5Hex(fileInputStream_source); if(md5_merge.equals(md5_source)){ System.out.println("文件合并成功"); } } }
3、上传视频–测试MinIO合并分块
流程:
在Java的MinIO库中,ComposeSource是一个用于指定合并操作的源对象的类。它用于构建对象合并(Object Composition)的配置。
MinIO合并分块用到minioClient.composeObject方法
//将分块文件上传到minio @Test public void uploadChunk() throws IOException, ServerException, InsufficientDataException, ErrorResponseException, NoSuchAlgorithmException, InvalidKeyException, InvalidResponseException, XmlParserException, InternalException { for (int i = 0; i < 33; i++) { //上传文件的参数信息 UploadObjectArgs uploadObjectArgs = UploadObjectArgs.builder() .bucket("testbucket")//桶 .filename("F:\\develop\\video\\chunk\\"+i) //指定本地文件路径 .object("chunk/"+i)//对象名 放在子目录下 .build(); //上传文件 minioClient.uploadObject(uploadObjectArgs); System.out.println("上传分块"+i+"成功"); } } //调用minio接口合并分块 @Test public void testMerge() throws Exception { List<ComposeSource> sources = new ArrayList<>(); for (int i = 0; i < 33; i++) { //指定分块文件的信息 ComposeSource composeSource = ComposeSource.builder().bucket("testbucket").object("chunk/" + i).build(); sources.add(composeSource); } // List<ComposeSource> sources = Stream.iterate(0, i -> ++i).limit(6).map(i -> ComposeSource.builder().bucket("testbucket").object("chunk/" + i).build()).collect(Collectors.toList()); //指定合并后的objectName等信息 ComposeObjectArgs composeObjectArgs = ComposeObjectArgs.builder() .bucket("testbucket") .object("merge01.mp4") .sources(sources)//指定源文件 .build(); //合并文件, //报错size 1048576 must be greater than 5242880,minio默认的分块文件大小为5M minioClient.composeObject(composeObjectArgs); }
4、上传视频–上传分块
4.1API:
package com.xuecheng.media.api; import com.xuecheng.base.model.RestResponse; import com.xuecheng.media.model.dto.UploadFileParamsDto; import com.xuecheng.media.service.MediaFileService; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.multipart.MultipartFile; import java.io.File; @Api(value = "大文件上传接口", tags = "大文件上传接口") @RestController public class BigFilesController { @Autowired MediaFileService mediaFileService; @ApiOperation(value = "文件上传前检查文件") @PostMapping("/upload/checkfile") public RestResponse<Boolean> checkfile( @RequestParam("fileMd5") String fileMd5 ) throws Exception { RestResponse<Boolean> booleanRestResponse = mediaFileService.checkFile(fileMd5); return booleanRestResponse; } @ApiOperation(value = "分块文件上传前的检测") @PostMapping("/upload/checkchunk") public RestResponse<Boolean> checkchunk(@RequestParam("fileMd5") String fileMd5, @RequestParam("chunk") int chunk) throws Exception { RestResponse<Boolean> booleanRestResponse = mediaFileService.checkChunk(fileMd5,chunk); return booleanRestResponse; } @ApiOperation(value = "上传分块文件") @PostMapping("/upload/uploadchunk") public RestResponse uploadchunk(@RequestParam("file") MultipartFile file, @RequestParam("fileMd5") String fileMd5, @RequestParam("chunk") int chunk) throws Exception { //创建一个临时文件 File tempFile = File.createTempFile("minio", ".temp"); file.transferTo(tempFile); //文件路径 String localFilePath = tempFile.getAbsolutePath(); RestResponse restResponse = mediaFileService.uploadChunk(fileMd5, chunk, localFilePath); return restResponse; } @ApiOperation(value = "合并文件") @PostMapping("/upload/mergechunks") public RestResponse mergechunks(@RequestParam("fileMd5") String fileMd5, @RequestParam("fileName") String fileName, @RequestParam("chunkTotal") int chunkTotal) throws Exception { Long companyId = 1232141425L; //文件信息对象 UploadFileParamsDto uploadFileParamsDto = new UploadFileParamsDto(); uploadFileParamsDto.setFilename(fileName); uploadFileParamsDto.setTags("视频文件"); uploadFileParamsDto.setFileType("001002"); RestResponse restResponse = mediaFileService.mergechunks(1232141425L, fileMd5, chunkTotal, uploadFileParamsDto); return restResponse; } }
4.2上传前检查文件,分块是否存在Service:
@Override public RestResponse<Boolean> checkFile(String fileMd5) { //先查询数据库 MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMd5); if(mediaFiles!=null){ //桶 String bucket = mediaFiles.getBucket(); //objectname String filePath = mediaFiles.getFilePath(); //如果数据库存在再查询 minio GetObjectArgs getObjectArgs = GetObjectArgs.builder() .bucket(bucket) .object(filePath) .build(); //查询远程服务获取到一个流对象 try { FilterInputStream inputStream = minioClient.getObject(getObjectArgs); if(inputStream!=null){ //文件已存在 return RestResponse.success(true); } } catch (Exception e) { e.printStackTrace(); } } //文件不存在 return RestResponse.success(false); } @Override public RestResponse<Boolean> checkChunk(String fileMd5, int chunkIndex) { //根据md5得到分块文件所在目录的路径 String chunkFileFolderPath = getChunkFileFolderPath(fileMd5); //如果数据库存在再查询 minio GetObjectArgs getObjectArgs = GetObjectArgs.builder() .bucket(bucket_video) .object(chunkFileFolderPath+chunkIndex) .build(); //查询远程服务获取到一个流对象 try { FilterInputStream inputStream = minioClient.getObject(getObjectArgs); if(inputStream!=null){ //文件已存在 return RestResponse.success(true); } } catch (Exception e) { e.printStackTrace(); } //文件不存在 return RestResponse.success(false); }
4.3上传分块
@Override public RestResponse uploadChunk(String fileMd5, int chunk, String localChunkFilePath) { //分块文件的路径 String chunkFilePath = getChunkFileFolderPath(fileMd5) + chunk; //获取mimeType String mimeType = getMimeType(null); //将分块文件上传到minio boolean b = addMediaFilesToMinIO(localChunkFilePath, mimeType, bucket_video, chunkFilePath); if(!b){ return RestResponse.validfail(false,"上传分块文件失败"); } //上传成功 return RestResponse.success(true); }
4.4合并分块
@Override public RestResponse mergechunks(Long companyId, String fileMd5, int chunkTotal, UploadFileParamsDto uploadFileParamsDto) { //分块文件所在目录 String chunkFileFolderPath = getChunkFileFolderPath(fileMd5); //找到所有的分块文件 List<ComposeSource> sources = Stream.iterate(0, i -> ++i).limit(chunkTotal).map(i -> ComposeSource.builder().bucket(bucket_video).object(chunkFileFolderPath + i).build()).collect(Collectors.toList()); //源文件名称 String filename = uploadFileParamsDto.getFilename(); //扩展名 String extension = filename.substring(filename.lastIndexOf(".")); //合并后文件的objectname String objectName = getFilePathByMd5(fileMd5, extension); //指定合并后的objectName等信息 ComposeObjectArgs composeObjectArgs = ComposeObjectArgs.builder() .bucket(bucket_video) .object(objectName)//合并后的文件的objectname .sources(sources)//指定源文件 .build(); //===========合并文件============ //报错size 1048576 must be greater than 5242880,minio默认的分块文件大小为5M try { minioClient.composeObject(composeObjectArgs); } catch (Exception e) { e.printStackTrace(); log.error("合并文件出错,bucket:{},objectName:{},错误信息:{}",bucket_video,objectName,e.getMessage()); return RestResponse.validfail(false,"合并文件异常"); } //===========校验合并后的和源文件是否一致,视频上传才成功=========== //先下载合并后的文件 File file = downloadFileFromMinIO(bucket_video, objectName); try(FileInputStream fileInputStream = new FileInputStream(file)){ //计算合并后文件的md5 String mergeFile_md5 = DigestUtils.md5Hex(fileInputStream); //比较原始md5和合并后文件的md5 if(!fileMd5.equals(mergeFile_md5)){ log.error("校验合并文件md5值不一致,原始文件:{},合并文件:{}",fileMd5,mergeFile_md5); return RestResponse.validfail(false,"文件校验失败"); } //文件大小 uploadFileParamsDto.setFileSize(file.length()); }catch (Exception e) { return RestResponse.validfail(false,"文件校验失败"); } //==============将文件信息入库============ MediaFiles mediaFiles = currentProxy.addMediaFilesToDb(companyId, fileMd5, uploadFileParamsDto, bucket_video, objectName); if(mediaFiles == null){ return RestResponse.validfail(false,"文件入库失败"); } //==========清理分块文件========= clearChunkFiles(chunkFileFolderPath,chunkTotal); return RestResponse.success(true); } /** * 清除分块文件 * @param chunkFileFolderPath 分块文件路径 * @param chunkTotal 分块文件总数 */ private void clearChunkFiles(String chunkFileFolderPath,int chunkTotal){ Iterable<DeleteObject> objects = Stream.iterate(0, i -> ++i).limit(chunkTotal).map(i -> new DeleteObject(chunkFileFolderPath+ i)).collect(Collectors.toList());; RemoveObjectsArgs removeObjectsArgs = RemoveObjectsArgs.builder().bucket(bucket_video).objects(objects).build(); Iterable<Result<DeleteError>> results = minioClient.removeObjects(removeObjectsArgs); //要想真正删除 results.forEach(f->{ try { DeleteError deleteError = f.get(); } catch (Exception e) { e.printStackTrace(); } }); } /** * 从minio下载文件 * @param bucket 桶 * @param objectName 对象名称 * @return 下载后的文件 */ public File downloadFileFromMinIO(String bucket,String objectName){ //临时文件 File minioFile = null; FileOutputStream outputStream = null; try{ InputStream stream = minioClient.getObject(GetObjectArgs.builder() .bucket(bucket) .object(objectName) .build()); //创建临时文件 minioFile=File.createTempFile("minio", ".merge"); outputStream = new FileOutputStream(minioFile); IOUtils.copy(stream,outputStream); return minioFile; } catch (Exception e) { e.printStackTrace(); }finally { if(outputStream!=null){ try { outputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } return null; }
5、分布式任务调度XXL-job
分布式任务调度是指将任务分发和执行的过程分布到多个计算节点或计算资源上的一种技术。它可以用于处理大规模任务并实现高效的任务执行和资源利用。
XXL-Job 是一个开源的分布式任务调度框架,它提供了一套完整的任务调度和管理解决方案,适用于各种任务调度场景,如定时任务、数据处理、定时报表等。
以下是 XXL-Job 的主要特点和组成部分:
- 分布式架构:XXL-Job 支持分布式部署,可以在多个节点上运行任务调度器和任务执行器,实现任务的分发和执行。
- 可视化管理界面:XXL-Job 提供了一个可视化的管理界面,方便用户配置和管理任务,包括任务的创建、编辑、暂停、恢复和监控等操作。
- 弹性扩展:XXL-Job 支持动态添加和删除任务调度器和任务执行器,以适应不同的任务负载和计算资源需求。
- 任务调度:XXL-Job 支持多种任务调度方式,包括基于固定时间间隔、CRON 表达式、任务触发等方式,可以满足不同类型任务的调度需求。
- 分片广播:XXL-Job 提供了分片广播机制,可以将一个任务拆分成多个子任务,并并行执行,提高任务执行效率。
- 失败重试和告警机制:XXL-Job 支持任务失败重试和告警功能,可以根据配置的策略进行失败重试,并通过邮件、短信等方式发送告警通知。
- 日志和监控:XXL-Job 提供了任务执行日志和监控数据的收集和展示,方便用户进行任务执行的跟踪和分析。
XXL-Job 是基于 Java 开发的,使用了轻量级的任务调度器 Quartz,并提供了一套可靠的任务调度和执行机制。它具有易用性、可扩展性和高可靠性的特点,广泛应用于各种企业和开发者的任务调度需求。
访问:http://192.168.101.65:8088/xxl-job-admin/
账号和密码:admin/123456
6、xxl-job配置执行器
第一步:
导入相应的Pom,哪个去执行就在哪个模块下配置
<dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-job-core</artifactId> </dependency>
第二步:
配置config:
package com.xuecheng.media.config; import com.xxl.job.core.executor.impl.XxlJobSpringExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; public class XxlJobConfig { private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class); @Value("${xxl.job.admin.addresses}") private String adminAddresses; @Value("${xxl.job.accessToken}") private String accessToken; @Value("${xxl.job.executor.appname}") private String appname; @Value("${xxl.job.executor.address}") private String address; @Value("${xxl.job.executor.ip}") private String ip; @Value("${xxl.job.executor.port}") private int port; @Value("${xxl.job.executor.logpath}") private String logPath; @Value("${xxl.job.executor.logretentiondays}") private int logRetentionDays; @Bean public XxlJobSpringExecutor xxlJobExecutor() { logger.info(">>>>>>>>>>> xxl-job config init."); XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appname); xxlJobSpringExecutor.setAddress(address); xxlJobSpringExecutor.setIp(ip); xxlJobSpringExecutor.setPort(port); xxlJobSpringExecutor.setAccessToken(accessToken); xxlJobSpringExecutor.setLogPath(logPath); xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays); return xxlJobSpringExecutor; } /** * 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP; * * 1、引入依赖: * <dependency> * <groupId>org.springframework.cloud</groupId> * <artifactId>spring-cloud-commons</artifactId> * <version>${version}</version> * </dependency> * * 2、配置文件,或者容器启动变量 * spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.' * * 3、获取IP * String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress(); */ }
第三步:
在xxl-job管理界面配置调度中心
第四步:
在nacos改相应的配置,目的是配置调度中心的地址和端口
xxl: job: admin: addresses: http://192.168.101.65:8088/xxl-job-admin executor: appname: testHandler address: ip: port: 9999 logpath: /data/applogs/xxl-job/jobhandler logretentiondays: 30 accessToken: default_token
第五步:
启动项目
7、执行器执行任务
第一步:
写执行的代码
package com.xuecheng.media.service.jobhandler; import com.xxl.job.core.context.XxlJobHelper; import com.xxl.job.core.handler.annotation.XxlJob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.DataOutputStream; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; import java.util.Arrays; import java.util.concurrent.TimeUnit; /** * XxlJob开发示例(Bean模式) * * 开发步骤: * 1、任务开发:在Spring Bean实例中,开发Job方法; * 2、注解配置:为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。 * 3、执行日志:需要通过 "XxlJobHelper.log" 打印执行日志; * 4、任务结果:默认任务结果为 "成功" 状态,不需要主动设置;如有诉求,比如设置任务结果为失败,可以通过 "XxlJobHelper.handleFail/handleSuccess" 自主设置任务结果; * * @author xuxueli 2019-12-11 21:52:51 */ @Component public class SampleXxlJob { private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class); /** * 1、简单任务示例(Bean模式) */ @XxlJob("demoJobHandler") public void demoJobHandler() throws Exception { System.out.println("处理视频......."); } @XxlJob("demoJobHandler2") public void demoJobHandler2() throws Exception { System.out.println("处理文档......."); // default success } /** * 2、分片广播任务 */ @XxlJob("shardingJobHandler") public void shardingJobHandler() throws Exception { // 分片参数 int shardIndex = XxlJobHelper.getShardIndex();//执行器的序号,从0开始 int shardTotal = XxlJobHelper.getShardTotal();//执行器总数 System.out.println("shardIndex="+shardIndex+",shardTotal="+shardTotal); } }
第二步:配置任务执行器
第三步:
启动项目
8、xxl-job高级配置参数
8.1配置集群
8.2普通 hash 算法 (取模算法):
在了解一致性哈希算法之前,我们先了解一下缓存中的一个应用场景,了解了这个应用场景之后,再来理解一致性哈希算法,就容易多了,也更能体现出一致性哈希算法的优点,那么,我们先来描述一下这个经典的分布式缓存的应用场景。
1、普通 hash算法 与 使用场景描述:
假设我们有三台缓存服务器,用于缓存图片,我们为这三台缓存服务器编号为 0号、1号、2号,现在有3万张图片需要缓存,我们希望这些图片被均匀的缓存到这3台服务器上,以便它们能够分摊缓存的压力。也就是说,我们希望每台服务器能够缓存1万张左右的图片,那么我们应该怎样做呢?常见的做法是对缓存项的键进行哈希,将hash后的结果对缓存服务器的数量进行取模操作,通过取模后的结果,决定缓存项将会缓存在哪一台服务器上
我们举例说明,以刚才描述的场景为例,假设图片名称是不重复的,那我们就可以使用图片名称作为访问图片的key,使用如下公式,计算出图片应该存放在哪台服务器上。
hash(图片名称)% N
当我们对同一个图片名称做相同的哈希计算时,得出的结果应该是不变的,如果我们有3台服务器,使用哈希后的结果对3求余,那么余数一定是0、1或者2;如果求余的结果为0, 就把当前图片缓存在0号服务器上,如果余数为1,就缓存在1号服务器上,以此类推;同理,当我们访问任意图片时,只要再次对图片名称进行上述运算,即可得出图片应该存放在哪一台缓存服务器上,我们只要在这一台服务器上查找图片即可,如果图片在对应的服务器上不存在,则证明对应的图片没有被缓存,也不用再去遍历其他缓存服务器了,通过这样的方法,即可将3万张图片随机的分布到3台缓存服务器上了,而且下次访问某张图片时,直接能够判断出该图片应该存在于哪台缓存服务器上,我们暂时称上述算法为 HASH 算法或者取模算法,取模算法的过程可以用下图表示:
2、普通 hash 算法的缺陷:
上述HASH算法时,会出现一些缺陷:如果服务器已经不能满足缓存需求,就需要增加服务器数量,假设我们增加了一台缓存服务器,此时如果仍然使用上述方法对同一张图片进行缓存,那么这张图片所在的服务器编号必定与原来3台服务器时所在的服务器编号不同,因为除数由3变为了4,最终导致所有缓存的位置都要发生改变,也就是说,当服务器数量发生改变时,所有缓存在一定时间内是失效的,当应用无法从缓存中获取数据时,则会向后端服务器请求数据;同理,假设突然有一台缓存服务器出现了故障,那么我们则需要将故障机器移除,那么缓存服务器数量从3台变为2台,同样会导致大量缓存在同一时间失效,造成了缓存的雪崩,后端服务器将会承受巨大的压力,整个系统很有可能被压垮。为了解决这种情况,就有了一致性哈希算法。
二、一致性哈希算法:
1、什么是一致性 hash 算法:
一致性哈希算法也是使用取模的方法,但是取模算法是对服务器的数量进行取模,而一致性哈希算法是对 2^32 取模,具体步骤如下:
- 步骤一:一致性哈希算法将整个哈希值空间按照顺时针方向组织成一个虚拟的圆环,称为 Hash 环;
- 步骤二:接着将各个服务器使用 Hash 函数进行哈希,具体可以选择服务器的IP或主机名作为关键字进行哈希,从而确定每台机器在哈希环上的位置
- 步骤三:最后使用算法定位数据访问到相应服务器:将数据key使用相同的函数Hash计算出哈希值,并确定此数据在环上的位置,从此位置沿环顺时针寻找,第一台遇到的服务器就是其应该定位到的服务器
下面我们使用具体案例说明一下一致性哈希算法的具体流程:
(1)步骤一:哈希环的组织:
我们将 2^32 想象成一个圆,像钟表一样,钟表的圆可以理解成由60个点组成的圆,而此处我们把这个圆想象成由2^32个点组成的圆,示意图如下:
圆环的正上方的点代表0,0点右侧的第一个点代表1,以此类推,2、3、4、5、6……直到2^32-1,也就是说0点左侧的第一个点代表2^32-1,我们把这个由 2^32 个点组成的圆环称为hash环。
(2)步骤二:确定服务器在哈希环的位置:
哈希算法:hash(服务器的IP) % 2^32
上述公式的计算结果一定是 0 到 2^32-1 之间的整数,那么上图中的 hash 环上必定有一个点与这个整数对应,所以我们可以使用这个整数代表服务器,也就是服务器就可以映射到这个环上,假设我们有 ABC 三台服务器,那么它们在哈希环上的示意图如下:
(3)步骤三:将数据映射到哈希环上:
我们还是使用图片的名称作为 key,所以我们使用下面算法将图片映射在哈希环上:hash(图片名称) % 2^32,假设我们有4张图片,映射后的示意图如下,其中橘黄色的点表示图片:
那么,怎么算出上图中的图片应该被缓存到哪一台服务上面呢?我们只要从图片的位置开始,沿顺时针方向遇到的第一个服务器就是图片存放的服务器了。最终,1号、2号图片将会被缓存到服务器A上,3号图片将会被缓存到服务器B上,4号图片将会被缓存到服务器C上。
2、一致性 hash 算法的优点:
前面提到,如果简单对服务器数量进行取模,那么当服务器数量发生变化时,会产生缓存的雪崩,从而很有可能导致系统崩溃,而使用一致性哈希算法就可以很好的解决这个问题,因为一致性Hash算法对于节点的增减都只需重定位环空间中的一小部分数据,只有部分缓存会失效,不至于将所有压力都在同一时间集中到后端服务器上,具有较好的容错性和可扩展性。
假设服务器B出现了故障,需要将服务器B移除,那么移除前后的示意图如下图所示:
在服务器B未移除时,图片3应该被缓存到服务器B中,可是当服务器B移除以后,按照之前描述的一致性哈希算法的规则,图片3应该被缓存到服务器C中,因为从图片3的位置出发,沿顺时针方向遇到的第一个缓存服务器节点就是服务器C,也就是说,如果服务器B出现故障被移除时,图片3的缓存位置会发生改变,但是,图片4仍然会被缓存到服务器C中,图片1与图片2仍然会被缓存到服务器A中,这与服务器B移除之前并没有任何区别,这就是一致性哈希算法的优点。
3、hash 环的倾斜与虚拟节点:
一致性哈希算法在服务节点太少的情况下,容易因为节点分部不均匀而造成数据倾斜问题,也就是被缓存的对象大部分集中缓存在某一台服务器上,从而出现数据分布不均匀的情况,这种情况就称为 hash 环的倾斜。如下图所示:
hash 环的倾斜在极端情况下,仍然有可能引起系统的崩溃,为了解决这种数据倾斜问题,一致性哈希算法引入了虚拟节点机制,即对每一个服务节点计算多个哈希,每个计算结果位置都放置一个此服务节点,称为虚拟节点,一个实际物理节点可以对应多个虚拟节点,虚拟节点越多,hash环上的节点就越多,缓存被均匀分布的概率就越大,hash环倾斜所带来的影响就越小,同时数据定位算法不变,只是多了一步虚拟节点到实际节点的映射。具体做法可以在服务器ip或主机名的后面增加编号来实现,加入虚拟节点以后的hash环如下:
9、xxl-job分片广播
前面的那几个负载均衡都是同一个时间只有一个节点在运行,要想真正并行处理,那就需要用到分片广播,而且分片广播是动态扩容的,也就是说当你增加一台服务器时候能动态识别出来。
步骤一:启动两个服务器
首先要修改nacos配置文件,本地配置优先
cloud: config: override-none: true
配置端口:
-Dserver.port=63051 -Dxxl.job.executor.port=9998
步骤二:启动成功标志