程序员必备的十大技能(进阶版)之网络与高并发原理(四)

简介: 教程来源 https://bncne.cn/ 本节详解高并发四大核心设计模式:①生产者-消费者(基于BlockingQueue与高性能Disruptor);②请求合并(批量处理降低IO压力);③背压控制(RxJava与信号量/滑动窗口限流);④线程封闭(ThreadLocal安全复用资源)。兼顾性能、可靠性与可维护性。

五、高并发系统设计模式

5.1 生产者-消费者模式

public class ProducerConsumerPattern {

    // 使用BlockingQueue实现
    public static class BlockingQueueDemo {
        private final BlockingQueue<Order> queue = new LinkedBlockingQueue<>(10000);

        // 生产者
        class Producer implements Runnable {
            @Override
            public void run() {
                while (true) {
                    Order order = createOrder();
                    // 阻塞直到有空位
                    queue.put(order);
                    // 非阻塞:queue.offer(order, 100, TimeUnit.MILLISECONDS);
                }
            }
        }

        // 消费者
        class Consumer implements Runnable {
            @Override
            public void run() {
                while (true) {
                    Order order = queue.take();  // 阻塞直到有数据
                    processOrder(order);
                }
            }
        }

        // 批量消费(提高吞吐量)
        class BatchConsumer implements Runnable {
            private final List<Order> buffer = new ArrayList<>();
            private final int batchSize = 100;

            @Override
            public void run() {
                while (true) {
                    // 使用drainTo批量获取
                    queue.drainTo(buffer, batchSize);
                    if (!buffer.isEmpty()) {
                        processBatch(buffer);
                        buffer.clear();
                    } else {
                        Thread.sleep(10);
                    }
                }
            }
        }
    }

    // 使用Disruptor(无锁环形缓冲区,高性能)
    public static class DisruptorDemo {
        // Disruptor基于RingBuffer,预分配内存,避免GC
        // 单个生产者可达千万级TPS

        // 定义事件
        static class OrderEvent {
            private long orderId;
            private long userId;
            private BigDecimal amount;
            // 对象复用(避免GC)

            void set(long orderId, long userId, BigDecimal amount) {
                this.orderId = orderId;
                this.userId = userId;
                this.amount = amount;
            }
        }

        // 事件工厂(预分配)
        static class OrderEventFactory implements EventFactory<OrderEvent> {
            @Override
            public OrderEvent newInstance() {
                return new OrderEvent();
            }
        }

        // 事件处理器
        static class OrderEventHandler implements EventHandler<OrderEvent> {
            @Override
            public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) {
                // 处理订单
                processOrder(event);
            }
        }

        public void start() {
            // RingBuffer大小(必须是2的幂)
            int bufferSize = 1024 * 1024;

            Disruptor<OrderEvent> disruptor = new Disruptor<>(
                new OrderEventFactory(),
                bufferSize,
                DaemonThreadFactory.INSTANCE,
                ProducerType.MULTI,      // 多生产者
                new BusySpinWaitStrategy() // 忙等待策略(低延迟)
            );

            disruptor.handleEventsWith(new OrderEventHandler());
            disruptor.start();

            RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();

            // 发布事件
            long sequence = ringBuffer.next();
            try {
                OrderEvent event = ringBuffer.get(sequence);
                event.set(123L, 456L, new BigDecimal("99.99"));
            } finally {
                ringBuffer.publish(sequence);
            }
        }
    }
}

5.2 请求合并(Request Coalescing)

@Component
public class RequestCoalescingService {

    // 将多个相同请求合并为一个批处理请求
    private final BlockingQueue<RequestPromise> queue = new LinkedBlockingQueue<>();
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

    static class RequestPromise {
        final Long userId;
        final CompletableFuture<UserInfo> future;

        RequestPromise(Long userId, CompletableFuture<UserInfo> future) {
            this.userId = userId;
            this.future = future;
        }
    }

    @PostConstruct
    public void init() {
        // 定时批量处理(每10ms或积累到100个)
        scheduler.scheduleAtFixedRate(this::processBatch, 10, 10, TimeUnit.MILLISECONDS);
    }

