背景
随着移动互联网的发展,短信验证码已成为用户身份验证的重要手段。对于企业级应用,构建一个高可用、高并发、安全可靠的短信验证码服务至关重要。本文将从架构设计角度,分享企业级短信验证码服务的技术实现方案。
架构设计原则
1. 高可用性
- 多通道冗余:接入多个短信服务提供商
- 故障自动切换:通道故障时自动切换到备用通道
- 服务降级:在异常情况下提供降级服务
2. 高并发处理
- 异步处理:短信发送采用异步方式
- 消息队列:使用队列削峰填谷
- 连接池管理:复用HTTP连接,减少开销
3. 安全可控
- 多层防刷:IP、用户、设备多维度限流
- 内容审核:模板化管理,防止内容滥用
- 数据加密:敏感信息加密存储
系统架构设计
整体架构图
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ 客户端 │────│ API Gateway │────│ 短信服务集群 │
└─────────────┘ └──────────────┘ └─────────────┘
│ │
┌──────────────┐ ┌─────────────┐
│ 限流组件 │ │ 消息队列 │
└──────────────┘ └─────────────┘
│ │
┌──────────────┐ ┌─────────────┐
│ 缓存层 │ │ 通道管理 │
└──────────────┘ └─────────────┘
│ │
┌──────────────┐ ┌─────────────┐
│ 数据库 │ │ 监控告警 │
└──────────────┘ └─────────────┘
核心模块设计
1. 通道管理模块
from abc import ABC, abstractmethod
from enum import Enum
import logging
class ChannelStatus(Enum):
ACTIVE = "active"
INACTIVE = "inactive"
FAILED = "failed"
class SMSChannel(ABC):
def __init__(self, channel_id, priority, config):
self.channel_id = channel_id
self.priority = priority
self.config = config
self.status = ChannelStatus.ACTIVE
self.failure_count = 0
self.last_failure_time = None
@abstractmethod
async def send_sms(self, phone, content):
pass
def mark_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= 3:
self.status = ChannelStatus.FAILED
def reset_failure(self):
self.failure_count = 0
self.status = ChannelStatus.ACTIVE
class ChannelManager:
def __init__(self):
self.channels = []
self.current_channel_index = 0
def add_channel(self, channel):
self.channels.append(channel)
# 按优先级排序
self.channels.sort(key=lambda x: x.priority)
def get_available_channel(self):
"""获取可用通道,实现负载均衡和故障切换"""
active_channels = [ch for ch in self.channels if ch.status == ChannelStatus.ACTIVE]
if not active_channels:
# 尝试恢复失败的通道
self._recover_failed_channels()
active_channels = [ch for ch in self.channels if ch.status == ChannelStatus.ACTIVE]
if active_channels:
# 轮询选择通道
channel = active_channels[self.current_channel_index % len(active_channels)]
self.current_channel_index += 1
return channel
return None
def _recover_failed_channels(self):
"""尝试恢复失败的通道"""
current_time = time.time()
for channel in self.channels:
if (channel.status == ChannelStatus.FAILED and
channel.last_failure_time and
current_time - channel.last_failure_time > 300): # 5分钟后重试
channel.reset_failure()
2. 限流控制模块
import redis
import time
from typing import Dict, Optional
class RateLimiter:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def check_limit(self, key: str, limit: int, window: int) -> Dict[str, any]:
"""
滑动窗口限流算法
:param key: 限流key
:param limit: 限制次数
:param window: 时间窗口(秒)
"""
now = time.time()
pipeline = self.redis.pipeline()
# 清理过期记录
pipeline.zremrangebyscore(key, 0, now - window)
# 添加当前请求
pipeline.zadd(key, {
str(now): now})
# 获取当前计数
pipeline.zcard(key)
# 设置过期时间
pipeline.expire(key, window)
results = pipeline.execute()
current_count = results[2]
if current_count <= limit:
return {
"allowed": True, "remaining": limit - current_count}
else:
return {
"allowed": False, "remaining": 0, "retry_after": window}
class SMSRateLimiter:
def __init__(self, redis_client):
self.limiter = RateLimiter(redis_client)
def check_phone_limit(self, phone: str) -> bool:
"""手机号限流:1分钟1次,1小时5次,1天10次"""
checks = [
(f"sms:phone:{phone}:1m", 1, 60),
(f"sms:phone:{phone}:1h", 5, 3600),
(f"sms:phone:{phone}:1d", 10, 86400)
]
for key, limit, window in checks:
result = self.limiter.check_limit(key, limit, window)
if not result["allowed"]:
return False
return True
def check_ip_limit(self, ip: str) -> bool:
"""IP限流:1分钟10次,1小时100次"""
checks = [
(f"sms:ip:{ip}:1m", 10, 60),
(f"sms:ip:{ip}:1h", 100, 3600)
]
for key, limit, window in checks:
result = self.limiter.check_limit(key, limit, window)
if not result["allowed"]:
return False
return True
3. 异步处理模块
import asyncio
import aioredis
from celery import Celery
import json
# Celery配置
celery_app = Celery('sms_service', broker='redis://localhost:6379/0')
@celery_app.task(bind=True, max_retries=3)
def send_sms_task(self, sms_data):
"""异步发送短信任务"""
try:
channel_manager = ChannelManager()
channel = channel_manager.get_available_channel()
if not channel:
raise Exception("No available SMS channel")
result = channel.send_sms(sms_data['phone'], sms_data['content'])
# 记录发送结果
log_sms_result(sms_data['request_id'], result)
return result
except Exception as exc:
# 重试机制
if self.request.retries < self.max_retries:
raise self.retry(countdown=60 * (2 ** self.request.retries))
else:
# 重试失败,记录错误日志
log_sms_error(sms_data['request_id'], str(exc))
raise
# 具体通道实现示例
class SpugSMSChannel(SMSChannel):
"""Spug推送平台通道实现"""
async def send_sms(self, phone, content):
url = f"https://push.spug.cc/send/{self.config['template_id']}"
payload = {
'name': self.config.get('app_name', '验证码'),
'code': content,
'targets': phone
}
async with aiohttp.ClientSession() as session:
try:
async with session.post(url, json=payload, timeout=10) as response:
result = await response.json()
if result.get('error') == 0:
return {
"success": True, "message_id": result.get('data', {
}).get('id')}
else:
self.mark_failure()
return {
"success": False, "error": result.get('message')}
except Exception as e:
self.mark_failure()
return {
"success": False, "error": str(e)}
class AsyncSMSService:
def __init__(self, redis_url="redis://localhost:6379"):
self.redis_url = redis_url
async def send_sms_async(self, phone, content, request_id):
"""异步发送短信"""
sms_data = {
'phone': phone,
'content': content,
'request_id': request_id,
'timestamp': time.time()
}
# 提交到队列
task = send_sms_task.delay(sms_data)
# 返回任务ID,用于查询状态
return {
'task_id': task.id,
'status': 'pending',
'request_id': request_id
}
async def get_send_status(self, task_id):
"""查询发送状态"""
task = send_sms_task.AsyncResult(task_id)
return {
'task_id': task_id,
'status': task.status,
'result': task.result if task.ready() else None
}
4. 验证码管理模块
import hashlib
import secrets
from typing import Optional
class VerificationCodeManager:
def __init__(self, redis_client, encrypt_key):
self.redis = redis_client
self.encrypt_key = encrypt_key
def generate_code(self, length=6) -> str:
"""生成安全的随机验证码"""
return ''.join([str(secrets.randbelow(10)) for _ in range(length)])
def _encrypt_phone(self, phone: str) -> str:
"""手机号加密存储"""
return hashlib.sha256(f"{phone}{self.encrypt_key}".encode()).hexdigest()
async def store_code(self, phone: str, code: str, expire_time: int = 300):
"""存储验证码"""
encrypted_phone = self._encrypt_phone(phone)
key = f"verification_code:{encrypted_phone}"
# 存储验证码和尝试次数
code_data = {
'code': code,
'attempts': 0,
'created_at': time.time()
}
await self.redis.setex(key, expire_time, json.dumps(code_data))
async def verify_code(self, phone: str, input_code: str) -> Dict[str, any]:
"""验证验证码"""
encrypted_phone = self._encrypt_phone(phone)
key = f"verification_code:{encrypted_phone}"
code_data_str = await self.redis.get(key)
if not code_data_str:
return {
"success": False, "message": "验证码已过期或不存在"}
code_data = json.loads(code_data_str)
# 检查尝试次数
if code_data['attempts'] >= 3:
await self.redis.delete(key)
return {
"success": False, "message": "验证次数过多,请重新获取"}
# 验证码校验
if code_data['code'] == input_code:
await self.redis.delete(key)
return {
"success": True, "message": "验证成功"}
else:
# 增加尝试次数
code_data['attempts'] += 1
ttl = await self.redis.ttl(key)
await self.redis.setex(key, ttl, json.dumps(code_data))
return {
"success": False,
"message": f"验证码错误,还可尝试{3 - code_data['attempts']}次"
}
监控与运维
1. 监控指标设计
from prometheus_client import Counter, Histogram, Gauge
import time
# 定义监控指标
sms_requests_total = Counter('sms_requests_total', 'Total SMS requests', ['channel', 'status'])
sms_duration = Histogram('sms_duration_seconds', 'SMS send duration')
sms_queue_size = Gauge('sms_queue_size', 'SMS queue size')
channel_status = Gauge('sms_channel_status', 'SMS channel status', ['channel'])
class SMSMonitoring:
@staticmethod
def record_request(channel_id: str, status: str, duration: float):
"""记录请求指标"""
sms_requests_total.labels(channel=channel_id, status=status).inc()
sms_duration.observe(duration)
@staticmethod
def update_queue_size(size: int):
"""更新队列大小"""
sms_queue_size.set(size)
@staticmethod
def update_channel_status(channel_id: str, is_active: bool):
"""更新通道状态"""
channel_status.labels(channel=channel_id).set(1 if is_active else 0)
2. 告警规则
# Prometheus告警规则
groups:
- name: sms_service
rules:
- alert: SMSChannelDown
expr: sms_channel_status == 0
for: 5m
labels:
severity: critical
annotations:
summary: "SMS channel {
{ $labels.channel }} is down"
- alert: SMSHighErrorRate
expr: rate(sms_requests_total{
status="error"}[5m]) / rate(sms_requests_total[5m]) > 0.1
for: 5m
labels:
severity: warning
annotations:
summary: "SMS error rate is high ({
{ $value }})"
- alert: SMSQueueTooLarge
expr: sms_queue_size > 1000
for: 2m
labels:
severity: warning
annotations:
summary: "SMS queue is too large ({
{ $value }})"
部署与扩展
1. 容器化部署
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["gunicorn", "--bind", "0.0.0.0:8000", "--workers", "4", "app:app"]
2. Kubernetes部署配置
apiVersion: apps/v1
kind: Deployment
metadata:
name: sms-service
spec:
replicas: 3
selector:
matchLabels:
app: sms-service
template:
metadata:
labels:
app: sms-service
spec:
containers:
- name: sms-service
image: sms-service:latest
ports:
- containerPort: 8000
env:
- name: REDIS_URL
value: "redis://redis-service:6379"
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
---
apiVersion: v1
kind: Service
metadata:
name: sms-service
spec:
selector:
app: sms-service
ports:
- port: 80
targetPort: 8000
type: LoadBalancer
性能优化
1. 数据库优化
-- 短信发送记录表设计
CREATE TABLE sms_records (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
request_id VARCHAR(64) NOT NULL UNIQUE,
phone_hash VARCHAR(64) NOT NULL,
channel_id VARCHAR(32) NOT NULL,
status ENUM('pending', 'success', 'failed') NOT NULL,
error_message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_phone_hash_created (phone_hash, created_at),
INDEX idx_channel_status (channel_id, status),
INDEX idx_created_at (created_at)
);
-- 分区表设计(按月分区)
ALTER TABLE sms_records
PARTITION BY RANGE (YEAR(created_at)*100 + MONTH(created_at)) (
PARTITION p202401 VALUES LESS THAN (202402),
PARTITION p202402 VALUES LESS THAN (202403),
-- 继续添加分区...
);
2. 缓存策略
import asyncio
from typing import Dict, Optional
class SMSCache:
def __init__(self, redis_client):
self.redis = redis_client
self.local_cache = {
}
self.cache_ttl = 300 # 5分钟
async def get_template(self, template_id: str) -> Optional[str]:
"""获取短信模板,多级缓存"""
# 先查本地缓存
if template_id in self.local_cache:
template_data = self.local_cache[template_id]
if time.time() - template_data['timestamp'] < self.cache_ttl:
return template_data['content']
# 再查Redis缓存
cache_key = f"sms_template:{template_id}"
template_content = await self.redis.get(cache_key)
if template_content:
# 更新本地缓存
self.local_cache[template_id] = {
'content': template_content,
'timestamp': time.time()
}
return template_content
# 最后查数据库
template_content = await self._load_template_from_db(template_id)
if template_content:
# 写入多级缓存
await self.redis.setex(cache_key, self.cache_ttl, template_content)
self.local_cache[template_id] = {
'content': template_content,
'timestamp': time.time()
}
return template_content
多通道配置实践
在实际生产环境中,建议配置多个短信通道以提高系统可用性:
# 通道配置示例
def setup_sms_channels():
manager = ChannelManager()
# 主通道:传统企业级服务
primary_channel = EnterpriseSMSChannel(
channel_id="primary",
priority=1,
config={
"api_key": "xxx", "secret": "xxx"}
)
# 备用通道:轻量化服务(如Spug等支持个人认证的平台)
backup_channel = SpugSMSChannel(
channel_id="backup",
priority=2,
config={
"template_id": "A27Lxxxx", "app_name": "MyApp"}
)
# 应急通道:其他可用服务
emergency_channel = OtherSMSChannel(
channel_id="emergency",
priority=3,
config={
"endpoint": "xxx", "token": "xxx"}
)
manager.add_channel(primary_channel)
manager.add_channel(backup_channel)
manager.add_channel(emergency_channel)
return manager
成本效益分析
不同类型短信服务的成本对比:
服务类型 | 初始成本 | 单条费用 | 认证要求 | 适用场景 |
---|---|---|---|---|
传统企业服务 | 高 | 0.03-0.08元 | 企业资质 | 大型企业应用 |
轻量化平台 | 低 | 0.05-0.10元 | 个人可用 | 中小型项目 |
聚合服务 | 中 | 0.04-0.06元 | 企业资质 | 中等规模应用 |
注:Spug等轻量化平台在成本和易用性之间取得了较好平衡
总结
本文从企业级应用的角度,详细介绍了短信验证码服务的架构设计和技术实现。通过多通道冗余、异步处理、多层防刷等技术手段,构建了一个高可用、高并发、安全可靠的短信服务系统。
在实际应用中,需要根据业务规模和技术栈选择合适的实现方案,重点关注系统的稳定性、安全性和可扩展性。同时,完善的监控告警机制对于保障服务质量至关重要。