下载地址:https://www.pan38.com/dow/share.php?code=JCnzE 提取密码:7618
这个框架包含配置管理、消息队列、API客户端和主程序四个主要模块。配置管理负责存储账号信息和设置,消息队列处理待发送消息,API客户端封装抖音接口调用,主程序协调整个流程。使用时需要根据实际抖音API调整接口细节。
import json
import os
class ConfigManager:
def init(self, config_file='config.json'):
self.config_file = config_file
self.config = self._load_config()
def _load_config(self):
if not os.path.exists(self.config_file):
return {
'api_endpoint': 'https://api.douyin.com',
'max_retries': 3,
'request_timeout': 30,
'accounts': [],
'message_queue': [],
'rate_limit': {
'messages_per_minute': 5,
'interval_seconds': 10
}
}
with open(self.config_file, 'r') as f:
return json.load(f)
def save_config(self):
with open(self.config_file, 'w') as f:
json.dump(self.config, f, indent=4)
def add_account(self, username, password, cookies=None):
self.config['accounts'].append({
'username': username,
'password': password,
'cookies': cookies or {},
'last_active': None
})
self.save_config()
def get_accounts(self):
return self.config.get('accounts', [])
time
from queue import Queue
from threading import Lock
class MessageQueue:
def init(self):
self.queue = Queue()
self.lock = Lock()
self.sent_messages = []
self.failed_messages = []
def add_message(self, recipient, content, priority=0):
with self.lock:
self.queue.put({
'recipient': recipient,
'content': content,
'priority': priority,
'timestamp': time.time(),
'attempts': 0,
'status': 'pending'
})
def get_next_message(self):
with self.lock:
if not self.queue.empty():
return self.queue.get()
return None
def mark_as_sent(self, message):
with self.lock:
message['status'] = 'sent'
message['sent_time'] = time.time()
self.sent_messages.append(message)
def mark_as_failed(self, message, reason):
with self.lock:
message['status'] = 'failed'
message['reason'] = reason
message['attempts'] += 1
if message['attempts'] < 3:
self.queue.put(message)
else:
self.failed_messages.append(message)
def get_stats(self):
with self.lock:
return {
'pending': self.queue.qsize(),
'sent': len(self.sent_messages),
'failed': len(self.failed_messages)
}
import requests
import time
import random
from urllib.parse import urlencode
from fake_useragent import UserAgent
class DouyinAPIClient:
def init(self, config):
self.config = config
self.session = requests.Session()
self.ua = UserAgent()
self.last_request_time = 0
def _get_headers(self):
return {
'User-Agent': self.ua.random,
'Accept': 'application/json',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Referer': 'https://www.douyin.com/',
'Origin': 'https://www.douyin.com'
}
def _rate_limit(self):
current_time = time.time()
elapsed = current_time - self.last_request_time
min_interval = 60 / self.config['rate_limit']['messages_per_minute']
if elapsed < min_interval:
time.sleep(min_interval - elapsed + random.uniform(0.1, 0.5))
self.last_request_time = time.time()
def login(self, username, password):
# 模拟登录逻辑
login_url = f"{self.config['api_endpoint']}/login"
payload = {
'username': username,
'password': password
}
try:
self._rate_limit()
response = self.session.post(
login_url,
data=payload,
headers=self._get_headers(),
timeout=self.config['request_timeout']
)
if response.status_code == 200:
return True
return False
except Exception as e:
print(f"Login failed: {str(e)}")
return False
def send_private_message(self, recipient_id, message):
send_url = f"{self.config['api_endpoint']}/message/send"
payload = {
'to_user_id': recipient_id,
'content': message,
'type': 'text'
}
try:
self._rate_limit()
response = self.session.post(
send_url,
data=payload,
headers=self._get_headers(),
timeout=self.config['request_timeout']
)
if response.status_code == 200:
return True
return False
except Exception as e:
print(f"Message send failed: {str(e)}")
return False
time
import threading
from config import ConfigManager
from message_queue import MessageQueue
from api_client import DouyinAPIClient
class DouyinMessenger:
def init(self):
self.config_manager = ConfigManager()
self.message_queue = MessageQueue()
self.is_running = False
self.workers = []
def start(self, worker_count=2):
self.is_running = True
for i in range(worker_count):
worker = threading.Thread(
target=self._worker_loop,
name=f"Worker-{i+1}",
daemon=True
)
worker.start()
self.workers.append(worker)
print(f"Started {worker_count} workers. Press Ctrl+C to stop.")
try:
while self.is_running:
time.sleep(1)
except KeyboardInterrupt:
self.stop()
def stop(self):
self.is_running = False
for worker in self.workers:
worker.join()
print("All workers stopped.")
def _worker_loop(self):
api_client = DouyinAPIClient(self.config_manager.config)
accounts = self.config_manager.get_accounts()
while self.is_running:
message = self.message_queue.get_next_message()
if not message:
time.sleep(1)
continue
for account in accounts:
if not api_client.login(account['username'], account['password']):
print(f"Login failed for account: {account['username']}")
continue
if api_client.send_private_message(message['recipient'], message['content']):
self.message_queue.mark_as_sent(message)
print(f"Message sent to {message['recipient']}")
break
else:
self.message_queue.mark_as_failed(message, "Send failed")
print(f"Failed to send message to {message['recipient']}")
time.sleep(1)
if name == "main":
messenger = DouyinMessenger()
# 示例:添加测试消息
messenger.message_queue.add_message("user123", "你好,这是测试消息1")
messenger.message_queue.add_message("user456", "你好,这是测试消息2", priority=1)
# 启动消息发送服务
messenger.start()