    // 用户调用接口
    public CompletableFuture<UserInfo> getUserInfo(Long userId) {
        CompletableFuture<UserInfo> future = new CompletableFuture<>();
        queue.offer(new RequestPromise(userId, future));
        return future;
    }

    private void processBatch() {
        List<RequestPromise> batch = new ArrayList<>();
        queue.drainTo(batch, 100);  // 最多100个

        if (batch.isEmpty()) return;

        // 提取所有userId
        Set<Long> userIds = batch.stream()
            .map(rp -> rp.userId)
            .collect(Collectors.toSet());

        // 批量查询数据库(一次查询获取多个用户)
        Map<Long, UserInfo> userMap = userService.batchGetUsers(userIds);

        // 返回结果
        for (RequestPromise rp : batch) {
            UserInfo user = userMap.get(rp.userId);
            if (user != null) {
                rp.future.complete(user);
            } else {
                rp.future.completeExceptionally(new UserNotFoundException());
            }
        }
    }
}

5.3 背压(Backpressure)处理

public class BackpressureHandling {

    // 使用RxJava实现背压
    public void rxJavaBackpressure() {
        Flowable.range(1, 1_000_000)
            .onBackpressureBuffer(10000)           // 缓冲10000个
            // .onBackpressureDrop()               // 丢弃
            // .onBackpressureLatest()              // 只保留最新
            .observeOn(Schedulers.computation())
            .subscribe(
                value -> processSlowly(value),
                error -> log.error("Error", error),
                () -> log.info("Complete")
            );
    }

    // 自定义背压实现(速率限制)
    public static class RateLimitingProcessor {
        private final Semaphore semaphore;  // 信号量控制并发

        public RateLimitingProcessor(int maxConcurrent) {
            this.semaphore = new Semaphore(maxConcurrent);
        }

        public <T> CompletableFuture<T> process(Supplier<T> task) {
            CompletableFuture<T> future = new CompletableFuture<>();

            // 异步处理,等待许可
            CompletableFuture.runAsync(() -> {
                try {
                    semaphore.acquire();  // 背压:获取不到许可时阻塞
                    try {
                        T result = task.get();
                        future.complete(result);
                    } finally {
                        semaphore.release();
                    }
                } catch (InterruptedException e) {
                    future.completeExceptionally(e);
                    Thread.currentThread().interrupt();
                } catch (Exception e) {
                    future.completeExceptionally(e);
                }
            });

            return future;
        }

        // 使用滑动窗口控制速率
        public static class SlidingWindowRateLimiter {
            private final int maxRequests;
            private final long windowMillis;
            private final Queue<Long> timestamps = new ConcurrentLinkedQueue<>();

            public SlidingWindowRateLimiter(int maxRequests, long windowMillis) {
                this.maxRequests = maxRequests;
                this.windowMillis = windowMillis;
            }

            public synchronized boolean tryAcquire() {
                long now = System.currentTimeMillis();
                // 清理过期的请求
                while (!timestamps.isEmpty() && now - timestamps.peek() > windowMillis) {
                    timestamps.poll();
                }

                if (timestamps.size() < maxRequests) {
                    timestamps.offer(now);
                    return true;
                }
                return false;
            }
        }
    }
}

5.4 线程封闭与ThreadLocal

public class ThreadLocalUsage {

    // 1. 数据库连接管理
    public class ConnectionManager {
        private static final ThreadLocal<Connection> connectionHolder = new ThreadLocal<>() {
            @Override
            protected Connection initialValue() {
                return createConnection();
            }
        };

        public static Connection getConnection() {
            return connectionHolder.get();
        }

        public static void removeConnection() {
            connectionHolder.remove();  // 防止内存泄漏(线程池场景)
        }
    }

    // 2. 用户上下文传递
    public class UserContext {
        private static final ThreadLocal<User> currentUser = new ThreadLocal<>();

        public static void setCurrentUser(User user) {
            currentUser.set(user);
        }

        public static User getCurrentUser() {
            return currentUser.get();
        }

        // 子线程继承(InheritableThreadLocal)
        private static final InheritableThreadLocal<RequestId> requestId = new InheritableThreadLocal<>();

        // 线程池传递(使用阿里TransmittableThreadLocal)
        // TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();
        // TtlRunnable.get(originalRunnable) 包装任务
    }

