程序员必备的十大技能(进阶版)之网络与高并发原理(五)

简介: 教程来源 https://tmywi.cn/ 本节系统讲解高并发场景下的操作系统与JVM深度调优:涵盖Linux网络内核参数(BBR拥塞控制、缓冲区、连接队列)、进程资源限制、JVM G1GC优化及性能压测与瓶颈定位方法,并以百万级WebSocket推送系统为实战案例,体现无锁化、异步非阻塞等高性能设计原则。

六、操作系统内核参数调优

6.1 Linux网络参数优化

#!/bin/bash
# /etc/sysctl.conf 高并发服务器配置

# 文件句柄限制
fs.file-max = 1000000
fs.nr_open = 1000000

# 网络核心参数
net.core.somaxconn = 32768              # listen队列最大长度
net.core.netdev_max_backlog = 5000      # 网卡队列长度
net.core.rmem_default = 87380           # 默认接收缓冲区
net.core.wmem_default = 87380           # 默认发送缓冲区
net.core.rmem_max = 134217728           # 最大接收缓冲区(128MB)
net.core.wmem_max = 134217728           # 最大发送缓冲区(128MB)

# TCP参数
net.ipv4.tcp_max_syn_backlog = 8192     # SYN队列长度
net.ipv4.tcp_syncookies = 1             # SYN Flood防护
net.ipv4.tcp_tw_reuse = 1               # TIME_WAIT复用
net.ipv4.tcp_fin_timeout = 30           # FIN_WAIT2超时
net.ipv4.tcp_keepalive_time = 600       # KeepAlive探测间隔
net.ipv4.tcp_keepalive_intvl = 30       # 探测重试间隔
net.ipv4.tcp_keepalive_probes = 3       # 探测次数
net.ipv4.tcp_slow_start_after_idle = 0  # 禁用空闲后慢启动

# 拥塞控制
net.ipv4.tcp_congestion_control = bbr
net.core.default_qdisc = fq

# 端口范围
net.ipv4.ip_local_port_range = 1024 65535

# 内存参数
net.ipv4.tcp_mem = 786432 1048576 1572864    # 4GB内存配置
net.ipv4.tcp_rmem = 4096 87380 134217728     # 读缓冲区
net.ipv4.tcp_wmem = 4096 65536 134217728     # 写缓冲区

6.2 用户进程限制

# /etc/security/limits.conf
* soft nofile 1000000
* hard nofile 1000000
* soft nproc 100000
* hard nproc 100000

# systemd服务限制(如果使用systemd)
[Service]
LimitNOFILE=1000000
LimitNPROC=100000

6.3 JVM参数调优

# 高并发服务JVM参数示例
# -Xms8G -Xmx8G                     # 堆内存8GB
# -Xmn3G                            # 新生代3GB
# -XX:MetaspaceSize=512M            # 元空间初始大小
# -XX:MaxMetaspaceSize=512M
# -XX:+UseG1GC                      # G1垃圾回收器
# -XX:MaxGCPauseMillis=100          # 目标GC暂停时间
# -XX:G1HeapRegionSize=16M          # G1 Region大小
# -XX:ParallelGCThreads=16          # GC并行线程数
# -XX:ConcGCThreads=4               # 并发GC线程数
# -XX:+UseStringDeduplication       # 字符串去重
# -XX:+DisableExplicitGC            # 禁用显式GC
# -XX:+HeapDumpOnOutOfMemoryError   # OOM时dump堆
# -XX:HeapDumpPath=/logs/heap.hprof

# 网络相关
# -Djava.net.preferIPv4Stack=true
# -Djava.net.preferIPv6Addresses=false

# 高性能模式
# -XX:+UseBiasedLocking             # 偏向锁
# -XX:+UseCompressedOops            # 压缩指针
# -XX:+AlwaysPreTouch               # 启动时预分配内存
# -XX:+UseNUMA                      # NUMA感知
# -XX:AutoBoxCacheMax=20000         # 自动装箱缓存

七、高并发性能测试

7.1 压测工具使用

// JMeter脚本生成(或使用wrk、ab、siege)
// wrk -t16 -c2000 -d30s --latency http://localhost:8080/api

// 自定义压测代码
public class LoadTest {

