程序员必备的十大技能(进阶版)之网络与高并发原理(五)

简介: 教程来源 https://tmywi.cn/ 本节系统讲解高并发场景下的操作系统与JVM深度调优:涵盖Linux网络内核参数(BBR拥塞控制、缓冲区、连接队列)、进程资源限制、JVM G1GC优化及性能压测与瓶颈定位方法,并以百万级WebSocket推送系统为实战案例,体现无锁化、异步非阻塞等高性能设计原则。

六、操作系统内核参数调优

6.1 Linux网络参数优化

#!/bin/bash
# /etc/sysctl.conf 高并发服务器配置

# 文件句柄限制
fs.file-max = 1000000
fs.nr_open = 1000000

# 网络核心参数
net.core.somaxconn = 32768              # listen队列最大长度
net.core.netdev_max_backlog = 5000      # 网卡队列长度
net.core.rmem_default = 87380           # 默认接收缓冲区
net.core.wmem_default = 87380           # 默认发送缓冲区
net.core.rmem_max = 134217728           # 最大接收缓冲区(128MB)
net.core.wmem_max = 134217728           # 最大发送缓冲区(128MB)

# TCP参数
net.ipv4.tcp_max_syn_backlog = 8192     # SYN队列长度
net.ipv4.tcp_syncookies = 1             # SYN Flood防护
net.ipv4.tcp_tw_reuse = 1               # TIME_WAIT复用
net.ipv4.tcp_fin_timeout = 30           # FIN_WAIT2超时
net.ipv4.tcp_keepalive_time = 600       # KeepAlive探测间隔
net.ipv4.tcp_keepalive_intvl = 30       # 探测重试间隔
net.ipv4.tcp_keepalive_probes = 3       # 探测次数
net.ipv4.tcp_slow_start_after_idle = 0  # 禁用空闲后慢启动

# 拥塞控制
net.ipv4.tcp_congestion_control = bbr
net.core.default_qdisc = fq

# 端口范围
net.ipv4.ip_local_port_range = 1024 65535

# 内存参数
net.ipv4.tcp_mem = 786432 1048576 1572864    # 4GB内存配置
net.ipv4.tcp_rmem = 4096 87380 134217728     # 读缓冲区
net.ipv4.tcp_wmem = 4096 65536 134217728     # 写缓冲区

6.2 用户进程限制

# /etc/security/limits.conf
* soft nofile 1000000
* hard nofile 1000000
* soft nproc 100000
* hard nproc 100000

# systemd服务限制(如果使用systemd)
[Service]
LimitNOFILE=1000000
LimitNPROC=100000

6.3 JVM参数调优

# 高并发服务JVM参数示例
# -Xms8G -Xmx8G                     # 堆内存8GB
# -Xmn3G                            # 新生代3GB
# -XX:MetaspaceSize=512M            # 元空间初始大小
# -XX:MaxMetaspaceSize=512M
# -XX:+UseG1GC                      # G1垃圾回收器
# -XX:MaxGCPauseMillis=100          # 目标GC暂停时间
# -XX:G1HeapRegionSize=16M          # G1 Region大小
# -XX:ParallelGCThreads=16          # GC并行线程数
# -XX:ConcGCThreads=4               # 并发GC线程数
# -XX:+UseStringDeduplication       # 字符串去重
# -XX:+DisableExplicitGC            # 禁用显式GC
# -XX:+HeapDumpOnOutOfMemoryError   # OOM时dump堆
# -XX:HeapDumpPath=/logs/heap.hprof

# 网络相关
# -Djava.net.preferIPv4Stack=true
# -Djava.net.preferIPv6Addresses=false

# 高性能模式
# -XX:+UseBiasedLocking             # 偏向锁
# -XX:+UseCompressedOops            # 压缩指针
# -XX:+AlwaysPreTouch               # 启动时预分配内存
# -XX:+UseNUMA                      # NUMA感知
# -XX:AutoBoxCacheMax=20000         # 自动装箱缓存

七、高并发性能测试

7.1 压测工具使用

