macOS原生iMessage群发系统:单个ID循环发送技术详解

简介: 群发系统作为自动化信息传递的重要工具,在企业内部通知、客户服务提醒等合规场景中有着广泛应用。本文将基于macOS原生技术栈,实现一套轻量化的iMessage群发系统,重点讲解单个Apple ID循环群发的核心技术实现,以及如何将系统部署到阿里云平台进行稳定运行。所有代码均基于Python 3.10+和AppleScript原生接口开发,无第三方商业SDK依赖,适合开发者进行二次开发和个性化功能扩展。在实际使用过程中,请务必遵守苹果公司的服务条款和相关法律法规,仅用于合法合规的信息传递场景。

iMessage群发系统:单个ID循环群发技术实现与阿里云部署

群发系统作为自动化信息传递的重要工具,在企业内部通知、客户服务提醒等合规场景中有着广泛应用。本文将基于macOS原生技术栈,实现一套轻量化的iMessage群发系统,重点讲解单个Apple ID循环群发的核心技术实现,以及如何将系统部署到阿里云平台进行稳定运行。所有代码均基于Python 3.10+和AppleScript原生接口开发,无第三方商业SDK依赖,适合开发者进行二次开发和个性化功能扩展。在实际使用过程中,请务必遵守苹果公司的服务条款和相关法律法规,仅用于合法合规的信息传递场景。

一、系统整体设计思路与技术选型

本系统采用模块化设计思想,将整个群发流程拆分为消息发送、任务调度、异常处理、日志记录和配置管理五个独立模块。这种设计不仅提高了代码的可维护性,也便于后续功能扩展。技术选型方面,我们选择Python作为主要开发语言,因为它拥有丰富的标准库和第三方库,能够快速实现各种功能需求。同时,利用macOS系统自带的AppleScript脚本语言与Messages应用进行交互,实现iMessage消息的发送功能。这种原生接口调用方式相比第三方API更加稳定可靠,且不会引入额外的安全风险。
系统的核心工作流程如下:首先从配置文件中读取接收者列表和消息内容,然后通过单个Apple ID按照预设的时间间隔循环发送消息。在发送过程中,系统会实时监控发送状态,对发送失败的消息进行自动重试,并将所有操作记录到日志文件中。当所有消息发送完成后,系统会生成发送报告,统计成功和失败的数量。

二、核心依赖库安装与环境配置

在开始编写代码之前,我们需要先配置好开发环境。本系统运行在macOS 12.0及以上版本,需要安装Python 3.10或更高版本。以下是详细的环境配置步骤和代码实现。
```# 环境检查与依赖安装脚本
import subprocess
import sys
import platform
import os

def check_python_version():
"""检查Python版本是否符合要求"""
required_version = (3, 10)
current_version = sys.version_info

if current_version < required_version:
    print(f"错误:Python版本过低。需要Python {required_version[0]}.{required_version[1]} 或更高版本")
    print(f"当前版本:Python {current_version[0]}.{current_version[1]}.{current_version[2]}")
    return False
print(f"Python版本检查通过:{sys.version}")
return True

def check_macos_version():
"""检查macOS版本是否符合要求"""
if platform.system() != 'Darwin':
print("错误:本系统只能运行在macOS平台上")
return False

macos_version = platform.mac_ver()[0]
version_parts = list(map(int, macos_version.split('.')))

if version_parts[0] < 12:
    print(f"错误:macOS版本过低。需要macOS 12.0或更高版本")
    print(f"当前版本:macOS {macos_version}")
    return False
print(f"macOS版本检查通过:macOS {macos_version}")
return True

def install_required_packages():
"""安装所需的Python依赖包"""
required_packages = [
'pyyaml==6.0.1',
'schedule==1.2.1',
'python-dotenv==1.0.0',
'colorlog==6.7.0'
]

print("开始安装依赖包...")
for package in required_packages:
    try:
        subprocess.check_call([sys.executable, '-m', 'pip', 'install', package])
        print(f"成功安装:{package}")
    except subprocess.CalledProcessError as e:
        print(f"安装失败:{package},错误代码:{e.returncode}")
        return False
print("所有依赖包安装完成")
return True

def create_config_files():
"""创建必要的配置文件和目录"""
directories = ['logs', 'config', 'data']

for directory in directories:
    if not os.path.exists(directory):
        os.makedirs(directory)
        print(f"创建目录:{directory}")
    else:
        print(f"目录已存在:{directory}")

# 创建默认配置文件
config_content = """# iMessage群发系统配置文件

发送间隔(秒)

send_interval: 5

最大重试次数

max_retries: 3

重试间隔(秒)

retry_interval: 10

日志级别:DEBUG, INFO, WARNING, ERROR, CRITICAL

log_level: INFO

日志保留天数

log_retention_days: 7
"""

config_path = 'config/config.yaml'
if not os.path.exists(config_path):
    with open(config_path, 'w', encoding='utf-8') as f:
        f.write(config_content)
    print(f"创建默认配置文件:{config_path}")
else:
    print(f"配置文件已存在:{config_path}")

# 创建示例接收者文件
recipients_content = """# 接收者列表,每行一个号码或邮箱

格式:电话号码(带国家代码)或Apple ID邮箱

+8613800138000
+8613900139000
example@icloud.com
"""

recipients_path = 'data/recipients.txt'
if not os.path.exists(recipients_path):
    with open(recipients_path, 'w', encoding='utf-8') as f:
        f.write(recipients_content)
    print(f"创建示例接收者文件:{recipients_path}")
else:
    print(f"接收者文件已存在:{recipients_path}")

# 创建示例消息文件
message_content = """这是一条测试消息。

请不要回复此消息。
"""

message_path = 'data/message.txt'
if not os.path.exists(message_path):
    with open(message_path, 'w', encoding='utf-8') as f:
        f.write(message_content)
    print(f"创建示例消息文件:{message_path}")
else:
    print(f"消息文件已存在:{message_path}")

def main():
"""主函数"""
print("=" 60)
print("iMessage群发系统环境配置工具")
print("="
60)

if not check_python_version():
    sys.exit(1)

if not check_macos_version():
    sys.exit(1)

if not install_required_packages():
    sys.exit(1)

create_config_files()

print("\n" + "=" * 60)
print("环境配置完成!")
print("请编辑data/recipients.txt文件添加接收者列表")
print("请编辑data/message.txt文件修改要发送的消息内容")
print("请编辑config/config.yaml文件调整系统参数")
print("=" * 60)

if name == "main":
main()

将以上代码保存为setup_env.py,在终端中运行python setup_env.py即可完成环境配置。脚本会自动检查Python和macOS版本,安装所需的依赖包,并创建必要的配置文件和目录结构。
# 三、AppleScript消息发送核心实现
iMessage消息发送的核心是通过AppleScript脚本调用macOS系统的Messages应用。AppleScript是苹果公司开发的一种脚本语言,可以直接控制macOS系统中的各种应用程序。以下是实现iMessage消息发送的核心代码。
```# imessage_sender.py
import subprocess
import logging
import time
import os
from typing import Optional

