iMessage群发系统:单个ID循环群发技术实现与阿里云部署
群发系统作为自动化信息传递的重要工具,在企业内部通知、客户服务提醒等合规场景中有着广泛应用。本文将基于macOS原生技术栈,实现一套轻量化的iMessage群发系统,重点讲解单个Apple ID循环群发的核心技术实现,以及如何将系统部署到阿里云平台进行稳定运行。所有代码均基于Python 3.10+和AppleScript原生接口开发,无第三方商业SDK依赖,适合开发者进行二次开发和个性化功能扩展。在实际使用过程中,请务必遵守苹果公司的服务条款和相关法律法规,仅用于合法合规的信息传递场景。
一、系统整体设计思路与技术选型
本系统采用模块化设计思想,将整个群发流程拆分为消息发送、任务调度、异常处理、日志记录和配置管理五个独立模块。这种设计不仅提高了代码的可维护性,也便于后续功能扩展。技术选型方面,我们选择Python作为主要开发语言,因为它拥有丰富的标准库和第三方库,能够快速实现各种功能需求。同时,利用macOS系统自带的AppleScript脚本语言与Messages应用进行交互,实现iMessage消息的发送功能。这种原生接口调用方式相比第三方API更加稳定可靠,且不会引入额外的安全风险。
系统的核心工作流程如下:首先从配置文件中读取接收者列表和消息内容,然后通过单个Apple ID按照预设的时间间隔循环发送消息。在发送过程中,系统会实时监控发送状态,对发送失败的消息进行自动重试,并将所有操作记录到日志文件中。当所有消息发送完成后,系统会生成发送报告,统计成功和失败的数量。
二、核心依赖库安装与环境配置
在开始编写代码之前,我们需要先配置好开发环境。本系统运行在macOS 12.0及以上版本,需要安装Python 3.10或更高版本。以下是详细的环境配置步骤和代码实现。
```# 环境检查与依赖安装脚本
import subprocess
import sys
import platform
import os
def check_python_version():
"""检查Python版本是否符合要求"""
required_version = (3, 10)
current_version = sys.version_info
if current_version < required_version:
print(f"错误:Python版本过低。需要Python {required_version[0]}.{required_version[1]} 或更高版本")
print(f"当前版本:Python {current_version[0]}.{current_version[1]}.{current_version[2]}")
return False
print(f"Python版本检查通过:{sys.version}")
return True
def check_macos_version():
"""检查macOS版本是否符合要求"""
if platform.system() != 'Darwin':
print("错误:本系统只能运行在macOS平台上")
return False
macos_version = platform.mac_ver()[0]
version_parts = list(map(int, macos_version.split('.')))
if version_parts[0] < 12:
print(f"错误:macOS版本过低。需要macOS 12.0或更高版本")
print(f"当前版本:macOS {macos_version}")
return False
print(f"macOS版本检查通过:macOS {macos_version}")
return True
def install_required_packages():
"""安装所需的Python依赖包"""
required_packages = [
'pyyaml==6.0.1',
'schedule==1.2.1',
'python-dotenv==1.0.0',
'colorlog==6.7.0'
]
print("开始安装依赖包...")
for package in required_packages:
try:
subprocess.check_call([sys.executable, '-m', 'pip', 'install', package])
print(f"成功安装:{package}")
except subprocess.CalledProcessError as e:
print(f"安装失败:{package},错误代码:{e.returncode}")
return False
print("所有依赖包安装完成")
return True
def create_config_files():
"""创建必要的配置文件和目录"""
directories = ['logs', 'config', 'data']
for directory in directories:
if not os.path.exists(directory):
os.makedirs(directory)
print(f"创建目录:{directory}")
else:
print(f"目录已存在:{directory}")
# 创建默认配置文件
config_content = """# iMessage群发系统配置文件
发送间隔(秒)
send_interval: 5
最大重试次数
max_retries: 3
重试间隔(秒)
retry_interval: 10
日志级别:DEBUG, INFO, WARNING, ERROR, CRITICAL
log_level: INFO
日志保留天数
log_retention_days: 7
"""
config_path = 'config/config.yaml'
if not os.path.exists(config_path):
with open(config_path, 'w', encoding='utf-8') as f:
f.write(config_content)
print(f"创建默认配置文件:{config_path}")
else:
print(f"配置文件已存在:{config_path}")
# 创建示例接收者文件
recipients_content = """# 接收者列表,每行一个号码或邮箱
格式:电话号码(带国家代码)或Apple ID邮箱
+8613800138000
+8613900139000
example@icloud.com
"""
recipients_path = 'data/recipients.txt'
if not os.path.exists(recipients_path):
with open(recipients_path, 'w', encoding='utf-8') as f:
f.write(recipients_content)
print(f"创建示例接收者文件:{recipients_path}")
else:
print(f"接收者文件已存在:{recipients_path}")
# 创建示例消息文件
message_content = """这是一条测试消息。
请不要回复此消息。
"""
message_path = 'data/message.txt'
if not os.path.exists(message_path):
with open(message_path, 'w', encoding='utf-8') as f:
f.write(message_content)
print(f"创建示例消息文件:{message_path}")
else:
print(f"消息文件已存在:{message_path}")
def main():
"""主函数"""
print("=" 60)
print("iMessage群发系统环境配置工具")
print("=" 60)
if not check_python_version():
sys.exit(1)
if not check_macos_version():
sys.exit(1)
if not install_required_packages():
sys.exit(1)
create_config_files()
print("\n" + "=" * 60)
print("环境配置完成!")
print("请编辑data/recipients.txt文件添加接收者列表")
print("请编辑data/message.txt文件修改要发送的消息内容")
print("请编辑config/config.yaml文件调整系统参数")
print("=" * 60)
if name == "main":
main()
将以上代码保存为setup_env.py,在终端中运行python setup_env.py即可完成环境配置。脚本会自动检查Python和macOS版本,安装所需的依赖包,并创建必要的配置文件和目录结构。
# 三、AppleScript消息发送核心实现
iMessage消息发送的核心是通过AppleScript脚本调用macOS系统的Messages应用。AppleScript是苹果公司开发的一种脚本语言,可以直接控制macOS系统中的各种应用程序。以下是实现iMessage消息发送的核心代码。
```# imessage_sender.py
import subprocess
import logging
import time
import os
from typing import Optional
logger = logging.getLogger(__name__)
class IMessagesSender:
"""iMessage消息发送器类"""
def __init__(self):
"""初始化发送器"""
self.applescript_path = 'scripts/send_message.scpt'
self._create_applescript_file()
def _create_applescript_file(self):
"""创建AppleScript脚本文件"""
scripts_dir = os.path.dirname(self.applescript_path)
if not os.path.exists(scripts_dir):
os.makedirs(scripts_dir)
applescript_content = """on run {targetBuddyPhone, targetMessage}
tell application "Messages"
set targetService to 1st service whose service type = iMessage
set targetBuddy to buddy targetBuddyPhone of targetService
send targetMessage to targetBuddy
end tell
end run
"""
with open(self.applescript_path, 'w', encoding='utf-8') as f:
f.write(applescript_content)
logger.debug(f"AppleScript脚本文件已创建:{self.applescript_path}")
def send_message(self, recipient: str, message: str) -> bool:
"""
发送iMessage消息
Args:
recipient: 接收者的电话号码(带国家代码)或Apple ID邮箱
message: 要发送的消息内容
Returns:
发送成功返回True,失败返回False
"""
if not recipient or not message:
logger.error("接收者或消息内容不能为空")
return False
try:
# 构建AppleScript命令
cmd = [
'osascript',
self.applescript_path,
recipient,
message
]
# 执行命令
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30
)
# 检查执行结果
if result.returncode == 0:
logger.info(f"消息发送成功:{recipient}")
return True
else:
logger.error(f"消息发送失败:{recipient},错误信息:{result.stderr}")
return False
except subprocess.TimeoutExpired:
logger.error(f"消息发送超时:{recipient}")
return False
except Exception as e:
logger.error(f"发送消息时发生异常:{recipient},异常信息:{str(e)}", exc_info=True)
return False
def send_message_with_retry(self, recipient: str, message: str, max_retries: int = 3, retry_interval: int = 10) -> bool:
"""
发送消息并在失败时自动重试
Args:
recipient: 接收者
message: 消息内容
max_retries: 最大重试次数
retry_interval: 重试间隔(秒)
Returns:
最终发送成功返回True,否则返回False
"""
for attempt in range(max_retries + 1):
if attempt > 0:
logger.info(f"第{attempt}次重试发送消息:{recipient}")
time.sleep(retry_interval)
if self.send_message(recipient, message):
return True
logger.error(f"消息发送最终失败:{recipient},已重试{max_retries}次")
return False
def test_connection(self) -> bool:
"""
测试Messages应用是否可以正常访问
Returns:
测试通过返回True,失败返回False
"""
try:
# 简单测试:检查Messages应用是否正在运行
cmd = [
'osascript',
'-e',
'tell application "System Events" to (name of processes) contains "Messages"'
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0 and result.stdout.strip() == 'true':
logger.info("Messages应用正在运行")
return True
else:
# 尝试启动Messages应用
logger.info("Messages应用未运行,尝试启动...")
start_cmd = [
'osascript',
'-e',
'tell application "Messages" to activate'
]
subprocess.run(start_cmd, check=True, timeout=30)
time.sleep(5) # 等待应用启动完成
# 再次检查
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0 and result.stdout.strip() == 'true':
logger.info("Messages应用启动成功")
return True
else:
logger.error("无法启动Messages应用")
return False
except Exception as e:
logger.error(f"测试连接时发生异常:{str(e)}", exc_info=True)
return False
这个IMessagesSender类封装了所有与iMessage消息发送相关的功能。它会自动创建必要的AppleScript脚本文件,并提供了发送消息、带重试的发送消息以及测试连接等方法。在发送消息时,系统会调用osascript命令执行AppleScript脚本,从而控制Messages应用发送iMessage消息。
四、单个ID循环群发逻辑设计
单个ID循环群发是本系统的核心功能。其基本思路是从接收者列表中依次取出每个接收者,使用同一个Apple ID按照预设的时间间隔发送消息。这种方式简单可靠,适合小规模的群发场景。以下是循环群发逻辑的实现代码。
```# mass_sender.py
import logging
import time
import yaml
import os
from typing import List, Tuple
from datetime import datetime
from imessage_sender import IMessagesSender
logger = logging.getLogger(name)
class MassSender:
"""单个ID循环群发器类"""
def __init__(self, config_path: str = 'config/config.yaml'):
"""
初始化群发器
Args:
config_path: 配置文件路径
"""
self.config = self._load_config(config_path)
self.sender = IMessagesSender()
self.send_interval = self.config.get('send_interval', 5)
self.max_retries = self.config.get('max_retries', 3)
self.retry_interval = self.config.get('retry_interval', 10)
# 统计信息
self.total_recipients = 0
self.sent_count = 0
self.failed_count = 0
self.start_time = None
self.end_time = None
def _load_config(self, config_path: str) -> dict:
"""
加载配置文件
Args:
config_path: 配置文件路径
Returns:
配置字典
"""
try:
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
logger.info(f"配置文件加载成功:{config_path}")
return config or {}
except Exception as e:
logger.error(f"加载配置文件失败:{str(e)}", exc_info=True)
return {}
def load_recipients(self, file_path: str) -> List[str]:
"""
从文件加载接收者列表
Args:
file_path: 接收者文件路径
Returns:
接收者列表
"""
recipients = []
try:
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
# 跳过空行和注释行
if line and not line.startswith('#'):
recipients.append(line)
logger.info(f"从文件加载了{len(recipients)}个接收者:{file_path}")
return recipients
except Exception as e:
logger.error(f"加载接收者列表失败:{str(e)}", exc_info=True)
return []
def load_message(self, file_path: str) -> str:
"""
从文件加载消息内容
Args:
file_path: 消息文件路径
Returns:
消息内容
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
message = f.read()
logger.info(f"消息内容加载成功:{file_path}")
return message
except Exception as e:
logger.error(f"加载消息内容失败:{str(e)}", exc_info=True)
return ""
def send_to_all(self, recipients: List[str], message: str) -> Tuple[int, int]:
"""
向所有接收者发送消息
Args:
recipients: 接收者列表
message: 消息内容
Returns:
元组(成功发送数量, 失败数量)
"""
if not recipients:
logger.error("接收者列表为空")
return (0, 0)
if not message:
logger.error("消息内容为空")
return (0, 0)
# 初始化统计信息
self.total_recipients = len(recipients)
self.sent_count = 0
self.failed_count = 0
self.start_time = datetime.now()
logger.info("=" * 60)
logger.info(f"开始群发消息")
logger.info(f"总接收者数量:{self.total_recipients}")
logger.info(f"发送间隔:{self.send_interval}秒")
logger.info(f"最大重试次数:{self.max_retries}次")
logger.info("=" * 60)
# 测试连接
if not self.sender.test_connection():
logger.error("无法连接到Messages应用,群发任务终止")
return (0, self.total_recipients)
# 循环发送消息
for index, recipient in enumerate(recipients, 1):
logger.info(f"正在发送第{index}/{self.total_recipients}条消息:{recipient}")
if self.sender.send_message_with_retry(
recipient,
message,
self.max_retries,
self.retry_interval
):
self.sent_count += 1
else:
self.failed_count += 1
# 如果不是最后一个接收者,等待发送间隔
if index < self.total_recipients:
logger.debug(f"等待{self.send_interval}秒后发送下一条消息")
time.sleep(self.send_interval)
# 记录结束时间
self.end_time = datetime.now()
# 生成发送报告
self._generate_report()
return (self.sent_count, self.failed_count)
def _generate_report(self):
"""生成发送报告"""
if self.start_time is None or self.end_time is None:
return
duration = self.end_time - self.start_time
duration_seconds = duration.total_seconds()
logger.info("=" * 60)
logger.info("群发任务完成")
logger.info(f"开始时间:{self.start_time.strftime('%Y-%m-%d %H:%M:%S')}")
logger.info(f"结束时间:{self.end_time.strftime('%Y-%m-%d %H:%M:%S')}")
logger.info(f"总耗时:{duration_seconds:.2f}秒")
logger.info(f"总接收者数量:{self.total_recipients}")
logger.info(f"成功发送:{self.sent_count}条")
logger.info(f"发送失败:{self.failed_count}条")
logger.info(f"成功率:{(self.sent_count/self.total_recipients)*100:.2f}%" if self.total_recipients > 0 else "成功率:0%")
logger.info("=" * 60)
# 保存报告到文件
report_dir = 'reports'
if not os.path.exists(report_dir):
os.makedirs(report_dir)
report_filename = f"report_{self.start_time.strftime('%Y%m%d_%H%M%S')}.txt"
report_path = os.path.join(report_dir, report_filename)
report_content = f"""iMessage群发任务报告
==============================
开始时间:{self.start_time.strftime('%Y-%m-%d %H:%M:%S')}
结束时间:{self.end_time.strftime('%Y-%m-%d %H:%M:%S')}
总耗时:{duration_seconds:.2f}秒
总接收者数量:{self.total_recipients}
成功发送:{self.sent_count}条
发送失败:{self.failed_count}条
成功率:{(self.sent_count/self.total_recipients)*100:.2f}%
"""
with open(report_path, 'w', encoding='utf-8') as f:
f.write(report_content)
logger.info(f"发送报告已保存到:{report_path}")
def main():
"""主函数"""
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/mass_sender.log', encoding='utf-8'),
logging.StreamHandler()
]
)
# 创建群发器实例
mass_sender = MassSender()
# 加载接收者列表和消息内容
recipients = mass_sender.load_recipients('data/recipients.txt')
message = mass_sender.load_message('data/message.txt')
# 开始群发
if recipients and message:
mass_sender.send_to_all(recipients, message)
else:
logger.error("接收者列表或消息内容为空,无法进行群发")
if name == "main":
main()
这个MassSender类实现了单个ID循环群发的核心逻辑。它从配置文件中读取发送间隔、最大重试次数等参数,从文件中加载接收者列表和消息内容,然后依次向每个接收者发送消息。在发送过程中,系统会实时记录发送状态,并在任务完成后生成详细的发送报告。
# 五、异常处理与重试机制
在实际运行过程中,群发系统可能会遇到各种异常情况,如网络中断、Messages应用崩溃、接收者号码无效等。为了提高系统的稳定性和可靠性,我们需要实现完善的异常处理和重试机制。以下是相关的代码实现。
```# exception_handler.py
import logging
import time
import traceback
from functools import wraps
from typing import Callable, Any, Type, Tuple
logger = logging.getLogger(__name__)
def retry_on_exception(
max_retries: int = 3,
retry_interval: int = 5,
exceptions: Tuple[Type[Exception], ...] = (Exception,),
backoff_factor: float = 1.0
):
"""
重试装饰器,在函数抛出指定异常时自动重试
Args:
max_retries: 最大重试次数
retry_interval: 初始重试间隔(秒)
exceptions: 需要重试的异常类型元组
backoff_factor: 退避因子,每次重试间隔会乘以这个因子
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
last_exception = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt < max_retries:
wait_time = retry_interval * (backoff_factor ** attempt)
logger.warning(
f"函数{func.__name__}执行失败,{wait_time:.2f}秒后进行第{attempt+1}次重试。"
f"异常信息:{str(e)}"
)
time.sleep(wait_time)
else:
logger.error(
f"函数{func.__name__}执行最终失败,已重试{max_retries}次。"
f"异常信息:{str(e)}",
exc_info=True
)
# 如果所有重试都失败,抛出最后一个异常
raise last_exception
return wrapper
return decorator
class ExceptionHandler:
"""全局异常处理器类"""
@staticmethod
def handle_exception(e: Exception, context: str = "") -> None:
"""
处理异常
Args:
e: 异常对象
context: 异常发生的上下文信息
"""
error_message = f"发生异常:{str(e)}"
if context:
error_message = f"在{context}时{error_message}"
logger.error(error_message, exc_info=True)
# 记录详细的堆栈信息
stack_trace = traceback.format_exc()
logger.debug(f"详细堆栈信息:\n{stack_trace}")
@staticmethod
def safe_execute(func: Callable, *args, **kwargs) -> Tuple[bool, Any]:
"""
安全执行函数,捕获所有异常
Args:
func: 要执行的函数
*args: 函数参数
**kwargs: 函数关键字参数
Returns:
元组(是否成功, 函数返回值或异常对象)
"""
try:
result = func(*args, **kwargs)
return (True, result)
except Exception as e:
ExceptionHandler.handle_exception(e, f"执行函数{func.__name__}")
return (False, e)
# 使用示例
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
# 示例1:使用重试装饰器
@retry_on_exception(max_retries=3, retry_interval=2, exceptions=(ValueError,))
def risky_function():
import random
if random.random() < 0.7:
raise ValueError("随机错误")
return "成功"
try:
result = risky_function()
print(f"函数执行结果:{result}")
except Exception as e:
print(f"函数最终失败:{e}")
# 示例2:使用安全执行函数
def another_risky_function():
raise RuntimeError("运行时错误")
success, result = ExceptionHandler.safe_execute(another_risky_function)
if success:
print(f"函数执行成功:{result}")
else:
print(f"函数执行失败:{result}")
这个异常处理模块提供了两种异常处理方式:重试装饰器和安全执行函数。重试装饰器可以在函数抛出指定异常时自动重试,适用于可能出现临时故障的操作。安全执行函数可以捕获函数执行过程中的所有异常,避免程序崩溃。
六、速率控制与风控规避策略
苹果公司对iMessage的发送频率有严格的限制,如果发送速度过快,很容易触发风控机制,导致Apple ID被限制甚至封禁。因此,我们需要实现合理的速率控制和风控规避策略。以下是相关的代码实现。
```# rate_controller.py
import time
import logging
import random
from datetime import datetime, timedelta
from typing import Dict, List
logger = logging.getLogger(name)
class RateController:
"""速率控制器类"""
def __init__(
self,
min_interval: float = 5.0,
max_interval: float = 10.0,
hourly_limit: int = 100,
daily_limit: int = 500,
enable_randomization: bool = True
):
"""
初始化速率控制器
Args:
min_interval: 最小发送间隔(秒)
max_interval: 最大发送间隔(秒)
hourly_limit: 每小时发送限制
daily_limit: 每日发送限制
enable_randomization: 是否启用随机间隔
"""
self.min_interval = min_interval
self.max_interval = max_interval
self.hourly_limit = hourly_limit
self.daily_limit = daily_limit
self.enable_randomization = enable_randomization
# 发送记录
self.send_history: List[datetime] = []
def _cleanup_old_records(self):
"""清理超过24小时的发送记录"""
now = datetime.now()
cutoff_time = now - timedelta(hours=24)
self.send_history = [t for t in self.send_history if t > cutoff_time]
def get_hourly_count(self) -> int:
"""获取过去一小时内的发送数量"""
self._cleanup_old_records()
now = datetime.now()
cutoff_time = now - timedelta(hours=1)
return sum(1 for t in self.send_history if t > cutoff_time)
def get_daily_count(self) -> int:
"""获取过去24小时内的发送数量"""
self._cleanup_old_records()
return len(self.send_history)
def get_next_interval(self) -> float:
"""
获取下一次发送的间隔时间
Returns:
间隔时间(秒)
"""
if self.enable_randomization:
interval = random.uniform(self.min_interval, self.max_interval)
else:
interval = (self.min_interval + self.max_interval) / 2
logger.debug(f"计算得到的发送间隔:{interval:.2f}秒")
return interval
def wait_for_next_send(self) -> None:
"""等待到下一次可以发送消息的时间"""
# 检查每小时限制
hourly_count = self.get_hourly_count()
if hourly_count >= self.hourly_limit:
now = datetime.now()
# 计算需要等待到下一个小时
next_hour = (now + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
wait_time = (next_hour - now).total_seconds()
logger.warning(
f"已达到每小时发送限制({self.hourly_limit}条),"
f"将等待{wait_time/60:.2f}分钟到下一个小时"
)
time.sleep(wait_time)
# 检查每日限制
daily_count = self.get_daily_count()
if daily_count >= self.daily_limit:
now = datetime.now()
# 计算需要等待到第二天
next_day = (now + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
wait_time = (next_day - now).total_seconds()
logger.warning(
f"已达到每日发送限制({self.daily_limit}条),"
f"将等待{wait_time/3600:.2f}小时到第二天"
)
time.sleep(wait_time)
# 等待基本间隔
interval = self.get_next_interval()
logger.debug(f"等待{interval:.2f}秒后发送下一条消息")
time.sleep(interval)
def record_send(self) -> None:
"""记录一次发送操作"""
self.send_history.append(datetime.now())
logger.debug(f"记录发送操作,当前总发送记录数:{len(self.send_history)}")
def get_status(self) -> Dict:
"""获取当前速率控制状态"""
return {
'hourly_count': self.get_hourly_count(),
'hourly_limit': self.hourly_limit,
'daily_count': self.get_daily_count(),
'daily_limit': self.daily_limit,
'min_interval': self.min_interval,
'max_interval': self.max_interval,
'enable_randomization': self.enable_randomization
}
使用示例
if name == "main":
logging.basicConfig(level=logging.INFO)
# 创建速率控制器
rate_controller = RateController(
min_interval=3.0,
max_interval=8.0,
hourly_limit=50,
daily_limit=200
)
# 模拟发送10条消息
for i in range(10):
print(f"发送第{i+1}条消息")
rate_controller.record_send()
if i < 9:
rate_controller.wait_for_next_send()
# 打印状态
status = rate_controller.get_status()
print("\n速率控制状态:")
for key, value in status.items():
print(f"{key}: {value}")
这个RateController类实现了完善的速率控制功能。它支持设置最小和最大发送间隔、每小时发送限制和每日发送限制。同时,它还支持随机间隔功能,可以使发送行为更加自然,降低被风控系统检测到的风险。在每次发送消息前,系统会检查是否达到了发送限制,如果达到了限制,会自动等待到限制解除后再继续发送。
# 七、阿里云部署与定时任务配置
为了实现群发系统的稳定运行和自动化执行,我们可以将系统部署到阿里云服务器上,并配置定时任务。以下是阿里云部署和定时任务配置的相关代码和说明。
```# deploy_aliyun.py
import os
import subprocess
import logging
from typing import List
logger = logging.getLogger(__name__)
class AliyunDeployer:
"""阿里云部署器类"""
def __init__(self, project_name: str = 'imessage-mass-sender'):
"""
初始化部署器
Args:
project_name: 项目名称
"""
self.project_name = project_name
self.remote_user = 'root'
self.remote_host = '' # 替换为你的阿里云服务器IP
self.remote_path = f'/opt/{project_name}'
def _run_command(self, cmd: List[str], cwd: str = None) -> bool:
"""
运行本地命令
Args:
cmd: 命令列表
cwd: 工作目录
Returns:
命令执行成功返回True,失败返回False
"""
try:
logger.info(f"执行命令:{' '.join(cmd)}")
result = subprocess.run(
cmd,
cwd=cwd,
capture_output=True,
text=True,
check=True
)
logger.info(f"命令输出:{result.stdout}")
return True
except subprocess.CalledProcessError as e:
logger.error(f"命令执行失败:{e.stderr}")
return False
def _run_remote_command(self, cmd: str) -> bool:
"""
运行远程命令
Args:
cmd: 要执行的命令
Returns:
命令执行成功返回True,失败返回False
"""
if not self.remote_host:
logger.error("未设置远程服务器地址")
return False
ssh_cmd = [
'ssh',
f'{self.remote_user}@{self.remote_host}',
cmd
]
return self._run_command(ssh_cmd)
def upload_files(self) -> bool:
"""
上传项目文件到远程服务器
Returns:
上传成功返回True,失败返回False
"""
if not self.remote_host:
logger.error("未设置远程服务器地址")
return False
# 创建远程目录
if not self._run_remote_command(f'mkdir -p {self.remote_path}'):
logger.error("创建远程目录失败")
return False
# 上传文件
scp_cmd = [
'scp',
'-r',
'*.py',
'config/',
'data/',
'scripts/',
f'{self.remote_user}@{self.remote_host}:{self.remote_path}/'
]
if not self._run_command(scp_cmd):
logger.error("上传文件失败")
return False
logger.info("文件上传成功")
return True
def install_dependencies(self) -> bool:
"""
在远程服务器上安装依赖
Returns:
安装成功返回True,失败返回False
"""
commands = [
'apt update',
'apt install -y python3 python3-pip python3-venv',
f'cd {self.remote_path} && python3 -m venv venv',
f'cd {self.remote_path} && source venv/bin/activate && pip install -r requirements.txt'
]
for cmd in commands:
if not self._run_remote_command(cmd):
logger.error(f"执行命令失败:{cmd}")
return False
logger.info("依赖安装成功")
return True
def create_systemd_service(self) -> bool:
"""
创建systemd服务文件
Returns:
创建成功返回True,失败返回False
"""
service_content = f"""[Unit]
Description=iMessage Mass Sender Service
After=network.target
[Service]
Type=simple
User={self.remote_user}
WorkingDirectory={self.remote_path}
ExecStart={self.remote_path}/venv/bin/python {self.remote_path}/mass_sender.py
Restart=on-failure
RestartSec=5
StandardOutput=journal+console
StandardError=journal+console
[Install]
WantedBy=multi-user.target
"""
# 将服务文件写入远程服务器
cmd = f"cat > /etc/systemd/system/{self.project_name}.service << 'EOF'\n{service_content}\nEOF"
if not self._run_remote_command(cmd):
logger.error("创建systemd服务文件失败")
return False
# 重新加载systemd配置
if not self._run_remote_command('systemctl daemon-reload'):
logger.error("重新加载systemd配置失败")
return False
logger.info("systemd服务创建成功")
return True
def configure_cron_job(self, schedule: str = '0 9 * * *') -> bool:
"""
配置定时任务
Args:
schedule: cron表达式,默认每天早上9点执行
Returns:
配置成功返回True,失败返回False
"""
# 创建cron任务
cron_cmd = f'(crontab -l 2>/dev/null | grep -v "{self.project_name}"; echo "{schedule} cd {self.remote_path} && {self.remote_path}/venv/bin/python {self.remote_path}/mass_sender.py >> {self.remote_path}/logs/cron.log 2>&1") | crontab -'
if not self._run_remote_command(cron_cmd):
logger.error("配置定时任务失败")
return False
logger.info(f"定时任务配置成功,执行时间:{schedule}")
return True
def deploy(self) -> bool:
"""
执行完整部署流程
Returns:
部署成功返回True,失败返回False
"""
logger.info("开始部署iMessage群发系统到阿里云服务器")
steps = [
("上传文件", self.upload_files),
("安装依赖", self.install_dependencies),
("创建systemd服务", self.create_systemd_service),
("配置定时任务", self.configure_cron_job)
]
for step_name, step_func in steps:
logger.info(f"执行步骤:{step_name}")
if not step_func():
logger.error(f"部署失败:{step_name}")
return False
logger.info("部署完成!")
logger.info(f"使用以下命令启动服务:systemctl start {self.project_name}")
logger.info(f"使用以下命令查看服务状态:systemctl status {self.project_name}")
logger.info(f"使用以下命令查看日志:journalctl -u {self.project_name} -f")
return True
def main():
"""主函数"""
logging.basicConfig(level=logging.INFO)
deployer = AliyunDeployer()
# 设置远程服务器地址
deployer.remote_host = 'your_server_ip' # 替换为你的阿里云服务器IP
deployer.remote_user = 'root' # 替换为你的服务器用户名
deployer.deploy()
if __name__ == "__main__":
main()
这个AliyunDeployer类提供了将群发系统部署到阿里云服务器的完整功能。它可以自动上传项目文件、安装依赖、创建systemd服务和配置定时任务。在使用时,只需要设置好远程服务器的IP地址和用户名,然后运行deploy()方法即可完成部署。
八、系统运行监控与日志管理
为了确保群发系统的稳定运行,我们需要实现完善的运行监控和日志管理功能。以下是相关的代码实现。
```# logger_config.py
import logging
import os
from datetime import datetime
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
import colorlog
def setup_logger(
name: str = 'imessage_mass_sender',
log_dir: str = 'logs',
log_level: int = logging.INFO,
max_bytes: int = 10 1024 1024, # 10MB
backup_count: int = 5,
retention_days: int = 7
) -> logging.Logger:
"""
配置日志系统
Args:
name: 日志器名称
log_dir: 日志目录
log_level: 日志级别
max_bytes: 单个日志文件最大大小
backup_count: 保留的备份文件数量
retention_days: 日志保留天数
Returns:
配置好的日志器
"""
# 创建日志目录
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 创建日志器
logger = logging.getLogger(name)
logger.setLevel(log_level)
logger.propagate = False
# 清除已有的处理器
for handler in logger.handlers[:]:
logger.removeHandler(handler)
# 创建格式器
file_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(module)s:%(lineno)d - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
console_formatter = colorlog.ColoredFormatter(
'%(log_color)s%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
log_colors={
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'red,bg_white',
}
)
# 创建文件处理器(按大小轮转)
current_date = datetime.now().strftime('%Y%m%d')
file_handler = RotatingFileHandler(
os.path.join(log_dir, f'{name}_{current_date}.log'),
maxBytes=max_bytes,
backupCount=backup_count,
encoding='utf-8'
)
file_handler.setFormatter(file_formatter)
file_handler.setLevel(log_level)
# 创建错误文件处理器(只记录错误及以上级别)
error_file_handler = RotatingFileHandler(
os.path.join(log_dir, f'{name}_error_{current_date}.log'),
maxBytes=max_bytes,
backupCount=backup_count,
encoding='utf-8'
)
error_file_handler.setFormatter(file_formatter)
error_file_handler.setLevel(logging.ERROR)
# 创建控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(console_formatter)
console_handler.setLevel(log_level)
# 添加处理器到日志器
logger.addHandler(file_handler)
logger.addHandler(error_file_handler)
logger.addHandler(console_handler)
# 清理过期日志
cleanup_old_logs(log_dir, retention_days)
return logger
def cleanup_old_logs(log_dir: str, retention_days: int) -> None:
"""
清理过期的日志文件
Args:
log_dir: 日志目录
retention_days: 日志保留天数
"""
if not os.path.exists(log_dir):
return
now = datetime.now()
cutoff_time = now - datetime.timedelta(days=retention_days)
for filename in os.listdir(log_dir):
file_path = os.path.join(log_dir, filename)
if not os.path.isfile(file_path):
continue
# 只处理.log文件
if not filename.endswith('.log'):
continue
# 获取文件修改时间
mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
if mtime < cutoff_time:
try:
os.remove(file_path)
logging.info(f"删除过期日志文件:{file_path}")
except Exception as e:
logging.error(f"删除过期日志文件失败:{file_path},错误:{str(e)}")
class SystemMonitor:
"""系统监控器类"""
def __init__(self, logger: logging.Logger):
"""
初始化监控器
Args:
logger: 日志器实例
"""
self.logger = logger
self.start_time = datetime.now()
def get_system_info(self) -> dict:
"""
获取系统信息
Returns:
系统信息字典
"""
import platform
import psutil
try:
# 获取CPU信息
cpu_percent = psutil.cpu_percent(interval=1)
cpu_count = psutil.cpu_count()
# 获取内存信息
memory = psutil.virtual_memory()
memory_total = memory.total / (1024 ** 3) # GB
memory_used = memory.used / (1024 ** 3) # GB
memory_percent = memory.percent
# 获取磁盘信息
disk = psutil.disk_usage('/')
disk_total = disk.total / (1024 ** 3) # GB
disk_used = disk.used / (1024 ** 3) # GB
disk_percent = disk.percent
# 获取运行时间
uptime = datetime.now() - self.start_time
return {
'system': platform.system(),
'node': platform.node(),
'release': platform.release(),
'version': platform.version(),
'machine': platform.machine(),
'cpu_percent': cpu_percent,
'cpu_count': cpu_count,
'memory_total_gb': round(memory_total, 2),
'memory_used_gb': round(memory_used, 2),
'memory_percent': memory_percent,
'disk_total_gb': round(disk_total, 2),
'disk_used_gb': round(disk_used, 2),
'disk_percent': disk_percent,
'uptime_seconds': uptime.total_seconds(),
'uptime_str': str(uptime).split('.')[0]
}
except Exception as e:
self.logger.error(f"获取系统信息失败:{str(e)}")
return {}
def log_system_status(self) -> None:
"""记录系统状态到日志"""
system_info = self.get_system_info()
if not system_info:
return
self.logger.info("=" * 60)
self.logger.info("系统状态信息")
self.logger.info("=" * 60)
self.logger.info(f"系统:{system_info.get('system')} {system_info.get('release')}")
self.logger.info(f"主机名:{system_info.get('node')}")
self.logger.info(f"CPU使用率:{system_info.get('cpu_percent')}% ({system_info.get('cpu_count')}核)")
self.logger.info(f"内存使用:{system_info.get('memory_used_gb')}GB/{system_info.get('memory_total_gb')}GB ({system_info.get('memory_percent')}%)")
self.logger.info(f"磁盘使用:{system_info.get('disk_used_gb')}GB/{system_info.get('disk_total_gb')}GB ({system_info.get('disk_percent')}%)")
self.logger.info(f"运行时间:{system_info.get('uptime_str')}")
self.logger.info("=" * 60)
使用示例
if name == "main":
# 配置日志
logger = setup_logger(log_level=logging.DEBUG)
# 创建系统监控器
monitor = SystemMonitor(logger)
# 记录系统状态
monitor.log_system_status()
# 测试日志
logger.debug("这是一条调试信息")
logger.info("这是一条普通信息")
logger.warning("这是一条警告信息")
logger.error("这是一条错误信息")
logger.critical("这是一条严重错误信息")
```
这个日志配置模块提供了完善的日志管理功能。它支持按大小轮转日志文件、按日期命名日志文件、自动清理过期日志等功能。同时,它还提供了彩色控制台输出,使日志信息更加易读。系统监控器类可以获取系统的CPU、内存、磁盘等使用情况,并将这些信息记录到日志中,帮助开发者了解系统的运行状态。
通过以上八个模块的实现,我们完成了一套完整的iMessage群发系统。该系统支持单个Apple ID循环群发消息,具有完善的异常处理、速率控制、日志管理和阿里云部署功能。在实际使用过程中,请务必遵守相关法律法规和苹果公司的服务条款,仅用于合法合规的信息传递场景。