下载地址:https://www.pan38.com/dow/share.php?code=JCnzE 提取密码:1882
这个无人直播助手系统包含三个主要模块:主程序负责模拟用户交互,视频处理模块用于准备直播内容,评论分析模块用于处理观众反馈。使用时需要安装相关依赖库。
import cv2
import numpy as np
import time
import pyautogui
import random
from pynput import keyboard
from pynput.mouse import Controller
from threading import Thread
class LiveStreamAssistant:
def init(self):
self.is_running = False
self.mouse = Controller()
self.interaction_interval = 30
self.last_interaction = 0
self.comments = [
"感谢大家观看!",
"喜欢主播的点个关注~",
"新人求支持!",
"礼物走一波!",
"谢谢大家的支持!"
]
def start_stream(self):
print("开始无人直播...")
self.is_running = True
Thread(target=self._monitor_keyboard).start()
Thread(target=self._run_stream).start()
def _run_stream(self):
while self.is_running:
current_time = time.time()
# 模拟鼠标移动
if current_time - self.last_interaction > self.interaction_interval:
self._simulate_interaction()
self.last_interaction = current_time
# 随机发送评论
if random.random() < 0.1:
self._send_comment()
time.sleep(1)
def _simulate_interaction(self):
# 模拟鼠标移动
x = random.randint(100, 500)
y = random.randint(100, 500)
self.mouse.position = (x, y)
# 随机点击
if random.random() < 0.3:
pyautogui.click()
def _send_comment(self):
comment = random.choice(self.comments)
pyautogui.typewrite(comment)
pyautogui.press('enter')
print(f"发送评论: {comment}")
def _monitor_keyboard(self):
def on_press(key):
if key == keyboard.Key.esc:
self.stop_stream()
return False
with keyboard.Listener(on_press=on_press) as listener:
listener.join()
def stop_stream(self):
print("停止无人直播...")
self.is_running = False
if name == "main":
assistant = LiveStreamAssistant()
print("按ESC键停止直播")
assistant.start_stream()
cv2
import numpy as np
import os
from moviepy.editor import VideoFileClip, concatenate_videoclips
class VideoProcessor:
def init(self):
self.video_clips = []
def load_videos(self, folder_path):
for filename in os.listdir(folder_path):
if filename.endswith(('.mp4', '.avi', '.mov')):
filepath = os.path.join(folder_path, filename)
clip = VideoFileClip(filepath)
self.video_clips.append(clip)
def concatenate_videos(self, output_path):
if not self.video_clips:
raise ValueError("没有加载视频文件")
final_clip = concatenate_videoclips(self.video_clips)
final_clip.write_videofile(output_path, codec='libx264')
print(f"视频已合并保存至: {output_path}")
def add_watermark(self, video_path, watermark_text, output_path):
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# 添加水印
cv2.putText(frame, watermark_text,
(width-200, height-30),
cv2.FONT_HERSHEY_SIMPLEX,
0.8, (255, 255, 255), 2)
out.write(frame)
cap.release()
out.release()
print(f"带水印视频已保存至: {output_path}")
import re
from collections import Counter
import jieba
import jieba.analyse
class CommentAnalyzer:
def init(self):
jieba.initialize()
def analyze_comments(self, comments):
# 情感分析
positive_words = ['好', '喜欢', '棒', '支持', '赞']
negative_words = ['差', '不喜欢', '垃圾', '无聊', '退']
positive_count = 0
negative_count = 0
for comment in comments:
if any(word in comment for word in positive_words):
positive_count += 1
elif any(word in comment for word in negative_words):
negative_count += 1
# 关键词提取
all_text = ' '.join(comments)
keywords = jieba.analyse.extract_tags(all_text, topK=10)
# 词频统计
words = []
for comment in comments:
words.extend(jieba.lcut(comment))
word_counts = Counter(words)
top_words = word_counts.most_common(10)
return {
'positive': positive_count,
'negative': negative_count,
'keywords': keywords,
'top_words': top_words
}
def filter_spam(self, comments):
# 简单的垃圾评论过滤
spam_patterns = [
r'加薇.*信',
r'扣扣群',
r'微信号',
r'点击链接',
r'http[s]?://'
]
filtered = []
for comment in comments:
if not any(re.search(pattern, comment) for pattern in spam_patterns):
filtered.append(comment)
return filtered