logger = logging.getLogger(__name__)

class IMessagesSender:
    """iMessage消息发送器类"""

    def __init__(self):
        """初始化发送器"""
        self.applescript_path = 'scripts/send_message.scpt'
        self._create_applescript_file()

    def _create_applescript_file(self):
        """创建AppleScript脚本文件"""
        scripts_dir = os.path.dirname(self.applescript_path)
        if not os.path.exists(scripts_dir):
            os.makedirs(scripts_dir)

        applescript_content = """on run {targetBuddyPhone, targetMessage}
    tell application "Messages"
        set targetService to 1st service whose service type = iMessage
        set targetBuddy to buddy targetBuddyPhone of targetService
        send targetMessage to targetBuddy
    end tell
end run
"""

        with open(self.applescript_path, 'w', encoding='utf-8') as f:
            f.write(applescript_content)

        logger.debug(f"AppleScript脚本文件已创建:{self.applescript_path}")

    def send_message(self, recipient: str, message: str) -> bool:
        """
        发送iMessage消息

        Args:
            recipient: 接收者的电话号码(带国家代码)或Apple ID邮箱
            message: 要发送的消息内容

        Returns:
            发送成功返回True,失败返回False
        """
        if not recipient or not message:
            logger.error("接收者或消息内容不能为空")
            return False

        try:
            # 构建AppleScript命令
            cmd = [
                'osascript',
                self.applescript_path,
                recipient,
                message
            ]

            # 执行命令
            result = subprocess.run(
                cmd,
                capture_output=True,
                text=True,
                timeout=30
            )

            # 检查执行结果
            if result.returncode == 0:
                logger.info(f"消息发送成功:{recipient}")
                return True
            else:
                logger.error(f"消息发送失败:{recipient},错误信息:{result.stderr}")
                return False

        except subprocess.TimeoutExpired:
            logger.error(f"消息发送超时:{recipient}")
            return False
        except Exception as e:
            logger.error(f"发送消息时发生异常:{recipient},异常信息:{str(e)}", exc_info=True)
            return False

    def send_message_with_retry(self, recipient: str, message: str, max_retries: int = 3, retry_interval: int = 10) -> bool:
        """
        发送消息并在失败时自动重试

        Args:
            recipient: 接收者
            message: 消息内容
            max_retries: 最大重试次数
            retry_interval: 重试间隔(秒)

        Returns:
            最终发送成功返回True,否则返回False
        """
        for attempt in range(max_retries + 1):
            if attempt > 0:
                logger.info(f"第{attempt}次重试发送消息:{recipient}")
                time.sleep(retry_interval)

            if self.send_message(recipient, message):
                return True

        logger.error(f"消息发送最终失败:{recipient},已重试{max_retries}次")
        return False

    def test_connection(self) -> bool:
        """
        测试Messages应用是否可以正常访问

        Returns:
            测试通过返回True,失败返回False
        """
        try:
            # 简单测试:检查Messages应用是否正在运行
            cmd = [
                'osascript',
                '-e',
                'tell application "System Events" to (name of processes) contains "Messages"'
            ]

            result = subprocess.run(
                cmd,
                capture_output=True,
                text=True,
                timeout=10
            )

            if result.returncode == 0 and result.stdout.strip() == 'true':
                logger.info("Messages应用正在运行")
                return True
            else:
                # 尝试启动Messages应用
                logger.info("Messages应用未运行,尝试启动...")
                start_cmd = [
                    'osascript',
                    '-e',
                    'tell application "Messages" to activate'
                ]

                subprocess.run(start_cmd, check=True, timeout=30)
                time.sleep(5)  # 等待应用启动完成

                # 再次检查
                result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
                if result.returncode == 0 and result.stdout.strip() == 'true':
                    logger.info("Messages应用启动成功")
                    return True
                else:
                    logger.error("无法启动Messages应用")
                    return False

        except Exception as e:
            logger.error(f"测试连接时发生异常:{str(e)}", exc_info=True)
            return False

