小红书批量发布协议, 抖音自动批量发布软件脚本,笔记作品视频自动发布工具【python】

简介: 这个工具框架包含了小红书和抖音的批量发布功能,支持图片和视频处理、定时发布等功能

下载地址:https://www.pan38.com/yun/share.php?code=JCnzE 提取密码:1133

这个工具框架包含了小红书和抖音的批量发布功能,支持图片和视频处理、定时发布等功能。使用时需要先配置config.json文件,填入从平台获取的API凭证。代码包含了错误处理和请求间隔,避免被封禁。

import os
import time
import json
import requests
from typing import List, Dict
from datetime import datetime
from PIL import Image, ImageFilter
from moviepy.editor import VideoFileClip

class SocialMediaPoster:
def init(self, config_path: str = "config.json"):
self.config = self._load_config(config_path)
self.xiaohongshu_api = "https://api.xiaohongshu.com"
self.douyin_api = "https://open.douyin.com"
self.session = requests.Session()

def _load_config(self, path: str) -> Dict:
    with open(path, 'r', encoding='utf-8') as f:
        return json.load(f)

def _get_xiaohongshu_token(self) -> str:
    url = f"{self.xiaohongshu_api}/oauth/access_token"
    payload = {
        "client_id": self.config["xiaohongshu"]["client_id"],
        "client_secret": self.config["xiaohongshu"]["client_secret"],
        "grant_type": "client_credentials"
    }
    response = self.session.post(url, data=payload)
    return response.json().get("access_token")

def _get_douyin_token(self) -> str:
    url = f"{self.douyin_api}/oauth/access_token/"
    params = {
        "client_key": self.config["douyin"]["client_key"],
        "client_secret": self.config["douyin"]["client_secret"],
        "grant_type": "client_credential"
    }
    response = self.session.get(url, params=params)
    return response.json().get("access_token")

def _process_image(self, image_path: str) -> str:
    img = Image.open(image_path)
    if img.mode != 'RGB':
        img = img.convert('RGB')
    img = img.filter(ImageFilter.SHARPEN)
    output_path = f"processed_{os.path.basename(image_path)}"
    img.save(output_path)
    return output_path

def _process_video(self, video_path: str) -> str:
    clip = VideoFileClip(video_path)
    if clip.duration > 60:  # 超过60秒的视频需要裁剪
        clip = clip.subclip(0, 60)
    output_path = f"processed_{os.path.basename(video_path)}"
    clip.write_videofile(output_path)
    return output_path

def post_to_xiaohongshu(self, content: str, media_paths: List[str], 
                       scheduled_time: datetime = None) -> Dict:
    token = self._get_xiaohongshu_token()
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }

    processed_media = []
    for path in media_paths:
        if path.lower().endswith(('.png', '.jpg', '.jpeg')):
            processed_media.append(self._process_image(path))
        elif path.lower().endswith(('.mp4', '.mov')):
            processed_media.append(self._process_video(path))

    payload = {
        "content": content,
        "media_urls": processed_media,
        "visibility": self.config["xiaohongshu"].get("visibility", "PUBLIC")
    }

    if scheduled_time:
        payload["scheduled_publish_time"] = scheduled_time.isoformat()

    url = f"{self.xiaohongshu_api}/api/v1/notes"
    response = self.session.post(url, headers=headers, json=payload)
    return response.json()

def post_to_douyin(self, content: str, video_path: str, 
                  scheduled_time: datetime = None) -> Dict:
    token = self._get_douyin_token()
    headers = {
        "access-token": token,
        "Content-Type": "application/json"
    }

    processed_video = self._process_video(video_path)

    payload = {
        "text": content,
        "video_url": processed_video,
        "privacy_level": self.config["douyin"].get("privacy_level", 0)
    }

    if scheduled_time:
        payload["publish_time"] = scheduled_time.timestamp()

    url = f"{self.douyin_api}/api/v1/video/upload"
    response = self.session.post(url, headers=headers, json=payload)
    return response.json()

def batch_post(self, platform: str, posts: List[Dict]) -> List[Dict]:
    results = []
    for post in posts:
        try:
            if platform.lower() == "xiaohongshu":
                result = self.post_to_xiaohongshu(
                    post["content"],
                    post["media_paths"],
                    post.get("scheduled_time")
                )
            elif platform.lower() == "douyin":
                result = self.post_to_douyin(
                    post["content"],
                    post["video_path"],
                    post.get("scheduled_time")
                )
            else:
                raise ValueError(f"Unsupported platform: {platform}")

            results.append({
                "status": "success",
                "platform": platform,
                "post_id": result.get("id"),
                "response": result
            })
            time.sleep(5)  # 防止请求过于频繁
        except Exception as e:
            results.append({
                "status": "failed",
                "platform": platform,
                "error": str(e)
            })
    return results


‘from datetime import datetime, timedelta
from social_media_poster import SocialMediaPoster

def main():
poster = SocialMediaPoster()