    // 3. SimpleDateFormat线程安全问题(ThreadLocal包装)
    public class DateUtil {
        private static final ThreadLocal<SimpleDateFormat> dateFormatHolder = ThreadLocal.withInitial(
            () -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        );

        public static String format(Date date) {
            return dateFormatHolder.get().format(date);
        }
    }

    // 4. 性能计数器(每个线程独立计数)
    public class PerThreadCounter {
        private static final ThreadLocal<LongAdder> counter = ThreadLocal.withInitial(LongAdder::new);

        public static void increment() {
            counter.get().increment();
        }

        public static long getAndReset() {
            long value = counter.get().sum();
            counter.get().reset();
            return value;
        }
    }
}

来源:
https://yvyus.cn/

相关文章
|
8天前
|
人工智能 开发工具 iOS开发
Claude Code 新手完全上手指南:安装、国产模型配置与常用命令全解
Claude Code 是一款运行在终端环境中的 AI 编程助手,能够直接在命令行中完成代码生成、项目分析、文件修改、命令执行、Git 管理等开发全流程工作。它最大的特点是**任务驱动、终端原生、轻量高效、多模型兼容**,无需图形界面、不依赖 IDE 插件,能够深度融入开发者日常工作流。
2970 7
|
10天前
|
Shell API 开发工具
Claude Code 快速上手指南(新手友好版)
AI编程工具卷疯啦!Claude Code凭借任务驱动+终端原生的特性,成了开发者的效率搭子。本文从安装、登录、切换国产模型到常用命令,手把手带新手快速上手,全程避坑,30分钟独立用起来。
3071 20
|
23天前
|
人工智能 JSON 供应链
畅用7个月无影 JVS Claw |手把手教你把JVS改造成「科研与产业地理情报可视化大师」
LucianaiB分享零成本畅用JVS Claw教程(学生认证享7个月使用权),并开源GeoMind项目——将JVS改造为科研与产业地理情报可视化AI助手,支持飞书文档解析、地理编码与腾讯地图可视化,助力产业关系图谱构建。
23567 15
畅用7个月无影 JVS Claw |手把手教你把JVS改造成「科研与产业地理情报可视化大师」
|
4天前
|
人工智能 Linux BI
国内用 Claude Code 终于不用翻墙了:一行命令搞定,自动接 DeepSeek
JeecgBoot AI专题研究 一键脚本:Claude Code + JeecgBoot Skills + DeepSeek 全平台接入 一行命令装好 Claude Code + JeecgBoot Skills + DeepSeek 接入,无需翻墙使用 Claude Code,支持 Wind
1956 3
国内用 Claude Code 终于不用翻墙了:一行命令搞定,自动接 DeepSeek
|
10天前
|
人工智能 JSON BI
DeepSeek V4-Pro 接入 Claude Code 完全实战:体验、测试与关键避坑指南
Claude Code 作为当前主流的 AI 编程辅助工具,凭借强大的代码理解、工程执行与自动化能力深受开发者喜爱,但原生模型的使用成本相对较高。为了在保持能力的同时进一步降低开销,不少开发者开始寻找兼容度高、价格更友好的替代模型。DeepSeek V4 系列的发布带来了新的选择,该系列包含 V4-Pro 与 V4-Flash 两款模型,并提供了与 Anthropic 完全兼容的 API 接口,理论上只需简单修改配置,即可让 Claude Code 无缝切换为 DeepSeek 引擎。
2464 3
|
8天前
|
人工智能 安全 开发工具
Claude Code 官方工作原理与使用指南
Claude Code 不是传统代码补全工具,而是 Anthropic 推出的终端 AI 代理,具备代理循环、双驱动架构(模型+工具)、全局项目感知、6 种权限模式等核心能力,本文基于官方文档系统解析其工作原理与高效使用技巧。
1342 0
|
8天前
|
存储 Linux iOS开发
【2026最新】MarkText中文版Markdown编辑器使用图解(附安装包)
MarkText是一款免费开源、跨平台的Markdown编辑器,主打所见即所得实时预览,支持Windows/macOS/Linux。内置数学公式、流程图、代码高亮、多主题及PDF/HTML导出,是Typora的轻量免费替代首选。(239字)