这个IMessagesSender类封装了所有与iMessage消息发送相关的功能。它会自动创建必要的AppleScript脚本文件,并提供了发送消息、带重试的发送消息以及测试连接等方法。在发送消息时,系统会调用osascript命令执行AppleScript脚本,从而控制Messages应用发送iMessage消息。

四、单个ID循环群发逻辑设计

单个ID循环群发是本系统的核心功能。其基本思路是从接收者列表中依次取出每个接收者,使用同一个Apple ID按照预设的时间间隔发送消息。这种方式简单可靠,适合小规模的群发场景。以下是循环群发逻辑的实现代码。
```# mass_sender.py
import logging
import time
import yaml
import os
from typing import List, Tuple
from datetime import datetime
from imessage_sender import IMessagesSender

logger = logging.getLogger(name)

class MassSender:
"""单个ID循环群发器类"""

def __init__(self, config_path: str = 'config/config.yaml'):
    """
    初始化群发器

    Args:
        config_path: 配置文件路径
    """
    self.config = self._load_config(config_path)
    self.sender = IMessagesSender()
    self.send_interval = self.config.get('send_interval', 5)
    self.max_retries = self.config.get('max_retries', 3)
    self.retry_interval = self.config.get('retry_interval', 10)

    # 统计信息
    self.total_recipients = 0
    self.sent_count = 0
    self.failed_count = 0
    self.start_time = None
    self.end_time = None

def _load_config(self, config_path: str) -> dict:
    """
    加载配置文件

    Args:
        config_path: 配置文件路径

    Returns:
        配置字典
    """
    try:
        with open(config_path, 'r', encoding='utf-8') as f:
            config = yaml.safe_load(f)
        logger.info(f"配置文件加载成功:{config_path}")
        return config or {}
    except Exception as e:
        logger.error(f"加载配置文件失败:{str(e)}", exc_info=True)
        return {}

def load_recipients(self, file_path: str) -> List[str]:
    """
    从文件加载接收者列表

    Args:
        file_path: 接收者文件路径

    Returns:
        接收者列表
    """
    recipients = []

    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            for line in f:
                line = line.strip()
                # 跳过空行和注释行
                if line and not line.startswith('#'):
                    recipients.append(line)

        logger.info(f"从文件加载了{len(recipients)}个接收者:{file_path}")
        return recipients
    except Exception as e:
        logger.error(f"加载接收者列表失败:{str(e)}", exc_info=True)
        return []

def load_message(self, file_path: str) -> str:
    """
    从文件加载消息内容

    Args:
        file_path: 消息文件路径

    Returns:
        消息内容
    """
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            message = f.read()
        logger.info(f"消息内容加载成功:{file_path}")
        return message
    except Exception as e:
        logger.error(f"加载消息内容失败:{str(e)}", exc_info=True)
        return ""

def send_to_all(self, recipients: List[str], message: str) -> Tuple[int, int]:
    """
    向所有接收者发送消息

    Args:
        recipients: 接收者列表
        message: 消息内容

    Returns:
        元组(成功发送数量, 失败数量)
    """
    if not recipients:
        logger.error("接收者列表为空")
        return (0, 0)

    if not message:
        logger.error("消息内容为空")
        return (0, 0)

    # 初始化统计信息
    self.total_recipients = len(recipients)
    self.sent_count = 0
    self.failed_count = 0
    self.start_time = datetime.now()

    logger.info("=" * 60)
    logger.info(f"开始群发消息")
    logger.info(f"总接收者数量:{self.total_recipients}")
    logger.info(f"发送间隔:{self.send_interval}秒")
    logger.info(f"最大重试次数:{self.max_retries}次")
    logger.info("=" * 60)

    # 测试连接
    if not self.sender.test_connection():
        logger.error("无法连接到Messages应用,群发任务终止")
        return (0, self.total_recipients)

    # 循环发送消息
    for index, recipient in enumerate(recipients, 1):
        logger.info(f"正在发送第{index}/{self.total_recipients}条消息:{recipient}")

        if self.sender.send_message_with_retry(
            recipient, 
            message, 
            self.max_retries, 
            self.retry_interval
        ):
            self.sent_count += 1
        else:
            self.failed_count += 1

        # 如果不是最后一个接收者,等待发送间隔
        if index < self.total_recipients:
            logger.debug(f"等待{self.send_interval}秒后发送下一条消息")
            time.sleep(self.send_interval)

    # 记录结束时间
    self.end_time = datetime.now()

    # 生成发送报告
    self._generate_report()

    return (self.sent_count, self.failed_count)

def _generate_report(self):
    """生成发送报告"""
    if self.start_time is None or self.end_time is None:
        return

    duration = self.end_time - self.start_time
    duration_seconds = duration.total_seconds()

    logger.info("=" * 60)
    logger.info("群发任务完成")
    logger.info(f"开始时间:{self.start_time.strftime('%Y-%m-%d %H:%M:%S')}")
    logger.info(f"结束时间:{self.end_time.strftime('%Y-%m-%d %H:%M:%S')}")
    logger.info(f"总耗时:{duration_seconds:.2f}秒")
    logger.info(f"总接收者数量:{self.total_recipients}")
    logger.info(f"成功发送:{self.sent_count}条")
    logger.info(f"发送失败:{self.failed_count}条")
    logger.info(f"成功率:{(self.sent_count/self.total_recipients)*100:.2f}%" if self.total_recipients > 0 else "成功率:0%")
    logger.info("=" * 60)

    # 保存报告到文件
    report_dir = 'reports'
    if not os.path.exists(report_dir):
        os.makedirs(report_dir)

    report_filename = f"report_{self.start_time.strftime('%Y%m%d_%H%M%S')}.txt"
    report_path = os.path.join(report_dir, report_filename)

    report_content = f"""iMessage群发任务报告

==============================
开始时间:{self.start_time.strftime('%Y-%m-%d %H:%M:%S')}
结束时间:{self.end_time.strftime('%Y-%m-%d %H:%M:%S')}
总耗时:{duration_seconds:.2f}秒
总接收者数量:{self.total_recipients}
成功发送:{self.sent_count}条
发送失败:{self.failed_count}条

成功率:{(self.sent_count/self.total_recipients)*100:.2f}%

"""

    with open(report_path, 'w', encoding='utf-8') as f:
        f.write(report_content)

    logger.info(f"发送报告已保存到:{report_path}")

def main():
"""主函数"""

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('logs/mass_sender.log', encoding='utf-8'),
        logging.StreamHandler()
    ]
)

# 创建群发器实例
mass_sender = MassSender()

# 加载接收者列表和消息内容
recipients = mass_sender.load_recipients('data/recipients.txt')
message = mass_sender.load_message('data/message.txt')

# 开始群发
if recipients and message:
    mass_sender.send_to_all(recipients, message)
else:
    logger.error("接收者列表或消息内容为空,无法进行群发")

if name == "main":
main()

这个MassSender类实现了单个ID循环群发的核心逻辑。它从配置文件中读取发送间隔、最大重试次数等参数,从文件中加载接收者列表和消息内容,然后依次向每个接收者发送消息。在发送过程中,系统会实时记录发送状态,并在任务完成后生成详细的发送报告。
# 五、异常处理与重试机制
在实际运行过程中,群发系统可能会遇到各种异常情况,如网络中断、Messages应用崩溃、接收者号码无效等。为了提高系统的稳定性和可靠性,我们需要实现完善的异常处理和重试机制。以下是相关的代码实现。
```# exception_handler.py
import logging
import time
import traceback
from functools import wraps
from typing import Callable, Any, Type, Tuple

