五、高并发系统设计模式
5.1 生产者-消费者模式
public class ProducerConsumerPattern {
// 使用BlockingQueue实现
public static class BlockingQueueDemo {
private final BlockingQueue<Order> queue = new LinkedBlockingQueue<>(10000);
// 生产者
class Producer implements Runnable {
@Override
public void run() {
while (true) {
Order order = createOrder();
// 阻塞直到有空位
queue.put(order);
// 非阻塞:queue.offer(order, 100, TimeUnit.MILLISECONDS);
}
}
}
// 消费者
class Consumer implements Runnable {
@Override
public void run() {
while (true) {
Order order = queue.take(); // 阻塞直到有数据
processOrder(order);
}
}
}
// 批量消费(提高吞吐量)
class BatchConsumer implements Runnable {
private final List<Order> buffer = new ArrayList<>();
private final int batchSize = 100;
@Override
public void run() {
while (true) {
// 使用drainTo批量获取
queue.drainTo(buffer, batchSize);
if (!buffer.isEmpty()) {
processBatch(buffer);
buffer.clear();
} else {
Thread.sleep(10);
}
}
}
}
}
// 使用Disruptor(无锁环形缓冲区,高性能)
public static class DisruptorDemo {
// Disruptor基于RingBuffer,预分配内存,避免GC
// 单个生产者可达千万级TPS
// 定义事件
static class OrderEvent {
private long orderId;
private long userId;
private BigDecimal amount;
// 对象复用(避免GC)
void set(long orderId, long userId, BigDecimal amount) {
this.orderId = orderId;
this.userId = userId;
this.amount = amount;
}
}
// 事件工厂(预分配)
static class OrderEventFactory implements EventFactory<OrderEvent> {
@Override
public OrderEvent newInstance() {
return new OrderEvent();
}
}
// 事件处理器
static class OrderEventHandler implements EventHandler<OrderEvent> {
@Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) {
// 处理订单
processOrder(event);
}
}
public void start() {
// RingBuffer大小(必须是2的幂)
int bufferSize = 1024 * 1024;
Disruptor<OrderEvent> disruptor = new Disruptor<>(
new OrderEventFactory(),
bufferSize,
DaemonThreadFactory.INSTANCE,
ProducerType.MULTI, // 多生产者
new BusySpinWaitStrategy() // 忙等待策略(低延迟)
);
disruptor.handleEventsWith(new OrderEventHandler());
disruptor.start();
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
// 发布事件
long sequence = ringBuffer.next();
try {
OrderEvent event = ringBuffer.get(sequence);
event.set(123L, 456L, new BigDecimal("99.99"));
} finally {
ringBuffer.publish(sequence);
}
}
}
}
5.2 请求合并(Request Coalescing)
@Component
public class RequestCoalescingService {
// 将多个相同请求合并为一个批处理请求
private final BlockingQueue<RequestPromise> queue = new LinkedBlockingQueue<>();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
static class RequestPromise {
final Long userId;
final CompletableFuture<UserInfo> future;
RequestPromise(Long userId, CompletableFuture<UserInfo> future) {
this.userId = userId;
this.future = future;
}
}
@PostConstruct
public void init() {
// 定时批量处理(每10ms或积累到100个)
scheduler.scheduleAtFixedRate(this::processBatch, 10, 10, TimeUnit.MILLISECONDS);
}
// 用户调用接口
public CompletableFuture<UserInfo> getUserInfo(Long userId) {
CompletableFuture<UserInfo> future = new CompletableFuture<>();
queue.offer(new RequestPromise(userId, future));
return future;
}
private void processBatch() {
List<RequestPromise> batch = new ArrayList<>();
queue.drainTo(batch, 100); // 最多100个
if (batch.isEmpty()) return;
// 提取所有userId
Set<Long> userIds = batch.stream()
.map(rp -> rp.userId)
.collect(Collectors.toSet());
// 批量查询数据库(一次查询获取多个用户)
Map<Long, UserInfo> userMap = userService.batchGetUsers(userIds);
// 返回结果
for (RequestPromise rp : batch) {
UserInfo user = userMap.get(rp.userId);
if (user != null) {
rp.future.complete(user);
} else {
rp.future.completeExceptionally(new UserNotFoundException());
}
}
}
}
5.3 背压(Backpressure)处理
public class BackpressureHandling {
// 使用RxJava实现背压
public void rxJavaBackpressure() {
Flowable.range(1, 1_000_000)
.onBackpressureBuffer(10000) // 缓冲10000个
// .onBackpressureDrop() // 丢弃
// .onBackpressureLatest() // 只保留最新
.observeOn(Schedulers.computation())
.subscribe(
value -> processSlowly(value),
error -> log.error("Error", error),
() -> log.info("Complete")
);
}
// 自定义背压实现(速率限制)
public static class RateLimitingProcessor {
private final Semaphore semaphore; // 信号量控制并发
public RateLimitingProcessor(int maxConcurrent) {
this.semaphore = new Semaphore(maxConcurrent);
}
public <T> CompletableFuture<T> process(Supplier<T> task) {
CompletableFuture<T> future = new CompletableFuture<>();
// 异步处理,等待许可
CompletableFuture.runAsync(() -> {
try {
semaphore.acquire(); // 背压:获取不到许可时阻塞
try {
T result = task.get();
future.complete(result);
} finally {
semaphore.release();
}
} catch (InterruptedException e) {
future.completeExceptionally(e);
Thread.currentThread().interrupt();
} catch (Exception e) {
future.completeExceptionally(e);
}
});
return future;
}
// 使用滑动窗口控制速率
public static class SlidingWindowRateLimiter {
private final int maxRequests;
private final long windowMillis;
private final Queue<Long> timestamps = new ConcurrentLinkedQueue<>();
public SlidingWindowRateLimiter(int maxRequests, long windowMillis) {
this.maxRequests = maxRequests;
this.windowMillis = windowMillis;
}
public synchronized boolean tryAcquire() {
long now = System.currentTimeMillis();
// 清理过期的请求
while (!timestamps.isEmpty() && now - timestamps.peek() > windowMillis) {
timestamps.poll();
}
if (timestamps.size() < maxRequests) {
timestamps.offer(now);
return true;
}
return false;
}
}
}
}
5.4 线程封闭与ThreadLocal
public class ThreadLocalUsage {
// 1. 数据库连接管理
public class ConnectionManager {
private static final ThreadLocal<Connection> connectionHolder = new ThreadLocal<>() {
@Override
protected Connection initialValue() {
return createConnection();
}
};
public static Connection getConnection() {
return connectionHolder.get();
}
public static void removeConnection() {
connectionHolder.remove(); // 防止内存泄漏(线程池场景)
}
}
// 2. 用户上下文传递
public class UserContext {
private static final ThreadLocal<User> currentUser = new ThreadLocal<>();
public static void setCurrentUser(User user) {
currentUser.set(user);
}
public static User getCurrentUser() {
return currentUser.get();
}
// 子线程继承(InheritableThreadLocal)
private static final InheritableThreadLocal<RequestId> requestId = new InheritableThreadLocal<>();
// 线程池传递(使用阿里TransmittableThreadLocal)
// TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();
// TtlRunnable.get(originalRunnable) 包装任务
}
// 3. SimpleDateFormat线程安全问题(ThreadLocal包装)
public class DateUtil {
private static final ThreadLocal<SimpleDateFormat> dateFormatHolder = ThreadLocal.withInitial(
() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
);
public static String format(Date date) {
return dateFormatHolder.get().format(date);
}
}
// 4. 性能计数器(每个线程独立计数)
public class PerThreadCounter {
private static final ThreadLocal<LongAdder> counter = ThreadLocal.withInitial(LongAdder::new);
public static void increment() {
counter.get().increment();
}
public static long getAndReset() {
long value = counter.get().sum();
counter.get().reset();
return value;
}
}
}