一. 前言
Executors 是一种典型的生产者 - 消费者模式, java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。线程池就是将线程进行池化,需要运行任务时从池中拿一个线程来执行,执行完毕,线程放回池中。
小伙伴们想想: 如果不使用线程池,每个任务都新开一个线程处理 for循环创建线程 这样开销太大,我们希望有固定数量的线程,来执行这1000个线 程,这样就避免了反复创建并销毁线程所带来的开销问题,所以我们引入了线程池。它解决了两个业务痛点:
- 问题一:反复创建线程开销大
- 问题二:过多的线程会占用太多内存
那么它又是怎么解决上述问题的呢?
用少量的线程避免内存占用过多,让这部分线程都保持工作,且可以反复执行任务避免生 命周期的损耗
在开发过程中,合理地使用线程池能够带来3个好处。
- 第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
- 第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。
有没有一种优化后的线程池方案可以进行统一的分配、调优和监控呢?今天我就带大家手把手写一个简化版的线程下载器
需求功能点如下:
- 支持文件下载监听成功,失败,暂停,下载进度回调
- 支持无限制添加任务到任务队列
- 支持查看文件总下载大小
- 支持查看已经下载的大小
- 支持查看下载成功后的目标文件路径
- 可以根据下载错误码定位下载失败原因
- 支持取消,暂停,开始下载任务
二. 关键技术
文件下载器不是很复杂,但是前提是你要对Executor一些参数要有简单的了解下面我列了一个表格,对每个参数都详细的介绍了一下
2.1 ThreadPoolExecutor 核心参数介绍
参数名 | 参数作用 |
corePoolSize | 线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。 |
maximumPoolSize | 线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize |
keepAliveTime | 线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间。默认情况下,该参数只在线程数大于corePoolSize时才有用 |
threadFactory | 创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名Executors静态工厂里默认的threadFactory,线程的命名规则是“pool-数字-thread-数字” |
handler | 通过这个参数你可以自定义任务的拒绝策略。如果线程池中所有的线程都在忙碌,并且工作队列也满了(前提是工作队列是有界队列),那么此时提交任务,线程池就会拒绝接收。至于拒绝的策略,你可以通过 handler 这个参数来指定。 |
TimeUnit | keepAliveTime的时间单位 |
2.1.1 RejectedExecutionHandler (饱和策阅)
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略: CallerRunsPolicy AbortPolicy DiscardPolicy DiscardOldestPolicy,API文档也给大家列好了,不记得的可以翻一翻
策阅名称 | 策阅作用 |
CallerRunsPolicy | 用调用者所在的线程来执行任务 |
AbortPolicy | 默认的拒绝策略,会 throws RejectedExecutionException |
DiscardPolicy | 丢弃阻塞队列中靠最前的任务,并执行当前任务 |
DiscardOldestPolicy | 丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列 |
2.1.2 workQueue (工作队列)
workQueue必须是BlockingQueue阻塞队列。当线程池中的线程数超过它的corePoolSize的时候,线程会进入阻塞队列进行阻塞等待。通过workQueue,线程池实现了阻塞功能
那么什么是阻塞队列呢?
阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。Java提供的阻塞队列如下:
阻塞队列名称 | 概念 |
ArrayBlockingQueue | 一个由数组结构组成的有界阻塞队列。 |
LinkedBlockingQueue | 一个由链表结构组成的有界阻塞队列 |
PriorityBlockingQueue | 一个支持优先级排序的无界阻塞队列 |
DelayQueue | 一个使用优先级队列实现的无界阻塞队列 |
SynchronousQueue | 一个不存储元素的阻塞队列 |
LinkedTransferQueue | 一个由链表结构组成的无界阻塞队列 |
LinkedBlockingDeque | 一个由链表结构组成的双向阻塞队列 |
阻塞队列怎么实现增删改查呢?
这里表格也给大家准备好了:
2.2 如何创建一个线程池
所有的API介绍都一一和大家介绍了,那么我们该如何传建线程池呢,创建线程池有五种方式: 定长线程池 , 可缓存的线程池 , 单线程的线程池还有长线程池,具体应用场景如下:
2.2.1 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
2.2.2 根据所需的并发数来动态创建和关闭线程。能够合理的使用CPU进行对任务进行并发操作,所以适合使用在很耗时的任务。
- 注意返回的是ForkJoinPool对象。
2.2.3 根据需求创建线程,对于空闲的可以reuse。对于短周期的异步请求任务使用cachedThreadPool会提高程序的性能。线程池中的线程在60秒内没有被使用的话,就会被终止并且移出pool。
2.2.4 创建一个单线程的线程池
2.2.5. 创建一个定长线程池,支持定时及周期性任务执行。
2.3 ThreadPoolExecutor 源码结构
2.4 线程池的工作原理
- 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)
- 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
- 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。
- 如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。
整体来说: 在 线程池 的内部,我们维护了一个阻塞队列 workQueue 和一组工作线程,工作线程的个数由构造函数中的 poolSize 来指定。用户通过调用 execute() 方法来提交 Runnable 任务,execute() 方法的内部实现仅仅是将任务加入到 workQueue 中。线程池 内部维护的工作线程会消费 workQueue 中的任务并执行任务,里面是一个 while 循环。
2.5 如何关闭线程池:
2.5.1 shutDown
shutdown调用的是advanceRunState(SHUTDOWN),而shutdownNow调用的是(STOP),即调用后设置的线程池状态不同
2.5.2 shutDownNow
shutdown调用的是中断空闲的Workers,而shutdownNow调用的是中断所有的Workers shutdownNow会把所有任务队列中的任务取出来,返回一个任务列表。而shutdown什么都不返回。
2.6 如何合理的配置线程池
要想合理地配置线程池,就必须首先分析任务特性要想合理地配置线程池,就必须首先分析任务特性
- 任务的性质:CPU密集型任务、IO密集型任务和混合型任务。
- 任务的优先级:高、中和低。
- 任务的执行时间:长、中和短。
- 任务的依赖性:是否依赖其他系统资源,如数据库连接。
2.6.1 确定核心线程数
混合型的任务,如果可以分成一个CPU密集型任务和一个 IO密集型(读写数据库、文件、网络读写等) 任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors(); //核心线程数量大小 private static final int corePoolSize = Math.max(2, Math.min(CPU_COUNT - 1, 4));
2.6.2 确定线程池最大容纳线程数
由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*Ncpu。
private static final int maximumPoolSize = CPU_COUNT * 2 + 1;
2.6.3 线程池的创建
private static ThreadPoolProxy mBackgroundPool = null; private static final Object mBackgroundLock = new Object(); private static ThreadPoolProxy mDownloadPool = null; private static final Object mDownloadLock = new Object(); private static Map<String, ThreadPoolProxy> mMap = new HashMap<>(); private static final Object mSingleLock = new Object();
2.6.3.0 线程池策阅ThreadPoolProxy
public static class ThreadPoolProxy { private ThreadPoolExecutor mPool; private int mCorePoolSize; private int mMaximumPoolSize; private long mKeepAliveTime; private boolean mIsPriority; /** * @param corePoolSize 核心线程数量 * @param maximumPoolSize 最大线程数量 * @param keepAliveTime 空闲线程存活时间,秒 */ private ThreadPoolProxy(int corePoolSize, int maximumPoolSize, long keepAliveTime, boolean isPriority) { mCorePoolSize = corePoolSize; mMaximumPoolSize = maximumPoolSize; mKeepAliveTime = keepAliveTime; mIsPriority = isPriority; }
执行任务,当线程池处于关闭,将会重新创建新的线程池
public synchronized void execute(Runnable run) { if (run == null) { return; } if (mPool == null || mPool.isShutdown()) { //ThreadFactory是每次创建新的线程工厂 if (mIsPriority) {//使用优先级队列 mPool = new ThreadPoolExecutor(mCorePoolSize, mMaximumPoolSize, mKeepAliveTime, TimeUnit.SECONDS, new PriorityBlockingQueue<>(), Executors.defaultThreadFactory(), new AbortPolicy()); } else {//队列任务 mPool = new ThreadPoolExecutor(mCorePoolSize, mMaximumPoolSize, mKeepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new AbortPolicy()); } } mPool.execute(run); }
取消线程池中某个还未执行的任务
public synchronized void remove(Runnable run) { if (mPool != null && (!mPool.isShutdown() || mPool.isTerminating())) { mPool.getQueue().remove(run); } } /** * 是否包含某个任务 */ public synchronized boolean contains(Runnable run) { if (mPool != null && (!mPool.isShutdown() || mPool.isTerminating())) { return mPool.getQueue().contains(run); } else { return false; } }
关闭线程池
/** * 关闭线程池, * * @param isNow if true 立即终止线程池,并尝试打断正在执行的任务,清空任务缓存队列,返回尚未执行的任务。 * if false ,确保所有已经加入的任务都将会被执行完毕才关闭,后面不接受任务 **/ public synchronized void shutdown(boolean isNow) { if (mPool != null && (!mPool.isShutdown() || mPool.isTerminating())) { if (isNow) { mPool.shutdownNow(); } else { mPool.shutdown(); } } } } }
2.6.3.1 CPU密集型(加密、计算hash等)
- 获取后台线程池,核心线程会一直存活。
CPU密集型任务应配置尽可能小的线程,如配置Ncpu+1个线程的线程池。
public static ThreadPoolProxy getBackgroundPool() { synchronized (mBackgroundLock) { if (mBackgroundPool == null) { mBackgroundPool = new ThreadPoolProxy(corePoolSize, maximumPoolSize, 60L, false); } return mBackgroundPool; } }
2.6.3.2 并发线程池
获取一个用于文件并发下载的线程池,修改核心线程数和最大线程数
public static ThreadPoolProxy getDownloadPool() { synchronized (mDownloadLock) { if (mDownloadPool == null) { mDownloadPool = new ThreadPoolProxy(4, 12, 60L, true); } return mDownloadPool; } }
2.6.3.3 单线程池
获取一个单线程池,所有任务将会被按照加入的顺序执行,免除了同步开销的问题
public static ThreadPoolProxy getSinglePool(String name) { synchronized (mSingleLock) { ThreadPoolProxy singlePool = mMap.get(name); if (singlePool == null) { singlePool = new ThreadPoolProxy(0, 1, 60L, false); mMap.put(name, singlePool); } return singlePool; } }
三. 简易版下载器实现
3.0.1 定义当前下载任务枚举状态 DownloadStatus
枚举名 | 意义 |
STATUS_NONE | 默认状态,无任务 |
STATUS_WAITING | 等待下载 |
STATUS_DOWNLOADING | 正在下载 |
STATUS_PAUSED | 停止下载 |
STATUS_DOWNLOADED | 下载完成 |
STATUS_DOWNLOADED | 下载完成 |
3.0.2 url,断点下载开关,线程优先级,连接、读取超时时间动态配置
public class DownloadConfig { public String url;//下载地址,唯一标示 public String targetPath;//下载成功后的文件路径 public int intervalTime = 1000;//默认每秒回调一次 public boolean isCallBackInUIThread = true;//默认回调在主线程 public boolean isRange = true;//是否开启断点下载的功能,默认开启 public int timeOutMillisecond = 30 * 1000;//连接、读取超时时间 public int priority;//默认线程优先级0,普通任务0,so库下载优先级8,其他更加情况自定义 public DownloadConfig(String url, String targetPath, boolean isRange, boolean isCallBackInUIThread, int intervalTime, int timeOutMillisecond) { this.url = url; this.targetPath = targetPath; this.isRange = isRange; this.isCallBackInUIThread = isCallBackInUIThread; this.intervalTime = intervalTime; this.timeOutMillisecond = timeOutMillisecond; }
3.0.3 定制化下载任务接口 BaseDownloadTask
方法名 | 作用 | 类型 |
add() | 添加任务 | BaseDownloadTask |
setListener(DownloadListener listener) | 设置监听 | BaseDownloadTask |
start() | 任务开始 | void |
pause() | 暂停任务 | void |
cancel() | 取消还未开始的任务 | void |
getUrl() | 获取文件下载的URL | String |
getErrorCode() | 下载错误码 | int |
getTempFile() | 下载临时文件 | int |
getTargetPath() | 下载成功后的目标文件路径 | String |
getDownloadedSize() | 已经下载的大小 | long |
getTotalSize() | 文件总下载大小 | long |
toString() | 返回信息 | String |
3.0.4 下载器执行文件下载接口化
public interface IDownloader{ void downloadFile(DownloadTask task); }
3.0.5 配置下载监听器,提供监听成功,失败,暂停,下载进度回调 DownloadListener
3.0.4 下载器执行文件下载接口化
public interface IDownloader{ void downloadFile(DownloadTask task); } 复制代码
3.0.5 配置下载监听器,提供监听成功,失败,暂停,下载进度回调 DownloadListener
方法名 | 作用 | 参数介绍 |
onSuccess(BaseDownloadTask task) | 下载成功监听 | 单任务下载回调 |
onError(BaseDownloadTask task, Throwable e) | 下载错误监听 | 单任务下载回调发生Throwable异常 |
onPaused(BaseDownloadTask task, long soFarBytes, long totalBytes) | 暂停错误监听 | 单下载任务暂停,上报已经传输文件大小已经本地文件大小和文件总传输大小 |
onProgress(long localBytes, long soFarBytes, long totalSize) | 暂停进度监听 | 单下载任务进度,上报已经传输文件大小已经本地文件大小和文件总传输大小 |
3.0.6 封装单个下载任务
单任务Task实现了我们的 Runnable ,里面主要是执行我们的任务,Comparable 给我们的任务设定优先级,BaseDownloadTask 就是给任务做一些简单的任务开始,结束,暂停,取消下载等工作了
3.0.6.1 DownloadTask 成员变量
DownloadTask 成员变量需要关注几个点:
字段名 | 作用 | 类型 |
tempSuffex | 临时文件后缀名 | String |
url | 下载地址,唯一标示 | String |
targetPath | 下载成功后的文件路径 | String |
tempFilePath | 下载缓存路径 | String |
fileName | 文件名 | String |
totalSize | 文件传输的总大小 | long |
soFarBytes | 当前线程已下载的size | long |
downloadedSize | 当前本地已下载的size | long |
status | 下载的状态 | DownloadStatus |
errorCode | 下载失败的错误码 | int |
isChunk | 是否分段 | boolean |
priority | 默认线程优先级0。普通任务0,so库下载优先级高些8,其他更加情况自定义 | int |
timeOutMillisecond | 超时时间 | int |
isRange | 是否开启断点下载的功能,默认开启 | boolean |
isCallBackInUIThread | 是否在主线程回调 | boolean |
intervalTime | 默认每秒回调一次 | int |
3.0.6.2 DownloadTask 构造方法
DownloadTask 构造方法有两个,一个是将外部自定义所有参数塞到DownloadConfig,一个是直接将参数传到DownloadTask里面
public DownloadTask(DownloadConfig config) { this(config.url, config.targetPath, config.isRange, config.isCallBackInUIThread, config.intervalTime, config.timeOutMillisecond, config.priority); } public DownloadTask(String url, String targetPath, boolean isRange, boolean isCallBackInUIThread, int intervalTime, int timeOutMillisecond, int priority) { this.url = url; this.targetPath = targetPath; this.isRange = isRange; this.isCallBackInUIThread = isCallBackInUIThread; this.intervalTime = intervalTime; this.timeOutMillisecond = timeOutMillisecond; if (this.intervalTime <= 0) { this.intervalTime = 1000; } if (this.timeOutMillisecond <= 0) { this.timeOutMillisecond = 30 * 1000; } this.status = DownloadStatus.STATUS_NONE; this.tempFilePath = this.targetPath + tempSuffex; this.priority = priority; }
3.0.6.3 DownloadTask 添加任务add()
当添加任务时候,我们给下载任务状态置为等待,然后在这个方法里面初始化下载成功后的文件路径以及对应的文件名
@Override public DownloadTask add() { status = DownloadStatus.STATUS_WAITING; final File file = new File(targetPath); fileName = file.getName(); return this; }
3.0.6.4 DownloadTask run() => 任务开始start()
如果下载缓存路径合法,将该当前本地已下载的size返回业务层
@Override public long getDownloadedSize() { if (!TextUtils.isEmpty(tempFilePath)) { File file = new File(tempFilePath); downloadedSize = file.exists() ? file.length() : 0; } else { downloadedSize = 0; } return downloadedSize; }
3.0.6.5 设置Range断点下载开关
如果配置了开启断点下载,且已经下载过,设置Range.否则关闭断点下载
public boolean isRangeRequest() { return isRange && getDownloadedSize() > 0; }
3.0.6.6 compareTo 比较任务的优先级
@Override public int compareTo(DownloadTask o) {//任务执行优先级排序 if (this.getPriority() < o.priority) { return 1; } if (this.getPriority() > o.priority) { return -1; } return 0; }
3.0.6.7 DownloadTask run() => 开始下载任务start()
执行任务开始的时候,重置任务状态,然后在run()方法里面调用下载器FileDownloader执行任务下载
@Override public void start() { //状态重置 this.status = DownloadStatus.STATUS_NONE; //开始下载 if (downloader == null) { downloader = new FileDownloader(); } downloader.downloadFile(this); }
3.0.6.8 DownloadTask 暂停下载任务 pause()
在暂停下载任务方法里,将下载任务设置为暂停状态,然后用下载监听器将本地问价大小和总文件大小回传给业务层
@Override public void pause() { status = DownloadStatus.STATUS_PAUSED; if (listener != null) { listener.onPaused(this, soFarBytes, totalSize); }
3.0.6.9 DownloadTask 取消下载任务 pause()
取消下载任务非常简单,将下载任务设置为取消,然后删除所有的副本文件
@Override public void cancel() { status = DownloadStatus.STATUS_NONE; //删除临时文件 FileUtils.deleteFile(tempFilePath); }
3.0.7.0 DownloadTask 设置DownloadListener下载监听器 setListener()
@Override public DownloadTask setListener(DownloadListener listener) { this.listener = listener; return this; }
还有一些常用的方法就不一一贴代码了,详细API文档如下:
方法名 | 作用 | 类型 |
toString | - | - |
getDownloadedSize | 获取已经下载的文件大小 | long |
getTotalSize | 获取文件总大小 | long |
getTempFile | 获取文件副本 | File |
getErrorCode | 获取错误码 | int |
getUrl | 获取下载链接 | String |
getPriority | 获取线程优先级 | int |
3.0.7 封装文件下载器FileDownloader,支持断点下载
3.0.7.1 FileDownloader文件下载器 处理单一任务文件下载 downloadFile
downloadFile 整体看起来是比较简单的,主线程的Looper是在这里初始化的,在这里我们默认不走下载成功回调,接着我们看看downloadByHttpURLConn是如何实现的
@Override public void downloadFile(DownloadTask task) { if (task == null) return; if (handler == null) { handler = new Handler(Looper.getMainLooper()); } hasCallbackSuccess = false; downloadByHttpURLConn(task); }
3.0.7.2 downloadByHttpURLConn
downloadByHttpURLConn是 使用系统API下载,如果服务端支持206分段请求,可以断点下载
在这里我们将下载状态置为 STATUS_DOWNLOADING ,并且创建 HttpURLConnection 建立 Http 连接,下载我们默认使用的是 GET 请求,并且整个连接处于Keep-Alive 长连接状态,不设置缓存,可配置化断点下载入口
task.status = DownloadTask.DownloadStatus.STATUS_DOWNLOADING; HttpURLConnection conn = null; InputStream inputStream = null; try { URL url = new URL(task.url); conn = (HttpURLConnection) url.openConnection(); conn.setRequestMethod("GET"); conn.setConnectTimeout(task.timeOutMillisecond); conn.setReadTimeout(task.timeOutMillisecond); conn.setRequestProperty("Connection", "Keep-Alive"); conn.setRequestProperty("Cache-Control", "no-cache"); conn.setRequestProperty("pragma", "no-cache"); conn.setRequestProperty("Accept", "*/*"); //是否开启range请求,code=416,表示请求范围不对 String rangStr = ""; if (task.isRangeRequest()) {//isRangeRequest()方法里面会获取最新的本地缓存文件downloadedSize大小 rangStr = "bytes=" + task.downloadedSize + "-"; conn.setRequestProperty("Range", rangStr);//+ task.totalSize //Logger.d(TAG, "isRangeRequest" + task.downloadedSize); }
- 获取服务器返回的状态码
//获取服务器返回的状态码 int code = conn.getResponseCode(); if (200 == code || 206 == code) { //200 请求服务器资源全部返回成功 //206 请求部分服务器资源返回成功 final String length = conn.getHeaderField("Content-Length"); if (!TextUtils.isEmpty(length)) {//总长度 task.totalSize = Long.parseLong(length); } if (task.totalSize == 0) { final String transferEncoding = conn.getHeaderField("Transfer-Encoding"); task.isChunk = (transferEncoding != null && transferEncoding.equals("chunked")); task.totalSize = TOTAL_VALUE_IN_CHUNKED_RESOURCE; } Map<String, List<String>> map = conn.getHeaderFields(); //Logger.d(TAG, "code=" + code + ",length=" + length); inputStream = conn.getInputStream(); startWriteFile(task, code, inputStream, rangStr, map); } else {//出错 task.errorCode = code; if (416 == code) {//请求分段range有误,本地临时文件大小已经是有问题的 //删除临时文件 FileUtils.deleteFile(task.tempFilePath); } handleError(task, new Throwable("net request error code=" + code + "|" + rangStr + "|url:" + task.url + "|tempFile:" + task.targetPath)); }
- 200 请求服务器资源全部返回成功
- 206 请求部分服务器资源返回成功
- Content-Length 文件总大小等于0,我们取消当前文件下载任务
- 非正常码
- 416
- 请求分段range有误,本地临时文件大小已经是有问题的,我们要删除临时文件
- 不要在出错的时候暴力删除本地缓存
private void handleError(final DownloadTask downloadTask, final Throwable e) { if (downloadTask == null) return; downloadTask.status = DownloadTask.DownloadStatus.STATUS_ERROR; DownloadManager.init().removeTask(downloadTask.url); if (downloadTask.isCallBackInUIThread) { handler.post(() -> { Logger.d(TAG, "download error " + e + ",isCallBackInUIThread=" + downloadTask.isCallBackInUIThread); if (downloadTask.listener != null) { downloadTask.listener.onError(downloadTask, e); } }); } else { if (downloadTask.listener != null) { downloadTask.listener.onError(downloadTask, e); } } }
当然我们不要在最好忘记关流和取消http连接
finally { if (conn != null) conn.disconnect(); if (inputStream != null) { try { inputStream.close(); } catch (IOException e) { e.printStackTrace(); } } }
3.0.7.3 startWriteFile 写文件
在downloadByHttpURLConn 成功返回码有一个写文件的方法startWriteFile,如果服务器支持206分段请求,写文件用RandomAccessFile
写文件的逻辑比较简单,首先得删除目标文件,然后在构建一个副本文件
- 如果是200所以不用做断点续传处理
if (200 == code) {//非range请求 tempFile.delete(); downloadTask.downloadedSize = 0; fileOutputStream = new FileOutputStream(tempFile); }
- 如果是206,断点续传
(206 == code) {//断点续传 currentSize = tempFile.length(); downloadTask.downloadedSize = currentSize; raf = new RandomAccessFile(tempPath, "rw"); raf.seek(currentSize); Logger.d(TAG, "206 range downloadedSize=" + currentSize + ",totalSize=" + downloadTask.totalSize); }
然后执行文件读写工作,中间有三个注意事项
while ((len = inputStream.read(buffer)) != -1 && DownloadTask.DownloadStatus.STATUS_DOWNLOADING == downloadTask.status) { if (200 == code) { fileOutputStream.write(buffer, 0, len); fileOutputStream.flush(); } else if (206 == code) { raf.write(buffer, 0, len); } }
- 每intervalTime毫秒回调一次,完成了下载必须回调
soFarBytes += len; downloadTask.downloadedSize = currentSize + soFarBytes; downloadTask.soFarBytes = soFarBytes; if (System.currentTimeMillis() - start >= downloadTask.intervalTime || soFarBytes == downloadTask.totalSize) { notifyProgress(downloadTask, soFarBytes); start = System.currentTimeMillis(); } }
- 本地文件大小和服务器文件大小一样,说明文件下载完毕执行下载成功回调
if (downloadTask.soFarBytes == downloadTask.totalSize) { handleSuccess(downloadTask); }
- 写文件出错了,停止下载,回调错误.如果是块传输,totalSize不是实际总大小
if (downloadTask.totalSize > 0 && downloadTask.soFarBytes > downloadTask.totalSize && !downloadTask.isChunk) { String errorMsg = ERROR_DOWNLOAD_MORE_SIZE + " localSize=" + downloadTask.downloadedSize + ",soFar=" + downloadTask.soFarBytes + ",total=" + downloadTask.totalSize; Logger.d(TAG, errorMsg); downloadTask.cancel(); handleError(downloadTask, new RuntimeException(errorMsg)); }
- 如果STATUS_DOWNLOADING下载状态的文件,没有回调成功,则回调失败
if (DownloadTask.DownloadStatus.STATUS_DOWNLOADING == downloadTask.status) { if (!hasCallbackSuccess && (!downloadTask.isChunk && downloadTask.soFarBytes == downloadTask.totalSize)) { //非Chunk下载,并且下载的大小和总大小一致回调成功 handleSuccess(downloadTask); } else if (!hasCallbackSuccess && downloadTask.isChunk) { //Chunk下载,暂时可以认为成功 handleSuccess(downloadTask); } else { //失败 StringBuilder headStr = new StringBuilder(); if (null != map) { for (String key : map.keySet()) { headStr.append("|").append(key).append("=").append(map.get(key).toString()); } } handleError(downloadTask, new Throwable("Error WriteFile,hasCallSuc:" + hasCallbackSuccess + "|headerStr:" + headStr + "|bytes:" + downloadTask.soFarBytes + "-" + downloadTask.totalSize + "-" + downloadTask.downloadedSize + "|" + rangStr + "|url:" + downloadTask.url + "|tempFile:" + tempFile.getPath() + "-" + tempFile.length())); } }
3.0.7.4 进度通知回调 notifyProgress
如果当前任务在主线程中运行,那么我们就把下载进度更新操作切换到UI线程
private void notifyProgress(final DownloadTask downloadTask, final long soFarBytes) { if (DownloadTask.DownloadStatus.STATUS_DOWNLOADING == downloadTask.status) { if (downloadTask.isCallBackInUIThread) { handler.post(() -> { if (downloadTask.listener != null) { downloadTask.listener.onProgress(downloadTask.downloadedSize, soFarBytes, downloadTask.totalSize); } }); } else { if (downloadTask.listener != null) { downloadTask.listener.onProgress(downloadTask.downloadedSize, soFarBytes, downloadTask.totalSize); } } }
3.0.7.5 下载成功回调 handleSuccess
如果是分段传输,我们要保证一点下载文件大小要等于总文件大小 下载成功后,重命名,去除.temp临时文件,并且将该链接下载任务从队列移除,然后将成功下载回调给业务层
private void handleSuccess(final DownloadTask downloadTask) { if (downloadTask == null) return; if (hasCallbackSuccess) return; downloadTask.status = DownloadTask.DownloadStatus.STATUS_DOWNLOADED; if (downloadTask.isChunk) { downloadTask.totalSize = downloadTask.downloadedSize;//保证最后完成下载时的文件大小回调 } //完整下载后,重命名,去除.temp临时文件 final File targetFile = new File(downloadTask.targetPath); if (targetFile.exists()) { targetFile.delete(); } final File tempDownloadedFile = new File(downloadTask.tempFilePath); tempDownloadedFile.renameTo(targetFile); //Log.d("dq-handleSuccess", targetFile.getName() + ",url=" + downloadTask.url); DownloadManager.init().removeTask(downloadTask.url); if (downloadTask.isCallBackInUIThread) { handler.post(() -> { if (downloadTask.listener != null) { downloadTask.listener.onSuccess(downloadTask); } }); } else { if (downloadTask.listener != null) { downloadTask.listener.onSuccess(downloadTask); } } hasCallbackSuccess = true; }
3.0.7.6 下载失败回调 handleSuccess
不要在出错的时候暴力删除本地缓存,这样可能其他回调会受到影响
private void handleError(final DownloadTask downloadTask, final Throwable e) { if (downloadTask == null) return; downloadTask.status = DownloadTask.DownloadStatus.STATUS_ERROR; DownloadManager.init().removeTask(downloadTask.url); if (downloadTask.isCallBackInUIThread) { handler.post(() -> { Logger.d(TAG, "download error " + e + ",isCallBackInUIThread=" + downloadTask.isCallBackInUIThread); if (downloadTask.listener != null) { downloadTask.listener.onError(downloadTask, e); } }); } else { if (downloadTask.listener != null) { downloadTask.listener.onError(downloadTask, e); } } }
3.0.8 下载管理器 DownloadManager 封装
3.0.8.1 DownloadManager()
DownloadManager 构造器主要拿到主线程的Looper, 以及初始化普通的下载线程池
private DownloadManager() { mTaskMap = new ConcurrentHashMap<>(); mHandler = new Handler(Looper.getMainLooper()); downloadPool = ThreadManager.getDownloadPool();//普通的下载线程池; }
3.0.8.2 init()
DownloadManager 是一个最简单的单例实现,我们获取DownloadManager 引用直接调用init 即可
public static DownloadManager init() { return SingletonHolder.INSTANCE; }
3.0.8.3 download() 文件下载
初始化文件下载任务,加入下载的线程池,默认在子线程回调的下载任务
/** * 方便外部自定义所有参数 * * @param config 下载参数,可扩展 * @param listener 下载监听,回调 * @return 返回当前下载任务 */ public synchronized DownloadTask download(DownloadConfig config, DownloadListener listener) { if (config == null || TextUtils.isEmpty(config.url) || TextUtils.isEmpty(config.targetPath)) { throw new NullPointerException(); } final DownloadTask downloadTask = new DownloadTask(config) .setListener(listener) .add(); addToDownloader(downloadTask); return downloadTask; } public synchronized DownloadTask download(String url, String targetPath, boolean isRange, boolean isCallBackInUIThread, int intervalTime, int timeOutMillisecond, DownloadListener listener) { if (TextUtils.isEmpty(url) || TextUtils.isEmpty(targetPath)) { throw new NullPointerException(); } final DownloadTask downloadTask = new DownloadTask(url, targetPath, isRange, isCallBackInUIThread, intervalTime, timeOutMillisecond, 0) .setListener(listener) .add(); addToDownloader(downloadTask); return downloadTask; }
3.0.8.4 添加下载任务
如果下载的文件有版本差异,下载前请清除缓存文件,防止出现不同时段写入文件,导致错误
public synchronized void addToDownloader(DownloadTask task) { if (task == null || TextUtils.isEmpty(task.url)) { throw new NullPointerException(); } DownloadTask downloadTask = mTaskMap.get(task.url); if (downloadTask == null) { // 创建一个新的下载任务,放入线程池 mTaskMap.put(task.url, task); downloadPool.execute(task); //Logger.d(TAG, "dq-start download:" + task.getUrl()); } else { downloadTask.pause(); downloadTask.setListener(task.listener);//重新设置监听 downloadPool.remove(downloadTask); downloadPool.execute(downloadTask);//直接执行 } }
3.0.8.5 暂停下载 pause
public synchronized boolean pause(String url) { if (!TextUtils.isEmpty(url)) { DownloadTask downloadTask = mTaskMap.get(url); if (downloadTask != null) { downloadTask.setListener(null); downloadTask.pause(); removeTask(url); //Logger.d(TAG, "pause download:" + downloadTask); return true; } } return false; }
3.0.8.6 取消下载 cancel
取消下载,将监听器置为null,然后清除已经下载文件,最后在线程池池里面remove task 即可
public synchronized void cancel(String url) { if (TextUtils.isEmpty(url)) { throw new NullPointerException(); } DownloadTask downloadTask = mTaskMap.get(url); if (downloadTask != null) { downloadTask.setListener(null); downloadTask.cancel();//会清除已经下载的文件 removeTask(url); //Logger.d(TAG, "cancel download:" + downloadTask); } }
3.0.8.7 清除task缓存 removeTask
清除任务比较简单,直接在线程池里面remove task 即可
public synchronized DownloadTask removeTask(String url) { final DownloadTask downloadTask = mTaskMap.remove(url); if (downloadTask != null) { downloadPool.remove(downloadTask); } return downloadTask; }
3.0.8.8 程序退出时停止下载任务 onAppExit
退出应用程序前务必清除所有任务队列暂停然后清除里面所有的任务
public synchronized void onAppExit() { for (DownloadTask downloadTask : mTaskMap.values()) { downloadTask.setListener(null); downloadTask.pause(); downloadPool.remove(downloadTask); } mTaskMap.clear(); downloadPool.shutdown(true); }