下载地址:https://www.pan38.com/dow/share.php?code=JCnzE 提取密码:9281
代码说明:这个微信机器人系统包含主程序、群管理模块和多个插件。主程序基于itchat库实现微信登录和消息处理,群管理模块扩展了群聊特有功能,插件系统可灵活扩展功能。使用时需要先安装itchat库(pip install itchat)。
import itchat
import time
import random
from threading import Thread
from queue import Queue
class WeChatBot:
def init(self):
self.msg_queue = Queue()
self.reply_rules = {
"你好": ["你好呀!", "嗨~", "欢迎找我聊天"],
"时间": lambda: f"现在是{time.strftime('%Y-%m-%d %H:%M:%S')}",
"天气": self.get_weather
}
self.auto_reply_enabled = True
self.keywords = ["机器人", "助手", "智能"]
def login(self):
itchat.auto_login(hotReload=True, enableCmdQR=2)
print("微信机器人登录成功")
def get_weather(self):
weathers = ["晴天", "多云", "小雨", "大风", "雾霾"]
return f"今天天气可能是{random.choice(weathers)}"
def message_handler(self, msg):
if msg['Type'] != 'Text' or not msg.get('Text'):
return
content = msg['Text'].strip()
sender = itchat.search_friends(userName=msg['FromUserName'])
name = sender.get('NickName', '未知用户')
print(f"收到来自[{name}]的消息: {content}")
reply = None
for keyword, response in self.reply_rules.items():
if keyword in content:
if callable(response):
reply = response()
else:
reply = random.choice(response)
break
if not reply and any(kw in content for kw in self.keywords):
reply = "我是智能助手,有什么可以帮你的吗?"
if reply:
itchat.send(reply, toUserName=msg['FromUserName'])
def auto_reply_thread(self):
while True:
if self.auto_reply_enabled and not self.msg_queue.empty():
msg = self.msg_queue.get()
self.message_handler(msg)
time.sleep(0.1)
def run(self):
self.login()
itchat.run(blockThread=False)
Thread(target=self.auto_reply_thread, daemon=True).start()
@itchat.msg_register(itchat.content.TEXT)
def text_reply(msg):
self.msg_queue.put(msg)
while True:
time.sleep(1)
if name == "main":
bot = WeChatBot()
bot.run()
wechat_bot import WeChatBot
import re
class GroupManager(WeChatBot):
def init(self):
super().init()
self.group_rules = {
"签到": self.handle_check_in,
"投票": self.handle_vote,
"禁言": self.handle_mute
}
self.group_members = {}
self.votes = {}
def handle_check_in(self, msg):
group_id = msg['FromUserName']
user_id = msg['ActualUserName']
if group_id not in self.group_members:
self.group_members[group_id] = set()
if user_id in self.group_members[group_id]:
return f"@{msg['ActualNickName']} 您今天已经签到过了哦~"
self.group_members[group_id].add(user_id)
return f"@{msg['ActualNickName']} 签到成功!当前签到人数: {len(self.group_members[group_id])}"
def handle_vote(self, msg):
content = msg['Text']
match = re.search(r"投票 (.+?) (.+)", content)
if not match:
return "投票格式错误,请使用: 投票 主题 选项1 选项2"
topic = match.group(1)
options = match.group(2).split()
if topic in self.votes:
return f"投票主题'{topic}'已存在"
self.votes[topic] = {
'options': options,
'votes': {opt: 0 for opt in options},
'voters': set()
}
options_text = "\n".join(f"{i+1}. {opt}" for i, opt in enumerate(options))
return f"投票创建成功:\n主题: {topic}\n选项:\n{options_text}\n请回复'投票 主题 选项编号'参与投票"
def handle_mute(self, msg):
# 实现禁言逻辑
return "禁言功能需要特殊权限"
def message_handler(self, msg):
if msg.get('IsAt') and msg['FromUserName'].startswith('@@'):
# 处理群消息
content = msg['Text'].replace(f"@{msg['ToUserName']}", "").strip()
for keyword, handler in self.group_rules.items():
if keyword in content:
reply = handler(msg)
if reply:
itchat.send(reply, toUserName=msg['FromUserName'])
return
super().message_handler(msg)
if name == "main":
manager = GroupManager()
manager.run()
json
import os
from datetime import datetime
class AutoReplyPlugin:
def init(self, config_file="reply_config.json"):
self.config_file = config_file
self.reply_config = self.load_config()
self.last_save = datetime.now()
def load_config(self):
if os.path.exists(self.config_file):
with open(self.config_file, 'r', encoding='utf-8') as f:
return json.load(f)
return {"rules": {}, "blacklist": []}
def save_config(self):
with open(self.config_file, 'w', encoding='utf-8') as f:
json.dump(self.reply_config, f, ensure_ascii=False, indent=2)
def add_rule(self, keyword, replies):
self.reply_config['rules'][keyword] = replies
if (datetime.now() - self.last_save).seconds > 60:
self.save_config()
self.last_save = datetime.now()
def match_reply(self, content):
for keyword, replies in self.reply_config['rules'].items():
if keyword in content:
return random.choice(replies)
return None
def is_blacklisted(self, user_id):
return user_id in self.reply_config['blacklist']