下载地址:https://www.pan38.com/dow/share.php?code=JCnzE 提取密码:7718
这个微博机器人系统包含完整的OAuth2.0认证流程、定时任务调度、多种内容类型发布和自动回复功能。使用时需要先在新浪微博开放平台申请应用获取API Key,配置好config.json文件后即可运行。
!/usr/bin/env python3
-- coding: utf-8 --
import json
import time
import random
import schedule
from datetime import datetime
from weibo_api import WeiboAPI
from content_manager import ContentManager
class WeiboBot:
def init(self, config_file='config.json'):
with open(config_file) as f:
self.config = json.load(f)
self.api = WeiboAPI(
app_key=self.config['app_key'],
app_secret=self.config['app_secret'],
callback_url=self.config['callback_url']
)
self.content_mgr = ContentManager()
self._setup_schedule()
def _setup_schedule(self):
# 定时任务配置
schedule.every().day.at("09:00").do(self.post_random_content)
schedule.every().day.at("12:00").do(self.post_random_content)
schedule.every().day.at("18:00").do(self.post_random_content)
schedule.every(3).hours.do(self.check_messages)
def post_random_content(self):
content_type = random.choice(['text', 'image', 'video'])
if content_type == 'text':
text = self.content_mgr.get_random_text()
self.api.post_text(text)
elif content_type == 'image':
text = self.content_mgr.get_random_text()
image_path = self.content_mgr.get_random_image()
self.api.post_image(text, image_path)
else:
text = self.content_mgr.get_random_text()
video_path = self.content_mgr.get_random_video()
self.api.post_video(text, video_path)
print(f"[{datetime.now()}] Posted {content_type} content")
def check_messages(self):
messages = self.api.get_unread_messages()
for msg in messages:
if msg['type'] == 'comment':
self._handle_comment(msg)
print(f"[{datetime.now()}] Checked {len(messages)} messages")
def _handle_comment(self, comment):
# 自动回复逻辑
reply = self.content_mgr.get_comment_reply(comment['content'])
self.api.reply_comment(comment['id'], reply)
def run(self):
print("Weibo Bot started...")
while True:
schedule.run_pending()
time.sleep(60)
if name == 'main':
bot = WeiboBot()
bot.run()
requests
import base64
from urllib.parse import quote
class WeiboAPI:
def init(self, app_key, app_secret, callback_url):
self.base_url = "https://api.weibo.com/2/"
self.app_key = app_key
self.app_secret = app_secret
self.callback_url = callback_url
self.access_token = None
self.expires_in = 0
self._load_token()
def _load_token(self):
try:
with open('token.json') as f:
token_data = json.load(f)
self.access_token = token_data['access_token']
self.expires_in = token_data['expires_in']
except FileNotFoundError:
self._get_new_token()
def _get_new_token(self):
auth_url = f"https://api.weibo.com/oauth2/authorize?client_id={self.app_key}&response_type=code&redirect_uri={quote(self.callback_url)}"
print(f"Please visit this URL to authorize: {auth_url}")
code = input("Enter the authorization code: ")
token_url = "https://api.weibo.com/oauth2/access_token"
data = {
'client_id': self.app_key,
'client_secret': self.app_secret,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
}
response = requests.post(token_url, data=data)
token_data = response.json()
self.access_token = token_data['access_token']
self.expires_in = token_data['expires_in']
with open('token.json', 'w') as f:
json.dump(token_data, f)
def _make_request(self, endpoint, method='GET', params=None, files=None):
if not params:
params = {}
params['access_token'] = self.access_token
url = self.base_url + endpoint
if method == 'GET':
response = requests.get(url, params=params)
else:
response = requests.post(url, data=params, files=files)
if response.status_code == 401: # Token expired
self._get_new_token()
return self._make_request(endpoint, method, params, files)
return response.json()
def post_text(self, text):
endpoint = "statuses/update.json"
params = {'status': text}
return self._make_request(endpoint, 'POST', params)
def post_image(self, text, image_path):
endpoint = "statuses/upload.json"
with open(image_path, 'rb') as f:
files = {'pic': f}
params = {'status': text}
return self._make_request(endpoint, 'POST', params, files)
def post_video(self, text, video_path):
endpoint = "statuses/upload_url_text.json"
with open(video_path, 'rb') as f:
video_data = base64.b64encode(f.read()).decode('utf-8')
params = {
'status': text,
'url': f"data:video/mp4;base64,{video_data}"
}
return self._make_request(endpoint, 'POST', params)
def get_unread_messages(self):
endpoint = "remind/unread_count.json"
return self._make_request(endpoint)['unread']
def reply_comment(self, comment_id, reply_text):
endpoint = "comments/reply.json"
params = {
'cid': comment_id,
'comment': reply_text
}
return self._make_request(endpoint, 'POST', params)