    public static void main(String[] args) throws InterruptedException {
        int threadCount = 100;
        int requestPerThread = 10000;
        CountDownLatch latch = new CountDownLatch(threadCount);
        AtomicLong totalTime = new AtomicLong();
        AtomicLong successCount = new AtomicLong();
        AtomicLong failCount = new AtomicLong();

        // 预热
        for (int i = 0; i < 100; i++) {
            sendRequest();
        }

        long startTime = System.currentTimeMillis();

        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                for (int j = 0; j < requestPerThread; j++) {
                    long requestStart = System.nanoTime();
                    boolean success = sendRequest();
                    long requestTime = System.nanoTime() - requestStart;
                    totalTime.addAndGet(requestTime);

                    if (success) {
                        successCount.incrementAndGet();
                    } else {
                        failCount.incrementAndGet();
                    }
                }
                latch.countDown();
            }).start();
        }

        latch.await();
        long totalDuration = System.currentTimeMillis() - startTime;

        // 输出结果
        long totalRequests = successCount.get() + failCount.get();
        double avgLatency = totalTime.get() / 1_000_000.0 / totalRequests;
        double qps = totalRequests * 1000.0 / totalDuration;

        System.out.printf("Total Requests: %d\n", totalRequests);
        System.out.printf("Success: %d, Fail: %d\n", successCount.get(), failCount.get());
        System.out.printf("QPS: %.2f\n", qps);
        System.out.printf("Avg Latency: %.2f ms\n", avgLatency);
        System.out.printf("Total Duration: %d ms\n", totalDuration);
    }
}

7.2 性能瓶颈定位

# 1. CPU分析
top -H -p <pid>                     # 查看线程CPU使用率
perf top -p <pid>                   # 查看热点函数
perf record -g -p <pid> -- sleep 30 # 采样30秒
perf report                         # 生成报告

# 2. 内存分析
jmap -heap <pid>                    # 查看堆内存分布
jmap -histo:live <pid> | head -20   # 查看存活对象
jstat -gcutil <pid> 1000            # GC统计

# 3. 线程分析
jstack <pid> > thread_dump.txt      # 线程dump
# 统计线程状态
cat thread_dump.txt | grep "java.lang.Thread.State" | sort | uniq -c

# 4. 网络分析
netstat -antp | grep <port> | wc -l # 连接数统计
ss -tni | grep -E "rtt|cwnd"        # TCP详细信息

# 5. 火焰图
# 使用async-profiler
./profiler.sh start <pid>
./profiler.sh stop <pid> -o flamegraph > flamegraph.svg

八、实战案例:百万级长连接推送系统

8.1 架构设计

// 基于Netty的WebSocket推送服务
@SpringBootApplication
public class PushServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(PushServerApplication.class, args);
    }

    @Bean
    public NettyWebSocketServer nettyWebSocketServer() {
        return new NettyWebSocketServer(8080);
    }
}

public class NettyWebSocketServer {

    private final int port;
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;

    // 连接管理(本地缓存 + Redis集群)
    private final ConnectionManager connectionManager;

    public NettyWebSocketServer(int port) {
        this.port = port;
        this.connectionManager = new ConnectionManager();
    }

    public void start() {
        bossGroup = new NioEventLoopGroup(1);
        workerGroup = new NioEventLoopGroup(0); // 自动设置(CPU核心数 * 2)

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 32768)
                .option(ChannelOption.SO_REUSEADDR, true)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childOption(ChannelOption.SO_RCVBUF, 65536)
                .childOption(ChannelOption.SO_SNDBUF, 65536)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ChannelPipeline pipeline = ch.pipeline();
                        // WebSocket编解码器
                        pipeline.addLast(new HttpServerCodec());
                        pipeline.addLast(new HttpObjectAggregator(65536));
                        pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536));
                        pipeline.addLast(new WebSocketFrameHandler(connectionManager));

                        // 心跳检测
                        pipeline.addLast(new IdleStateHandler(60, 0, 0));
                        pipeline.addLast(new HeartbeatHandler());
                    }
                });

            ChannelFuture future = bootstrap.bind(port).sync();
            log.info("Push server started on port {}", port);
            future.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            log.error("Server interrupted", e);
        } finally {
            shutdown();
        }
    }

    public void shutdown() {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }

    // 连接管理器(支持亿级连接,使用分片)
    static class ConnectionManager {
        // 使用ConcurrentHashMap分片减少竞争
        private final ConcurrentHashMap<String, Channel>[] segments;
        private final int segmentCount = 128;

        @SuppressWarnings("unchecked")
        ConnectionManager() {
            segments = new ConcurrentHashMap[segmentCount];
            for (int i = 0; i < segmentCount; i++) {
                segments[i] = new ConcurrentHashMap<>();
            }
        }

        private int getSegmentIndex(String userId) {
            return Math.abs(userId.hashCode() % segmentCount);
        }

        public void add(String userId, Channel channel) {
            segments[getSegmentIndex(userId)].put(userId, channel);
        }

        public Channel get(String userId) {
            return segments[getSegmentIndex(userId)].get(userId);
        }

        public void remove(String userId) {
            segments[getSegmentIndex(userId)].remove(userId);
        }

        // 广播消息(高效遍历)
        public void broadcast(Object message) {
            for (ConcurrentHashMap<String, Channel> segment : segments) {
                for (Channel channel : segment.values()) {
                    if (channel.isActive()) {
                        channel.writeAndFlush(message);
                    }
                }
            }
        }
    }
}

