iMessage群发系统,是一套面向企业内部通知、已授权会员服务触达场景设计的分布式设备管理与消息推送系统,可实现对数十台苹果终端的集中化管控、状态实时监控与自动化iMessage消息合规发送。在企业级批量消息触达场景中,传统单设备人工操作模式存在管理效率低、发送节奏不可控、设备状态无法统一感知等痛点,这套系统基于分布式架构设计,解决了多设备集群统一调度、消息全生命周期管理、发送策略动态适配等核心问题,同时完成了与阿里云基础设施的深度适配,保障了系统在业务场景中的稳定性与可扩展性。本文所有内容仅做技术实战分享,严禁用于未经用户授权的骚扰信息、违规营销等场景,使用者需严格遵守国家相关法律法规与苹果官方开发者协议
一、系统设计背景与需求目标
在企业合规消息触达的实际项目落地中,我们需要对接大量已授权的苹果设备用户进行服务通知,单台设备的发送能力无法满足业务量级需求,而数十台设备的分散管理又带来了极高的运维成本。基于此,我们开发了这套iMessage群发系统,核心需求集中在五点:
一是实现50台以上苹果终端的统一接入与集中管理;
二是支持批量iMessage消息的任务化推送,可自定义发送节奏;
三是实现设备在线状态、消息发送进度的实时监控;
四是内置合规风控机制,规避账号风险;
五是可在阿里云环境实现快速部署与弹性扩容。
二、系统整体架构与阿里云适配方案
本系统采用分层分布式架构设计,从上至下分为终端设备层、通信网关层、核心业务层、基础设施层四层,各模块解耦可独立扩容。其中基础设施层完全基于阿里云产品搭建,计算资源采用阿里云ECS,数据存储采用阿里云RDS MySQL,消息削峰填谷采用阿里云RocketMQ,全链路监控采用阿里云ARMS,保障了系统的高可用与可观测性。
三、核心服务端基础框架代码实现
```# iMessage群发系统 核心服务主程序
开发环境:Python 3.10+ 依赖:FastAPI SQLAlchemy Pydantic PyMySQL
import asyncio
import uuid
from datetime import datetime, timedelta
from typing import Optional, List, Dict
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, HTTPException, status
from pydantic import BaseModel, Field
from sqlalchemy import create_engine, Column, String, Integer, DateTime, Text, Float
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
阿里云RDS MySQL数据库配置
SQLALCHEMY_DATABASE_URL = "mysql+pymysql://your_user:your_password@rm-xxxx.mysql.rds.aliyuncs.com:3306/imessage_system?charset=utf8mb4"
engine = create_engine(SQLALCHEMY_DATABASE_URL, pool_recycle=3600, pool_pre_ping=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
数据库模型定义
class Device(Base):
tablename = "devices"
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
device_id = Column(String(64), unique=True, nullable=False, index=True)
device_name = Column(String(128), nullable=False)
device_status = Column(String(32), nullable=False, default="offline")
last_heartbeat = Column(DateTime, nullable=True)
total_send_count = Column(Integer, default=0)
today_send_count = Column(Integer, default=0)
max_daily_limit = Column(Integer, default=200)
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
class MessageTask(Base):
tablename = "message_tasks"
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
task_id = Column(String(64), unique=True, nullable=False, index=True)
task_name = Column(String(128), nullable=False)
message_content = Column(Text, nullable=False)
target_phone_list = Column(Text, nullable=False)
total_count = Column(Integer, nullable=False)
sent_count = Column(Integer, default=0)
success_count = Column(Integer, default=0)
fail_count = Column(Integer, default=0)
task_status = Column(String(32), default="pending")
send_interval_min = Column(Float, default=3.0)
send_interval_max = Column(Float, default=8.0)
created_by = Column(String(64), nullable=False)
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
class SendRecord(Base):
tablename = "send_records"
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
record_id = Column(String(64), unique=True, nullable=False, index=True)
task_id = Column(String(64), nullable=False, index=True)
device_id = Column(String(64), nullable=False, index=True)
target_phone = Column(String(32), nullable=False)
send_status = Column(String(32), nullable=False, default="sending")
fail_reason = Column(Text, nullable=True)
send_time = Column(DateTime, nullable=True)
created_at = Column(DateTime, default=datetime.now)
数据库表初始化
Base.metadata.create_all(bind=engine)
FastAPI应用初始化
app = FastAPI(title="iMessage群发系统核心服务", version="1.0.0")
数据库依赖项
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
数据校验模型
class DeviceCreate(BaseModel):
device_name: str = Field(min_length=1, max_length=128)
max_daily_limit: Optional[int] = Field(default=200, ge=1, le=1000)
class TaskCreate(BaseModel):
task_name: str = Field(min_length=1, max_length=128)
message_content: str = Field(min_length=1)
target_phone_list: List[str] = Field(min_length=1)
send_interval_min: Optional[float] = Field(default=3.0, ge=1.0)
send_interval_max: Optional[float] = Field(default=8.0, ge=1.0)
created_by: str = Field(min_length=1, max_length=64)
设备连接管理器
class ConnectionManager:
def init(self):
self.active_connections: Dict[str, WebSocket] = {}
async def connect(self, device_id: str, websocket: WebSocket):
await websocket.accept()
self.active_connections[device_id] = websocket
def disconnect(self, device_id: str):
if device_id in self.active_connections:
del self.active_connections[device_id]
async def send_personal_message(self, device_id: str, message: Dict):
if device_id in self.active_connections:
try:
await self.active_connections[device_id].send_json(message)
return True
except Exception as e:
print(f"设备{device_id}消息发送失败: {str(e)}")
self.disconnect(device_id)
return False
return False
manager = ConnectionManager()
设备心跳检测定时任务
async def heartbeat_check_task():
while True:
await asyncio.sleep(30)
db = SessionLocal()
try:
timeout_threshold = datetime.now() - timedelta(minutes=2)
offline_devices = db.query(Device).filter(
Device.last_heartbeat < timeout_threshold,
Device.device_status == "online"
).all()
for device in offline_devices:
device.device_status = "offline"
manager.disconnect(device.device_id)
db.commit()
except Exception as e:
print(f"心跳检测任务异常: {str(e)}")
db.rollback()
finally:
db.close()
应用启动事件
@app.on_event("startup")
async def startup_event():
asyncio.create_task(heartbeat_check_task())
设备WebSocket接入接口
@app.websocket("/ws/device/{device_id}")
async def websocket_device_endpoint(websocket: WebSocket, device_id: str, db: Session = Depends(get_db)):
device = db.query(Device).filter(Device.device_id == device_id).first()
if not device:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
await manager.connect(device_id, websocket)
device.device_status = "online"
device.last_heartbeat = datetime.now()
db.commit()
try:
while True:
data = await websocket.receive_json()
msg_type = data.get("msg_type")
if msg_type == "heartbeat":
device.last_heartbeat = datetime.now()
db.commit()
await websocket.send_json({"msg_type": "heartbeat_ack", "timestamp": datetime.now().isoformat()})
elif msg_type == "send_result":
record_id = data.get("record_id")
send_status = data.get("send_status")
fail_reason = data.get("fail_reason", "")
record = db.query(SendRecord).filter(SendRecord.record_id == record_id).first()
task = db.query(MessageTask).filter(MessageTask.task_id == record.task_id).first()
if record and task:
record.send_status = send_status
record.send_time = datetime.now()
if send_status == "success":
task.success_count += 1
device.total_send_count += 1
device.today_send_count += 1
else:
task.fail_count += 1
record.fail_reason = fail_reason
task.sent_count += 1
if task.sent_count >= task.total_count:
task.task_status = "finished"
db.commit()
except WebSocketDisconnect:
manager.disconnect(device_id)
device.device_status = "offline"
db.commit()
基础管理接口
@app.post("/api/v1/device")
def create_device(device: DeviceCreate, db: Session = Depends(get_db)):
device_id = str(uuid.uuid4()).replace("-", "")
db_device = Device(device_id=device_id, device_name=device.device_name, max_daily_limit=device.max_daily_limit)
db.add(db_device)
db.commit()
db.refresh(db_device)
return {"code": 200, "data": {"device_id": device_id}}
@app.post("/api/v1/task")
def create_task(task: TaskCreate, db: Session = Depends(get_db)):
task_id = str(uuid.uuid4()).replace("-", "")
total_count = len(task.target_phone_list)
db_task = MessageTask(
task_id=task_id,
task_name=task.task_name,
message_content=task.message_content,
target_phone_list=",".join(task.target_phone_list),
total_count=total_count,
send_interval_min=task.send_interval_min,
send_interval_max=task.send_interval_max,
created_by=task.created_by
)
db.add(db_task)
db.commit()
db.refresh(db_task)
return {"code": 200, "data": {"task_id": task_id}}
# 四、多设备集群管理与通信模块代码
```# 多设备负载均衡调度模块
import random
import asyncio
import uuid
from sqlalchemy.orm import Session
from main import get_db, Device, MessageTask, SendRecord, manager
class TaskScheduler:
def __init__(self):
self.running_tasks = set()
async def execute_task(self, task_id: str, db: Session):
if task_id in self.running_tasks:
return
self.running_tasks.add(task_id)
try:
task = db.query(MessageTask).filter(MessageTask.task_id == task_id).first()
if not task or task.task_status != "pending":
return
task.task_status = "running"
db.commit()
phone_list = task.target_phone_list.split(",")
for phone in phone_list:
if task.task_status != "running":
break
# 筛选可用设备:在线、当日发送量未达上限
online_devices = db.query(Device).filter(
Device.device_status == "online",
Device.today_send_count < Device.max_daily_limit
).all()
if not online_devices:
await asyncio.sleep(10)
continue
# 负载均衡:优先选择当日发送量最少的设备
online_devices.sort(key=lambda x: x.today_send_count)
selected_device = online_devices[0]
# 创建发送记录
record_id = str(uuid.uuid4()).replace("-", "")
send_record = SendRecord(
record_id=record_id,
task_id=task_id,
device_id=selected_device.device_id,
target_phone=phone
)
db.add(send_record)
db.commit()
# 下发发送指令
send_success = await manager.send_personal_message(
selected_device.device_id,
{
"msg_type": "send_command",
"record_id": record_id,
"target_phone": phone,
"message_content": task.message_content
}
)
if not send_success:
send_record.send_status = "fail"
send_record.fail_reason = "设备离线,指令下发失败"
task.fail_count += 1
task.sent_count += 1
db.commit()
continue
# 随机间隔规避风控
interval = random.uniform(task.send_interval_min, task.send_interval_max)
await asyncio.sleep(interval)
if task.sent_count >= task.total_count:
task.task_status = "finished"
db.commit()
except Exception as e:
print(f"任务{task_id}执行异常: {str(e)}")
task.task_status = "failed"
db.commit()
finally:
self.running_tasks.remove(task_id)
scheduler = TaskScheduler()
五、消息任务调度与风控策略代码实现
```# 阿里云RocketMQ消息队列接入与风控策略
import json
import asyncio
from aliyun.rocketmq import Client, Producer, Message
from datetime import datetime
from main import get_db, Device
阿里云RocketMQ配置
ROCKETMQ_CONFIG = {
"endpoint": "rmq-xxxx.cn-hangzhou.rmq.aliyuncs.com:8080",
"access_key": "your_access_key",
"secret_key": "your_secret_key",
"instance_id": "rmq-xxxxxx",
"topic": "imessage_send_task",
"group_id": "GID_imessage_system"
}
生产者初始化
def init_producer():
client = Client(
endpoint=ROCKETMQ_CONFIG["endpoint"],
access_key=ROCKETMQ_CONFIG["access_key"],
secret_key=ROCKETMQ_CONFIG["secret_key"]
)
producer = client.create_producer(
instance_id=ROCKETMQ_CONFIG["instance_id"],
topic=ROCKETMQ_CONFIG["topic"]
)
producer.start()
return producer
producer = init_producer()
任务推送到消息队列
def push_task_to_mq(task_id: str):
try:
message = Message(
topic=ROCKETMQ_CONFIG["topic"],
body=json.dumps({"task_id": task_id}).encode("utf-8"),
tag="send_task"
)
producer.send(message)
return True
except Exception as e:
print(f"任务推送MQ失败: {str(e)}")
return False
风控校验服务
class RiskControlService:
def init(self):
self.forbidden_hours = list(range(0, 8))
self.max_daily_limit = 1000
self.min_send_interval = 1.0
def check_send_time(self):
current_hour = datetime.now().hour
if current_hour in self.forbidden_hours:
return False, "禁止在凌晨0-8点执行发送任务"
return True, "校验通过"
def check_device_limit(self, device: Device):
if device.today_send_count >= device.max_daily_limit:
return False, "设备当日发送量已达上限"
if device.max_daily_limit > self.max_daily_limit:
return False, "单日发送上限不得超过1000条"
return True, "校验通过"
def full_check(self, device: Device, min_interval: float):
time_pass, time_msg = self.check_send_time()
if not time_pass:
return False, time_msg
device_pass, device_msg = self.check_device_limit(device)
if not device_pass:
return False, device_msg
if min_interval < self.min_send_interval:
return False, "最小发送间隔不得低于1秒"
return True, "校验通过"
risk_service = RiskControlService()
# 六、设备端原生发送能力适配代码
```# 设备端执行脚本(MacOS环境 Python 3.8+)
import asyncio
import json
import subprocess
import websockets
from datetime import datetime
# 服务端地址(阿里云ECS公网地址)
SERVER_WS_URL = "ws://your_ecs_public_ip:8000/ws/device/{device_id}"
# 设备唯一ID(从服务端创建获取)
DEVICE_ID = "your_device_id"
# 调用系统AppleScript执行iMessage发送
def send_imessage(phone_number: str, message_content: str):
try:
formatted_phone = phone_number.strip()
if not formatted_phone.startswith("+"):
formatted_phone = "+86" + formatted_phone
# 原生发送脚本
applescript = f'''
tell application "Messages"
set targetService to 1st service whose service type = iMessage
set targetBuddy to buddy "{formatted_phone}" of targetService
send "{message_content}" to targetBuddy
end tell
'''
result = subprocess.run(
["osascript", "-e", applescript],
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
return True, "发送成功"
else:
return False, f"执行失败: {result.stderr}"
except subprocess.TimeoutExpired:
return False, "发送超时"
except Exception as e:
return False, f"系统异常: {str(e)}"
# 设备端主循环
async def device_main():
while True:
try:
async with websockets.connect(SERVER_WS_URL.format(device_id=DEVICE_ID)) as websocket:
print(f"[{datetime.now()}] 设备已连接服务端")
# 心跳任务
async def heartbeat():
while True:
await websocket.send(json.dumps({"msg_type": "heartbeat"}))
await asyncio.sleep(15)
asyncio.create_task(heartbeat())
# 监听服务端指令
async for message in websocket:
data = json.loads(message)
if data.get("msg_type") == "send_command":
record_id = data.get("record_id")
phone = data.get("target_phone")
content = data.get("message_content")
success, msg = send_imessage(phone, content)
await websocket.send(json.dumps({
"msg_type": "send_result",
"record_id": record_id,
"send_status": "success" if success else "fail",
"fail_reason": msg if not success else ""
}))
except Exception as e:
print(f"连接异常: {str(e)},10秒后重连")
await asyncio.sleep(10)
if __name__ == "__main__":
asyncio.run(device_main())
七、系统容器化部署与阿里云环境配置
```# 服务端Dockerfile
FROM python:3.10-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt -i https://mirrors.aliyun.com/pypi/simple/
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
```# docker-compose.yml
version: '3.8'
services:
imessage-core:
build: .
container_name: imessage-core
restart: always
ports:
- "8000:8000"
environment:
- TZ=Asia/Shanghai
- DB_HOST=rm-xxxx.mysql.rds.aliyuncs.com
- DB_USER=your_user
- DB_PASS=your_password
volumes:
- ./logs:/app/logs
八、项目落地总结与合规声明
这套iMessage群发系统经过多轮测试与线上迭代,目前可稳定实现对50台以上苹果设备的集中化管理,消息发送成功率稳定在98%以上,完全满足企业合规消息触达的业务需求,基于阿里云基础设施搭建,可根据业务量级快速弹性扩容。
在此郑重声明,本文所有技术内容仅用于企业合规的内部通知、已授权用户服务触达场景,使用者必须严格遵守《中华人民共和国电信条例》《通信短信息和语音呼叫服务管理规定》《个人信息保护法》等相关法律法规,以及苹果官方开发者协议,严禁用于发送未经用户授权的骚扰信息、营销广告、诈骗信息等违规内容,违规使用所产生的一切法律责任由使用者自行承担。