# 小红书批量发布
xhs_posts = [
    {
        "content": "今天分享一个美食教程 #美食 #教程",
        "media_paths": ["food1.jpg", "food2.jpg"],
        "scheduled_time": datetime.now() + timedelta(hours=1)
    },
    {
        "content": "旅行日记 #旅行 #摄影",
        "media_paths": ["travel1.jpg", "travel2.jpg", "travel_video.mp4"]
    }
]
xhs_results = poster.batch_post("xiaohongshu", xhs_posts)
print("小红书发布结果:", xhs_results)

# 抖音批量发布
dy_posts = [
    {
        "content": "搞笑视频来啦 #搞笑 #娱乐",
        "video_path": "funny_video.mp4",
        "scheduled_time": datetime.now() + timedelta(hours=2)
    }
]
dy_results = poster.batch_post("douyin", dy_posts)
print("抖音发布结果:", dy_results)

if name == "main":
main()

from datetime import datetime, timedelta
from social_media_poster import SocialMediaPoster

def main():
poster = SocialMediaPoster()

# 小红书批量发布
xhs_posts = [
    {
        "content": "今天分享一个美食教程 #美食 #教程",
        "media_paths": ["food1.jpg", "food2.jpg"],
        "scheduled_time": datetime.now() + timedelta(hours=1)
    },
    {
        "content": "旅行日记 #旅行 #摄影",
        "media_paths": ["travel1.jpg", "travel2.jpg", "travel_video.mp4"]
    }
]
xhs_results = poster.batch_post("xiaohongshu", xhs_posts)
print("小红书发布结果:", xhs_results)

# 抖音批量发布
dy_posts = [
    {
        "content": "搞笑视频来啦 #搞笑 #娱乐",
        "video_path": "funny_video.mp4",
        "scheduled_time": datetime.now() + timedelta(hours=2)
    }
]
dy_results = poster.batch_post("douyin", dy_posts)
print("抖音发布结果:", dy_results)

if name == "main":
main()

相关文章
|
15天前
|
数据采集 自然语言处理 数据可视化
Python爬取B站视频评论区情感分析:从数据采集到价值挖掘
B站作为年轻人聚集地,评论蕴含丰富情感与趋势。本文详解如何用Python爬取评论,结合SnowNLP与jieba进行中文情感分析,并通过可视化挖掘用户情绪、消费意愿与内容反馈,助力精准运营与决策。
127 0
|
3月前
|
API 数据安全/隐私保护 Python
拼多多批量上架软件, 电商一键上货发布工具,python电商框架分享
多线程批量上传架构,支持并发处理商品数据 完整的拼多多API签名和token管理机制
|
3月前
|
安全 API 数据安全/隐私保护
|
3月前
|
Java API 数据安全/隐私保护
淘宝一键上货发布软件,淘宝批量发布上架工具, 淘宝批量上架脚本【python】
这个Python脚本实现了以下功能: 完整的淘宝API调用封装
|
3月前
|
Java API 数据处理
淘宝批量上架软件脚本,电商一键上货软件, 淘宝一键铺货软件【python】
核心功能:实现淘宝商品批量上传,包含登录认证、商品数据处理、图片处理和API调用 多线程处理
|
3月前
|
机器人 数据安全/隐私保护 Python
淘宝批量发货发布工具, 淘宝批量上传商品软件, 淘宝批量上架软件【python】
使用Selenium实现自动化操作淘宝卖家后台 支持三种核心功能
|
3月前
|
Linux 数据安全/隐私保护 Python
一键修改电脑机器码, 软件机器码一键修改工具, 机器码重置工具【python】
该工具实现了完整的机器码生成、加密、验证功能 使用系统硬件信息生成唯一机器码
|
Linux 区块链 Python
Python实用记录(十三):python脚本打包exe文件并运行
这篇文章介绍了如何使用PyInstaller将Python脚本打包成可执行文件(exe),并提供了详细的步骤和注意事项。
504 1
Python实用记录(十三):python脚本打包exe文件并运行
|
存储 Shell 区块链
怎么把Python脚本打包成可执行程序?
该文档介绍了如何将Python脚本及其运行环境打包成EXE可执行文件,以便在不具备Python环境的计算机上运行。首先确保Python脚本能够正常运行,然后通过安装PyInstaller并使用`--onefile`参数将脚本打包成独立的EXE文件。此外,还提供了去除命令行窗口和指定可执行文件图标的详细方法。这些步骤帮助用户轻松地将Python程序分发给最终用户。
174 3
怎么把Python脚本打包成可执行程序?
|
存储 区块链 Python
怎么把Python脚本打包成可执行程序?
最近根据用户提的需求用python做了一个小工具,但是在给客户使用的时候不能直接发送python文件,毕竟让客户去安装python环境,那就离了大谱了。所以这时候就需要把多个py文件带着运行环境打包成EXE可执行文件。
怎么把Python脚本打包成可执行程序?

推荐镜像

更多
下一篇
开通oss服务