下载地址:https://www.pan38.com/dow/share.php?code=JCnzE 提取密码:1653
这个微信群机器人包含主程序、配置文件和工具模块,实现了自动登录、消息监控、违规检测、活跃度统计等功能。需要安装itchat库(pip install itchat)后才能运行。
import itchat
from itchat.content import *
import time
import re
import threading
from collections import defaultdict
class WeChatGroupManager:
def init(self):
self.group_monitors = {}
self.message_count = defaultdict(int)
self.last_active = {}
self.keywords = ["广告", "推广", "加群"]
self.admin_users = ["你的微信号"]
def login(self):
itchat.auto_login(hotReload=True)
itchat.run(blockThread=False)
def register_handlers(self):
@itchat.msg_register(TEXT, isGroupChat=True)
def text_reply(msg):
self.handle_group_message(msg)
@itchat.msg_register(PICTURE, isGroupChat=True)
def image_reply(msg):
self.handle_group_message(msg)
def handle_group_message(self, msg):
group_id = msg['FromUserName']
user_id = msg['ActualUserName']
content = msg['Content'] if 'Content' in msg else "[图片/表情]"
# 记录活跃度
self.message_count[group_id] += 1
self.last_active[user_id] = time.time()
# 关键词检测
if any(keyword in content for keyword in self.keywords):
self.warn_user(group_id, user_id, content)
# 自动回复
if "帮助" in content:
self.send_help(group_id)
def warn_user(self, group_id, user_id, content):
warning = f"检测到违规内容: {content}\n请遵守群规!"
itchat.send(warning, toUserName=group_id)
def send_help(self, group_id):
help_msg = "本群机器人功能:\n1. 违规检测\n2. 活跃统计\n3. 自动回复"
itchat.send(help_msg, toUserName=group_id)
def start_monitoring(self):
while True:
self.check_inactive_groups()
time.sleep(3600) # 每小时检查一次
def check_inactive_groups(self):
inactive_threshold = time.time() - 7*24*3600
for group_id, last_msg_time in self.last_active.items():
if last_msg_time < inactive_threshold:
self.notify_inactive(group_id)
def notify_inactive(self, group_id):
notification = "本群已超过7天无活跃,请管理员注意!"
itchat.send(notification, toUserName=group_id)
if name == "main":
manager = WeChatGroupManager()
manager.login()
manager.register_handlers()
# 启动监控线程
monitor_thread = threading.Thread(target=manager.start_monitoring)
monitor_thread.daemon = True
monitor_thread.start()
# 保持主线程运行
while True:
time.sleep(1)
import json
import os
from datetime import datetime
def load_config():
if os.path.exists("config.json"):
with open("config.json", "r") as f:
return json.load(f)
return {}
def save_config(config):
with open("config.json", "w") as f:
json.dump(config, f, indent=4)
def format_time(timestamp):
return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")
def is_admin(user_id):
config = load_config()
return user_id in config.get("admin_users", [])
def log_message(msg_type, content, group_name, user_name):
log_entry = f"{datetime.now()} - {msg_type} - {group_name} - {user_name}: {content}"
with open("message.log", "a") as f:
f.write(log_entry + "\n")