前言
在当今快节奏的数字时代,大文件的下载已经成为我们日常生活中不可或缺的一部分。然而,传统的单线程下载器在面临大文件时往往显得力不从心,下载速度缓慢,用户体验不佳。
老读者应该知道,我最近在研究Java多线程并发编程这一块的内容,故想要编写一个多线程下载工具,一是为了知识的落地实践,二是可以将这个工具运用到平时下载大文件的地方。
1 基础知识回顾
为了照顾一些新来的小伙伴,我这里简单讲解一下在Java中一些常用的多线程实现
1.1 线程的创建和启动
在Java中,线程可以通过以下几种方式创建和启动一个新的线程:
继承Thread类:自定义一个类,继承自Thread类,并重写run()方法。
创建线程对象并调用start()方法启动线程。
public class MyThread extends Thread { @Override public void run() { // 线程执行的代码 } }
实现Runnable接口:自定义一个类,实现Runnable接口,并重写run()方法。
创建Runnable对象,并将其传递给Thread对象,然后调用start()方法。
public class MyRunnable implements Runnable { @Override public void run() { // 线程执行的代码 } }
使用ExecutorService:这是一个更高级的方式,用于管理线程池。本次的多线程下载器主要用的就是这种方式实现
ExecutorService executor = Executors.newFixedThreadPool(5); executor.execute(new MyRunnable());
1.2 线程池的使用
线程池是一种管理线程的更高效的方式,可以避免频繁创建和销毁线程的开销。Java中使用ExecutorService接口来管理线程池。
固定大小的线程池:
创建一个固定大小的线程池,最多同时运行5个线程。
ExecutorService executor = Executors.newFixedThreadPool(5);
单线程的Executor:
创建一个只有一个线程的线程池,所有任务按顺序执行。
ExecutorService executor = Executors.newSingleThreadExecutor();
缓存线程池:
创建一个可以根据需要创建新线程的线程池,适合执行短期异步任务。
ExecutorService executor = Executors.newCachedThreadPool();
计划任务的ScheduledExecutorService:
创建一个可以定时执行任务的线程池,适合执行周期性任务。
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5); scheduler.scheduleAtFixedRate(new MyRunnable(), 0, 10, TimeUnit.SECONDS);
在本次案例中,使用了Executors.newFixedThreadPool(DOWNLOAD_THREAD_NUM + 1)来创建一个固定大小的线程池,用于管理下载任务和日志线程。这种方式确保了线程的复用,并且能够有效地控制线程的数量。
2.运行环境说明
Maven依赖如下
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.wdbyte</groupId> <artifactId>down-bit</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.1.0</version> <configuration> <archive> <manifest> <mainClass>com.wdbyte.downbit.DownloadMain</mainClass> </manifest> </archive> <descriptorRefs> <!-- 这个jar-with-dependencies是assembly预先写好的一个,组装描述引用 --> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <!--工程名--> <finalName>${project.name}</finalName> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
项目整体结构如下
3.核心模块实现
3.1下载线程的实现
/** * 多线程下载工具类 * @author 牵着猫散步的鼠鼠-LiuShiJie */ public class DownloadThread implements Callable<Boolean> { /** * 每次读取的数据块大小 */ private static int BYTE_SIZE = 1024 * 100; /** * 下载链接 */ private String url; /** * 下载开始位置 */ private long startPos; /** * 要下载的文件区块大小 */ private Long endPos; /** * 标识多线程下载切分的第几部分 */ private Integer part; /** * 文件总大小 */ private Long contentLenth; public DownloadThread(String url, long startPos, Long endPos, Integer part, Long contentLenth) { this.url = url; this.startPos = startPos; this.endPos = endPos; this.part = part; this.contentLenth = contentLenth; } @Override public Boolean call() throws Exception { if (url == null || url.trim() == "") { throw new RuntimeException("下载路径不正确"); } // 文件名 String httpFileName = HttpUtls.getHttpFileName(url); if (part != null) { httpFileName = httpFileName + DownloadMain.FILE_TEMP_SUFFIX + part; } // 本地文件大小 Long localFileContentLength = FileUtils.getFileContentLength(httpFileName); LogThread.LOCAL_FINISH_SIZE.addAndGet(localFileContentLength); if (localFileContentLength >= endPos - startPos) { LogUtils.info("{} 已经下载完毕,无需重复下载", httpFileName); LogThread.DOWNLOAD_FINISH_THREAD.addAndGet(1); return true; } if (endPos.equals(contentLenth)) { endPos = null; } HttpURLConnection httpUrlConnection = HttpUtls.getHttpUrlConnection(url, startPos + localFileContentLength, endPos); // 获得输入流 try (InputStream input = httpUrlConnection.getInputStream(); BufferedInputStream bis = new BufferedInputStream(input); RandomAccessFile oSavedFile = new RandomAccessFile(httpFileName, "rw")) { oSavedFile.seek(localFileContentLength); byte[] buffer = new byte[BYTE_SIZE]; int len = -1; // 读到文件末尾则返回-1 while ((len = bis.read(buffer)) != -1) { oSavedFile.write(buffer, 0, len); LogThread.DOWNLOAD_SIZE.addAndGet(len); } } catch (FileNotFoundException e) { LogUtils.error("ERROR! 要下载的文件路径不存在 {} ", url); return false; } catch (Exception e) { LogUtils.error("下载出现异常"); e.printStackTrace(); return false; } finally { httpUrlConnection.disconnect(); LogThread.DOWNLOAD_FINISH_THREAD.addAndGet(1); } return true; } }
3.2日志线程的实现
/** * 多线程下载日志记录 * @author 牵着猫散步的鼠鼠-LiuShiJie */ public class LogThread implements Callable<Boolean> { // 本地下载的文件大小 public static AtomicLong LOCAL_FINISH_SIZE = new AtomicLong(); // 已经下载的文件大小 public static AtomicLong DOWNLOAD_SIZE = new AtomicLong(); // 下载完成的线程数 public static AtomicLong DOWNLOAD_FINISH_THREAD = new AtomicLong(); // 待下载的文件总大小 private long httpFileContentLength; public LogThread(long httpFileContentLength) { this.httpFileContentLength = httpFileContentLength; } @Override public Boolean call() throws Exception { int[] downSizeArr = new int[5]; int i = 0; double size = 0; double mb = 1024d * 1024d; // 文件总大小 String httpFileSize = String.format("%.2f", httpFileContentLength / mb); while (DOWNLOAD_FINISH_THREAD.get() != DownloadMain.DOWNLOAD_THREAD_NUM) { double downloadSize = DOWNLOAD_SIZE.get(); downSizeArr[++i % 5] = Double.valueOf(downloadSize - size).intValue(); size = downloadSize; // 每秒速度 double fiveSecDownloadSize = Arrays.stream(downSizeArr).sum(); int speed = (int)((fiveSecDownloadSize / 1024d) / (i < 5d ? i : 5d)); // 剩余时间 double surplusSize = httpFileContentLength - downloadSize - LOCAL_FINISH_SIZE.get(); String surplusTime = String.format("%.1f", surplusSize / 1024d / speed); if (surplusTime.equals("Infinity")) { surplusTime = "-"; } // 已下大小 String currentFileSize = String.format("%.2f", downloadSize / mb + LOCAL_FINISH_SIZE.get() / mb); String speedLog = String.format("> 已下载 %smb/%smb,速度 %skb/s,剩余时间 %ss", currentFileSize, httpFileSize, speed, surplusTime); System.out.print("\r"); System.out.print(speedLog); // 一秒更新一次日志 Thread.sleep(1000); } System.out.println(); return true; } }
3.3相关工具类的实现
文件操作工具类FileUtils ,主要用来获取文件大小长度
public class FileUtils { /** * 获取文件内容长度 * * @param name * @return */ public static long getFileContentLength(String name) { File file = new File(name); return file.exists() && file.isFile() ? file.length() : 0; } }
网络请求操作工具类HttpUtls,主要是一些常用的Http操作
/** * 网络请求操作工具类 * @author 牵着猫散步的鼠鼠-LiuShiJie */ public class HttpUtls { /** * 获取 HTTP 链接 * * @param url * @return * @throws IOException */ public static HttpURLConnection getHttpUrlConnection(String url) throws IOException { URL httpUrl = new URL(url); HttpURLConnection httpConnection = (HttpURLConnection)httpUrl.openConnection(); httpConnection.setRequestProperty("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36"); return httpConnection; } /** * 获取 HTTP 链接 * * @param url * @param start * @param end * @return * @throws IOException */ public static HttpURLConnection getHttpUrlConnection(String url, long start, Long end) throws IOException { HttpURLConnection httpUrlConnection = getHttpUrlConnection(url); LogUtils.debug("此线程下载内容区间 {}-{}", start, end); if (end != null) { httpUrlConnection.setRequestProperty("RANGE", "bytes=" + start + "-" + end); } else { httpUrlConnection.setRequestProperty("RANGE", "bytes=" + start + "-"); } Map<String, List<String>> headerFields = httpUrlConnection.getHeaderFields(); for (String s : headerFields.keySet()) { LogUtils.debug("此线程相应头{}:{}", s, headerFields.get(s)); } return httpUrlConnection; } /** * 获取网络文件大小 bytes * * @param url * @return * @throws IOException */ public static long getHttpFileContentLength(String url) throws IOException { HttpURLConnection httpUrlConnection = getHttpUrlConnection(url); int contentLength = httpUrlConnection.getContentLength(); httpUrlConnection.disconnect(); return contentLength; } /** * 获取网络文件 Etag * * @param url * @return * @throws IOException */ public static String getHttpFileEtag(String url) throws IOException { HttpURLConnection httpUrlConnection = getHttpUrlConnection(url); Map<String, List<String>> headerFields = httpUrlConnection.getHeaderFields(); List<String> eTagList = headerFields.get("ETag"); httpUrlConnection.disconnect(); return eTagList.get(0); } /** * 获取网络文件名 * * @param url * @return */ public static String getHttpFileName(String url) { int indexOf = url.lastIndexOf("/"); return url.substring(indexOf + 1); } }
日志工具类LogUtils ,主要负责日志格式化输出
/** * 日志工具类,输出日志 * @author 牵着猫散步的鼠鼠-LiuShiJie */ public class LogUtils { public static boolean DEBUG = false; static DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss"); public static void info(String msg, Object... arg) { print(msg, " -INFO- ", arg); } public static void error(String msg, Object... arg) { print(msg, " -ERROR-", arg); } public static void debug(String msg, Object... arg) { if (DEBUG) { print(msg, " -DEBUG-", arg); } } private static void print(String msg, String level, Object... arg) { if (arg != null && arg.length > 0) { msg = String.format(msg.replace("{}", "%s"), arg); } String thread = Thread.currentThread().getName(); System.out.println(LocalDateTime.now().format(dateTimeFormatter) + " " + thread + level + msg); } }
迅雷链接转换工具类ThunderUtils ,迅雷链接与普通链接不同,需要转换
/** * 迅雷链接转换工具 * @author 牵着猫散步的鼠鼠-LiuShiJie */ public class ThunderUtils { private static String THUNDER = "thunder://"; /** * 判断是否是迅雷链接 * * @param url * @return */ public static boolean isThunderLink(String url) { return url.startsWith(THUNDER); } /** * 转换成 HTTP URL * * @param url * @return */ public static String toHttpUrl(String url) { if (!isThunderLink(url)) { return url; } LogUtils.info("当前链接是迅雷链接,开始转换..."); url = url.replaceFirst(THUNDER, ""); try { // base 64 转换 url = new String(Base64.getDecoder().decode(url.getBytes()), "UTF-8"); // url 解码 url = URLDecoder.decode(url, "UTF-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } // 去头去尾 if (url.startsWith("AA")) { url = url.substring(2); } if (url.endsWith("ZZ")) { url = url.substring(0, url.length() - 2); } LogUtils.info("当前链接是迅雷链接,转换结果:{}", url); return url; } }
3.4核心业务实现
/** * 多线程下载 * 断点续传下载 demo * @author 牵着猫散步的鼠鼠-LiuShiJie */ public class DownloadMain { // 下载线程数量 public static int DOWNLOAD_THREAD_NUM = 5; // 下载线程池 private static ExecutorService executor = Executors.newFixedThreadPool(DOWNLOAD_THREAD_NUM + 1); // 临时文件后缀 public static String FILE_TEMP_SUFFIX = ".temp"; // 支持的 URL 协议 private static HashSet<String> PROTOCAL_SET = new HashSet(); static { PROTOCAL_SET.add("thunder://"); PROTOCAL_SET.add("http://"); PROTOCAL_SET.add("https://"); } public static void main(String[] args) throws Exception { Scanner scanner = new Scanner(System.in); System.out.println("请输入要下载的链接:"); String url = scanner.nextLine(); long count = PROTOCAL_SET.stream().filter(prefix -> url.startsWith(prefix)).count(); if (count == 0) { LogUtils.info("不支持的协议类型"); return; } LogUtils.info("要下载的链接是:{}", url); new DownloadMain().download(ThunderUtils.toHttpUrl(url)); } public void download(String url) throws Exception { String fileName = HttpUtls.getHttpFileName(url); long localFileSize = FileUtils.getFileContentLength(fileName); // 获取网络文件具体大小 long httpFileContentLength = HttpUtls.getHttpFileContentLength(url); if (localFileSize >= httpFileContentLength) { LogUtils.info("{}已经下载完毕,无需重新下载", fileName); return; } List<Future<Boolean>> futureList = new ArrayList<>(); if (localFileSize > 0) { LogUtils.info("开始断点续传 {}", fileName); } else { LogUtils.info("开始下载文件 {}", fileName); } LogUtils.info("开始下载时间 {}", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss"))); long startTime = System.currentTimeMillis(); // 任务切分 splitDownload(url, futureList); LogThread logThread = new LogThread(httpFileContentLength); Future<Boolean> future = executor.submit(logThread); futureList.add(future); // 开始下载 for (Future<Boolean> booleanFuture : futureList) { booleanFuture.get(); } LogUtils.info("文件下载完毕 {},本次下载耗时:{}", fileName, (System.currentTimeMillis() - startTime) / 1000 + "s"); LogUtils.info("结束下载时间 {}", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss"))); // 文件合并 boolean merge = merge(fileName); if (merge) { // 清理分段文件 clearTemp(fileName); } LogUtils.info("本次文件下载结束,下载位置为" + fileName); System.exit(0); } /** * 切分下载任务到多个线程 * * @param url * @param futureList * @throws IOException */ public void splitDownload(String url, List<Future<Boolean>> futureList) throws IOException { long httpFileContentLength = HttpUtls.getHttpFileContentLength(url); // 任务切分 long size = httpFileContentLength / DOWNLOAD_THREAD_NUM; long lastSize = httpFileContentLength - (httpFileContentLength / DOWNLOAD_THREAD_NUM * (DOWNLOAD_THREAD_NUM - 1)); for (int i = 0; i < DOWNLOAD_THREAD_NUM; i++) { long start = i * size; Long downloadWindow = (i == DOWNLOAD_THREAD_NUM - 1) ? lastSize : size; Long end = start + downloadWindow; if (start != 0) { start++; } DownloadThread downloadThread = new DownloadThread(url, start, end, i, httpFileContentLength); Future<Boolean> future = executor.submit(downloadThread); futureList.add(future); } } public boolean merge(String fileName) throws IOException { LogUtils.info("开始合并文件 {}", fileName); byte[] buffer = new byte[1024 * 10]; int len = -1; try (RandomAccessFile oSavedFile = new RandomAccessFile(fileName, "rw")) { for (int i = 0; i < DOWNLOAD_THREAD_NUM; i++) { try (BufferedInputStream bis = new BufferedInputStream( new FileInputStream(fileName + FILE_TEMP_SUFFIX + i))) { while ((len = bis.read(buffer)) != -1) { // 读到文件末尾则返回-1 oSavedFile.write(buffer, 0, len); } } } LogUtils.info("文件合并完毕 {}", fileName); } catch (Exception e) { e.printStackTrace(); return false; } return true; } public boolean clearTemp(String fileName) { LogUtils.info("开始清理临时文件 {}{}0-{}", fileName, FILE_TEMP_SUFFIX, (DOWNLOAD_THREAD_NUM - 1)); for (int i = 0; i < DOWNLOAD_THREAD_NUM; i++) { File file = new File(fileName + FILE_TEMP_SUFFIX + i); file.delete(); } LogUtils.info("临时文件清理完毕 {}{}0-{}", fileName, FILE_TEMP_SUFFIX, (DOWNLOAD_THREAD_NUM - 1)); return true; } /** * 使用CheckedInputStream计算CRC */ public static Long getCRC32(String filepath) throws IOException { InputStream inputStream = new BufferedInputStream(new FileInputStream(filepath)); CRC32 crc = new CRC32(); byte[] bytes = new byte[1024]; int cnt; while ((cnt = inputStream.read(bytes)) != -1) { crc.update(bytes, 0, cnt); } inputStream.close(); return crc.getValue(); } }
4.功能测试
启动main程序,输入下载链接,我们这里使用腾讯QQ安装包的CDN链接(https://dldir1.qq.com/qqfile/qq/PCQQ9.7.17/QQ9.7.17.29225.exe)来测试下载功能
文件下载成功,在项目上级目录文件完整,能够正常使用
总结
本文通过详细解析一个多线程下载器的实现,深入探讨了Java多线程编程的基础知识、工具类的使用以及核心模块的实现。我们学习了如何创建和启动线程、线程的同步和互斥、以及线程池的使用,这些都是多线程下载器的基础。同时,我们了解了如何使用Java的标准库来实现文件操作、网络请求、日志记录等功能。
希望文章对您的学习有帮助,有时间会继续出Java并发编程相关的内容~