下载地址:https://www.pan38.com/yun/share.php?code=JCnzE 提取密码:8872
代码说明:这个微信机器人系统包含三个主要模块:主程序基于itchat实现微信消息监听和自动回复功能,支持文本、图片等多种消息类型;增强功能模块提供消息日志、好友管理和规则配置等高级功能;配置文件集中管理各项参数。系统整合了图灵机器人API实现智能对话,并支持关键词触发和定时任务。
import itchat
from itchat.content import TEXT, PICTURE, MAP, CARD, NOTE, SHARING
import requests
import time
import threading
图灵机器人配置
TULING_API_KEY = "485712b8079e44e1bc4af10872b08319" # 替换为你的API KEY
TULING_URL = "http://www.tuling123.com/openapi/api"
登录状态回调
def login_callback():
print("微信登录成功!")
itchat.send("微信机器人已启动", toUserName="filehelper")
def exit_callback():
print("微信已退出登录")
获取图灵机器人回复
def get_tuling_response(msg):
data = {
'key': TULING_API_KEY,
'info': msg,
'userid': 'wechat-robot'
}
try:
r = requests.post(TULING_URL, data=data).json()
return r.get('text', '我听不懂你在说什么')
except Exception as e:
print("图灵API请求失败:", e)
return "网络开小差了,请稍后再试"
关键词自动回复配置
REPLY_RULES = {
"你好": ["你好呀!", "嗨~", "您好!"],
"在吗": ["我在呢", "随时为您服务"],
"谢谢": ["不客气~", "这是我的荣幸"],
"再见": ["再见,祝您愉快", "下次再见哦"]
}
消息处理函数
@itchat.msg_register([TEXT, PICTURE, MAP, CARD, NOTE, SHARING])
def handle_message(msg):
# 消息类型判断
if msg['Type'] == TEXT:
content = msg['Text']
sender = msg['User']['NickName']
print(f"收到{sender}的文本消息: {content}")
# 关键词匹配回复
for keyword in REPLY_RULES:
if keyword in content:
return random.choice(REPLY_RULES[keyword])
# 图灵机器人回复
return get_tuling_response(content)
elif msg['Type'] == PICTURE:
return "[自动回复]收到图片,稍后查看"
elif msg['Type'] == SHARING:
return "[自动回复]收到分享链接,感谢分享"
else:
return "[自动回复]暂不支持此类型消息"
定时任务
def schedule_task():
while True:
# 每小时发送一次提醒
current_hour = time.localtime().tm_hour
if 9 <= current_hour <= 18:
itchat.send("工作时段自动提醒:记得喝水休息哦", toUserName="filehelper")
time.sleep(3600)
if name == 'main':
# 登录微信(热登录模式)
itchat.auto_login(
hotReload=True,
loginCallback=login_callback,
exitCallback=exit_callback,
statusNotify=True
)
# 启动定时任务线程
threading.Thread(target=schedule_task, daemon=True).start()
# 保持运行
itchat.run()
sqlite3
from datetime import datetime
class MessageLogger:
def init(self):
self.conn = sqlite3.connect('wechat_msg.db')
self.create_table()
def create_table(self):
cursor = self.conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sender TEXT NOT NULL,
content TEXT,
msg_type TEXT,
create_time DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
self.conn.commit()
def log_message(self, sender, content, msg_type):
cursor = self.conn.cursor()
cursor.execute('''
INSERT INTO messages (sender, content, msg_type)
VALUES (?, ?, ?)
''', (sender, content, msg_type))
self.conn.commit()
def get_message_count(self, sender=None):
cursor = self.conn.cursor()
if sender:
cursor.execute('SELECT COUNT(*) FROM messages WHERE sender=?', (sender,))
else:
cursor.execute('SELECT COUNT(*) FROM messages')
return cursor.fetchone()[0]
class AutoReplyManager:
def init(self):
self.rules = {}
self.load_rules()
def add_rule(self, keyword, replies):
self.rules[keyword] = replies
self.save_rules()
def remove_rule(self, keyword):
if keyword in self.rules:
del self.rules[keyword]
self.save_rules()
def load_rules(self):
# 从文件或数据库加载规则
self.rules = REPLY_RULES # 示例中使用内存规则
def save_rules(self):
# 保存规则到文件或数据库
pass
class FriendManager:
@staticmethod
def get_friends():
return itchat.get_friends(update=True)
@staticmethod
def search_friend(name):
return itchat.search_friends(name=name)
@staticmethod
def send_to_friend(name, content):
friend = itchat.search_friends(name=name)
if friend:
itchat.send(content, toUserName=friend[0]['UserName'])
return True
return False
微信机器人配置
CONFIG = {
"auto_reply": {
"enable": True,
"reply_delay": 1.5, # 回复延迟(秒)
"night_mode": {
"enable": True,
"start_time": "23:00",
"end_time": "08:00",
"reply_msg": "夜间模式自动回复:我已休息,明天回复您"
}
},
"tuling": {
"api_key": "your_api_key_here",
"timeout": 10
},
"database": {
"path": "wechat_data.db",
"backup_interval": 86400 # 每天备份一次
}
}
关键词回复规则
KEYWORD_REPLIES = {
"你好": ["你好!", "Hi~", "您好"],
"帮助": ["输入以下关键词获取帮助:\n1. 天气\n2. 新闻\n3. 笑话"],
"时间": lambda: f"当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
}