// JMeter脚本生成(或使用wrk、ab、siege)
// wrk -t16 -c2000 -d30s --latency http://localhost:8080/api

// 自定义压测代码
public class LoadTest {

    public static void main(String[] args) throws InterruptedException {
        int threadCount = 100;
        int requestPerThread = 10000;
        CountDownLatch latch = new CountDownLatch(threadCount);
        AtomicLong totalTime = new AtomicLong();
        AtomicLong successCount = new AtomicLong();
        AtomicLong failCount = new AtomicLong();

        // 预热
        for (int i = 0; i < 100; i++) {
            sendRequest();
        }

        long startTime = System.currentTimeMillis();

        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                for (int j = 0; j < requestPerThread; j++) {
                    long requestStart = System.nanoTime();
                    boolean success = sendRequest();
                    long requestTime = System.nanoTime() - requestStart;
                    totalTime.addAndGet(requestTime);

                    if (success) {
                        successCount.incrementAndGet();
                    } else {
                        failCount.incrementAndGet();
                    }
                }
                latch.countDown();
            }).start();
        }

        latch.await();
        long totalDuration = System.currentTimeMillis() - startTime;

        // 输出结果
        long totalRequests = successCount.get() + failCount.get();
        double avgLatency = totalTime.get() / 1_000_000.0 / totalRequests;
        double qps = totalRequests * 1000.0 / totalDuration;

        System.out.printf("Total Requests: %d\n", totalRequests);
        System.out.printf("Success: %d, Fail: %d\n", successCount.get(), failCount.get());
        System.out.printf("QPS: %.2f\n", qps);
        System.out.printf("Avg Latency: %.2f ms\n", avgLatency);
        System.out.printf("Total Duration: %d ms\n", totalDuration);
    }
}

7.2 性能瓶颈定位

# 1. CPU分析
top -H -p <pid>                     # 查看线程CPU使用率
perf top -p <pid>                   # 查看热点函数
perf record -g -p <pid> -- sleep 30 # 采样30秒
perf report                         # 生成报告

# 2. 内存分析
jmap -heap <pid>                    # 查看堆内存分布
jmap -histo:live <pid> | head -20   # 查看存活对象
jstat -gcutil <pid> 1000            # GC统计

# 3. 线程分析
jstack <pid> > thread_dump.txt      # 线程dump
# 统计线程状态
cat thread_dump.txt | grep "java.lang.Thread.State" | sort | uniq -c

# 4. 网络分析
netstat -antp | grep <port> | wc -l # 连接数统计
ss -tni | grep -E "rtt|cwnd"        # TCP详细信息

# 5. 火焰图
# 使用async-profiler
./profiler.sh start <pid>
./profiler.sh stop <pid> -o flamegraph > flamegraph.svg

八、实战案例:百万级长连接推送系统

8.1 架构设计

// 基于Netty的WebSocket推送服务
@SpringBootApplication
public class PushServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(PushServerApplication.class, args);
    }

    @Bean
    public NettyWebSocketServer nettyWebSocketServer() {
        return new NettyWebSocketServer(8080);
    }
}

public class NettyWebSocketServer {

    private final int port;
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;

    // 连接管理(本地缓存 + Redis集群)
    private final ConnectionManager connectionManager;

    public NettyWebSocketServer(int port) {
        this.port = port;
        this.connectionManager = new ConnectionManager();
    }

