下载地址:https://www.pan38.com/yun/share.php?code=JCnzE 提取密码:1133
这个工具实现了四大平台的视频批量上传功能,包含完整的异常处理和日志记录。使用时需要配置Chrome用户数据目录实现免登录,支持自定义上传间隔防止被封禁。
import os
import time
import random
import requests
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import NoSuchElementException
from PIL import Image
import cv2
import numpy as np
import json
import mimetypes
import logging
from datetime import datetime
class SocialMediaUploader:
def init(self, config_path='config.json'):
self.logger = self._setup_logger()
self.config = self._load_config(config_path)
self.driver = None
self.platform_handlers = {
'douyin': self._upload_to_douyin,
'kuaishou': self._upload_to_kuaishou,
'xiaohongshu': self._upload_to_xiaohongshu,
'bilibili': self._upload_to_bilibili
}
def _setup_logger(self):
logger = logging.getLogger('SocialMediaUploader')
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# 控制台输出
ch = logging.StreamHandler()
ch.setFormatter(formatter)
logger.addHandler(ch)
# 文件输出
fh = logging.FileHandler('uploader.log')
fh.setFormatter(formatter)
logger.addHandler(fh)
return logger
def _load_config(self, config_path):
try:
with open(config_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
self.logger.error(f"加载配置文件失败: {e}")
raise
def _init_driver(self):
chrome_options = Options()
if self.config.get('headless', False):
chrome_options.add_argument('--headless')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--window-size=1920,1080')
# 加载用户数据目录实现免登录
if self.config.get('user_data_dir'):
chrome_options.add_argument(f'--user-data-dir={self.config["user_data_dir"]}')
self.driver = webdriver.Chrome(options=chrome_options)
self.driver.implicitly_wait(10)
def _random_sleep(self, min_sec=1, max_sec=3):
time.sleep(random.uniform(min_sec, max_sec))
def _wait_element(self, by, value, timeout=30):
return WebDriverWait(self.driver, timeout).until(
EC.presence_of_element_located((by, value))
)
def _upload_to_douyin(self, content):
self.logger.info("开始上传到抖音...")
self.driver.get('https://creator.douyin.com/creator-micro/content/upload')
self._random_sleep(3, 5)
# 上传视频
upload_btn = self._wait_element(By.XPATH, '//input[@type="file"]')
upload_btn.send_keys(content['video_path'])
self.logger.info("视频上传中...")
# 等待上传完成
self._wait_element(By.XPATH, '//div[contains(text(),"上传完成")]', timeout=300)
self._random_sleep(2, 4)
# 填写描述
if content.get('description'):
desc_input = self._wait_element(By.XPATH, '//textarea[@placeholder="填写更全面的描述,让更多人看到你"]')
desc_input.clear()
desc_input.send_keys(content['description'])
self._random_sleep()
# 选择封面
if content.get('cover_path'):
self._wait_element(By.XPATH, '//div[contains(text(),"选择封面")]').click()
self._random_sleep(1, 2)
# 上传封面
cover_input = self._wait_element(By.XPATH, '//input[@accept="image/jpeg,image/png"]')
cover_input.send_keys(content['cover_path'])
self._random_sleep(2, 3)
# 确认封面
self._wait_element(By.XPATH, '//span[contains(text(),"确认")]').click()
self._random_sleep()
# 发布
publish_btn = self._wait_element(By.XPATH, '//button[contains(text(),"发布")]')
publish_btn.click()
self.logger.info("抖音发布请求已提交")
self._random_sleep(5, 8)
def _upload_to_kuaishou(self, content):
self.logger.info("开始上传到快手...")
self.driver.get('https://cp.kuaishou.com/article/publish/video')
self._random_sleep(3, 5)
# 上传视频
upload_btn = self._wait_element(By.XPATH, '//input[@type="file"]')
upload_btn.send_keys(content['video_path'])
self.logger.info("视频上传中...")
# 等待上传完成
self._wait_element(By.XPATH, '//div[contains(text(),"上传成功")]', timeout=300)
self._random_sleep(2, 4)
# 填写描述
if content.get('description'):
desc_input = self._wait_element(By.XPATH, '//textarea[@placeholder="填写标题会有更多赞哦~"]')
desc_input.clear()
desc_input.send_keys(content['description'])
self._random_sleep()
# 选择封面
if content.get('cover_path'):
self._wait_element(By.XPATH, '//div[contains(text(),"编辑封面")]').click()
self._random_sleep(1, 2)
# 上传封面
cover_input = self._wait_element(By.XPATH, '//input[@type="file"]')
cover_input.send_keys(content['cover_path'])
self._random_sleep(2, 3)
# 确认封面
self._wait_element(By.XPATH, '//button[contains(text(),"确定")]').click()
self._random_sleep()
# 发布
publish_btn = self._wait_element(By.XPATH, '//button[contains(text(),"发布")]')
publish_btn.click()
self.logger.info("快手发布请求已提交")
self._random_sleep(5, 8)
def _upload_to_xiaohongshu(self, content):
self.logger.info("开始上传到小红书...")
self.driver.get('https://creator.xiaohongshu.com/publish/publish')
self._random_sleep(3, 5)
# 上传视频
upload_btn = self._wait_element(By.XPATH, '//input[@type="file"]')
upload_btn.send_keys(content['video_path'])
self.logger.info("视频上传中...")
# 等待上传完成
self._wait_element(By.XPATH, '//div[contains(text(),"上传完成")]', timeout=300)
self._random_sleep(2, 4)
# 填写描述
if content.get('description'):
desc_input = self._wait_element(By.XPATH, '//textarea[@placeholder="填写标题会有更多赞哦~"]')
desc_input.clear()
desc_input.send_keys(content['description'])
self._random_sleep()
# 添加标签
if content.get('tags'):
for tag in content['tags']:
self._wait_element(By.XPATH, '//input[@placeholder="添加标签"]').send_keys(tag)
self._random_sleep(0.5, 1)
self.driver.find_element(By.XPATH, '//input[@placeholder="添加标签"]').send_keys(Keys.ENTER)
self._random_sleep(0.5, 1)
# 发布
publish_btn = self._wait_element(By.XPATH, '//button[contains(text(),"发布")]')
publish_btn.click()
self.logger.info("小红书发布请求已提交")
self._random_sleep(5, 8)
def _upload_to_bilibili(self, content):
self.logger.info("开始上传到哔哩哔哩...")
self.driver.get('https://member.bilibili.com/platform/upload/video/frame')
self._random_sleep(3, 5)
# 上传视频
upload_btn = self._wait_element(By.XPATH, '//input[@type="file"]')
upload_btn.send_keys(content['video_path'])
self.logger.info("视频上传中...")
# 等待上传完成
self._wait_element(By.XPATH, '//span[contains(text(),"上传完成")]', timeout=300)
self._random_sleep(2, 4)
# 填写标题
if content.get('title'):
title_input = self._wait_element(By.XPATH, '//input[@placeholder="请输入视频标题"]')
title_input.clear()
title_input.send_keys(content['title'])
self._random_sleep()
# 填写描述
if content.get('description'):
desc_input = self._wait_element(By.XPATH, '//textarea[@placeholder="请输入视频简介"]')
desc_input.clear()
desc_input.send_keys(content['description'])
self._random_sleep()
# 选择分区
if content.get('category'):
category_btn = self._wait_element(By.XPATH, '//div[@class="category-btn"]')
category_btn.click()
self._random_sleep()
# 选择主分区
main_category = self._wait_element(By.XPATH, f'//li[contains(text(),"{content["category"]["main"]}")]')
main_category.click()
self._random_sleep()
# 选择子分区
sub_category = self._wait_element(By.XPATH, f'//li[contains(text(),"{content["category"]["sub"]}")]')
sub_category.click()
self._random_sleep()
# 发布
publish_btn = self._wait_element(By.XPATH, '//button[contains(text(),"立即投稿")]')
publish_btn.click()
self.logger.info("哔哩哔哩发布请求已提交")
self._random_sleep(5, 8)
def upload_content(self, platform, content):
if platform not in self.platform_handlers:
self.logger.error(f"不支持的平台: {platform}")
return False
try:
if not self.driver:
self._init_driver()
handler = self.platform_handlers[platform]
handler(content)
return True
except Exception as e:
self.logger.error(f"上传到{platform}失败: {e}")
return False
finally:
# 每次上传后随机等待一段时间
self._random_sleep(10, 20)
def batch_upload(self, contents):
results = {}
for content in contents:
platform = content['platform']
try:
success = self.upload_content(platform, content)
results[platform] = {
'success': success,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
results[platform] = {
'success': False,
'error': str(e),
'timestamp': datetime.now().isoformat()
}
# 平台间随机等待
self._random_sleep(30, 60)
return results
def close(self):
if self.driver:
self.driver.quit()
self.driver = None
if name == 'main':
# 示例配置
config = {
"headless": False,
"user_data_dir": "/path/to/chrome/profile",
"platforms": ["douyin", "kuaishou", "xiaohongshu", "bilibili"]
}
# 示例内容
contents = [
{
"platform": "douyin",
"video_path": "/path/to/video1.mp4",
"cover_path": "/path/to/cover1.jpg",
"description": "这是一个测试视频 #测试 #抖音"
},
{
"platform": "kuaishou",
"video_path": "/path/to/video1.mp4",
"description": "快手测试视频 #快手"
}
]
uploader = SocialMediaUploader()
try:
results = uploader.batch_upload(contents)
print("批量上传结果:", results)
finally:
uploader.close()