持续创作,加速成长!这是我参与「掘金日新计划 · 10 月更文挑战」的第18天, 点击查看活动详情
前言
本文主要讲解了如何下载m3u8的视频文件到本地,加密解密,将ts文件合并为一个mp4文件三个知识点。关于爬虫,欢迎先阅读一下我的前几篇文章😶🌫️😶🌫️😶🌫️:
「Python」爬虫-1.入门知识简介 - 掘金 (juejin.cn)
「Python」爬虫-2.xpath解析和cookie,session - 掘金 (juejin.cn)
「Python」爬虫-3.防盗链处理 - 掘金 (juejin.cn)
「Python」爬虫-4.selenium的使用 - 掘金 (juejin.cn)
参考链接:
视频爬取
先聊一聊视频吧,
现在大多数的视频网站采用的是流媒体传输协议,大致就是把一整个视频,比如周董的《最伟大的作品》,虽然只有几分钟,但是很有可能这整个视频并不是一起加载进来的,而是当播放完某一段的时候,再给你加载下一段的内容。
也就是将一段视频切成无数个小段,这些小段就是ts格式的视频文件。
这样做的好处是为了让用户获得更加流畅的观感体验,因为他会根据网络状况自动切换视频的清晰度,在网络状况不稳定的情况下,对保障流畅播放非常有帮助。
一个视频播放的全过程如下:1.服务器采集编码传输视频到切片器
2.切片器对视频创建索引文件, 并且切割成n个ts文件
3.将ts文件传输到http服务器上
4.网站/客户端根据索引文件查找http服务器上的ts文件,连续播放这n个ts文件,就可以了。
所以我们知道,索引文件非常重要,毕竟没有这个文件你就不知道每个部分的ts文件里面到底存储的是什么信息。
索引文件里面存储的是ts文件的网络url链接。视频播放网站要播放视频,首先需要拿到索引文件,按照文件中的url链接下载存储在http服务器中的ts文件。
拿到了ts文件之后,由于本身这些ts文件就是原视频中的一小段视频,将所有ts文件下载之后并按照顺序播放就可以凑成整个视频了。
而m3u8文件就是索引文件。
也就是说,如果在观看网页视频的时候,能够找到加载该视频的m3u8文件,就可以下载相关视频了!
M3U8其实是M3U文件经过UTF-8的编码后的文件
M3U8文件的格式如下图所示:
其中#EXTINF:10.000
代表的是每一个切片的播放时间,
而下面的cFN8034360001.ts
实际上是一个url的一部分,在请求视频的时候可能需要拼接字符串从而拿到真正的视频地址。
eg:https://www.baidu.com/cFNxxxxxxx
一般的视频处理过程
用户上传 -> 转码(把视频做处理 ,2k,1080,标清) -> 切片处理(把单个的文件进行拆分)60
用户在进行拉动进度条的时候需要一个文件记录:
1.视频的播放顺序
2.视频存放的路径 -
M3U (类似于txt,json ) => 文本
抓取视频步骤
1.找到M3U8(各种手段)
2.通过M3U8下载到ts文件
3.
可以通过各种手段(不仅是编程手段:pr) 把ts文件合并为一个mp4文件
下面以下载https://91kanju.com/ 网站的一个电视《哲仁王后》为例。
这个网站的页面源代码中直接就有m3u8的url,所以这里直接采用正则表达式进行提取即可。
obj = re.compile(r"url: '(?P<url>.*?)',", re.S)
m3u8_url = obj.search(resp.text).group("url")
完整代码如下:
"""
流程:
1.拿到html的页面源代码
2.从源代码中提取到m3u8的url
3.下载m3u8
4.读取m3u8,下载视频
5.合并视频
"""
import requests
import re
headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.81 Safari/537.36 Edg/94.0.992.50"
}
obj = re.compile(r"url: '(?P<url>.*?)',", re.S) # 用来提取m3u8的url地址
url = "https://91kanju.com/vod-play/54812-1-1.html"
resp = requests.get(url, headers=headers)
m3u8_url = obj.search(resp.text).group("url") # 拿到m3u8的地址
resp.close()
# 下载m3u8 resp2 = requests.get(m3u8_url, headers=headers)
with open("哲仁王后.m3u8", mode="wb") as f:
f.write(resp2.content)
resp2.close()
print("下载完毕")
通过上述代码已经下载到了当前视频的M3U8文件,接下来就是对该文件进行进一步处理了。
从之前m3u8文件的额格式来看,如果视频没有被加密的话,那么就直接提取以.ts
结尾的就行,然后做一个url的拼接就可以获取到一段一段的ts文件了。
具体代码如下:
import requests
import os
n = 1
with open("哲仁王后.m3u8",mode="r", encoding="utf-8") as f:
for line in f:
line = line.strip() # 先除去空格,空白,换行符
if line.startswith("#"): # 如果以#开头,则舍去
continue
# 下载视频片段
resp3 = requests.get(line)
f = open(f"{n}.ts",mode="wb")
os.chdir("D:/python/workSpace/study_demo/base/book_demos/demo/vedio/v_ts")
f.write(resp3.content)
f.close()
resp3.close()
n += 1
超复杂版本
通过上面的介绍,你一定会觉得上面的内容还是比较的水,并没有什么挑战性。
接下来将模拟从iframe
中拿到m3u8
文件,然后加密的文件进行解密,最后将所有的ts合并为一个MP4文件。
这个网址不大稳定,所以想要自己实战可能需要看点运气(bushi
整体思路:
- 拿到主页面的源代码,找到iframe
- 从iframe源代码中拿到m3u8文件
- 下载第一层m3u8文件 -> 下载第二层m3u8文件(视频存放路径)
- 下载视频
- 下载密钥,进行解密操作
- 合并所有ts文件为一个mp4文件
获取m3u8:
直接在页面源代码中搜iframe
标签,一般情况下可以看到一个.m3u8
为后缀的链接。
获取url的参考代码如下:
def get_first_m3u8_url(iframe_src):
resp = requests.get(url)
obj = re.compile(r'var main = "(?P<m3u8_url>.*?)"', re.S)
m3u8_url = obj.search(resp.text).group("m3u8_url")
return m3u8_url
下载m3u8文件的代码参考如下:
def download_m3u8_file(url, name):
resp = requests.get(url)
with open(name, mode="wb") as f:
f.write(resp.content)
异步:
由于考虑到爬虫的效率问题,这里采用异步下载的方式,下载所有的ts文件。
异步一般在函数定义的前面加上async
即可。然后套用模板即可:
async def xxx(url , session):
async with session.get(url) as resp:
async with aiofiles.open(f"vedio/{name}", mode="wb") as f:
await f.write(xx)
异步下载ts代码参考:
async def download_ts(url, name, session):
async with session.get(url) as resp:
async with aiofiles.open(f"vedio/{name}", mode="wb") as f:
await f.write(await resp.content.read()) # 把下载的内容写入到文件中
print(f"{name}下载完毕")
创建多个异步任务模板:
tasks = [] # 任务池子
task = asyncio.create_task() # 创建异步任务
await asyncio.wait(tasks) # 等待任务结束
完整代码:
async def aio_download(up_url): # https://boba.52kuyun.com/20170906/M0h219zV/hls/
tasks = []
async with aiohttp.ClientSession() as session: # 提前准备好session
async with aiofiles.open("second_m3u8.txt", mode="r", encoding="utf-8") as f:
async for line in f:
if line.startswith("#"):
continue
# line就是 xxxx.ts
line = line.strip() # 除去空格
# 拼接真正的ts路径
ts_url = up_url + line
task = asyncio.create_task(download_ts(ts_url, line, session)) # 创建任务
tasks.append(task)
await asyncio.wait(tasks) # 等待任务结束
解密:
这里定义对单个ts文件进行解密的函数:
def dec_ts(name, key):
aes = AES.new(key=key, IV=b"0000000000000000", mode=AES.MODE_CBC)
# b"xx"是以字节的形式打开,16个0的长度与key的长度保持一致
async with aiofiles.open(f"vedio/{name}", mode="rb") as f1, \
aiofiles.open(f"vedio/temp_{name}", mode="wb") as f2:
bs = await f1.read() # 从源文件读取内容
await f2.write(aes.decrypt(bs)) # 把解密好的内容写入文件
print(f"{name}处理完毕")
将所有的ts文件都进行解密操作,并开启异步任务。
def aio_dec(key):
# 解密
tasks = []
async with aiofiles.open("second_m3u8.txt", mode="r", encoding="utf-8") as f:
async for line in f:
if line.startswith("#"):
continue
line = line.strip()
# 开始创建一步任务
task = asyncio.create_task(dec_ts(line, key))
tasks.append(task)
await asyncio.wait(tasks)
合并ts:
合并ts可以采用命令行的方式,
如果是win系统直接copy / b 1.ts+2.ts+3.ts xxx.mp4
命令即可
如果是mac系统,则使用cat 1.ts 2.ts 3.ts > xxx.mp4
即可。
具体代码如下:
def merge_ts():
# mac:cat 1.ts 2.ts 3.ts > xxx.mp4
# windows: copy / b 1.ts+2.ts+3.ts xxx.mp4
lst = []
with open("second_m3u8.txt", mode="r", encoding="utf-8") as f:
for line in f:
if line.startswith("#"):
continue
line = line.strip()
lst.append(f"vedio/temp_{line}")
# s = " ".join(lst)
# os.system(f"cat {s} > movie.mp4") 苹果
# windows
print("over")
完整代码如下:
import requests
import os
import re
from bs4 import BeautifulSoup
import asyncio
import aiohttp
import aiofiles
from Crypto.Cipher import AES
def get_iframe_src(url):
resp = requests.get(url)
main_page = BeautifulSoup(resp.text, "html.parser")
src = main_page.find("iframe").get("src")
# return src
return "https://boba.52kuyun.com/share/xfPs9NPHvYGhNzFp" # 测试使用
def get_first_m3u8_url(iframe_src):
resp = requests.get(url)
obj = re.compile(r'var main = "(?P<m3u8_url>.*?)"', re.S)
m3u8_url = obj.search(resp.text).group("m3u8_url")
return m3u8_url
def download_m3u8_file(url, name):
resp = requests.get(url)
with open(name, mode="wb") as f:
f.write(resp.content)
async def download_ts(url, name, session):
async with session.get(url) as resp:
async with aiofiles.open(f"vedio/{name}", mode="wb") as f:
await f.write(await resp.content.read()) # 把下载的内容写入到文件中
print(f"{name}下载完毕")
async def aio_download(up_url): # https://boba.52kuyun.com/20170906/M0h219zV/hls/
tasks = []
async with aiohttp.ClientSession() as session: # 提前准备好session
async with aiofiles.open("second_m3u8.txt", mode="r", encoding="utf-8") as f:
async for line in f:
if line.startswith("#"):
continue
# line就是 xxxx.ts
line = line.strip() # 除去空格
# 拼接真正的ts路径
ts_url = up_url + line
task = asyncio.create_task(download_ts(ts_url, line, session)) # 创建任务
tasks.append(task)
await asyncio.wait(tasks) # 等待任务结束
def get_key(key_url):
resp = requests.get(url)
return resp.text
def dec_ts(name, key):
aes = AES.new(key=key, IV=b"0000000000000000", mode=AES.MODE_CBC) # b"xx"是以字节的形式打开,16个0的长度与key的长度保持一致
async with aiofiles.open(f"vedio/{name}", mode="rb") as f1, \
aiofiles.open(f"vedio/temp_{name}", mode="wb") as f2:
bs = await f1.read() # 从源文件读取内容
await f2.write(aes.decrypt(bs)) # 把解密好的内容写入文件
print(f"{name}处理完毕")
def aio_dec(key):
# 解密
tasks = []
async with aiofiles.open("second_m3u8.txt", mode="r", encoding="utf-8") as f:
async for line in f:
if line.startswith("#"):
continue
line = line.strip()
# 开始创建一步任务
task = asyncio.create_task(dec_ts(line, key))
tasks.append(task)
await asyncio.wait(tasks)
def merge_ts():
# mac:cat 1.ts 2.ts 3.ts > xxx.mp4
# windows: copy / b 1.ts+2.ts+3.ts xxx.mp4
lst = []
with open("second_m3u8.txt", mode="r", encoding="utf-8") as f:
for line in f:
if line.startswith("#"):
continue
line = line.strip()
lst.append(f"vedio/temp_{line}")
# s = " ".join(lst)
# os.system(f"cat {s} > movie.mp4") 苹果
# windows
print("over")
def main(url):
# 1.拿到主页面的页面源代码,找到iframe对应的url
# iframe_src = https://boba.52kuyun.com/share/xfPs9NPHvYGhNzFp
iframe_src = get_iframe_src(url)
# 2.拿到第一层的m3u8文件的下载地址
# first_m3u8_url = /20170906/M0h219zV/index.m3u8?sign=548ae366a075f0f9e7c76af215aa18e1
first_m3u8_url = get_first_m3u8_url(iframe_src)
# 通过iframe_src拿到iframe的域名 => iframe_domain = https://boba.52kuyun.com
iframe_domain = iframe_src.split("/share")[0]
# 拼接出真正的m3u8的下载路径 => https://boba.52kuyun.com/20170906/M0h219zV/index.m3u8?....
first_m3u8_url = iframe_domain + first_m3u8_url
# 3.1下载第一层m3u8文件
# 处理之后得到 => hls/index.m3u8
download_m3u8_file(first_m3u8_url, "first_m3u8.txt")
# 3.2下载第二层m3u8文件
# 第二层的m3u8请求地址 => https://boba.52kuyun.com/20170906/M0h219zV/hls/index.m3u8
with open("first_m3u8.txt", mode="r", encoding="utf-8") as f:
for line in f:
if line.startswith("#"): # 过滤掉#开头的
continue
else:
line = line.strip() # 去掉换行符等
# 准备拼接第二层m3u8的下载路径
# https://boba.52kuyun.com/20170906/M0h219zV/ + hls/index.m3u8
# https://boba.52kuyun.com/20170906/M0h219zV/hls/cFN8o343600.ts
second_m3u8_url = first_m3u8_url.split("index.m3u8")[0] + line
download_m3u8_file(second_m3u8_url, "second_m3u8.txt")
# 4. 下载视频
second_m3u8_url_up = second_m3u8_url.replace("index.m3u8", "")
# 异步协程
aio_download(second_m3u8_url_up)
# 5.1 拿到密钥
key_url = second_m3u8_url_up + "key.key" # key.key => 正常情况下应该去m3u8文件里找
key = get_key(key_url)
# 5.2 解密
asyncio.run(aio_dec(key))
# 6.合并
merge_ts()
if __name__ == '__main__':
url = "https://www.91kanju.com/vod-play/541-2-1.html"
main(url)
本次的文章就暂且码到这儿了⛷️⛷️⛷️,如果对你有帮助的话,就点个,几天后继续更新~
往期好文推荐🪶
「MongoDB」Win10版安装教程