下载地址:https://www.pan38.com/yun/share.php?code=JCnzE 提取密码:1131
企业微信机器人包含完整的消息处理流程,支持文本消息自动回复、事件处理、消息加密解密等功能。使用时需要替换配置文件中的企业微信相关参数,并部署到服务器上。
import json
import time
import requests
from flask import Flask, request, jsonify
app = Flask(name)
class WeChatWorkRobot:
def init(self, corp_id, corp_secret, agent_id):
self.corp_id = corp_id
self.corp_secret = corp_secret
self.agent_id = agent_id
self.access_token = None
self.token_expire_time = 0
def get_access_token(self):
if time.time() < self.token_expire_time and self.access_token:
return self.access_token
url = f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={self.corp_id}&corpsecret={self.corp_secret}"
response = requests.get(url)
data = response.json()
if data.get('errcode') == 0:
self.access_token = data['access_token']
self.token_expire_time = time.time() + data['expires_in'] - 300
return self.access_token
else:
raise Exception(f"获取access_token失败: {data}")
def send_message(self, user_id, content):
token = self.get_access_token()
url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={token}"
payload = {
"touser": user_id,
"msgtype": "text",
"agentid": self.agent_id,
"text": {"content": content},
"safe": 0
}
response = requests.post(url, json=payload)
return response.json()
def handle_message(self, msg_data):
msg_type = msg_data.get('MsgType')
user_id = msg_data.get('FromUserName')
if msg_type == 'text':
content = msg_data.get('Content', '').strip()
if content == '帮助':
return self.send_message(user_id, "输入关键词获取帮助信息")
else:
return self.send_message(user_id, f"已收到您的消息: {content}")
return None
AI 代码解读
robot = WeChatWorkRobot(
corp_id="YOUR_CORP_ID",
corp_secret="YOUR_CORP_SECRET",
agent_id=YOUR_AGENT_ID
)
@app.route('/callback', methods=['POST'])
def callback():
data = request.json
if data.get('Event') == 'subscribe':
user_id = data.get('FromUserName')
robot.send_message(user_id, "欢迎关注企业微信机器人")
else:
robot.handle_message(data)
return jsonify({"status": "success"})
if name == 'main':
app.run(host='0.0.0.0', port=5000)
企业微信配置
WECHAT_CONFIG = {
"corp_id": "YOUR_CORP_ID",
"corp_secret": "YOUR_CORP_SECRET",
"agent_id": 1000002,
"token": "YOUR_TOKEN",
"encoding_aes_key": "YOUR_ENCODING_AES_KEY"
}
自动回复规则
REPLY_RULES = {
"帮助": "输入以下关键词获取帮助:\n1. 天气\n2. 新闻\n3. 会议",
"天气": "今日天气晴朗,气温25-32℃",
"新闻": "最新新闻请访问公司内网",
"会议": "今日会议安排:\n10:00 部门例会\n15:00 项目评审"
}
import re
from config import REPLY_RULES
class MessageHandler:
@staticmethod
def process_text_message(content):
content = content.strip().lower()
# 精确匹配
if content in REPLY_RULES:
return REPLY_RULES[content]
# 模糊匹配
for keyword, reply in REPLY_RULES.items():
if keyword in content:
return reply
# 正则匹配
if re.search(r'时间|几点', content):
from datetime import datetime
return f"当前时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
return "抱歉,我不理解您的请求。输入'帮助'获取支持信息。"
@staticmethod
def process_event_message(event):
if event == 'subscribe':
return "欢迎关注企业微信机器人!"
elif event == 'unsubscribe':
return ""
return None
AI 代码解读
hashlib
import time
import random
import string
from Crypto.Cipher import AES
import base64
from config import WECHAT_CONFIG
class WeChatUtils:
@staticmethod
def verify_signature(signature, timestamp, nonce):
token = WECHAT_CONFIG['token']
tmp_list = sorted([token, timestamp, nonce])
tmp_str = ''.join(tmp_list).encode('utf-8')
hashcode = hashlib.sha1(tmp_str).hexdigest()
return hashcode == signature
@staticmethod
def decrypt_message(encrypt_msg):
aes_key = base64.b64decode(WECHAT_CONFIG['encoding_aes_key'] + "=")
cipher = AES.new(aes_key, AES.MODE_CBC, aes_key[:16])
decrypted = cipher.decrypt(base64.b64decode(encrypt_msg))
pad = ord(decrypted[-1:])
content = decrypted[16:-pad].decode('utf-8')
return content
@staticmethod
def encrypt_message(reply_msg):
aes_key = base64.b64decode(WECHAT_CONFIG['encoding_aes_key'] + "=")
cipher = AES.new(aes_key, AES.MODE_CBC, aes_key[:16])
random_str = ''.join(random.choices(string.ascii_letters + string.digits, k=16))
msg_len = len(reply_msg.encode('utf-8'))
pad_len = 32 - (msg_len % 32)
pad_char = chr(pad_len)
plaintext = random_str + reply_msg + pad_char * pad_len
encrypted = cipher.encrypt(plaintext.encode('utf-8'))
return base64.b64encode(encrypted).decode('utf-8')
AI 代码解读