// WebSocket处理器
class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> {

    private final ConnectionManager connectionManager;
    private String userId;

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        log.info("New connection: {}", ctx.channel().remoteAddress());
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) {
        if (frame instanceof TextWebSocketFrame) {
            String text = ((TextWebSocketFrame) frame).text();
            handleMessage(ctx, text);
        }
    }

    private void handleMessage(ChannelHandlerContext ctx, String message) {
        // 解析注册消息
        JsonObject json = JsonParser.parseString(message).getAsJsonObject();
        String type = json.get("type").getAsString();

        if ("register".equals(type)) {
            userId = json.get("userId").getAsString();
            connectionManager.add(userId, ctx.channel());
            log.info("User registered: {}", userId);
        } else if ("ping".equals(type)) {
            // 心跳响应
            ctx.writeAndFlush(new TextWebSocketFrame("{\"type\":\"pong\"}"));
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        if (userId != null) {
            connectionManager.remove(userId);
            log.info("User disconnected: {}", userId);
        }
    }
}

// 推送服务
@RestController
public class PushController {

    @Autowired
    private ConnectionManager connectionManager;

    @PostMapping("/push/{userId}")
    public ResponseEntity<String> pushToUser(@PathVariable String userId, @RequestBody PushRequest request) {
        Channel channel = connectionManager.get(userId);
        if (channel != null && channel.isActive()) {
            String message = new TextWebSocketFrame(JSON.toJSONString(request)).toString();
            channel.writeAndFlush(message);
            return ResponseEntity.ok("success");
        }
        return ResponseEntity.status(404).body("user offline");
    }

    @PostMapping("/push/batch")
    public ResponseEntity<String> pushBatch(@RequestBody BatchPushRequest request) {
        List<String> userIds = request.getUserIds();
        String message = JSON.toJSONString(request.getMessage());

        // 批量推送(使用CompletableFuture并行)
        List<CompletableFuture<Void>> futures = userIds.stream()
            .map(userId -> CompletableFuture.runAsync(() -> {
                Channel channel = connectionManager.get(userId);
                if (channel != null && channel.isActive()) {
                    channel.writeAndFlush(new TextWebSocketFrame(message));
                }
            }))
            .collect(Collectors.toList());

        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
            .join();

        return ResponseEntity.ok("success");
    }
}

高性能设计原则

/*
 * 1. 无锁化:使用CAS代替锁,减少上下文切换
 * 2. 缓存友好:数据结构对齐,避免伪共享
 * 3. 异步非阻塞:IO密集型任务使用异步
 * 4. 批量处理:合并请求,减少系统调用
 * 5. 内存池:复用对象,减少GC
 * 6. 零拷贝:sendfile、mmap
 * 7. 预先计算:缓存热点数据
 * 8. 懒加载:延迟初始化
 * 9. 短路原则:快速失败
 * 10. 降级熔断:保护核心服务
 */

来源:
https://yyvgt.cn/

