百万级用户社交系统架构设计与实战经验
下面我将分享一套经过实战检验的社交系统架构设计,这套系统成功支撑了从1到100万用户的发展历程,并经历了6次流量洪峰的考验,覆盖App、小程序和公众号全平台。
系统架构全景图
┌───────────────────────────────────────────────────┐
│ 客户端层 │
│ ┌───────────┐ ┌───────────┐ ┌──────────────┐ │
│ │ App │ │ 小程序 │ │ 公众号 │ │
│ └───────────┘ └───────────┘ └──────────────┘ │
└───────────────────────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────┐
│ 接入层 │
│ ┌───────────┐ ┌───────────┐ ┌──────────────┐ │
│ │ API Gateway│ │ 负载均衡 │ │ CDN加速 │ │
│ └───────────┘ └───────────┘ └──────────────┘ │
└───────────────────────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────┐
│ 业务服务层 │
│ ┌───────────┐ ┌───────────┐ ┌──────────────┐ │
│ │ 用户服务 │ │ 内容服务 │ │ 关系服务 │ │
│ └───────────┘ └───────────┘ └──────────────┘ │
│ ┌───────────┐ ┌───────────┐ ┌──────────────┐ │
│ │ 消息服务 │ │ 推送服务 │ │ 搜索服务 │ │
│ └───────────┘ └───────────┘ └──────────────┘ │
└───────────────────────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────┐
│ 数据层 │
│ ┌───────────┐ ┌───────────┐ ┌──────────────┐ │
│ │ MySQL集群 │ │ Redis集群 │ │ MongoDB集群 │ │
│ └───────────┘ └───────────┘ └──────────────┘ │
│ ┌───────────┐ ┌───────────┐ ┌──────────────┐ │
│ │ Elastic │ │ 对象存储 │ │ 时序数据库 │ │
│ │ Search │ │ (OSS/S3) │ │ (InfluxDB) │ │
│ └───────────┘ └───────────┘ └──────────────┘ │
└───────────────────────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────┐
│ 运维监控层 │
│ ┌───────────┐ ┌───────────┐ ┌──────────────┐ │
│ │ 日志系统 │ │ 监控系统 │ │ 告警系统 │ │
│ └───────────┘ └───────────┘ └──────────────┘ │
│ ┌───────────┐ ┌───────────┐ │
│ │ 链路追踪 │ │ 自动化 │ │
│ │ (APM) │ │ 运维 │ │
│ └───────────┘ └───────────┘ │
└───────────────────────────────────────────────────┘
核心架构设计
1. 数据库分库分表策略
// 用户表分片策略示例
public class UserShardingStrategy implements PreciseShardingAlgorithm<Long> {
@Override
public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) {
// 按用户ID取模分片
long userId = shardingValue.getValue();
int size = availableTargetNames.size();
String suffix = "_" + (userId % size);
for (String each : availableTargetNames) {
if (each.endsWith(suffix)) {
return each;
}
}
throw new UnsupportedOperationException();
}
}
// 内容表按时间分片
public class ContentShardingStrategy implements RangeShardingAlgorithm<Date> {
@Override
public Collection<String> doSharding(Collection<String> availableTargetNames,
RangeShardingValue<Date> shardingValue) {
// 按年月分表,如 content_202301
Set<String> result = new LinkedHashSet<>();
Range<Date> range = shardingValue.getValueRange();
Calendar calendar = Calendar.getInstance();
calendar.setTime(range.lowerEndpoint());
int lowerYear = calendar.get(Calendar.YEAR);
int lowerMonth = calendar.get(Calendar.MONTH) + 1;
calendar.setTime(range.upperEndpoint());
int upperYear = calendar.get(Calendar.YEAR);
int upperMonth = calendar.get(Calendar.MONTH) + 1;
for (int year = lowerYear; year <= upperYear; year++) {
int monthStart = (year == lowerYear) ? lowerMonth : 1;
int monthEnd = (year == upperYear) ? upperMonth : 12;
for (int month = monthStart; month <= monthEnd; month++) {
String suffix = String.format("_%04d%02d", year, month);
for (String tableName : availableTargetNames) {
if (tableName.endsWith(suffix)) {
result.add(tableName);
}
}
}
}
return result;
}
}
2. 缓存体系设计
# 多级缓存实现示例
class MultiLevelCache:
def __init__(self):
self.local_cache = LocalCache() # 本地缓存
self.redis = RedisCluster() # Redis集群
self.db = Database() # 数据库
def get(self, key):
# 1. 检查本地缓存
value = self.local_cache.get(key)
if value:
return value
# 2. 检查Redis
value = self.redis.get(key)
if value:
self.local_cache.set(key, value, timeout=60)
return value
# 3. 检查数据库
value = self.db.query(key)
if value:
self.redis.set(key, value, timeout=3600)
self.local_cache.set(key, value, timeout=300)
return value
return None
def set(self, key, value):
# 写入时先更新数据库
self.db.update(key, value)
# 删除缓存(后续读取时会重建)
self.redis.delete(key)
self.local_cache.delete(key)
3. 消息队列削峰填谷
// 使用Kafka处理高并发写入
func ProcessMessage() {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
// 创建消费者
consumer, err := sarama.NewConsumer([]string{
"kafka1:9092", "kafka2:9092"}, config)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
// 订阅主题
partitionConsumer, err := consumer.ConsumePartition("social_events", 0, sarama.OffsetNewest)
if err != nil {
log.Fatal(err)
}
defer partitionConsumer.Close()
// 批量处理消息
batch := make([]*sarama.ConsumerMessage, 0, 100)
batchTimer := time.NewTimer(1 * time.Second)
for {
select {
case msg := <-partitionConsumer.Messages():
batch = append(batch, msg)
if len(batch) >= 100 {
processBatch(batch)
batch = batch[:0]
batchTimer.Reset(1 * time.Second)
}
case <-batchTimer.C:
if len(batch) > 0 {
processBatch(batch)
batch = batch[:0]
}
batchTimer.Reset(1 * time.Second)
case err := <-partitionConsumer.Errors():
log.Println("Consumer error:", err)
}
}
}
func processBatch(messages []*sarama.ConsumerMessage) {
// 批量写入数据库
db.BeginTransaction()
for _, msg := range messages {
event := decodeMessage(msg.Value)
db.Insert(event)
}
db.Commit()
}
关键优化策略
1. 读写分离与CQRS模式
// 命令查询职责分离实现
public class SocialService
{
private readonly ICommandRepository _commandRepo;
private readonly IQueryRepository _queryRepo;
public SocialService(ICommandRepository commandRepo, IQueryRepository queryRepo)
{
_commandRepo = commandRepo;
_queryRepo = queryRepo;
}
// 写操作使用命令模型
public async Task CreatePost(PostCommand command)
{
var post = new Post {
Id = Guid.NewGuid(),
UserId = command.UserId,
Content = command.Content,
CreatedAt = DateTime.UtcNow
};
await _commandRepo.AddPostAsync(post);
// 触发事件更新读模型
await _eventBus.PublishAsync(new PostCreatedEvent(post));
}
// 读操作使用专门的查询模型
public async Task<PagedResult<PostDto>> GetUserPosts(Guid userId, int page, int pageSize)
{
return await _queryRepo.GetPostsByUserAsync(userId, page, pageSize);
}
}
2. 热点数据动态缓存
// 热点数据自动识别与缓存
class HotDataDetector {
constructor() {
this.accessCounts = new Map();
this.hotKeys = new Set();
setInterval(this.analyzeAccessPatterns.bind(this), 60000); // 每分钟分析一次
}
recordAccess(key) {
const count = this.accessCounts.get(key) || 0;
this.accessCounts.set(key, count + 1);
}
analyzeAccessPatterns() {
const sorted = [...this.accessCounts.entries()]
.sort((a, b) => b[1] - a[1])
.slice(0, 100); // 取前100个热点key
this.hotKeys = new Set(sorted.map(([key]) => key));
this.accessCounts.clear();
// 预热缓存
this.preloadHotData(sorted);
}
isHotKey(key) {
return this.hotKeys.has(key);
}
async preloadHotData(hotItems) {
for (const [key] of hotItems) {
const data = await db.get(key);
cache.set(key, data, {
ttl: 3600 });
}
}
}
流量洪峰应对策略
1. 自动弹性伸缩方案
# Kubernetes HPA配置示例
apiVersion: autoscaling/v2beta2
kind: HorizontalPodAutoscaler
metadata:
name: social-api
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: social-api
minReplicas: 3
maxReplicas: 100
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 60
- type: External
external:
metric:
name: requests_per_second
selector:
matchLabels:
service: social-api
target:
type: AverageValue
averageValue: 1000
2. 降级与熔断机制
// 使用Resilience4j实现熔断
CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
.failureRateThreshold(50)
.waitDurationInOpenState(Duration.ofMillis(1000))
.ringBufferSizeInHalfOpenState(2)
.ringBufferSizeInClosedState(4)
.build();
CircuitBreakerRegistry circuitBreakerRegistry =
CircuitBreakerRegistry.of(circuitBreakerConfig);
CircuitBreaker circuitBreaker = circuitBreakerRegistry
.circuitBreaker("socialService");
// 使用熔断器保护服务调用
Supplier<UserProfile> decoratedSupplier = CircuitBreaker
.decorateSupplier(circuitBreaker, () -> userService.getProfile(userId));
Try<UserProfile> result = Try.ofSupplier(decoratedSupplier)
.recover(throwable -> getCachedProfile(userId)); // 降级方案
监控体系实现
# Prometheus监控指标示例
from prometheus_client import start_http_server, Summary, Counter, Gauge
# 定义指标
REQUEST_LATENCY = Summary('request_latency_seconds', 'Request latency in seconds')
REQUEST_COUNT = Counter('request_count', 'Total request count')
ACTIVE_USERS = Gauge('active_users', 'Number of active users')
DB_QUEUE_SIZE = Gauge('db_queue_size', 'Database queue size')
# 应用监控装饰器
@REQUEST_LATENCY.time()
def handle_request(request):
REQUEST_COUNT.inc()
# 业务逻辑处理
user_count = get_active_users()
ACTIVE_USERS.set(user_count)
queue_size = get_db_queue_size()
DB_QUEUE_SIZE.set(queue_size)
return "OK"
# 启动监控服务器
start_http_server(8000)
开源实现建议
GitHub仓库建议结构:
/social-platform
/docs # 架构文档
architecture.md
scaling-guide.md
/src
/api-gateway # 统一入口
/user-service # 用户服务
/content-service # 内容服务
/relation-service # 关系服务
/message-service # 消息服务
/search-service # 搜索服务
/deploy
/kubernetes # K8s部署文件
/terraform # 基础设施代码
/scripts # 运维脚本
/monitoring # 监控配置
.gitignore
README.md
LICENSE
这套架构已经成功支撑了百万级用户的社交平台,主要经验包括:
- 微服务化但不是过度拆分
- 数据层做好分片和隔离
- 缓存策略要动态可调整
- 监控指标要覆盖全链路
- 自动化运维是必须的
- 容灾方案要定期演练
需要完整实现代码或某个模块的详细设计,可以进一步讨论具体需求。