下载地址:https://www.pan38.com/yun/share.php?code=JCnzE 提取密码:1133
快手批量上传工具提供了完整的视频和图文上传功能,包含登录验证、文件上传、标题设置、标签添加等功能。代码结构清晰,包含详细的日志记录和错误处理机制。使用时需要配置config.json文件并安装必要的Python依赖库。
import os
import time
import random
import requests
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from PIL import Image
import cv2
import numpy as np
import json
import mimetypes
import logging
from datetime import datetime
class KuaishouUploader:
def init(self, config_file='config.json'):
self.logger = self._setup_logger()
self.config = self._load_config(config_file)
self.driver = self._init_webdriver()
self.wait = WebDriverWait(self.driver, 30)
def _setup_logger(self):
logger = logging.getLogger('KuaishouUploader')
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# 文件日志
file_handler = logging.FileHandler('kuaishou_upload.log')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# 控制台日志
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
def _load_config(self, config_file):
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
self.logger.info("配置文件加载成功")
return config
except Exception as e:
self.logger.error(f"加载配置文件失败: {str(e)}")
raise
def _init_webdriver(self):
chrome_options = Options()
# 配置选项
if self.config.get('headless', False):
chrome_options.add_argument('--headless')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--window-size=1920,1080')
chrome_options.add_argument('--disable-notifications')
chrome_options.add_argument('--lang=zh-CN')
# 用户数据目录
if self.config.get('user_data_dir'):
chrome_options.add_argument(f'--user-data-dir={self.config["user_data_dir"]}')
# 初始化驱动
try:
driver = webdriver.Chrome(
executable_path=self.config.get('chrome_driver_path', 'chromedriver'),
options=chrome_options
)
self.logger.info("WebDriver初始化成功")
return driver
except Exception as e:
self.logger.error(f"WebDriver初始化失败: {str(e)}")
raise
def login(self):
self.logger.info("开始登录流程")
self.driver.get('https://www.kuaishou.com')
try:
# 等待登录按钮出现
login_btn = self.wait.until(
EC.element_to_be_clickable((By.XPATH, '//div[contains(@class, "login-btn")]'))
)
login_btn.click()
# 切换到手机号登录
phone_login = self.wait.until(
EC.element_to_be_clickable((By.XPATH, '//div[contains(text(), "手机号登录")]'))
)
phone_login.click()
# 输入手机号
phone_input = self.wait.until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="请输入手机号"]'))
)
phone_input.send_keys(self.config['phone_number'])
# 输入密码
password_input = self.driver.find_element(By.XPATH, '//input[@placeholder="请输入密码"]')
password_input.send_keys(self.config['password'])
# 点击登录
submit_btn = self.driver.find_element(By.XPATH, '//button[contains(text(), "登录")]')
submit_btn.click()
# 等待登录成功
self.wait.until(
EC.presence_of_element_located((By.XPATH, '//div[contains(@class, "user-info")]'))
)
self.logger.info("登录成功")
return True
except Exception as e:
self.logger.error(f"登录失败: {str(e)}")
return False
def upload_video(self, video_path, title='', tags=[], cover_path=None):
self.logger.info(f"开始上传视频: {video_path}")
try:
# 打开上传页面
self.driver.get('https://www.kuaishou.com/upload')
# 等待上传按钮出现
upload_btn = self.wait.until(
EC.presence_of_element_located((By.XPATH, '//input[@type="file"]'))
)
# 上传视频文件
upload_btn.send_keys(os.path.abspath(video_path))
self.logger.info("视频文件已选择")
# 等待视频处理完成
self.wait.until(
EC.presence_of_element_located((By.XPATH, '//div[contains(@class, "video-preview")]'))
)
time.sleep(5) # 额外等待确保完全加载
# 输入标题
if title:
title_input = self.driver.find_element(By.XPATH, '//textarea[@placeholder="填写标题更容易上热门哦"]')
title_input.clear()
title_input.send_keys(title)
self.logger.info(f"标题已设置: {title}")
# 添加标签
if tags:
tag_input = self.driver.find_element(By.XPATH, '//input[@placeholder="添加标签"]')
for tag in tags:
tag_input.send_keys(tag)
tag_input.send_keys(Keys.ENTER)
time.sleep(1)
self.logger.info(f"标签已添加: {', '.join(tags)}")
# 设置封面
if cover_path:
try:
cover_btn = self.driver.find_element(By.XPATH, '//div[contains(text(), "自定义封面")]')
cover_btn.click()
upload_cover = self.wait.until(
EC.presence_of_element_located((By.XPATH, '//input[@type="file" and @accept="image/*"]'))
)
upload_cover.send_keys(os.path.abspath(cover_path))
confirm_btn = self.wait.until(
EC.element_to_be_clickable((By.XPATH, '//button[contains(text(), "确定")]'))
)
confirm_btn.click()
self.logger.info("自定义封面已设置")
except Exception as e:
self.logger.warning(f"设置封面失败: {str(e)}")
# 发布视频
publish_btn = self.wait.until(
EC.element_to_be_clickable((By.XPATH, '//button[contains(text(), "发布")]'))
)
publish_btn.click()
# 等待发布成功
self.wait.until(
EC.presence_of_element_located((By.XPATH, '//div[contains(text(), "发布成功")]'))
)
self.logger.info("视频发布成功")
return True
except Exception as e:
self.logger.error(f"视频上传失败: {str(e)}")
return False
def upload_photo(self, image_path, title='', tags=[]):
self.logger.info(f"开始上传图片: {image_path}")
try:
# 打开上传页面
self.driver.get('https://www.kuaishou.com/upload')
# 切换到图文模式
photo_tab = self.wait.until(
EC.element_to_be_clickable((By.XPATH, '//div[contains(text(), "图文")]'))
)
photo_tab.click()
# 上传图片文件
upload_btn = self.wait.until(
EC.presence_of_element_located((By.XPATH, '//input[@type="file"]'))
)
upload_btn.send_keys(os.path.abspath(image_path))
self.logger.info("图片文件已选择")
# 等待图片处理完成
self.wait.until(
EC.presence_of_element_located((By.XPATH, '//div[contains(@class, "image-preview")]'))
)
time.sleep(3)
# 输入标题
if title:
title_input = self.driver.find_element(By.XPATH, '//textarea[@placeholder="填写标题更容易上热门哦"]')
title_input.clear()
title_input.send_keys(title)
self.logger.info(f"标题已设置: {title}")
# 添加标签
if tags:
tag_input = self.driver.find_element(By.XPATH, '//input[@placeholder="添加标签"]')
for tag in tags:
tag_input.send_keys(tag)
tag_input.send_keys(Keys.ENTER)
time.sleep(1)
self.logger.info(f"标签已添加: {', '.join(tags)}")
# 发布图文
publish_btn = self.wait.until(
EC.element_to_be_clickable((By.XPATH, '//button[contains(text(), "发布")]'))
)
publish_btn.click()
# 等待发布成功
self.wait.until(
EC.presence_of_element_located((By.XPATH, '//div[contains(text(), "发布成功")]'))
)
self.logger.info("图文发布成功")
return True
except Exception as e:
self.logger.error(f"图文上传失败: {str(e)}")
return False
def batch_upload_videos(self, video_dir, title_template='视频{index}', tags=[]):
self.logger.info(f"开始批量上传视频,目录: {video_dir}")
if not os.path.isdir(video_dir):
self.logger.error(f"目录不存在: {video_dir}")
return False
video_files = [f for f in os.listdir(video_dir)
if os.path.isfile(os.path.join(video_dir, f))
and mimetypes.guess_type(f)[0] and mimetypes.guess_type(f)[0].startswith('video/')]
if not video_files:
self.logger.warning("目录中没有找到视频文件")
return False
success_count = 0
for index, video_file in enumerate(video_files, 1):
video_path = os.path.join(video_dir, video_file)
title = title_template.format(index=index, filename=os.path.splitext(video_file)[0])
try:
if self.upload_video(video_path, title, tags):
success_count += 1
# 随机等待时间,避免频繁操作
time.sleep(random.randint(10, 30))
except Exception as e:
self.logger.error(f"上传视频 {video_file} 时出错: {str(e)}")
continue
self.logger.info(f"批量上传完成,成功 {success_count}/{len(video_files)} 个视频")
return success_count > 0
def batch_upload_photos(self, image_dir, title_template='图片{index}', tags=[]):
self.logger.info(f"开始批量上传图片,目录: {image_dir}")
if not os.path.isdir(image_dir):
self.logger.error(f"目录不存在: {image_dir}")
return False
image_files = [f for f in os.listdir(image_dir)
if os.path.isfile(os.path.join(image_dir, f))
and mimetypes.guess_type(f)[0] and mimetypes.guess_type(f)[0].startswith('image/')]
if not image_files:
self.logger.warning("目录中没有找到图片文件")
return False
success_count = 0
for index, image_file in enumerate(image_files, 1):
image_path = os.path.join(image_dir, image_file)
title = title_template.format(index=index, filename=os.path.splitext(image_file)[0])
try:
if self.upload_photo(image_path, title, tags):
success_count += 1
# 随机等待时间,避免频繁操作
time.sleep(random.randint(5, 15))
except Exception as e:
self.logger.error(f"上传图片 {image_file} 时出错: {str(e)}")
continue
self.logger.info(f"批量上传完成,成功 {success_count}/{len(image_files)} 张图片")
return success_count > 0
def close(self):
try:
self.driver.quit()
self.logger.info("浏览器已关闭")
except Exception as e:
self.logger.error(f"关闭浏览器时出错: {str(e)}")
if name == 'main':
# 示例用法
uploader = KuaishouUploader('config.json')
try:
if uploader.login():
# 批量上传视频
uploader.batch_upload_videos(
video_dir='./videos',
title_template='日常视频{index}',
tags=['生活', '日常', '记录']
)
# 批量上传图片
uploader.batch_upload_photos(
image_dir='./images',
title_template='美图分享{index}',
tags=['摄影', '美图', '分享']
)
finally:
uploader.close()