下载地址:https://www.pan38.com/yun/share.php?code=JCnzE 提取密码:7792
这个抖音批量上传工具包含以下功能模块:
配置文件(config.py):集中管理账号、视频和上传配置
视频处理模块(video_processor.py):检查视频格式、生成封面、预处理视频
抖音上传模块(dy_uploader.py):实现自动登录、视频上传、批量上传功能
主程序(main.py):协调各模块工作流程
使用说明:
安装依赖:pip install selenium moviepy opencv-python pillow
配置config.py中的账号信息和路径设置
将要上传的视频放入videos目录
运行main.py开始批量上传
import os
抖音账号配置
ACCOUNT = {
'username': 'your_username',
'password': 'your_password',
'cookie_path': './cookies.pkl'
}
视频配置
VIDEO_CONFIG = {
'source_dir': './videos', # 视频源目录
'processed_dir': './processed', # 处理后的视频目录
'max_duration': 300, # 最大时长(秒)
'min_duration': 5, # 最小时长(秒)
'resolution': (1080, 1920), # 分辨率
'cover_dir': './covers' # 封面目录
}
上传配置
UPLOAD_CONFIG = {
'max_retry': 3, # 最大重试次数
'interval': 30, # 上传间隔(秒)
'max_per_day': 100, # 每日最大上传量
'auto_add_watermark': True # 自动添加水印
}
import cv2
import os
from moviepy.editor import VideoFileClip
from PIL import Image
import numpy as np
import shutil
from config import VIDEO_CONFIG
class VideoProcessor:
def init(self):
self.source_dir = VIDEO_CONFIG['source_dir']
self.processed_dir = VIDEO_CONFIG['processed_dir']
self.cover_dir = VIDEO_CONFIG['cover_dir']
os.makedirs(self.processed_dir, exist_ok=True)
os.makedirs(self.cover_dir, exist_ok=True)
def check_video(self, video_path):
try:
clip = VideoFileClip(video_path)
duration = clip.duration
if duration < VIDEO_CONFIG['min_duration']:
return False, f"视频太短(小于{VIDEO_CONFIG['min_duration']}秒)"
if duration > VIDEO_CONFIG['max_duration']:
return False, f"视频太长(大于{VIDEO_CONFIG['max_duration']}秒)"
return True, "视频符合要求"
except Exception as e:
return False, f"视频检查失败: {str(e)}"
def generate_cover(self, video_path, output_path):
try:
clip = VideoFileClip(video_path)
frame = clip.get_frame(1) # 获取第1秒的帧
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
cv2.imwrite(output_path, frame)
return True
except Exception as e:
print(f"生成封面失败: {str(e)}")
return False
def process_video(self, video_file):
src_path = os.path.join(self.source_dir, video_file)
dst_path = os.path.join(self.processed_dir, video_file)
cover_path = os.path.join(self.cover_dir, f"{os.path.splitext(video_file)[0]}.jpg")
# 检查视频
is_valid, msg = self.check_video(src_path)
if not is_valid:
print(f"视频 {video_file} 无效: {msg}")
return False
# 生成封面
if not self.generate_cover(src_path, cover_path):
print(f"为视频 {video_file} 生成封面失败")
return False
# 复制到处理目录
try:
shutil.copy2(src_path, dst_path)
return True
except Exception as e:
print(f"复制视频 {video_file} 失败: {str(e)}")
return False
def batch_process(self):
processed_count = 0
for video_file in os.listdir(self.source_dir):
if video_file.lower().endswith(('.mp4', '.mov', '.avi', '.mkv')):
if self.process_video(video_file):
processed_count += 1
return processed_count
os
import time
import random
import pickle
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
from config import ACCOUNT, UPLOAD_CONFIG, VIDEO_CONFIG
class DouyinUploader:
def init(self):
self.username = ACCOUNT['username']
self.password = ACCOUNT['password']
self.cookie_path = ACCOUNT['cookie_path']
self.driver = None
self.logged_in = False
self.init_driver()
def init_driver(self):
options = webdriver.ChromeOptions()
options.add_argument('--disable-gpu')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--window-size=1920,1080')
options.add_argument('user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')
self.driver = webdriver.Chrome(options=options)
self.driver.set_page_load_timeout(30)
def save_cookies(self):
with open(self.cookie_path, 'wb') as f:
pickle.dump(self.driver.get_cookies(), f)
def load_cookies(self):
try:
with open(self.cookie_path, 'rb') as f:
cookies = pickle.load(f)
for cookie in cookies:
self.driver.add_cookie(cookie)
return True
except:
return False
def login(self):
self.driver.get('https://www.douyin.com')
time.sleep(2)
# 尝试加载cookie
if os.path.exists(self.cookie_path):
if self.load_cookies():
self.driver.refresh()
time.sleep(3)
if self.check_login():
self.logged_in = True
return True
# 手动登录
try:
login_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, '//button[contains(text(),"登录")]'))
)
login_btn.click()
time.sleep(2)
# 切换到账号密码登录
switch_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, '//div[contains(text(),"密码登录")]'))
)
switch_btn.click()
time.sleep(1)
# 输入用户名密码
username_input = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.NAME, 'username'))
)
username_input.send_keys(self.username)
password_input = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.NAME, 'password'))
)
password_input.send_keys(self.password)
# 点击登录按钮
submit_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, '//button[@type="submit"]'))
)
submit_btn.click()
# 等待登录完成
time.sleep(5)
if self.check_login():
self.save_cookies()
self.logged_in = True
return True
return False
except Exception as e:
print(f"登录失败: {str(e)}")
return False
def check_login(self):
try:
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.XPATH, '//div[contains(@class,"avatar-wrapper")]'))
)
return True
except:
return False
def upload_video(self, video_path, cover_path=None, description=""):
if not self.logged_in and not self.login():
print("请先登录抖音账号")
return False
try:
# 进入发布页面
self.driver.get('https://creator.douyin.com/creator-micro/content/upload')
time.sleep(3)
# 上传视频
upload_input = WebDriverWait(self.driver, 20).until(
EC.presence_of_element_located((By.XPATH, '//input[@type="file"]'))
)
upload_input.send_keys(os.path.abspath(video_path))
# 等待视频上传完成
WebDriverWait(self.driver, 300).until(
EC.presence_of_element_located((By.XPATH, '//div[contains(text(),"视频上传完成")]'))
)
time.sleep(2)
# 设置封面
if cover_path and os.path.exists(cover_path):
try:
cover_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, '//div[contains(text(),"编辑封面")]'))
)
cover_btn.click()
time.sleep(1)
upload_cover_input = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.XPATH, '//input[@type="file"]'))
)
upload_cover_input.send_keys(os.path.abspath(cover_path))
confirm_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, '//button[contains(text(),"确定")]'))
)
confirm_btn.click()
time.sleep(1)
except Exception as e:
print(f"设置封面失败: {str(e)}")
# 输入描述
if description:
desc_input = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.XPATH, '//textarea[@placeholder="添加描述..."]'))
)
desc_input.send_keys(description)
time.sleep(1)
# 发布视频
publish_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, '//button[contains(text(),"发布")]'))
)
publish_btn.click()
# 等待发布完成
WebDriverWait(self.driver, 30).until(
EC.presence_of_element_located((By.XPATH, '//div[contains(text(),"发布成功")]'))
)
time.sleep(3)
return True
except Exception as e:
print(f"上传视频失败: {str(e)}")
return False
def batch_upload(self):
if not self.logged_in and not self.login():
print("请先登录抖音账号")
return 0
uploaded_count = 0
video_files = [f for f in os.listdir(VIDEO_CONFIG['processed_dir'])
if f.lower().endswith(('.mp4', '.mov', '.avi', '.mkv'))]
for video_file in video_files:
video_path = os.path.join(VIDEO_CONFIG['processed_dir'], video_file)
cover_file = f"{os.path.splitext(video_file)[0]}.jpg"
cover_path = os.path.join(VIDEO_CONFIG['cover_dir'], cover_file)
if not os.path.exists(cover_path):
cover_path = None
retry = 0
while retry < UPLOAD_CONFIG['max_retry']:
if self.upload_video(video_path, cover_path):
uploaded_count += 1
# 上传成功后删除视频
try:
os.remove(video_path)
if cover_path:
os.remove(cover_path)
except:
pass
break
retry += 1
time.sleep(UPLOAD_CONFIG['interval'])
# 随机间隔防止被封
time.sleep(random.randint(UPLOAD_CONFIG['interval'], UPLOAD_CONFIG['interval']*2))
if uploaded_count >= UPLOAD_CONFIG['max_per_day']:
print(f"已达到每日最大上传量 {UPLOAD_CONFIG['max_per_day']}")
break
return uploaded_count
def close(self):
if self.driver:
self.driver.quit()
import time
from video_processor import VideoProcessor
from dy_uploader import DouyinUploader
from config import UPLOAD_CONFIG
def main():
print("=== 抖音批量上传工具 ===")
# 视频处理
print("开始处理视频...")
processor = VideoProcessor()
processed_count = processor.batch_process()
print(f"成功处理 {processed_count} 个视频")
if processed_count == 0:
print("没有可处理的视频,程序退出")
return
# 视频上传
print("开始上传视频...")
uploader = DouyinUploader()
try:
uploaded_count = uploader.batch_upload()
print(f"成功上传 {uploaded_count} 个视频")
except Exception as e:
print(f"上传过程中发生错误: {str(e)}")
finally:
uploader.close()
print("程序执行完毕")
if name == "main":
main()