    public void start() {
        bossGroup = new NioEventLoopGroup(1);
        workerGroup = new NioEventLoopGroup(0); // 自动设置(CPU核心数 * 2)

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 32768)
                .option(ChannelOption.SO_REUSEADDR, true)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childOption(ChannelOption.SO_RCVBUF, 65536)
                .childOption(ChannelOption.SO_SNDBUF, 65536)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ChannelPipeline pipeline = ch.pipeline();
                        // WebSocket编解码器
                        pipeline.addLast(new HttpServerCodec());
                        pipeline.addLast(new HttpObjectAggregator(65536));
                        pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536));
                        pipeline.addLast(new WebSocketFrameHandler(connectionManager));

                        // 心跳检测
                        pipeline.addLast(new IdleStateHandler(60, 0, 0));
                        pipeline.addLast(new HeartbeatHandler());
                    }
                });

            ChannelFuture future = bootstrap.bind(port).sync();
            log.info("Push server started on port {}", port);
            future.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            log.error("Server interrupted", e);
        } finally {
            shutdown();
        }
    }

    public void shutdown() {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }

    // 连接管理器(支持亿级连接,使用分片)
    static class ConnectionManager {
        // 使用ConcurrentHashMap分片减少竞争
        private final ConcurrentHashMap<String, Channel>[] segments;
        private final int segmentCount = 128;

        @SuppressWarnings("unchecked")
        ConnectionManager() {
            segments = new ConcurrentHashMap[segmentCount];
            for (int i = 0; i < segmentCount; i++) {
                segments[i] = new ConcurrentHashMap<>();
            }
        }

        private int getSegmentIndex(String userId) {
            return Math.abs(userId.hashCode() % segmentCount);
        }

        public void add(String userId, Channel channel) {
            segments[getSegmentIndex(userId)].put(userId, channel);
        }

        public Channel get(String userId) {
            return segments[getSegmentIndex(userId)].get(userId);
        }

        public void remove(String userId) {
            segments[getSegmentIndex(userId)].remove(userId);
        }

        // 广播消息(高效遍历)
        public void broadcast(Object message) {
            for (ConcurrentHashMap<String, Channel> segment : segments) {
                for (Channel channel : segment.values()) {
                    if (channel.isActive()) {
                        channel.writeAndFlush(message);
                    }
                }
            }
        }
    }
}

// WebSocket处理器
class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> {

    private final ConnectionManager connectionManager;
    private String userId;

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        log.info("New connection: {}", ctx.channel().remoteAddress());
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) {
        if (frame instanceof TextWebSocketFrame) {
            String text = ((TextWebSocketFrame) frame).text();
            handleMessage(ctx, text);
        }
    }

    private void handleMessage(ChannelHandlerContext ctx, String message) {
        // 解析注册消息
        JsonObject json = JsonParser.parseString(message).getAsJsonObject();
        String type = json.get("type").getAsString();

        if ("register".equals(type)) {
            userId = json.get("userId").getAsString();
            connectionManager.add(userId, ctx.channel());
            log.info("User registered: {}", userId);
        } else if ("ping".equals(type)) {
            // 心跳响应
            ctx.writeAndFlush(new TextWebSocketFrame("{\"type\":\"pong\"}"));
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        if (userId != null) {
            connectionManager.remove(userId);
            log.info("User disconnected: {}", userId);
        }
    }
}

// 推送服务
@RestController
public class PushController {

    @Autowired
    private ConnectionManager connectionManager;

    @PostMapping("/push/{userId}")
    public ResponseEntity<String> pushToUser(@PathVariable String userId, @RequestBody PushRequest request) {
        Channel channel = connectionManager.get(userId);
        if (channel != null && channel.isActive()) {
            String message = new TextWebSocketFrame(JSON.toJSONString(request)).toString();
            channel.writeAndFlush(message);
            return ResponseEntity.ok("success");
        }
        return ResponseEntity.status(404).body("user offline");
    }

    @PostMapping("/push/batch")
    public ResponseEntity<String> pushBatch(@RequestBody BatchPushRequest request) {
        List<String> userIds = request.getUserIds();
        String message = JSON.toJSONString(request.getMessage());

        // 批量推送(使用CompletableFuture并行)
        List<CompletableFuture<Void>> futures = userIds.stream()
            .map(userId -> CompletableFuture.runAsync(() -> {
                Channel channel = connectionManager.get(userId);
                if (channel != null && channel.isActive()) {
                    channel.writeAndFlush(new TextWebSocketFrame(message));
                }
            }))
            .collect(Collectors.toList());

        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
            .join();

        return ResponseEntity.ok("success");
    }
}

高性能设计原则

