下载地址:https://www.pan38.com/share.php?code=pvvmX 提取码:7786 【仅供学习】
核心类结构:
DouyinLiveCrawler:主控制类,负责初始化参数和协调各模块
DataProcessor:数据处理类,负责数据存储和分析
关键方法说明:
get_live_info():通过HTTP请求获取直播间基础信息
parse_response():使用正则表达式提取页面中的JSON数据
connect_websocket():建立WebSocket长连接接收实时消息
monitor_comments():持续监听弹幕和互动消息
数据采集类型:
用户UID(匿名哈希处理)
实时弹幕内容
点赞行为数据
用户活跃度统计
技术特点:
双协议支持(HTTP+WebSocket)
自动UID生成机制
数据持久化存储
异常处理机制
注意事项:
需要自行处理抖音的反爬机制
WebSocket协议可能随版本更新变化
高频请求可能导致IP封禁
用户数据需脱敏处理
import requests
import json
import time
import hashlib
from websocket import create_connection
class DouyinLiveCrawler:
def init(self, room_id):
self.room_id = room_id
self.headers = {
'User-Agent': 'Mozilla/5.0...',
'Cookie': 'your_cookie_here'
}
self.ws_url = "wss://webcast3-ws-web-hl.douyin.com/webcast/im/push/"
def get_live_info(self):
url = f"https://live.douyin.com/{self.room_id}"
try:
response = requests.get(url, headers=self.headers)
# 解析直播间基本信息
data = self.parse_response(response.text)
return data
except Exception as e:
print(f"获取直播信息失败: {str(e)}")
return None
def parse_response(self, html):
# 这里需要实现HTML解析逻辑
import re
pattern = re.compile(r'<script id="RENDER_DATA" type="application/json">(.*?)</script>')
match = pattern.search(html)
if match:
decoded_str = requests.utils.unquote(match.group(1))
return json.loads(decoded_str)
return None
def connect_websocket(self):
ws = create_connection(self.ws_url)
# 发送握手协议
ws.send(json.dumps({
"type": "login_req",
"roomid": self.room_id,
"uid": self.generate_uid(),
"protover": 1
}))
return ws
def generate_uid(self):
# 生成匿名UID
return hashlib.md5(str(time.time()).encode()).hexdigest()
def monitor_comments(self):
ws = self.connect_websocket()
try:
while True:
msg = ws.recv()
data = json.loads(msg)
if data['type'] == 'comment':
print(f"用户{data['uid']}评论: {data['content']}")
elif data['type'] == 'like':
print(f"用户{data['uid']}点赞")
time.sleep(0.1)
except KeyboardInterrupt:
ws.close()
print("监控结束")
if name == "main":
crawler = DouyinLiveCrawler("123456789") # 替换为真实房间号
crawler.monitor_comments()
import pandas as pd
from datetime import datetime
class DataProcessor:
def init(self):
self.comments = []
self.likes = []
self.users = set()
def add_comment(self, uid, content, timestamp):
self.comments.append({
'uid': uid,
'content': content,
'timestamp': timestamp
})
self.users.add(uid)
def add_like(self, uid, timestamp):
self.likes.append({
'uid': uid,
'timestamp': timestamp
})
self.users.add(uid)
def save_to_csv(self):
df_comments = pd.DataFrame(self.comments)
df_likes = pd.DataFrame(self.likes)
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
df_comments.to_csv(f"comments_{current_time}.csv", index=False)
df_likes.to_csv(f"likes_{current_time}.csv", index=False)
print(f"数据已保存,共收集{len(self.comments)}条评论和{len(self.likes)}次点赞")