跨境电商多账号iMessage群发短信架构设计
iMessage群发短信作为跨境电商触达海外用户的重要渠道,凭借其高送达率、低拦截率和原生体验优势,逐渐成为外贸企业客户维护与营销推广的首选方案。然而,随着跨境电商业务规模的扩大,单账号发送能力受限、账号风控风险高、消息发送效率低等问题日益凸显。本文将从实际业务需求出发,详细介绍一套基于阿里云生态构建的多账号iMessage群发短信系统架构,重点解决多账号管理、负载均衡、消息可靠性和系统可扩展性等核心技术问题。
一、跨境电商场景下iMessage群发短信的技术挑战
跨境电商业务对iMessage群发短信系统有着特殊的要求。首先,海外用户分布在不同时区,需要支持定时发送和批量处理能力。其次,苹果对iMessage账号有着严格的风控机制,单账号发送频率过高或内容相似性过强都会导致账号被封禁。此外,跨境网络延迟和不稳定性会影响消息发送的成功率,需要完善的失败重试机制。最后,随着业务增长,系统需要能够平滑扩展,支持数千甚至上万个账号同时工作。
```# 跨境电商iMessage发送业务核心约束定义
class BusinessConstraints:
# 单账号每日最大发送量
MAX_DAILY_MESSAGES_PER_ACCOUNT = 200
# 单账号每分钟最大发送量
MAX_MINUTE_MESSAGES_PER_ACCOUNT = 5
# 消息发送间隔(秒)
MESSAGE_SEND_INTERVAL = 12
# 账号冷却时间(小时)
ACCOUNT_COOLDOWN_HOURS = 24
# 消息内容相似度阈值
CONTENT_SIMILARITY_THRESHOLD = 0.7
# 最大重试次数
MAX_RETRY_COUNT = 3
# 重试间隔(秒)
RETRY_INTERVALS = [60, 300, 900]
# 时区支持列表
SUPPORTED_TIMEZONES = ["America/New_York", "Europe/London", "Asia/Tokyo", "Australia/Sydney"]
账号状态枚举
from enum import Enum
class AccountStatus(Enum):
ACTIVE = "active"
COOLDOWN = "cooldown"
BANNED = "banned"
MAINTENANCE = "maintenance"
消息状态枚举
class MessageStatus(Enum):
PENDING = "pending"
SENDING = "sending"
SENT = "sent"
DELIVERED = "delivered"
FAILED = "failed"
RETRYING = "retrying"
# 二、多账号iMessage群发短信系统整体架构设计
本系统采用分层架构设计,从上到下分为接入层、业务逻辑层、数据层和基础设施层。接入层负责接收外部请求并进行身份验证和参数校验。业务逻辑层是系统的核心,包含账号管理、消息调度、发送执行、状态追踪等模块。数据层使用阿里云RDS存储结构化数据,Redis缓存热点数据和分布式锁。基础设施层基于阿里云ECS部署应用服务,使用阿里云消息队列RocketMQ实现异步消息处理,阿里云日志服务SLS进行日志收集和监控。
```# 系统整体架构配置类
import os
from datetime import datetime
import pytz
class SystemConfig:
# 阿里云服务配置
ALIYUN_ACCESS_KEY = os.getenv("ALIYUN_ACCESS_KEY")
ALIYUN_SECRET_KEY = os.getenv("ALIYUN_SECRET_KEY")
ALIYUN_REGION = "cn-hangzhou"
# 数据库配置
DB_HOST = os.getenv("DB_HOST", "rm-xxxx.mysql.rds.aliyuncs.com")
DB_PORT = int(os.getenv("DB_PORT", 3306))
DB_USER = os.getenv("DB_USER", "imessage_user")
DB_PASSWORD = os.getenv("DB_PASSWORD")
DB_NAME = os.getenv("DB_NAME", "imessage_system")
# Redis配置
REDIS_HOST = os.getenv("REDIS_HOST", "r-xxxx.redis.rds.aliyuncs.com")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD")
REDIS_DB = int(os.getenv("REDIS_DB", 0))
# RocketMQ配置
ROCKETMQ_ENDPOINT = os.getenv("ROCKETMQ_ENDPOINT")
ROCKETMQ_TOPIC = "imessage_send_topic"
ROCKETMQ_GROUP_ID = "imessage_send_group"
# 日志服务配置
SLS_PROJECT = "imessage-monitor-project"
SLS_LOGSTORE = "imessage-send-logs"
# 系统参数
WORKER_NUM = int(os.getenv("WORKER_NUM", 4))
BATCH_SIZE = int(os.getenv("BATCH_SIZE", 100))
SCHEDULER_INTERVAL = int(os.getenv("SCHEDULER_INTERVAL", 10))
# 系统启动入口
def main():
print(f"[{datetime.now(pytz.UTC)}] Starting iMessage multi-account sending system...")
print(f"[{datetime.now(pytz.UTC)}] System configuration loaded successfully")
print(f"[{datetime.now(pytz.UTC)}] Worker number: {SystemConfig.WORKER_NUM}")
print(f"[{datetime.now(pytz.UTC)}] Batch size: {SystemConfig.BATCH_SIZE}")
# 初始化各模块
from database import init_database
from redis_client import init_redis
from mq_producer import init_mq_producer
from scheduler import MessageScheduler
from worker import MessageWorker
init_database()
init_redis()
init_mq_producer()
# 启动调度器和工作线程
scheduler = MessageScheduler()
scheduler.start()
workers = []
for i in range(SystemConfig.WORKER_NUM):
worker = MessageWorker(worker_id=i)
worker.start()
workers.append(worker)
print(f"[{datetime.now(pytz.UTC)}] System started successfully")
# 等待所有线程结束
try:
for worker in workers:
worker.join()
scheduler.join()
except KeyboardInterrupt:
print(f"[{datetime.now(pytz.UTC)}] System shutting down...")
if __name__ == "__main__":
main()
三、基于阿里云ECS的账号池管理模块实现
账号池管理是多账号iMessage群发短信系统的核心模块之一。我们将所有可用的iMessage账号存储在阿里云RDS数据库中,并通过Redis缓存账号的实时状态和发送统计信息。账号池采用分布式锁机制保证账号分配的原子性,避免多个工作线程同时获取同一个账号。同时,系统会自动监控账号的发送量和状态,当账号达到每日发送上限或进入冷却期时,会自动将其从可用账号池中移除。
```import redis
import pymysql
import json
import time
from datetime import datetime, timedelta
import pytz
from threading import Lock
class AccountPoolManager:
def init(self):
self.db_connection = pymysql.connect(
host=SystemConfig.DB_HOST,
port=SystemConfig.DB_PORT,
user=SystemConfig.DB_USER,
password=SystemConfig.DB_PASSWORD,
database=SystemConfig.DB_NAME,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
self.redis_client = redis.Redis(
host=SystemConfig.REDIS_HOST,
port=SystemConfig.REDIS_PORT,
password=SystemConfig.REDIS_PASSWORD,
db=SystemConfig.REDIS_DB,
decode_responses=True
)
self.lock = Lock()
self.account_cache_key = "available_accounts"
self.account_stats_prefix = "account_stats:"
self.account_lock_prefix = "account_lock:"
self.lock_timeout = 30 # 锁超时时间(秒)
def load_accounts_from_db(self):
"""从数据库加载所有活跃账号"""
with self.db_connection.cursor() as cursor:
sql = """
SELECT id, apple_id, password, device_token, status, daily_sent_count,
last_sent_time, created_at, updated_at
FROM imessage_accounts
WHERE status = %s
"""
cursor.execute(sql, (AccountStatus.ACTIVE.value,))
accounts = cursor.fetchall()
# 清空缓存并重新加载
self.redis_client.delete(self.account_cache_key)
for account in accounts:
account_data = json.dumps(account)
self.redis_client.sadd(self.account_cache_key, account_data)
print(f"[{datetime.now(pytz.UTC)}] Loaded {len(accounts)} active accounts from database")
return accounts
def get_available_account(self):
"""获取一个可用的账号(带分布式锁)"""
while True:
# 从缓存中随机获取一个账号
account_data = self.redis_client.srandmember(self.account_cache_key)
if not account_data:
# 缓存中没有可用账号,重新从数据库加载
self.load_accounts_from_db()
time.sleep(1)
continue
account = json.loads(account_data)
account_id = account['id']
# 尝试获取分布式锁
lock_key = f"{self.account_lock_prefix}{account_id}"
if self.redis_client.set(lock_key, "1", ex=self.lock_timeout, nx=True):
try:
# 检查账号是否真的可用
if self._is_account_available(account_id):
return account
else:
# 账号不可用,从缓存中移除
self.redis_client.srem(self.account_cache_key, account_data)
finally:
# 释放锁
self.redis_client.delete(lock_key)
time.sleep(0.1)
def _is_account_available(self, account_id):
"""检查账号是否可用"""
# 获取账号今日发送量
today = datetime.now(pytz.UTC).strftime("%Y-%m-%d")
stats_key = f"{self.account_stats_prefix}{account_id}:{today}"
daily_sent = int(self.redis_client.get(stats_key) or 0)
if daily_sent >= BusinessConstraints.MAX_DAILY_MESSAGES_PER_ACCOUNT:
return False
# 获取账号最后发送时间
last_sent_time_str = self.redis_client.get(f"account:last_sent:{account_id}")
if last_sent_time_str:
last_sent_time = datetime.fromisoformat(last_sent_time_str)
if datetime.now(pytz.UTC) - last_sent_time < timedelta(seconds=BusinessConstraints.MESSAGE_SEND_INTERVAL):
return False
return True
def update_account_stats(self, account_id, success=True):
"""更新账号发送统计信息"""
today = datetime.now(pytz.UTC).strftime("%Y-%m-%d")
stats_key = f"{self.account_stats_prefix}{account_id}:{today}"
with self.lock:
# 增加今日发送量
self.redis_client.incr(stats_key)
# 设置过期时间为24小时
self.redis_client.expire(stats_key, 86400)
# 更新最后发送时间
self.redis_client.set(f"account:last_sent:{account_id}", datetime.now(pytz.UTC).isoformat())
# 如果达到每日上限,更新数据库状态
daily_sent = int(self.redis_client.get(stats_key))
if daily_sent >= BusinessConstraints.MAX_DAILY_MESSAGES_PER_ACCOUNT:
with self.db_connection.cursor() as cursor:
sql = """
UPDATE imessage_accounts
SET status = %s, cooldown_until = %s, updated_at = %s
WHERE id = %s
"""
cooldown_until = datetime.now(pytz.UTC) + timedelta(hours=BusinessConstraints.ACCOUNT_COOLDOWN_HOURS)
cursor.execute(sql, (
AccountStatus.COOLDOWN.value,
cooldown_until,
datetime.now(pytz.UTC),
account_id
))
self.db_connection.commit()
# 从缓存中移除该账号
self._remove_account_from_cache(account_id)
print(f"[{datetime.now(pytz.UTC)}] Account {account_id} reached daily limit, entering cooldown")
def _remove_account_from_cache(self, account_id):
"""从缓存中移除指定账号"""
accounts = self.redis_client.smembers(self.account_cache_key)
for account_data in accounts:
account = json.loads(account_data)
if account['id'] == account_id:
self.redis_client.srem(self.account_cache_key, account_data)
break
def recover_cooldown_accounts(self):
"""恢复冷却期结束的账号"""
with self.db_connection.cursor() as cursor:
sql = """
SELECT id FROM imessage_accounts
WHERE status = %s AND cooldown_until <= %s
"""
cursor.execute(sql, (
AccountStatus.COOLDOWN.value,
datetime.now(pytz.UTC)
))
accounts_to_recover = cursor.fetchall()
for account in accounts_to_recover:
account_id = account['id']
# 更新数据库状态
update_sql = """
UPDATE imessage_accounts
SET status = %s, daily_sent_count = 0, updated_at = %s
WHERE id = %s
"""
cursor.execute(update_sql, (
AccountStatus.ACTIVE.value,
datetime.now(pytz.UTC),
account_id
))
# 清除统计信息
today = datetime.now(pytz.UTC).strftime("%Y-%m-%d")
stats_key = f"{self.account_stats_prefix}{account_id}:{today}"
self.redis_client.delete(stats_key)
print(f"[{datetime.now(pytz.UTC)}] Account {account_id} recovered from cooldown")
self.db_connection.commit()
if accounts_to_recover:
# 重新加载账号到缓存
self.load_accounts_from_db()
# 四、消息队列与异步发送机制的设计与实现
为了提高系统的并发处理能力和可靠性,我们采用阿里云RocketMQ作为消息中间件,实现消息的异步发送。当用户提交批量发送任务时,系统会将任务拆分成单个消息并发送到RocketMQ队列中。多个工作线程从队列中消费消息,获取可用账号后执行实际的发送操作。这种设计可以有效解耦消息提交和消息发送过程,提高系统的吞吐量和容错能力。
```import json
import time
from datetime import datetime
import pytz
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.acs_exception.exceptions import ClientException, ServerException
from aliyunsdkrocketmq.request.v20190214 import SendMessageRequest, ConsumeMessageRequest
class MQProducer:
def __init__(self):
self.client = AcsClient(
SystemConfig.ALIYUN_ACCESS_KEY,
SystemConfig.ALIYUN_SECRET_KEY,
SystemConfig.ALIYUN_REGION
)
self.topic = SystemConfig.ROCKETMQ_TOPIC
def send_message(self, message_data):
"""发送单条消息到队列"""
try:
request = SendMessageRequest.SendMessageRequest()
request.set_InstanceId("MQ_INST_xxxx")
request.set_Topic(self.topic)
request.set_Tag("imessage")
request.set_Body(json.dumps(message_data))
request.set_Key(str(message_data['message_id']))
response = self.client.do_action_with_exception(request)
response_data = json.loads(response)
if response_data['Code'] == 'Success':
print(f"[{datetime.now(pytz.UTC)}] Message {message_data['message_id']} sent to MQ successfully")
return True
else:
print(f"[{datetime.now(pytz.UTC)}] Failed to send message {message_data['message_id']} to MQ: {response_data['Message']}")
return False
except Exception as e:
print(f"[{datetime.now(pytz.UTC)}] Exception sending message to MQ: {str(e)}")
return False
def send_batch_messages(self, messages):
"""批量发送消息到队列"""
success_count = 0
for message in messages:
if self.send_message(message):
success_count += 1
time.sleep(0.01) # 避免发送过快
print(f"[{datetime.now(pytz.UTC)}] Batch send completed: {success_count}/{len(messages)} messages sent to MQ")
return success_count
class MQConsumer:
def __init__(self, worker_id):
self.worker_id = worker_id
self.client = AcsClient(
SystemConfig.ALIYUN_ACCESS_KEY,
SystemConfig.ALIYUN_SECRET_KEY,
SystemConfig.ALIYUN_REGION
)
self.topic = SystemConfig.ROCKETMQ_TOPIC
self.group_id = SystemConfig.ROCKETMQ_GROUP_ID
self.running = True
def start_consuming(self, message_handler):
"""开始消费消息"""
print(f"[{datetime.now(pytz.UTC)}] Worker {self.worker_id} started consuming messages")
while self.running:
try:
request = ConsumeMessageRequest.ConsumeMessageRequest()
request.set_InstanceId("MQ_INST_xxxx")
request.set_Topic(self.topic)
request.set_ConsumerId(self.group_id)
request.set_MaxMessageNum(SystemConfig.BATCH_SIZE)
request.set_WaitSeconds(10)
response = self.client.do_action_with_exception(request)
response_data = json.loads(response)
if response_data['Code'] == 'Success' and 'Messages' in response_data:
messages = response_data['Messages']
if messages:
print(f"[{datetime.now(pytz.UTC)}] Worker {self.worker_id} received {len(messages)} messages")
for msg in messages:
message_body = json.loads(msg['Body'])
receipt_handle = msg['ReceiptHandle']
# 处理消息
success = message_handler(message_body)
# 确认消息
if success:
self.ack_message(receipt_handle)
else:
# 消息处理失败,稍后重新消费
print(f"[{datetime.now(pytz.UTC)}] Message {message_body['message_id']} processing failed, will retry later")
time.sleep(1)
except Exception as e:
print(f"[{datetime.now(pytz.UTC)}] Exception consuming messages: {str(e)}")
time.sleep(5)
def ack_message(self, receipt_handle):
"""确认消息消费成功"""
try:
from aliyunsdkrocketmq.request.v20190214 import AckMessageRequest
request = AckMessageRequest.AckMessageRequest()
request.set_InstanceId("MQ_INST_xxxx")
request.set_Topic(self.topic)
request.set_ConsumerId(self.group_id)
request.set_ReceiptHandles([receipt_handle])
response = self.client.do_action_with_exception(request)
response_data = json.loads(response)
if response_data['Code'] != 'Success':
print(f"[{datetime.now(pytz.UTC)}] Failed to ack message: {response_data['Message']}")
except Exception as e:
print(f"[{datetime.now(pytz.UTC)}] Exception acking message: {str(e)}")
def stop(self):
"""停止消费"""
self.running = False
print(f"[{datetime.now(pytz.UTC)}] Worker {self.worker_id} stopped consuming messages")
五、多账号负载均衡与限流策略代码实现
多账号负载均衡是保证系统稳定运行的关键。我们采用加权轮询算法进行账号分配,根据账号的历史发送成功率和剩余发送量动态调整权重。同时,系统实现了多级限流策略,包括账号级限流、IP级限流和系统级限流,防止发送频率过高触发苹果的风控机制。此外,我们还引入了消息内容去重机制,避免短时间内发送大量相似内容。
```import hashlib
import json
import time
from datetime import datetime, timedelta
import pytz
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
class LoadBalancer:
def init(self, account_pool_manager):
self.account_pool_manager = account_pool_manager
self.vectorizer = TfidfVectorizer()
self.content_cache = {}
self.content_cache_expiry = 3600 # 1小时
def select_account(self):
"""使用加权轮询算法选择账号"""
# 获取所有可用账号
accounts = self._get_available_accounts_with_weights()
if not accounts:
return None
# 计算总权重
total_weight = sum(account['weight'] for account in accounts)
if total_weight == 0:
return self.account_pool_manager.get_available_account()
# 加权轮询选择
import random
rand = random.uniform(0, total_weight)
current_sum = 0
for account in accounts:
current_sum += account['weight']
if current_sum >= rand:
return account['data']
return accounts[0]['data']
def _get_available_accounts_with_weights(self):
"""获取带权重的可用账号列表"""
accounts_data = self.account_pool_manager.redis_client.smembers(self.account_pool_manager.account_cache_key)
weighted_accounts = []
for account_data in accounts_data:
account = json.loads(account_data)
account_id = account['id']
# 计算基础权重
today = datetime.now(pytz.UTC).strftime("%Y-%m-%d")
stats_key = f"{self.account_pool_manager.account_stats_prefix}{account_id}:{today}"
daily_sent = int(self.account_pool_manager.redis_client.get(stats_key) or 0)
remaining = BusinessConstraints.MAX_DAILY_MESSAGES_PER_ACCOUNT - daily_sent
if remaining <= 0:
continue
# 基础权重与剩余发送量成正比
base_weight = remaining / BusinessConstraints.MAX_DAILY_MESSAGES_PER_ACCOUNT
# 成功率权重
success_rate = self._get_account_success_rate(account_id)
success_weight = success_rate * 0.6
# 活跃度权重(最近发送过的账号权重稍低)
last_sent_time_str = self.account_pool_manager.redis_client.get(f"account:last_sent:{account_id}")
if last_sent_time_str:
last_sent_time = datetime.fromisoformat(last_sent_time_str)
time_since_last = (datetime.now(pytz.UTC) - last_sent_time).total_seconds()
activity_weight = min(time_since_last / BusinessConstraints.MESSAGE_SEND_INTERVAL, 1.0) * 0.2
else:
activity_weight = 0.2
# 总权重
total_weight = base_weight * 0.2 + success_weight + activity_weight
weighted_accounts.append({
'data': account,
'weight': total_weight
})
return weighted_accounts
def _get_account_success_rate(self, account_id):
"""获取账号历史发送成功率"""
success_key = f"account:success:{account_id}"
fail_key = f"account:fail:{account_id}"
success_count = int(self.account_pool_manager.redis_client.get(success_key) or 0)
fail_count = int(self.account_pool_manager.redis_client.get(fail_key) or 0)
if success_count + fail_count == 0:
return 1.0 # 新账号默认成功率100%
return success_count / (success_count + fail_count)
def update_account_success_rate(self, account_id, success):
"""更新账号成功率统计"""
success_key = f"account:success:{account_id}"
fail_key = f"account:fail:{account_id}"
if success:
self.account_pool_manager.redis_client.incr(success_key)
else:
self.account_pool_manager.redis_client.incr(fail_key)
# 设置过期时间为7天
self.account_pool_manager.redis_client.expire(success_key, 604800)
self.account_pool_manager.redis_client.expire(fail_key, 604800)
class RateLimiter:
def init(self, redis_client):
self.redis_client = redis_client
def check_account_rate_limit(self, account_id):
"""检查账号级限流"""
minute_key = f"rate:account:minute:{account_id}:{datetime.now(pytz.UTC).strftime('%Y%m%d%H%M')}"
hour_key = f"rate:account:hour:{account_id}:{datetime.now(pytz.UTC).strftime('%Y%m%d%H')}"
minute_count = self.redis_client.incr(minute_key)
hour_count = self.redis_client.incr(hour_key)
# 设置过期时间
self.redis_client.expire(minute_key, 60)
self.redis_client.expire(hour_key, 3600)
if minute_count > BusinessConstraints.MAX_MINUTE_MESSAGES_PER_ACCOUNT:
return False
return True
def check_ip_rate_limit(self, ip_address):
"""检查IP级限流"""
minute_key = f"rate:ip:minute:{ip_address}:{datetime.now(pytz.UTC).strftime('%Y%m%d%H%M')}"
hour_key = f"rate:ip:hour:{ip_address}:{datetime.now(pytz.UTC).strftime('%Y%m%d%H')}"
minute_count = self.redis_client.incr(minute_key)
hour_count = self.redis_client.incr(hour_key)
self.redis_client.expire(minute_key, 60)
self.redis_client.expire(hour_key, 3600)
# 单IP每分钟最多发送50条
if minute_count > 50:
return False
# 单IP每小时最多发送500条
if hour_count > 500:
return False
return True
def check_system_rate_limit(self):
"""检查系统级限流"""
minute_key = f"rate:system:minute:{datetime.now(pytz.UTC).strftime('%Y%m%d%H%M')}"
count = self.redis_client.incr(minute_key)
self.redis_client.expire(minute_key, 60)
# 系统每分钟最多发送1000条
return count <= 1000
class ContentFilter:
def init(self, redis_client):
self.redis_client = redis_client
self.vectorizer = TfidfVectorizer(stop_words='english')
def check_content_similarity(self, content, account_id):
"""检查内容相似度"""
# 计算内容哈希
content_hash = hashlib.md5(content.encode('utf-8')).hexdigest()
# 检查是否有完全相同的内容在最近1小时内发送过
duplicate_key = f"content:duplicate:{content_hash}"
if self.redis_client.exists(duplicate_key):
return False
# 检查与该账号最近发送内容的相似度
recent_contents_key = f"content:recent:{account_id}"
recent_contents = self.redis_client.lrange(recent_contents_key, 0, 9) # 获取最近10条
if recent_contents:
# 构建TF-IDF矩阵
all_contents = [content] + recent_contents
tfidf_matrix = self.vectorizer.fit_transform(all_contents)
# 计算余弦相似度
cosine_similarities = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:]).flatten()
# 检查是否有相似度超过阈值的内容
for similarity in cosine_similarities:
if similarity > BusinessConstraints.CONTENT_SIMILARITY_THRESHOLD:
return False
# 保存当前内容到最近发送列表
self.redis_client.lpush(recent_contents_key, content)
self.redis_client.ltrim(recent_contents_key, 0, 9) # 只保留最近10条
self.redis_client.expire(recent_contents_key, 3600)
# 保存内容哈希
self.redis_client.setex(duplicate_key, 3600, "1")
return True
# 六、消息状态追踪与失败重试机制
消息状态追踪是保证消息可靠性的重要手段。我们在数据库中为每条消息维护完整的状态流转记录,包括待发送、发送中、已发送、已送达、发送失败等状态。当消息发送失败时,系统会根据失败类型自动进行重试,重试次数和间隔时间可以配置。对于多次重试仍然失败的消息,系统会将其标记为最终失败并记录详细的失败原因,方便后续排查和人工处理。
```import pymysql
import json
import time
from datetime import datetime, timedelta
import pytz
class MessageStatusTracker:
def __init__(self):
self.db_connection = pymysql.connect(
host=SystemConfig.DB_HOST,
port=SystemConfig.DB_PORT,
user=SystemConfig.DB_USER,
password=SystemConfig.DB_PASSWORD,
database=SystemConfig.DB_NAME,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
def create_message(self, message_data):
"""创建新消息记录"""
with self.db_connection.cursor() as cursor:
sql = """
INSERT INTO messages (
message_id, task_id, recipient, content, sender_account_id,
status, retry_count, created_at, updated_at
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(sql, (
message_data['message_id'],
message_data.get('task_id'),
message_data['recipient'],
message_data['content'],
message_data.get('sender_account_id'),
MessageStatus.PENDING.value,
0,
datetime.now(pytz.UTC),
datetime.now(pytz.UTC)
))
self.db_connection.commit()
return message_data['message_id']
def update_message_status(self, message_id, status, sender_account_id=None, error_message=None):
"""更新消息状态"""
with self.db_connection.cursor() as cursor:
updates = []
params = []
updates.append("status = %s")
params.append(status.value)
if sender_account_id:
updates.append("sender_account_id = %s")
params.append(sender_account_id)
if error_message:
updates.append("error_message = %s")
params.append(error_message)
updates.append("updated_at = %s")
params.append(datetime.now(pytz.UTC))
if status == MessageStatus.SENT:
updates.append("sent_at = %s")
params.append(datetime.now(pytz.UTC))
elif status == MessageStatus.DELIVERED:
updates.append("delivered_at = %s")
params.append(datetime.now(pytz.UTC))
sql = f"""
UPDATE messages
SET {', '.join(updates)}
WHERE message_id = %s
"""
params.append(message_id)
cursor.execute(sql, params)
self.db_connection.commit()
def increment_retry_count(self, message_id):
"""增加消息重试次数"""
with self.db_connection.cursor() as cursor:
sql = """
UPDATE messages
SET retry_count = retry_count + 1, updated_at = %s
WHERE message_id = %s
"""
cursor.execute(sql, (datetime.now(pytz.UTC), message_id))
self.db_connection.commit()
# 获取当前重试次数
with self.db_connection.cursor() as cursor:
sql = "SELECT retry_count FROM messages WHERE message_id = %s"
cursor.execute(sql, (message_id,))
result = cursor.fetchone()
return result['retry_count'] if result else 0
def get_failed_messages_for_retry(self):
"""获取需要重试的失败消息"""
with self.db_connection.cursor() as cursor:
sql = """
SELECT message_id, task_id, recipient, content, retry_count, error_message
FROM messages
WHERE status = %s AND retry_count < %s
LIMIT %s
"""
cursor.execute(sql, (
MessageStatus.FAILED.value,
BusinessConstraints.MAX_RETRY_COUNT,
SystemConfig.BATCH_SIZE
))
return cursor.fetchall()
def get_message_status(self, message_id):
"""获取消息状态"""
with self.db_connection.cursor() as cursor:
sql = """
SELECT message_id, task_id, recipient, content, sender_account_id,
status, retry_count, error_message, created_at, sent_at, delivered_at
FROM messages
WHERE message_id = %s
"""
cursor.execute(sql, (message_id,))
return cursor.fetchone()
class RetryManager:
def __init__(self, message_tracker, mq_producer):
self.message_tracker = message_tracker
self.mq_producer = mq_producer
def schedule_retry(self, message_id, error_message):
"""安排消息重试"""
retry_count = self.message_tracker.increment_retry_count(message_id)
if retry_count >= BusinessConstraints.MAX_RETRY_COUNT:
# 达到最大重试次数,标记为最终失败
self.message_tracker.update_message_status(
message_id,
MessageStatus.FAILED,
error_message=f"Max retry count reached: {error_message}"
)
print(f"[{datetime.now(pytz.UTC)}] Message {message_id} reached max retry count, marked as failed")
return False
# 计算下次重试时间
retry_interval = BusinessConstraints.RETRY_INTERVALS[retry_count - 1]
retry_time = datetime.now(pytz.UTC) + timedelta(seconds=retry_interval)
# 更新消息状态为重试中
self.message_tracker.update_message_status(
message_id,
MessageStatus.RETRYING,
error_message=f"Retry {retry_count} scheduled at {retry_time.isoformat()}: {error_message}"
)
print(f"[{datetime.now(pytz.UTC)}] Message {message_id} scheduled for retry {retry_count} at {retry_time.isoformat()}")
return True
def process_retry_queue(self):
"""处理重试队列"""
failed_messages = self.message_tracker.get_failed_messages_for_retry()
if not failed_messages:
return
print(f"[{datetime.now(pytz.UTC)}] Found {len(failed_messages)} messages for retry")
for message in failed_messages:
message_id = message['message_id']
retry_count = message['retry_count']
# 检查是否到了重试时间
with self.message_tracker.db_connection.cursor() as cursor:
sql = "SELECT updated_at FROM messages WHERE message_id = %s"
cursor.execute(sql, (message_id,))
result = cursor.fetchone()
if not result:
continue
updated_at = result['updated_at']
retry_interval = BusinessConstraints.RETRY_INTERVALS[retry_count]
if datetime.now(pytz.UTC) - updated_at < timedelta(seconds=retry_interval):
continue
# 重新发送消息到队列
message_data = {
'message_id': message_id,
'task_id': message['task_id'],
'recipient': message['recipient'],
'content': message['content'],
'is_retry': True,
'retry_count': retry_count
}
if self.mq_producer.send_message(message_data):
# 更新消息状态为待发送
self.message_tracker.update_message_status(message_id, MessageStatus.PENDING)
print(f"[{datetime.now(pytz.UTC)}] Message {message_id} requeued for retry {retry_count + 1}")
七、基于阿里云日志服务的监控与告警系统
为了实时监控系统的运行状态,我们集成了阿里云日志服务SLS。系统将所有关键操作日志和错误日志发送到SLS,通过SLS的查询分析功能实现实时监控和统计分析。我们还配置了多种告警规则,当系统出现异常情况(如账号被封禁、发送成功率过低、队列堆积严重等)时,会自动通过邮件和短信通知运维人员。
```import json
import time
from datetime import datetime
import pytz
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.acs_exception.exceptions import ClientException, ServerException
from aliyunsdksls.request.v20191023 import PutLogsRequest
class SLSLogger:
def init(self):
self.client = AcsClient(
SystemConfig.ALIYUN_ACCESS_KEY,
SystemConfig.ALIYUN_SECRET_KEY,
SystemConfig.ALIYUN_REGION
)
self.project = SystemConfig.SLS_PROJECT
self.logstore = SystemConfig.SLS_LOGSTORE
def _put_logs(self, log_group):
"""发送日志到SLS"""
try:
request = PutLogsRequest.PutLogsRequest()
request.set_Project(self.project)
request.set_Logstore(self.logstore)
request.set_LogGroup(log_group)
response = self.client.do_action_with_exception(request)
return True
except Exception as e:
print(f"[{datetime.now(pytz.UTC)}] Failed to send logs to SLS: {str(e)}")
return False
def log_message_sent(self, message_id, account_id, recipient, success, duration_ms, error_message=None):
"""记录消息发送日志"""
log_contents = {
'type': 'message_sent',
'message_id': message_id,
'account_id': str(account_id),
'recipient': recipient,
'success': str(success).lower(),
'duration_ms': str(duration_ms),
'timestamp': str(int(time.time() * 1000))
}
if error_message:
log_contents['error_message'] = error_message
log_group = {
'logs': [
{
'time': int(time.time()),
'contents': log_contents
}
]
}
self._put_logs(log_group)
def log_account_status_change(self, account_id, old_status, new_status, reason):
"""记录账号状态变更日志"""
log_group = {
'logs': [
{
'time': int(time.time()),
'contents': {
'type': 'account_status_change',
'account_id': str(account_id),
'old_status': old_status,
'new_status': new_status,
'reason': reason,
'timestamp': str(int(time.time() * 1000))
}
}
]
}
self._put_logs(log_group)
def log_system_metrics(self, metrics):
"""记录系统指标日志"""
log_contents = {
'type': 'system_metrics',
'timestamp': str(int(time.time() * 1000))
}
for key, value in metrics.items():
log_contents[key] = str(value)
log_group = {
'logs': [
{
'time': int(time.time()),
'contents': log_contents
}
]
}
self._put_logs(log_group)
def log_error(self, error_type, error_message, stack_trace=None):
"""记录错误日志"""
log_contents = {
'type': 'error',
'error_type': error_type,
'error_message': error_message,
'timestamp': str(int(time.time() * 1000))
}
if stack_trace:
log_contents['stack_trace'] = stack_trace
log_group = {
'logs': [
{
'time': int(time.time()),
'contents': log_contents
}
]
}
self._put_logs(log_group)
class MonitorService:
def init(self, sls_logger, account_pool_manager):
self.sls_logger = sls_logger
self.account_pool_manager = account_pool_manager
self.running = True
def start_monitoring(self):
"""启动监控服务"""
print(f"[{datetime.now(pytz.UTC)}] Monitor service started")
while self.running:
try:
# 收集系统指标
metrics = self._collect_system_metrics()
# 发送指标到SLS
self.sls_logger.log_system_metrics(metrics)
# 检查告警条件
self._check_alerts(metrics)
time.sleep(60) # 每分钟收集一次
except Exception as e:
print(f"[{datetime.now(pytz.UTC)}] Exception in monitor service: {str(e)}")
self.sls_logger.log_error("monitor_exception", str(e))
time.sleep(10)
def _collect_system_metrics(self):
"""收集系统指标"""
metrics = {}
# 账号统计
with self.account_pool_manager.db_connection.cursor() as cursor:
cursor.execute("SELECT COUNT(*) as total FROM imessage_accounts")
metrics['total_accounts'] = cursor.fetchone()['total']
cursor.execute("SELECT COUNT(*) as active FROM imessage_accounts WHERE status = %s", (AccountStatus.ACTIVE.value,))
metrics['active_accounts'] = cursor.fetchone()['active']
cursor.execute("SELECT COUNT(*) as cooldown FROM imessage_accounts WHERE status = %s", (AccountStatus.COOLDOWN.value,))
metrics['cooldown_accounts'] = cursor.fetchone()['cooldown']
cursor.execute("SELECT COUNT(*) as banned FROM imessage_accounts WHERE status = %s", (AccountStatus.BANNED.value,))
metrics['banned_accounts'] = cursor.fetchone()['banned']
# 消息统计
with self.account_pool_manager.db_connection.cursor() as cursor:
today_start = datetime.now(pytz.UTC).replace(hour=0, minute=0, second=0, microsecond=0)
cursor.execute("SELECT COUNT(*) as today_total FROM messages WHERE created_at >= %s", (today_start,))
metrics['today_total_messages'] = cursor.fetchone()['today_total']
cursor.execute("SELECT COUNT(*) as today_sent FROM messages WHERE status = %s AND sent_at >= %s", (MessageStatus.SENT.value, today_start))
metrics['today_sent_messages'] = cursor.fetchone()['today_sent']
cursor.execute("SELECT COUNT(*) as today_failed FROM messages WHERE status = %s AND updated_at >= %s", (MessageStatus.FAILED.value, today_start))
metrics['today_failed_messages'] = cursor.fetchone()['today_failed']
# 计算成功率
if metrics['today_total_messages'] > 0:
metrics['success_rate'] = round(metrics['today_sent_messages'] / metrics['today_total_messages'] * 100, 2)
else:
metrics['success_rate'] = 100.0
# 队列长度(需要从RocketMQ获取,这里简化处理)
metrics['queue_length'] = 0 # 实际应调用RocketMQ API获取
return metrics
def _check_alerts(self, metrics):
"""检查告警条件"""
# 账号封禁告警
if metrics['banned_accounts'] > 0:
self._send_alert(
"账号封禁告警",
f"检测到有 {metrics['banned_accounts']} 个账号被封禁,请及时处理"
)
# 成功率过低告警
if metrics['success_rate'] < 80.0:
self._send_alert(
"发送成功率过低告警",
f"今日发送成功率仅为 {metrics['success_rate']}%,低于80%阈值"
)
# 活跃账号不足告警
if metrics['active_accounts'] < 10:
self._send_alert(
"活跃账号不足告警",
f"当前活跃账号仅为 {metrics['active_accounts']} 个,低于10个阈值"
)
def _send_alert(self, title, content):
"""发送告警通知"""
# 实际应调用阿里云短信或邮件服务发送通知
print(f"[{datetime.now(pytz.UTC)}] ALERT: {title} - {content}")
# 记录告警日志
self.sls_logger.log_error("system_alert", f"{title}: {content}")
def stop(self):
"""停止监控服务"""
self.running = False
print(f"[{datetime.now(pytz.UTC)}] Monitor service stopped")
八、系统性能优化与安全防护措施
为了提高系统的性能和安全性,我们采取了多种优化措施。在性能方面,我们使用连接池管理数据库和Redis连接,减少连接创建和销毁的开销;采用批量处理和异步IO提高并发处理能力;对热点数据进行缓存,减少数据库访问次数。在安全方面,我们对所有敏感信息(如Apple ID密码、设备令牌等)进行加密存储;使用HTTPS协议进行数据传输;实现了严格的身份验证和权限控制;定期进行安全审计和漏洞扫描。
```import pymysql
import redis
import hashlib
import hmac
import base64
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import os
import time
from datetime import datetime
import pytz
# 数据库连接池
class DatabaseConnectionPool:
def __init__(self, max_connections=10):
self.max_connections = max_connections
self.connections = []
self.in_use = set()
def get_connection(self):
"""获取数据库连接"""
# 查找可用连接
for conn in self.connections:
if conn not in self.in_use:
# 检查连接是否有效
try:
conn.ping()
self.in_use.add(conn)
return conn
except:
self.connections.remove(conn)
# 创建新连接
if len(self.connections) < self.max_connections:
conn = pymysql.connect(
host=SystemConfig.DB_HOST,
port=SystemConfig.DB_PORT,
user=SystemConfig.DB_USER,
password=SystemConfig.DB_PASSWORD,
database=SystemConfig.DB_NAME,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor,
autocommit=True
)
self.connections.append(conn)
self.in_use.add(conn)
return conn
# 等待连接释放
time.sleep(0.1)
return self.get_connection()
def release_connection(self, conn):
"""释放数据库连接"""
if conn in self.in_use:
self.in_use.remove(conn)
def close_all(self):
"""关闭所有连接"""
for conn in self.connections:
conn.close()
self.connections.clear()
self.in_use.clear()
# Redis连接池
class RedisConnectionPool:
def __init__(self):
self.pool = redis.ConnectionPool(
host=SystemConfig.REDIS_HOST,
port=SystemConfig.REDIS_PORT,
password=SystemConfig.REDIS_PASSWORD,
db=SystemConfig.REDIS_DB,
decode_responses=True,
max_connections=20
)
def get_client(self):
"""获取Redis客户端"""
return redis.Redis(connection_pool=self.pool)
# 数据加密工具
class EncryptionUtil:
def __init__(self, master_key=None):
if master_key:
self.master_key = master_key
else:
self.master_key = os.getenv("ENCRYPTION_MASTER_KEY", "default_master_key_123456")
# 生成加密密钥
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=b"imessage_system_salt",
iterations=100000,
)
key = base64.urlsafe_b64encode(kdf.derive(self.master_key.encode()))
self.cipher = Fernet(key)
def encrypt(self, plaintext):
"""加密数据"""
if not plaintext:
return plaintext
return self.cipher.encrypt(plaintext.encode()).decode()
def decrypt(self, ciphertext):
"""解密数据"""
if not ciphertext:
return ciphertext
try:
return self.cipher.decrypt(ciphertext.encode()).decode()
except:
return ciphertext
# 身份验证中间件
class AuthMiddleware:
def __init__(self, redis_client):
self.redis_client = redis_client
self.token_expiry = 86400 # 24小时
def generate_token(self, user_id):
"""生成访问令牌"""
token = hashlib.sha256(f"{user_id}{time.time()}{os.urandom(16)}".encode()).hexdigest()
self.redis_client.setex(f"auth:token:{token}", self.token_expiry, str(user_id))
return token
def verify_token(self, token):
"""验证访问令牌"""
if not token:
return None
user_id = self.redis_client.get(f"auth:token:{token}")
if user_id:
# 刷新令牌过期时间
self.redis_client.expire(f"auth:token:{token}", self.token_expiry)
return int(user_id)
return None
def invalidate_token(self, token):
"""使令牌失效"""
self.redis_client.delete(f"auth:token:{token}")
# 安全审计日志
class SecurityAuditLogger:
def __init__(self, db_pool):
self.db_pool = db_pool
def log_action(self, user_id, action, ip_address, details=None):
"""记录安全审计日志"""
conn = self.db_pool.get_connection()
try:
with conn.cursor() as cursor:
sql = """
INSERT INTO security_audit_logs (
user_id, action, ip_address, details, created_at
) VALUES (%s, %s, %s, %s, %s)
"""
cursor.execute(sql, (
user_id,
action,
ip_address,
json.dumps(details) if details else None,
datetime.now(pytz.UTC)
))
conn.commit()
finally:
self.db_pool.release_connection(conn)
def get_recent_logs(self, user_id=None, limit=100):
"""获取最近的审计日志"""
conn = self.db_pool.get_connection()
try:
with conn.cursor() as cursor:
if user_id:
sql = """
SELECT * FROM security_audit_logs
WHERE user_id = %s
ORDER BY created_at DESC
LIMIT %s
"""
cursor.execute(sql, (user_id, limit))
else:
sql = """
SELECT * FROM security_audit_logs
ORDER BY created_at DESC
LIMIT %s
"""
cursor.execute(sql, (limit,))
return cursor.fetchall()
finally:
self.db_pool.release_connection(conn)
# 初始化全局实例
db_pool = DatabaseConnectionPool()
redis_pool = RedisConnectionPool()
encryption_util = EncryptionUtil()
auth_middleware = AuthMiddleware(redis_pool.get_client())
security_audit_logger = SecurityAuditLogger(db_pool)
# 使用示例
def example_usage():
# 加密敏感信息
apple_id = "user@example.com"
password = "password123"
encrypted_password = encryption_util.encrypt(password)
print(f"Encrypted password: {encrypted_password}")
print(f"Decrypted password: {encryption_util.decrypt(encrypted_password)}")
# 生成和验证令牌
user_id = 1
token = auth_middleware.generate_token(user_id)
print(f"Generated token: {token}")
print(f"Verified user ID: {auth_middleware.verify_token(token)}")
# 记录审计日志
security_audit_logger.log_action(
user_id=1,
action="login",
ip_address="192.168.1.1",
details={"user_agent": "Mozilla/5.0"}
)
# 获取数据库连接
conn = db_pool.get_connection()
try:
with conn.cursor() as cursor:
cursor.execute("SELECT COUNT(*) as count FROM imessage_accounts")
result = cursor.fetchone()
print(f"Total accounts: {result['count']}")
finally:
db_pool.release_connection(conn)
if __name__ == "__main__":
example_usage()
本文详细介绍了一套基于阿里云生态的跨境电商多账号iMessage群发短信系统架构设计与实现。通过合理的分层架构、多账号池管理、异步消息处理、负载均衡和完善的监控告警机制,系统能够稳定高效地处理大规模的消息发送任务。在实际应用中,我们还需要根据具体的业务需求和苹果的风控政策不断调整和优化系统参数,确保系统的长期稳定运行。同时,我们也要严格遵守相关法律法规和用户隐私保护要求,合法合规地使用iMessage群发短信功能。