logger = logging.getLogger(__name__)

def retry_on_exception(
    max_retries: int = 3,
    retry_interval: int = 5,
    exceptions: Tuple[Type[Exception], ...] = (Exception,),
    backoff_factor: float = 1.0
):
    """
    重试装饰器,在函数抛出指定异常时自动重试

    Args:
        max_retries: 最大重试次数
        retry_interval: 初始重试间隔(秒)
        exceptions: 需要重试的异常类型元组
        backoff_factor: 退避因子,每次重试间隔会乘以这个因子
    """
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            last_exception = None

            for attempt in range(max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    last_exception = e
                    if attempt < max_retries:
                        wait_time = retry_interval * (backoff_factor ** attempt)
                        logger.warning(
                            f"函数{func.__name__}执行失败,{wait_time:.2f}秒后进行第{attempt+1}次重试。"
                            f"异常信息:{str(e)}"
                        )
                        time.sleep(wait_time)
                    else:
                        logger.error(
                            f"函数{func.__name__}执行最终失败,已重试{max_retries}次。"
                            f"异常信息:{str(e)}",
                            exc_info=True
                        )

            # 如果所有重试都失败,抛出最后一个异常
            raise last_exception

        return wrapper
    return decorator

class ExceptionHandler:
    """全局异常处理器类"""

    @staticmethod
    def handle_exception(e: Exception, context: str = "") -> None:
        """
        处理异常

        Args:
            e: 异常对象
            context: 异常发生的上下文信息
        """
        error_message = f"发生异常:{str(e)}"
        if context:
            error_message = f"在{context}时{error_message}"

        logger.error(error_message, exc_info=True)

        # 记录详细的堆栈信息
        stack_trace = traceback.format_exc()
        logger.debug(f"详细堆栈信息:\n{stack_trace}")

    @staticmethod
    def safe_execute(func: Callable, *args, **kwargs) -> Tuple[bool, Any]:
        """
        安全执行函数,捕获所有异常

        Args:
            func: 要执行的函数
            *args: 函数参数
            **kwargs: 函数关键字参数

        Returns:
            元组(是否成功, 函数返回值或异常对象)
        """
        try:
            result = func(*args, **kwargs)
            return (True, result)
        except Exception as e:
            ExceptionHandler.handle_exception(e, f"执行函数{func.__name__}")
            return (False, e)

# 使用示例
if __name__ == "__main__":
    logging.basicConfig(level=logging.DEBUG)

    # 示例1:使用重试装饰器
    @retry_on_exception(max_retries=3, retry_interval=2, exceptions=(ValueError,))
    def risky_function():
        import random
        if random.random() < 0.7:
            raise ValueError("随机错误")
        return "成功"

    try:
        result = risky_function()
        print(f"函数执行结果:{result}")
    except Exception as e:
        print(f"函数最终失败:{e}")

    # 示例2:使用安全执行函数
    def another_risky_function():
        raise RuntimeError("运行时错误")

    success, result = ExceptionHandler.safe_execute(another_risky_function)
    if success:
        print(f"函数执行成功:{result}")
    else:
        print(f"函数执行失败:{result}")

这个异常处理模块提供了两种异常处理方式:重试装饰器和安全执行函数。重试装饰器可以在函数抛出指定异常时自动重试,适用于可能出现临时故障的操作。安全执行函数可以捕获函数执行过程中的所有异常,避免程序崩溃。

六、速率控制与风控规避策略

苹果公司对iMessage的发送频率有严格的限制,如果发送速度过快,很容易触发风控机制,导致Apple ID被限制甚至封禁。因此,我们需要实现合理的速率控制和风控规避策略。以下是相关的代码实现。
```# rate_controller.py
import time
import logging
import random
from datetime import datetime, timedelta
from typing import Dict, List

logger = logging.getLogger(name)

class RateController:
"""速率控制器类"""

def __init__(
    self,
    min_interval: float = 5.0,
    max_interval: float = 10.0,
    hourly_limit: int = 100,
    daily_limit: int = 500,
    enable_randomization: bool = True
):
    """
    初始化速率控制器

    Args:
        min_interval: 最小发送间隔(秒)
        max_interval: 最大发送间隔(秒)
        hourly_limit: 每小时发送限制
        daily_limit: 每日发送限制
        enable_randomization: 是否启用随机间隔
    """
    self.min_interval = min_interval
    self.max_interval = max_interval
    self.hourly_limit = hourly_limit
    self.daily_limit = daily_limit
    self.enable_randomization = enable_randomization

    # 发送记录
    self.send_history: List[datetime] = []

def _cleanup_old_records(self):
    """清理超过24小时的发送记录"""
    now = datetime.now()
    cutoff_time = now - timedelta(hours=24)
    self.send_history = [t for t in self.send_history if t > cutoff_time]

def get_hourly_count(self) -> int:
    """获取过去一小时内的发送数量"""
    self._cleanup_old_records()
    now = datetime.now()
    cutoff_time = now - timedelta(hours=1)
    return sum(1 for t in self.send_history if t > cutoff_time)

def get_daily_count(self) -> int:
    """获取过去24小时内的发送数量"""
    self._cleanup_old_records()
    return len(self.send_history)

def get_next_interval(self) -> float:
    """
    获取下一次发送的间隔时间

    Returns:
        间隔时间(秒)
    """
    if self.enable_randomization:
        interval = random.uniform(self.min_interval, self.max_interval)
    else:
        interval = (self.min_interval + self.max_interval) / 2

    logger.debug(f"计算得到的发送间隔:{interval:.2f}秒")
    return interval

def wait_for_next_send(self) -> None:
    """等待到下一次可以发送消息的时间"""
    # 检查每小时限制
    hourly_count = self.get_hourly_count()
    if hourly_count >= self.hourly_limit:
        now = datetime.now()
        # 计算需要等待到下一个小时
        next_hour = (now + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
        wait_time = (next_hour - now).total_seconds()

        logger.warning(
            f"已达到每小时发送限制({self.hourly_limit}条),"
            f"将等待{wait_time/60:.2f}分钟到下一个小时"
        )
        time.sleep(wait_time)

    # 检查每日限制
    daily_count = self.get_daily_count()
    if daily_count >= self.daily_limit:
        now = datetime.now()
        # 计算需要等待到第二天
        next_day = (now + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
        wait_time = (next_day - now).total_seconds()

        logger.warning(
            f"已达到每日发送限制({self.daily_limit}条),"
            f"将等待{wait_time/3600:.2f}小时到第二天"
        )
        time.sleep(wait_time)

    # 等待基本间隔
    interval = self.get_next_interval()
    logger.debug(f"等待{interval:.2f}秒后发送下一条消息")
    time.sleep(interval)

def record_send(self) -> None:
    """记录一次发送操作"""
    self.send_history.append(datetime.now())
    logger.debug(f"记录发送操作,当前总发送记录数:{len(self.send_history)}")

def get_status(self) -> Dict:
    """获取当前速率控制状态"""
    return {
        'hourly_count': self.get_hourly_count(),
        'hourly_limit': self.hourly_limit,
        'daily_count': self.get_daily_count(),
        'daily_limit': self.daily_limit,
        'min_interval': self.min_interval,
        'max_interval': self.max_interval,
        'enable_randomization': self.enable_randomization
    }

使用示例

if name == "main":
logging.basicConfig(level=logging.INFO)

# 创建速率控制器
rate_controller = RateController(
    min_interval=3.0,
    max_interval=8.0,
    hourly_limit=50,
    daily_limit=200
)

# 模拟发送10条消息
for i in range(10):
    print(f"发送第{i+1}条消息")
    rate_controller.record_send()

    if i < 9:
        rate_controller.wait_for_next_send()

# 打印状态
status = rate_controller.get_status()
print("\n速率控制状态:")
for key, value in status.items():
    print(f"{key}: {value}")
这个RateController类实现了完善的速率控制功能。它支持设置最小和最大发送间隔、每小时发送限制和每日发送限制。同时,它还支持随机间隔功能,可以使发送行为更加自然,降低被风控系统检测到的风险。在每次发送消息前,系统会检查是否达到了发送限制,如果达到了限制,会自动等待到限制解除后再继续发送。
# 七、阿里云部署与定时任务配置
为了实现群发系统的稳定运行和自动化执行,我们可以将系统部署到阿里云服务器上,并配置定时任务。以下是阿里云部署和定时任务配置的相关代码和说明。
```# deploy_aliyun.py
import os
import subprocess
import logging
from typing import List

logger = logging.getLogger(__name__)

class AliyunDeployer:
    """阿里云部署器类"""

    def __init__(self, project_name: str = 'imessage-mass-sender'):
        """
        初始化部署器

        Args:
            project_name: 项目名称
        """
        self.project_name = project_name
        self.remote_user = 'root'
        self.remote_host = ''  # 替换为你的阿里云服务器IP
        self.remote_path = f'/opt/{project_name}'

    def _run_command(self, cmd: List[str], cwd: str = None) -> bool:
        """
        运行本地命令

        Args:
            cmd: 命令列表
            cwd: 工作目录

        Returns:
            命令执行成功返回True,失败返回False
        """
        try:
            logger.info(f"执行命令:{' '.join(cmd)}")
            result = subprocess.run(
                cmd,
                cwd=cwd,
                capture_output=True,
                text=True,
                check=True
            )
            logger.info(f"命令输出:{result.stdout}")
            return True
        except subprocess.CalledProcessError as e:
            logger.error(f"命令执行失败:{e.stderr}")
            return False

    def _run_remote_command(self, cmd: str) -> bool:
        """
        运行远程命令

        Args:
            cmd: 要执行的命令

        Returns:
            命令执行成功返回True,失败返回False
        """
        if not self.remote_host:
            logger.error("未设置远程服务器地址")
            return False

        ssh_cmd = [
            'ssh',
            f'{self.remote_user}@{self.remote_host}',
            cmd
        ]

        return self._run_command(ssh_cmd)

    def upload_files(self) -> bool:
        """
        上传项目文件到远程服务器

        Returns:
            上传成功返回True,失败返回False
        """
        if not self.remote_host:
            logger.error("未设置远程服务器地址")
            return False

        # 创建远程目录
        if not self._run_remote_command(f'mkdir -p {self.remote_path}'):
            logger.error("创建远程目录失败")
            return False

        # 上传文件
        scp_cmd = [
            'scp',
            '-r',
            '*.py',
            'config/',
            'data/',
            'scripts/',
            f'{self.remote_user}@{self.remote_host}:{self.remote_path}/'
        ]

        if not self._run_command(scp_cmd):
            logger.error("上传文件失败")
            return False

        logger.info("文件上传成功")
        return True

    def install_dependencies(self) -> bool:
        """
        在远程服务器上安装依赖

        Returns:
            安装成功返回True,失败返回False
        """
        commands = [
            'apt update',
            'apt install -y python3 python3-pip python3-venv',
            f'cd {self.remote_path} && python3 -m venv venv',
            f'cd {self.remote_path} && source venv/bin/activate && pip install -r requirements.txt'
        ]

        for cmd in commands:
            if not self._run_remote_command(cmd):
                logger.error(f"执行命令失败:{cmd}")
                return False

        logger.info("依赖安装成功")
        return True

    def create_systemd_service(self) -> bool:
        """
        创建systemd服务文件

        Returns:
            创建成功返回True,失败返回False
        """
        service_content = f"""[Unit]
Description=iMessage Mass Sender Service
After=network.target

[Service]
Type=simple
User={self.remote_user}
WorkingDirectory={self.remote_path}
ExecStart={self.remote_path}/venv/bin/python {self.remote_path}/mass_sender.py
Restart=on-failure
RestartSec=5
StandardOutput=journal+console
StandardError=journal+console

[Install]
WantedBy=multi-user.target
"""

        # 将服务文件写入远程服务器
        cmd = f"cat > /etc/systemd/system/{self.project_name}.service << 'EOF'\n{service_content}\nEOF"

        if not self._run_remote_command(cmd):
            logger.error("创建systemd服务文件失败")
            return False

        # 重新加载systemd配置
        if not self._run_remote_command('systemctl daemon-reload'):
            logger.error("重新加载systemd配置失败")
            return False

        logger.info("systemd服务创建成功")
        return True

    def configure_cron_job(self, schedule: str = '0 9 * * *') -> bool:
        """
        配置定时任务

        Args:
            schedule: cron表达式,默认每天早上9点执行

        Returns:
            配置成功返回True,失败返回False
        """
        # 创建cron任务
        cron_cmd = f'(crontab -l 2>/dev/null | grep -v "{self.project_name}"; echo "{schedule} cd {self.remote_path} && {self.remote_path}/venv/bin/python {self.remote_path}/mass_sender.py >> {self.remote_path}/logs/cron.log 2>&1") | crontab -'

        if not self._run_remote_command(cron_cmd):
            logger.error("配置定时任务失败")
            return False

        logger.info(f"定时任务配置成功,执行时间:{schedule}")
        return True

    def deploy(self) -> bool:
        """
        执行完整部署流程

        Returns:
            部署成功返回True,失败返回False
        """
        logger.info("开始部署iMessage群发系统到阿里云服务器")

        steps = [
            ("上传文件", self.upload_files),
            ("安装依赖", self.install_dependencies),
            ("创建systemd服务", self.create_systemd_service),
            ("配置定时任务", self.configure_cron_job)
        ]

        for step_name, step_func in steps:
            logger.info(f"执行步骤:{step_name}")
            if not step_func():
                logger.error(f"部署失败:{step_name}")
                return False

        logger.info("部署完成!")
        logger.info(f"使用以下命令启动服务:systemctl start {self.project_name}")
        logger.info(f"使用以下命令查看服务状态:systemctl status {self.project_name}")
        logger.info(f"使用以下命令查看日志:journalctl -u {self.project_name} -f")

        return True

def main():
    """主函数"""
    logging.basicConfig(level=logging.INFO)

    deployer = AliyunDeployer()
    # 设置远程服务器地址
    deployer.remote_host = 'your_server_ip'  # 替换为你的阿里云服务器IP
    deployer.remote_user = 'root'  # 替换为你的服务器用户名

    deployer.deploy()

if __name__ == "__main__":
    main()

这个AliyunDeployer类提供了将群发系统部署到阿里云服务器的完整功能。它可以自动上传项目文件、安装依赖、创建systemd服务和配置定时任务。在使用时,只需要设置好远程服务器的IP地址和用户名,然后运行deploy()方法即可完成部署。

八、系统运行监控与日志管理

为了确保群发系统的稳定运行,我们需要实现完善的运行监控和日志管理功能。以下是相关的代码实现。
```# logger_config.py
import logging
import os
from datetime import datetime
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
import colorlog

def setup_logger(
name: str = 'imessage_mass_sender',
log_dir: str = 'logs',
log_level: int = logging.INFO,
max_bytes: int = 10 1024 1024, # 10MB
backup_count: int = 5,
retention_days: int = 7
) -> logging.Logger:
"""
配置日志系统

Args:
    name: 日志器名称
    log_dir: 日志目录
    log_level: 日志级别
    max_bytes: 单个日志文件最大大小
    backup_count: 保留的备份文件数量
    retention_days: 日志保留天数

Returns:
    配置好的日志器
"""
# 创建日志目录
if not os.path.exists(log_dir):
    os.makedirs(log_dir)

# 创建日志器
logger = logging.getLogger(name)
logger.setLevel(log_level)
logger.propagate = False

# 清除已有的处理器
for handler in logger.handlers[:]:
    logger.removeHandler(handler)

# 创建格式器
file_formatter = logging.Formatter(
    '%(asctime)s - %(name)s - %(levelname)s - %(module)s:%(lineno)d - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

console_formatter = colorlog.ColoredFormatter(
    '%(log_color)s%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S',
    log_colors={
        'DEBUG': 'cyan',
        'INFO': 'green',
        'WARNING': 'yellow',
        'ERROR': 'red',
        'CRITICAL': 'red,bg_white',
    }
)

# 创建文件处理器(按大小轮转)
current_date = datetime.now().strftime('%Y%m%d')
file_handler = RotatingFileHandler(
    os.path.join(log_dir, f'{name}_{current_date}.log'),
    maxBytes=max_bytes,
    backupCount=backup_count,
    encoding='utf-8'
)
file_handler.setFormatter(file_formatter)
file_handler.setLevel(log_level)

# 创建错误文件处理器(只记录错误及以上级别)
error_file_handler = RotatingFileHandler(
    os.path.join(log_dir, f'{name}_error_{current_date}.log'),
    maxBytes=max_bytes,
    backupCount=backup_count,
    encoding='utf-8'
)
error_file_handler.setFormatter(file_formatter)
error_file_handler.setLevel(logging.ERROR)

# 创建控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(console_formatter)
console_handler.setLevel(log_level)

# 添加处理器到日志器
logger.addHandler(file_handler)
logger.addHandler(error_file_handler)
logger.addHandler(console_handler)

# 清理过期日志
cleanup_old_logs(log_dir, retention_days)

return logger

def cleanup_old_logs(log_dir: str, retention_days: int) -> None:
"""
清理过期的日志文件

Args:
    log_dir: 日志目录
    retention_days: 日志保留天数
"""
if not os.path.exists(log_dir):
    return

now = datetime.now()
cutoff_time = now - datetime.timedelta(days=retention_days)

for filename in os.listdir(log_dir):
    file_path = os.path.join(log_dir, filename)

    if not os.path.isfile(file_path):
        continue

    # 只处理.log文件
    if not filename.endswith('.log'):
        continue

    # 获取文件修改时间
    mtime = datetime.fromtimestamp(os.path.getmtime(file_path))

    if mtime < cutoff_time:
        try:
            os.remove(file_path)
            logging.info(f"删除过期日志文件:{file_path}")
        except Exception as e:
            logging.error(f"删除过期日志文件失败:{file_path},错误:{str(e)}")

class SystemMonitor:
"""系统监控器类"""

def __init__(self, logger: logging.Logger):
    """
    初始化监控器

    Args:
        logger: 日志器实例
    """
    self.logger = logger
    self.start_time = datetime.now()

def get_system_info(self) -> dict:
    """
    获取系统信息

    Returns:
        系统信息字典
    """
    import platform
    import psutil

    try:
        # 获取CPU信息
        cpu_percent = psutil.cpu_percent(interval=1)
        cpu_count = psutil.cpu_count()

        # 获取内存信息
        memory = psutil.virtual_memory()
        memory_total = memory.total / (1024 ** 3)  # GB
        memory_used = memory.used / (1024 ** 3)    # GB
        memory_percent = memory.percent

        # 获取磁盘信息
        disk = psutil.disk_usage('/')
        disk_total = disk.total / (1024 ** 3)    # GB
        disk_used = disk.used / (1024 ** 3)      # GB
        disk_percent = disk.percent

        # 获取运行时间
        uptime = datetime.now() - self.start_time

        return {
            'system': platform.system(),
            'node': platform.node(),
            'release': platform.release(),
            'version': platform.version(),
            'machine': platform.machine(),
            'cpu_percent': cpu_percent,
            'cpu_count': cpu_count,
            'memory_total_gb': round(memory_total, 2),
            'memory_used_gb': round(memory_used, 2),
            'memory_percent': memory_percent,
            'disk_total_gb': round(disk_total, 2),
            'disk_used_gb': round(disk_used, 2),
            'disk_percent': disk_percent,
            'uptime_seconds': uptime.total_seconds(),
            'uptime_str': str(uptime).split('.')[0]
        }
    except Exception as e:
        self.logger.error(f"获取系统信息失败:{str(e)}")
        return {}

def log_system_status(self) -> None:
    """记录系统状态到日志"""
    system_info = self.get_system_info()

    if not system_info:
        return

    self.logger.info("=" * 60)
    self.logger.info("系统状态信息")
    self.logger.info("=" * 60)
    self.logger.info(f"系统:{system_info.get('system')} {system_info.get('release')}")
    self.logger.info(f"主机名:{system_info.get('node')}")
    self.logger.info(f"CPU使用率:{system_info.get('cpu_percent')}% ({system_info.get('cpu_count')}核)")
    self.logger.info(f"内存使用:{system_info.get('memory_used_gb')}GB/{system_info.get('memory_total_gb')}GB ({system_info.get('memory_percent')}%)")
    self.logger.info(f"磁盘使用:{system_info.get('disk_used_gb')}GB/{system_info.get('disk_total_gb')}GB ({system_info.get('disk_percent')}%)")
    self.logger.info(f"运行时间:{system_info.get('uptime_str')}")
    self.logger.info("=" * 60)

使用示例

if name == "main":

# 配置日志
logger = setup_logger(log_level=logging.DEBUG)

# 创建系统监控器
monitor = SystemMonitor(logger)

# 记录系统状态
monitor.log_system_status()

# 测试日志
logger.debug("这是一条调试信息")
logger.info("这是一条普通信息")
logger.warning("这是一条警告信息")
logger.error("这是一条错误信息")
logger.critical("这是一条严重错误信息")

```
这个日志配置模块提供了完善的日志管理功能。它支持按大小轮转日志文件、按日期命名日志文件、自动清理过期日志等功能。同时,它还提供了彩色控制台输出,使日志信息更加易读。系统监控器类可以获取系统的CPU、内存、磁盘等使用情况,并将这些信息记录到日志中,帮助开发者了解系统的运行状态。
通过以上八个模块的实现,我们完成了一套完整的iMessage群发系统。该系统支持单个Apple ID循环群发消息,具有完善的异常处理、速率控制、日志管理和阿里云部署功能。在实际使用过程中,请务必遵守相关法律法规和苹果公司的服务条款,仅用于合法合规的信息传递场景。

相关文章
|
12天前
|
人工智能 JSON 供应链
畅用7个月无影 JVS Claw |手把手教你把JVS改造成「科研与产业地理情报可视化大师」
LucianaiB分享零成本畅用JVS Claw教程(学生认证享7个月使用权),并开源GeoMind项目——将JVS改造为科研与产业地理情报可视化AI助手,支持飞书文档解析、地理编码与腾讯地图可视化,助力产业关系图谱构建。
23475 11
畅用7个月无影 JVS Claw |手把手教你把JVS改造成「科研与产业地理情报可视化大师」
|
16天前
|
人工智能 缓存 BI
Claude Code + DeepSeek V4-Pro 真实评测:除了贵,没别的毛病
JeecgBoot AI专题研究 把 Claude Code 接入 DeepSeek V4Pro,跑完 Skills —— OA 审批、大屏、报表、部署 5 大实战场景后的真实体验 ![](https://oscimg.oschina.net/oscnet/up608d34aeb6bafc47f
5233 19
Claude Code + DeepSeek V4-Pro 真实评测:除了贵,没别的毛病
|
17天前
|
人工智能 JSON BI
DeepSeek V4 来了!超越 Claude Sonnet 4.5,赶紧对接 Claude Code 体验一把
JeecgBoot AI专题研究 把 Claude Code 接入 DeepSeek V4Pro 的真实体验与避坑记录 本文记录我将 Claude Code 对接 DeepSeek 最新模型(V4Pro)后的真实体验,测试了 Skills 自动化查询和积木报表 AI 建表两个场景——有惊喜,也踩
6250 15
|
6天前
|
人工智能 缓存 Shell
Claude Code 全攻略:命令大全 + 实战工作流(完整版)
Claude Code 是一款运行在终端环境下的 AI 编码助手,能够直接在项目目录中理解代码结构、编辑文件、执行命令、执行开发计划,并支持持久化记忆、上下文压缩、后台任务、多模型切换等专业能力。对于日常开发、项目维护、快速重构、代码审查等场景,它可以大幅减少手动操作、提升编码效率。本文从常用命令、界面模式、核心指令、记忆机制、图片处理、进阶工作流等维度完整说明,帮助开发者快速上手并稳定使用。
1292 2
|
5天前
|
前端开发 API 内存技术
对比claude code等编程cli工具与deepseek v4的适配情况
DeepSeek V4发布后,多家编程工具因未适配其强制要求的`reasoning_content`字段而报错。本文对比Claude Code、GitHub Copilot、Langcli、OpenCode及DeepSeek-TUI等主流工具的兼容性:Claude Code需按官方方式配置;Langcli表现最佳,开箱即用且无报错;Copilot与OpenCode暂未修复问题;DeepSeek-TUI尚处早期阶段。
946 2
对比claude code等编程cli工具与deepseek v4的适配情况
|
1月前
|
人工智能 自然语言处理 安全
Claude Code 全攻略:命令大全 + 实战工作流(建议收藏)
本文介绍了Claude Code终端AI助手的使用指南,主要内容包括:1)常用命令如版本查看、项目启动和更新;2)三种工作模式切换及界面说明;3)核心功能指令速查表,包含初始化、压缩对话、清除历史等操作;4)详细解析了/init、/help、/clear、/compact、/memory等关键命令的使用场景和语法。文章通过丰富的界面截图和场景示例,帮助开发者快速掌握如何通过命令行和交互界面高效使用Claude Code进行项目开发,特别强调了CLAUDE.md文件作为项目知识库的核心作用。
26190 65
Claude Code 全攻略:命令大全 + 实战工作流(建议收藏)