下载地址:http://m.pan38.com/download.php?code=ENROLN 提取码:5589
代码功能说明:
多线程架构实现弹幕监听和发送分离
支持特殊用户识别和定制回复
包含关键词触发和随机回复机制
发送频率控制防止被封禁
完整的异常处理和日志记录
可扩展的配置系统支持自定义回复
import requests
import time
import random
import threading
from queue import Queue
from datetime import datetime
class LiveRoomBot:
def init(self, room_id):
self.room_id = room_id
self.session = requests.Session()
self.msg_queue = Queue()
self.is_running = False
self.last_sent_time = 0
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)',
'Referer': f'https://live.bilibili.com/{room_id}'
}
self.response_list = [
"666", "主播真厉害", "关注了",
"再来一个", "哈哈哈笑死", "前方高能预警"
]
self.special_users = {
"房管": ["欢迎房管大大!", "房管晚上好"],
"主播": ["主播辛苦了", "主播今天状态不错"]
}
def fetch_danmu(self):
while self.is_running:
try:
url = f'https://api.live.bilibili.com/xlive/web-room/v1/dM/gethistory'
params = {'roomid': self.room_id, 'room_type': 0}
resp = self.session.get(url, headers=self.headers, params=params, timeout=10)
data = resp.json()
if data['code'] == 0:
for msg in data['data']['room']:
self.process_message(msg)
time.sleep(3)
except Exception as e:
print(f"[ERROR] 获取弹幕失败: {e}")
time.sleep(10)
def process_message(self, msg):
nickname = msg['nickname']
text = msg['text']
uid = msg['uid']
timestamp = msg['timeline']
print(f"[{datetime.fromtimestamp(timestamp)}] {nickname}: {text}")
# 特殊用户处理
for role in self.special_users:
if role in text or role == nickname:
reply = random.choice(self.special_users[role])
self.msg_queue.put((uid, reply))
return
# 关键词触发
triggers = {
"你好": ["你好呀", "欢迎来到直播间"],
"几点": [f"现在是{datetime.now().strftime('%H:%M')}"],
"天气": ["今天天气不错", "记得带伞哦"]
}
for keyword in triggers:
if keyword in text:
reply = random.choice(triggers[keyword])
self.msg_queue.put((uid, reply))
return
# 随机互动
if random.random() < 0.1:
reply = random.choice(self.response_list)
self.msg_queue.put((uid, reply))
def send_danmu(self):
while self.is_running:
if not self.msg_queue.empty() and time.time() - self.last_sent_time > 5:
uid, text = self.msg_queue.get()
try:
url = 'https://api.live.bilibili.com/msg/send'
data = {
'bubble': '0',
'msg': text,
'color': '16777215',
'mode': '1',
'fontsize': '25',
'rnd': int(time.time()),
'roomid': self.room_id,
'csrf': '',
'csrf_token': ''
}
resp = self.session.post(url, data=data, headers=self.headers)
if resp.json()['code'] == 0:
print(f"[SENT] {text}")
self.last_sent_time = time.time()
else:
print(f"[ERROR] 发送失败: {resp.text}")
except Exception as e:
print(f"[ERROR] 发送弹幕异常: {e}")
time.sleep(1)
def start(self):
self.is_running = True
fetch_thread = threading.Thread(target=self.fetch_danmu, daemon=True)
send_thread = threading.Thread(target=self.send_danmu, daemon=True)
fetch_thread.start()
send_thread.start()
print(f"机器人已启动,监听房间: {self.room_id}")
def stop(self):
self.is_running = False
print("机器人已停止")
if name == 'main':
room_id = input("请输入直播间ID: ")
bot = LiveRoomBot(room_id)
bot.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
bot.stop()
[settings]
room_id = 123456
min_interval = 5
max_interval = 15
response_chance = 0.15
[responses]
default = 你好呀,欢迎来到直播间
default = 666
default = 主播加油
special = 房管: 欢迎房管大大!
special = 主播: 主播今天状态不错
[keywords]
天气 = 今天天气不错哦
时间 = 现在是%H:%M
价格 = 具体价格请查看商品详情