"""
python 多线程下载大文件,并实现断点续传
"""
```python
"""
python 多线程下载大文件,并实现断点续传
"""
import os
import time
import httpx
from tqdm import tqdm
from threading import Thread
import datetime
import sys
class Logger(object):
def __init__(self, filename='default.log', stream=sys.stdout):
self.terminal = stream
self.log = open(filename, 'w' , encoding = 'utf-8')
def write(self, message):
self.terminal.write(message)
self.log.write(message)
def flush(self):
pass
class DownloadFile(object):
def __init__(self, download_url, data_folder, thread_num):
"""
:param download_url: 文件下载连接
:param data_folder: 文件存储目录
:param thread_num: 开辟线程数量
"""
self.download_url = download_url
self.data_folder = data_folder
self.thread_num = thread_num
self.file_size = None
self.cut_size = None
self.tqdm_obj = None
self.thread_list = []
self.file_path = os.path.join(self.data_folder, download_url.split('/')[-1])
def downloader(self, etag, thread_index, start_index, stop_index, retry=False):
sub_path_file = "{}_{}".format(self.file_path, thread_index)
if os.path.exists(sub_path_file):
temp_size = os.path.getsize(sub_path_file)
if not retry:
self.tqdm_obj.update(temp_size)
else:
temp_size = 0
if stop_index == '-': stop_index = ""
headers = {
'Range': 'bytes={}-{}'.format(start_index + temp_size, stop_index),
'ETag': etag, 'if-Range': etag,
}
down_file = open(sub_path_file, 'ab')
try:
with httpx.stream("GET", self.download_url, headers=headers) as response:
num_bytes_downloaded = response.num_bytes_downloaded
for chunk in response.iter_bytes():
if chunk:
down_file.write(chunk)
self.tqdm_obj.update(response.num_bytes_downloaded - num_bytes_downloaded)
num_bytes_downloaded = response.num_bytes_downloaded
except Exception as e:
print("Thread-{}:请求超时,尝试重连\n报错信息:{}".format(thread_index, e))
self.downloader(etag, thread_index, start_index, stop_index, retry=True)
finally:
down_file.close()
return
def get_file_size(self):
"""
获取预下载文件大小和文件etag
:return:
"""
with httpx.stream("GET", self.download_url) as response2:
etag = ''
total_size = int(response2.headers["Content-Length"])
for tltle in response2.headers.raw:
if tltle[0].decode() == "ETag":
etag = tltle[1].decode()
break
return total_size, etag
def cutting(self):
"""
切割成若干份
:param file_size: 下载文件大小
:param thread_num: 线程数量
:return:
"""
cut_info = {
}
cut_size = self.file_size // self.thread_num
for num in range(1, self.thread_num + 1):
if num != 1:
cut_info[num] = [cut_size, cut_size * (num - 1) + 1, cut_size * num]
else:
cut_info[num] = [cut_size, cut_size * (num - 1), cut_size * num]
if num == self.thread_num:
cut_info[num][2] = '-'
return cut_info, cut_size
def write_file(self):
"""
合并分段下载的文件
:param file_path:
:return:
"""
if os.path.exists(self.file_path):
if len(self.file_path) >= self.file_size:
return
with open(self.file_path, 'ab') as f_count:
for thread_index in range(1, self.thread_num + 1):
with open("{}_{}".format(self.file_path, thread_index), 'rb') as sub_write:
f_count.write(sub_write.read())
os.remove("{}_{}".format(self.file_path, thread_index))
return
def create_thread(self, etag, cut_info):
"""
开辟多线程下载
:param file_path: 文件存储路径
:param etag: headers校验
:param cut_info:
:return:
"""
for thread_index in range(1, self.thread_num + 1):
thread = Thread(target=self.downloader,
args=(etag, thread_index, cut_info[thread_index][1], cut_info[thread_index][2]))
thread.setName('Thread-{}'.format(thread_index))
thread.setDaemon(True)
thread.start()
self.thread_list.append(thread)
for thread in self.thread_list:
thread.join()
return
def check_thread_status(self):
"""
查询线程状态。
:return:
"""
while True:
for thread in self.thread_list:
thread_name = thread.getName()
if not thread.isAlive():
print("{}:已停止".format(thread_name))
time.sleep(1)
def create_data(self):
if not os.path.exists(self.data_folder):
os.mkdir(self.data_folder)
return
def main(self):
self.create_data()
self.file_size, etag = self.get_file_size()
cut_info, self.cut_size = self.cutting()
self.tqdm_obj = tqdm(total=self.file_size, unit_scale=True, desc=self.file_path.split('/')[-1],
unit_divisor=1024,
unit="B")
self.create_thread(etag, cut_info)
self.write_file()
return
if __name__ == '__main__':
sys.stdout = Logger(r'log.txt', sys.stdout)
start_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print("开始时间:"+start_time)
print("==" * 20)
download_url = "https://heyulei1.github.io/videos/1.mp4"
data_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'Data')
thread_num = 20
downloader = DownloadFile(download_url, data_folder, thread_num)
downloader.main()
print(download_url,'完成')
end_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print("==" * 20)
print("结束时间:"+end_time+"\n")