相关文章
|
1月前
|
SQL 缓存 druid
一次 OOM 线上排查实录
老项目线上 OOM 踩坑实录!Druid 连接池 SQL 缓存泄漏 + 业务 SQL 拼接双重叠加导致内存溢出,通过堆 dump 定位问题,优化 Druid 配置 + 批量插入预防 OOM。
235 2
|
5天前
|
SQL 网络协议 NoSQL
软件开发新手入门五大核心技能之计算机基础常识(五)
教程来源 http://vbzcj.cn/ 本章系统讲解网络与数据库核心知识:涵盖OSI/TCP/IP模型、IP/端口、TCP三次握手/四次挥手、HTTP协议、DNS解析;以及SQL基础、索引优化、ACID事务、NoSQL(如Redis)等,理论结合Python实战示例。
|
5天前
|
机器学习/深度学习 人工智能 PyTorch
PyTorch深度学习实战 | 人工智能项目从训练到部署
本项目基于LSTM模型对污水处理厂总曝气量(旧区+新区)进行时序预测。通过数据清洗、Min-Max归一化、滑动窗口构造(12小时输入→预测未来1小时),构建并训练轻量级LSTM模型,支持API部署与实时调用,已实现端到端预测流程及模型保存。
180 6
|
5天前
|
存储 数据可视化 程序员
程序员进阶工程师必备的十大技能之业务深度理解与建模能力(三)
教程来源 https://qcycj.cn/ 本文介绍业务建模的三大工程实践:①建模文档化——用代码、注释与状态图清晰表达订单等核心概念与规则;②模型验证——通过单元测试保障业务规则正确性;③数据库映射——借助Repository模式实现领域模型与持久化的解耦。强调以业务为中心建模,让代码成为业务的精准数字映射。
|
30天前
|
Linux 程序员 网络安全
初级程序员必备的十大技能之基础 Linux 命令(一)
教程来源 https://qcycj.cn/ 本文系统讲解程序员必备的Linux核心命令,涵盖文件操作、文本处理、权限管理、进程与网络工具等,结合原理、参数详解及实战案例,助你高效部署、排查与运维——无论用Windows还是macOS,Linux都是程序员不可或缺的“第二操作系统”。
|
5天前
|
Linux KVM 虚拟化
虚拟机使用教程大全(二)
教程来源 https://oplhc.cn/ 虚拟机网络模式深度解析:NAT(共享上网)、桥接(独立局域网身份)、仅主机(宿主隔离通信)、内部网络(纯虚拟机互通)及VMware自定义模式,涵盖原理、配置、端口转发与高级应用,助你精准构建安全高效实验环境。
|
5天前
|
Ubuntu Linux KVM
虚拟机使用教程大全(一)
教程来源 https://tmywi.cn/ 虚拟机技术已成IT从业者必备技能。本文聚焦“使用”而非搭建,涵盖资源调优、网络配置、快照备份、性能监控与故障排查等30+实用章节,兼容VMware/VirtualBox/KVM,含100+命令示例与实战技巧。
|
5天前
|
存储 Linux KVM
虚拟机搭建教程(三)
教程来源 https://bncne.cn/ Windows 11虚拟机安装需注意:启用vTPM与Secure Boot、分配≥4GB内存/64GB磁盘、选NAT联网;遇限制可执行OOBE\BYPASSNRO跳过;常见问题含虚拟化未开、无网络、卡顿等,对应BIOS设置、关Hyper-V、装VMware Tools即可解决。
|
5天前
|
设计模式 安全 Java
程序员必备的十大技能(进阶版)之网络与高并发原理(四)
教程来源 https://bncne.cn/ 本节详解高并发四大核心设计模式:①生产者-消费者(基于BlockingQueue与高性能Disruptor);②请求合并(批量处理降低IO压力);③背压控制(RxJava与信号量/滑动窗口限流);④线程封闭(ThreadLocal安全复用资源)。兼顾性能、可靠性与可维护性。
|
5天前
|
Arthas 运维 Java
程序员必备的十大技能(进阶版)之性能调优与故障排查(二)
教程来源 qfcrz.cn 本节系统梳理Java内存问题排查全流程:涵盖JVM内存结构(堆、元空间、直接内存等)、四大典型泄漏场景(静态集合、ThreadLocal、监听器、动态代理)、jstat/jmap/jcmd/Arthas等实战工具用法、MAT深度分析技巧(Dominator Tree、OQL查询),以及GC调优策略与I/O问题定位方法。