/*
 * 1. 无锁化:使用CAS代替锁,减少上下文切换
 * 2. 缓存友好:数据结构对齐,避免伪共享
 * 3. 异步非阻塞:IO密集型任务使用异步
 * 4. 批量处理:合并请求,减少系统调用
 * 5. 内存池:复用对象,减少GC
 * 6. 零拷贝:sendfile、mmap
 * 7. 预先计算:缓存热点数据
 * 8. 懒加载:延迟初始化
 * 9. 短路原则:快速失败
 * 10. 降级熔断:保护核心服务
 */

来源:
https://yyvgt.cn/

相关文章
|
8天前
|
人工智能 开发工具 iOS开发
Claude Code 新手完全上手指南:安装、国产模型配置与常用命令全解
Claude Code 是一款运行在终端环境中的 AI 编程助手,能够直接在命令行中完成代码生成、项目分析、文件修改、命令执行、Git 管理等开发全流程工作。它最大的特点是**任务驱动、终端原生、轻量高效、多模型兼容**,无需图形界面、不依赖 IDE 插件,能够深度融入开发者日常工作流。
2967 7
|
10天前
|
Shell API 开发工具
Claude Code 快速上手指南(新手友好版)
AI编程工具卷疯啦!Claude Code凭借任务驱动+终端原生的特性,成了开发者的效率搭子。本文从安装、登录、切换国产模型到常用命令,手把手带新手快速上手,全程避坑,30分钟独立用起来。
3068 20
|
23天前
|
人工智能 JSON 供应链
畅用7个月无影 JVS Claw |手把手教你把JVS改造成「科研与产业地理情报可视化大师」
LucianaiB分享零成本畅用JVS Claw教程(学生认证享7个月使用权),并开源GeoMind项目——将JVS改造为科研与产业地理情报可视化AI助手,支持飞书文档解析、地理编码与腾讯地图可视化,助力产业关系图谱构建。
23567 15
畅用7个月无影 JVS Claw |手把手教你把JVS改造成「科研与产业地理情报可视化大师」
|
4天前
|
人工智能 Linux BI
国内用 Claude Code 终于不用翻墙了:一行命令搞定,自动接 DeepSeek
JeecgBoot AI专题研究 一键脚本:Claude Code + JeecgBoot Skills + DeepSeek 全平台接入 一行命令装好 Claude Code + JeecgBoot Skills + DeepSeek 接入,无需翻墙使用 Claude Code,支持 Wind
1953 3
国内用 Claude Code 终于不用翻墙了:一行命令搞定,自动接 DeepSeek
|
10天前
|
人工智能 JSON BI
DeepSeek V4-Pro 接入 Claude Code 完全实战:体验、测试与关键避坑指南
Claude Code 作为当前主流的 AI 编程辅助工具,凭借强大的代码理解、工程执行与自动化能力深受开发者喜爱,但原生模型的使用成本相对较高。为了在保持能力的同时进一步降低开销,不少开发者开始寻找兼容度高、价格更友好的替代模型。DeepSeek V4 系列的发布带来了新的选择,该系列包含 V4-Pro 与 V4-Flash 两款模型,并提供了与 Anthropic 完全兼容的 API 接口,理论上只需简单修改配置,即可让 Claude Code 无缝切换为 DeepSeek 引擎。
2460 3
|
8天前
|
人工智能 安全 开发工具
Claude Code 官方工作原理与使用指南
Claude Code 不是传统代码补全工具,而是 Anthropic 推出的终端 AI 代理,具备代理循环、双驱动架构(模型+工具)、全局项目感知、6 种权限模式等核心能力,本文基于官方文档系统解析其工作原理与高效使用技巧。
1339 0
|
8天前
|
存储 Linux iOS开发
【2026最新】MarkText中文版Markdown编辑器使用图解(附安装包)
MarkText是一款免费开源、跨平台的Markdown编辑器,主打所见即所得实时预览,支持Windows/macOS/Linux。内置数学公式、流程图、代码高亮、多主题及PDF/HTML导出,是Typora的轻量免费替代首选。(239字)