⚡ 构建真正的高性能即时通讯服务:基于 Netty 集群的架构设计与实现

简介: 本文介绍了如何基于 Netty 构建分布式即时通讯集群。随着用户量增长,单体架构面临性能瓶颈,文章对比了三种集群方案:Nginx 负载均衡、注册中心服务发现与基于 ZooKeeper 的消息路由架构。最终选择第三种方案,通过 ZooKeeper 实现服务注册发现与消息路由,并结合 RabbitMQ 支持跨服务器消息广播。文中还详细讲解了 ZooKeeper 搭建、Netty 集群改造、动态端口分配、服务注册、负载均衡及消息广播的实现,构建了一个高可用、可水平扩展的即时通讯系统。

引子

在前面的文章中,我们基于 Netty 构建了一套单体架构的即时通讯服务。虽然单体架构在开发初期简单高效,但随着用户量的增长和业务规模的扩大,其局限性逐渐显现。当面对高并发场景时,单体 Netty 服务很容易触及性能天花板,导致消息推送延迟、连接频繁断开等问题。

1.png

然而,如果只是简单地复制多个 Netty 实例进行水平扩展:

2.png

这种方案会带来新的问题:客户端连接分散在不同服务器上,消息无法跨服务器传递。比如用户 A 和用户 B 连接到不同的 Netty 实例,他们之间的消息将无法送达。

三种构建 Netty 集群方案

没有什么问题是加一层解决不了的,下面为大家介绍三种常见的 Netty 集群构建方案。

方案一:基于 Nginx 的负载均衡架构

通过 Nginx 作为负载均衡器,将用户请求分发到不同的 Netty 服务器上。每个用户固定连接到某一台服务器,保证了点对点通信的简单性。

3.png

这个方案架构简单,部署方便,Nginx 也是成熟稳定的中间件。但它无法实现跨服务器消息传递,用户只能与同服务器的其他用户通信,无法实现真正的集群化。

方案二:基于注册中心的服务发现架构

引入 Nacos 作为服务注册中心,Netty 服务器启动时向 Nacos 注册。客户端通过 Gateway 获取可用服务器列表,实现动态服务发现。

4.png

这个方案支持服务动态上下线,负载均衡策略灵活,服务监控管理方便。但它需要引入阿里系的相关中间件,如果原架构中未使用,突然引入大量中间件,让架构复杂度增加,而且阿里系在Java这块儿通常都是引一个,你得用一套它的方案才能用好。

方案三:基于消息路由的分布式架构

通过 ZooKeeper 记录用户连接信息,Controller 服务查询用户位置并路由消息到正确的 Netty 实例,实现跨服务器通信。

5.png

这个方案完美解决跨服务器消息传递,而且支持无限水平扩展,而且只需要引入一个中间件 ZooKeeper,只需要把消息通过相应的 Controller 转发下即可。

通过对三种方案优劣的综合考量,我们最终选择方案三。它真正实现了 Netty 服务的集群化,解决了跨服务器消息传递这一核心问题,而且架构也相对简单,是构建 Netty 集群的最佳选择。

实现消息广播

在选定了基于消息路由的分布式架构后,我们面临的下一个挑战是如何实现高效的消息广播。当用户发送消息时,不仅需要推送给特定的接收者,还可能需要广播给群组成员或者进行系统通知。这就需要一个可靠的消息分发机制。

6.png

在前面的文章中,我们已经引入了 RabbitMQ 来处理离线消息的持久化存储。既然已有成熟的消息中间件,我们可以充分利用其发布/订阅模式来实现广播功能。

ZooKeeper 环境搭建

既然我们选择了方案三(基于消息路由的分布式架构),那么在项目改造前,首先需要搭建 ZooKeeper 环境。ZooKeeper 将作为我们的分布式协调服务,记录用户的连接信息,帮助 Controller 服务准确定位用户所在的 Netty 实例。

在我们的即时通讯架构中,ZooKeeper 将承担以下职责:

  • 存储用户与 Netty 服务器的映射关系
  • 监控 Netty 服务器的健康状态
  • 提供服务发现功能

安装 ZooKeeper 的方式有很多,大家也可以选择自己喜欢的方式,我这里就只演示下如何通过 docker 安装。

1. 拉取镜像

首先拉取 ZooKeeper 镜像,大家在安装时先去 docker hub 上查看最新版本,我写这篇文章时最新版本是 3.9.3:

docker pull zookeeper:3.9.3

2.创建挂载目录

为了数据持久化和方便配置管理,我们需要创建本地挂载目录:

7.png

3. 启动容器

使用以下命令启动 ZooKeeper 容器,大家在使用时需要注意下操作系统和挂载目录的路径,根据自己的实际情况修改:

docker run --name zookeeper \
-p 2181:2181 \
--restart always \
-v D:\devolop\zookeeper\data:/data \
-v D:\devolop\zookeeper\conf:/conf \
-v D:\devolop\zookeeper\logs:/datalog \
-d zookeeper:3.9.3

4.补充配置文件

容器启动后,查看 conf 目录,会发现自动生成了 zoo.cfg 配置文件:

8.png

但是还缺少日志配置文件 logback.xml。我们需要从 Apache ZooKeeper 官网下载对应版本的安装包来获取完整的配置文件。

访问 https://zookeeper.apache.org/releases.html,下载 3.9.3 版本的二进制包,解压后将 conf/logback.xml 文件复制到本地挂载的 conf 目录中:

9.png

10.png

5.验证安装

接着重启容器后,进入容器并检查 ZooKeeper 状态:

/apache-zookeeper-3.9.3-bin/bin/zkServer.sh status

如果看到以下输出,说明 ZooKeeper 已成功启动:

11.png

Netty 集群改造实战

搭建好 ZooKeeper 环境后,我们开始对原有的单体 Netty 服务进行集群化改造。改造的核心思路是:

  1. 让每个 Netty 实例启动时自动注册到 ZooKeeper

  2. 实现基于最少连接数的负载均衡策略

  3. 通过 RabbitMQ 实现跨服务器的消息广播

  4. 处理用户上下线时的在线人数同步

Spring Boot 集成 ZooKeeper

1. 引入依赖

首先在项目中引入 Apache Curator 依赖,它是 ZooKeeper 的客户端,提供了更友好的 API:

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.9.3</version>
</dependency>

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.8.0</version>
</dependency>

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>5.8.0</version>
</dependency>

2. 配置 ZooKeeper 连接

application.yml 中添加 ZooKeeper 配置:

# zookeeper配置
zookeeper:
  curator:
    host: 127.0.0.1:2181
    connectionTimeoutMs: 30000
    sessionTimeoutMs: 3000
    sleepMsBetweenRetry: 2000
    maxRetries: 3
    namespace: wechat

3.创建 Curator 配置类

这里添加了一个监听器对 Redis 残留端口的处理,如果不加处理,一直累加下去肯定会加到上限的。

12.png

@Slf4j
@Component
@Data
@ConfigurationProperties(prefix = "zookeeper.curator")
public class CuratorConfig {
   

    private String host;
    private Integer connectionTimeout;
    private Integer sessionTimeout;
    private Integer sleepMsBetweenRetry;
    private Integer maxRetries;
    private String namespace;

    @Autowired
    private RedisOperator redisOperator;

    public static final String PATH = "/server-list";

    @Bean("curatorClient")
    public CuratorFramework curatorClient() {
   
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(
                sleepMsBetweenRetry,
                maxRetries);

        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(host)
                .connectionTimeoutMs(connectionTimeout)
                .sessionTimeoutMs(sessionTimeout)
                .retryPolicy(retryPolicy)
                .namespace(namespace)
                .build();

        client.start();

        // 添加节点监听器
        addWatcher(PATH, client);

        return client;
    }

    /**
     * 注册节点的事件监听
     */
    public void addWatcher(String path, CuratorFramework client) {
   
        CuratorCache curatorCache = CuratorCache.build(client, path);
        curatorCache.listenable().addListener(((type, oldData, data) -> {
   
            switch (type.name()) {
   
                case "NODE_DELETED":
                    log.info("节点删除事件");
                    // 清理 Redis 中的端口缓存
                    NettyServerNode oldNode = JsonUtils.jsonToPojo(
                        new String(oldData.getData()), 
                        NettyServerNode.class
                    );
                    String oldPort = oldNode.getPort() + "";
                    redisOperator.hdel("netty_port", oldPort);
                    break;
                default:
                    break;
            }
        }));
    }
}

改造 Netty 服务启动流程

1. 动态端口分配

为了支持在同一台服务器上启动多个 Netty 实例,我们需要实现动态端口分配。通过 Redis 记录已使用的端口,每次启动时自动分配一个新端口:

public class ChatServer {
   

    public static final Integer nettyDefaultPort = 875;

    /**
     * 动态获取端口号
     */
    public static Integer selectPort(Integer port) {
   
        String portKey = "netty_port";
        Jedis jedis = JedisPoolUtils.getJedis();
        Map<String, String> portMap = jedis.hgetAll(portKey);

        List<Integer> portList = portMap.entrySet().stream()
                .map(entry -> Integer.valueOf(entry.getKey()))
                .collect(Collectors.toList());

        Integer nettyPort = null;
        if (portList == null || portList.isEmpty()) {
   
            jedis.hset(portKey, port+"", "0");
            nettyPort = port;
        } else {
   
            // 获取最大端口号并加10
            Optional<Integer> maxInteger = portList.stream()
                .max(Integer::compareTo);
            Integer maxPort = maxInteger.get().intValue();
            Integer currentPort = maxPort + 10;
            jedis.hset(portKey, currentPort+"", "0");
            nettyPort = currentPort;
        }
        return nettyPort;
    }
}

2. 向 ZooKeeper 注册服务

创建服务注册工具类:

public class ZookeeperRegister {
   

    /**
     * 注册 Netty 服务到 ZooKeeper
     */
    public static void registerNettyServer(String nodeName, 
                                         String ip, 
                                         Integer port) throws Exception {
   
        CuratorFramework zkClient = CuratorConfig.getClient();
        String path = "/" + nodeName;
        Stat stat = zkClient.checkExists().forPath(path);

        // 创建持久节点
        if (stat == null) {
   
            zkClient.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT)
                    .forPath(path);
        }

        // 创建临时顺序节点,存储服务器信息
        NettyServerNode serverNode = new NettyServerNode();
        serverNode.setIp(ip);
        serverNode.setPort(port);
        serverNode.setOnlineCounts(0);
        String nodeJson = JsonUtils.objectToJson(serverNode);

        zkClient.create()
                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                .forPath(path + "/im-", nodeJson.getBytes());
    }

    /**
     * 获取本机IP地址
     */
    public static String getLocalIp() throws UnknownHostException {
   
        InetAddress address = InetAddress.getLocalHost();
        return address.getHostAddress();
    }

    /**
     * 处理在线人数(加锁保证数据一致性)
     */
    public static void dealOnlineCounts(NettyServerNode serverNode,
                                      Integer counts) throws Exception {
   
        CuratorFramework zkClient = CuratorConfig.getClient();

        // 使用分布式读写锁
        InterProcessReadWriteLock readWriteLock = 
            new InterProcessReadWriteLock(zkClient, "rw-lock");
        readWriteLock.writeLock().acquire();

        try {
   
            String path = "/server-list";
            List<String> list = zkClient.getChildren().forPath(path);

            for (String node : list) {
   
                String nodeValue = new String(
                    zkClient.getData().forPath(path + "/" + node)
                );
                NettyServerNode pendingNode = JsonUtils.jsonToPojo(
                    nodeValue, 
                    NettyServerNode.class
                );

                if (pendingNode.getIp().equals(serverNode.getIp()) && 
                    pendingNode.getPort().intValue() == serverNode.getPort().intValue()) {
   
                    pendingNode.setOnlineCounts(
                        pendingNode.getOnlineCounts() + counts
                    );
                    String nodeJson = JsonUtils.objectToJson(pendingNode);
                    zkClient.setData().forPath(
                        path + "/" + node, 
                        nodeJson.getBytes()
                    );
                }
            }
        } finally {
   
            readWriteLock.writeLock().release();
        }
    }

    public static void incrementOnlineCounts(NettyServerNode serverNode) 
        throws Exception {
   
        dealOnlineCounts(serverNode, 1);
    }

    public static void decrementOnlineCounts(NettyServerNode serverNode) 
        throws Exception {
   
        dealOnlineCounts(serverNode, -1);
    }
}

3.改造 Netty 启动类

public class ChatServer {
   

    public static void main(String[] args) throws Exception {
   
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        // 动态分配端口
        Integer nettyPort = selectPort(nettyDefaultPort);

        // 注册到 ZooKeeper
        ZookeeperRegister.registerNettyServer(
            "Netty-Server-List",
            ZookeeperRegister.getLocalIp(), 
            nettyPort
        );

        // 启动 RabbitMQ 监听器
        String queueName = "queue_" + ZookeeperRegister.getLocalIp() 
            + "_" + nettyPort;
        RabbitMQConnectUtils mqConnectUtils = new RabbitMQConnectUtils();
        mqConnectUtils.listen("fanout_exchange", queueName);

        try {
   
            ServerBootstrap server = new ServerBootstrap();
            server.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new WSServerInitializer());

            ChannelFuture channelFuture = server.bind(nettyPort).sync();
            log.info("Netty 服务启动成功,端口:{}", nettyPort);
            channelFuture.channel().closeFuture().sync();
        } finally {
   
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

实现客户端负载均衡

当客户端请求连接时,我们需要从 ZooKeeper 中获取所有可用的 Netty 服务器,并选择连接数最少的服务器:

@RestController
public class NettyController {
   

    @Resource(name = "curatorClient")
    private CuratorFramework zkClient;

    @PostMapping("getNettyOnlineInfo")
    public GraceJSONResult getNettyOnlineInfo() throws Exception {
   
        // 从 ZooKeeper 获取所有 Netty 服务器节点
        String path = "/server-list";
        List<String> list = zkClient.getChildren().forPath(path);

        List<NettyServerNode> serverNodeList = new ArrayList<>();
        for (String node : list) {
   
            String nodeValue = new String(
                zkClient.getData().forPath(path + "/" + node)
            );
            NettyServerNode serverNode = JsonUtils.jsonToPojo(
                nodeValue, 
                NettyServerNode.class
            );
            serverNodeList.add(serverNode);
        }

        // 选择连接数最少的服务器
        Optional<NettyServerNode> minNodeOptional = serverNodeList
                .stream()
                .min(Comparator.comparing(NettyServerNode::getOnlineCounts));

        NettyServerNode minNode = minNodeOptional.get();
        return GraceJSONResult.ok(minNode);
    }
}

跨服务器消息广播

1.RabbitMQ 连接工具类

public class RabbitMQConnectUtils {
   

    private final List<Connection> connections = new ArrayList<>();
    private final int maxConnection = 20;

    // RabbitMQ 连接配置
    private final String host = "127.0.0.1";
    private final int port = 5672;
    private final String username = "guest";
    private final String password = "guest";
    private final String virtualHost = "/";

    private ConnectionFactory factory;

    /**
     * 初始化连接工厂
     */
    private void initFactory() {
   
        if (factory == null) {
   
            factory = new ConnectionFactory();
            factory.setHost(host);
            factory.setPort(port);
            factory.setUsername(username);
            factory.setPassword(password);
            factory.setVirtualHost(virtualHost);
        }
    }

    /**
     * 监听消息队列
     */
    public void listen(String exchangeName, String queueName) throws Exception {
   
        Connection connection = getConnection();
        Channel channel = connection.createChannel();

        // 声明交换机(fanout 模式用于广播)
        channel.exchangeDeclare(
            exchangeName, 
            BuiltinExchangeType.FANOUT, 
            true, 
            false, 
            null
        );

        // 声明队列
        channel.queueDeclare(queueName, true, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(queueName, exchangeName, "");

        // 创建消费者
        Consumer consumer = new DefaultConsumer(channel) {
   
            @Override
            public void handleDelivery(String consumerTag,
                                     Envelope envelope,
                                     AMQP.BasicProperties properties,
                                     byte[] body) throws IOException {
   
                String msg = new String(body);
                String exchange = envelope.getExchange();

                if (exchange.equalsIgnoreCase(exchangeName)) {
   
                    // 处理接收到的消息
                    DataContent dataContent = JsonUtils.jsonToPojo(
                        msg, 
                        DataContent.class
                    );
                    String senderId = dataContent.getChatMsg().getSenderId();
                    String receiverId = dataContent.getChatMsg().getReceiverId();

                    // 发送给接收者
                    List<Channel> receiverChannels = 
                        UserChannelSession.getMultiChannels(receiverId);
                    UserChannelSession.sendToTarget(receiverChannels, dataContent);

                    // 同步到发送者的其他设备
                    String currentChannelId = dataContent.getExtend();
                    List<Channel> senderChannels = UserChannelSession
                        .getMyOtherChannels(senderId, currentChannelId);
                    UserChannelSession.sendToTarget(senderChannels, dataContent);
                }
            }
        };

        // 开始消费
        channel.basicConsume(queueName, true, consumer);
    }
}

2.消息发布工具类

public class MessagePublisher {
   

    // 定义交换机名称
    public static final String EXCHANGE = "pitayafruits_exchange";
    public static final String FANOUT_EXCHANGE = "fanout_exchange";

    // 定义路由键
    public static final String ROUTING_KEY_SEND = "pitayafruits.wechat.send";

    /**
     * 发送消息到数据库保存
     */
    public static void sendMsgToSave(ChatMsg msg) throws Exception {
   
        RabbitMQConnectUtils connectUtils = new RabbitMQConnectUtils();
        connectUtils.sendMsg(
            JsonUtils.objectToJson(msg),
            EXCHANGE,
            ROUTING_KEY_SEND
        );
    }

    /**
     * 广播消息到所有 Netty 服务器
     */
    public static void sendMsgToNettyServers(String msg) throws Exception {
   
        RabbitMQConnectUtils connectUtils = new RabbitMQConnectUtils();
        connectUtils.sendMsg(msg, FANOUT_EXCHANGE, "");
    }
}

3. 改造消息处理器

ChatHandler 中处理用户连接、消息发送和断线:

public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
   

    public static ChannelGroup clients = new DefaultChannelGroup(
        GlobalEventExecutor.INSTANCE
    );

    @Override
    protected void channelRead0(ChannelHandlerContext ctx,
                              TextWebSocketFrame msg) throws Exception {
   
        String content = msg.text();
        DataContent dataContent = JsonUtils.jsonToPojo(content, DataContent.class);
        ChatMsg chatMsg = dataContent.getChatMsg();

        Integer msgType = chatMsg.getMsgType();
        Channel currentChannel = ctx.channel();
        String currentChannelId = currentChannel.id().asLongText();
        String senderId = chatMsg.getSenderId();

        if (msgType == MsgTypeEnum.CONNECT_INIT.type) {
   
            // 用户初次连接
            UserChannelSession.putMultiChannels(senderId, currentChannel);
            UserChannelSession.putUserChannelIdRelation(currentChannelId, senderId);

            // 更新在线人数
            NettyServerNode minNode = dataContent.getServerNode();
            ZookeeperRegister.incrementOnlineCounts(minNode);

            // 保存用户与服务器的映射关系到 Redis
            Jedis jedis = JedisPoolUtils.getJedis();
            jedis.set(senderId, JsonUtils.objectToJson(minNode));

        } else if (msgType == MsgTypeEnum.WORDS.type 
                || msgType == MsgTypeEnum.IMAGE.type
                || msgType == MsgTypeEnum.VIDEO.type
                || msgType == MsgTypeEnum.VOICE.type) {
   

            // 生成消息ID
            Snowflake snowflake = new Snowflake(new IdWorkerConfigBean());
            chatMsg.setMsgId(snowflake.nextId());

            // 设置服务器时间
            chatMsg.setChatTime(LocalDateTime.now());

            dataContent.setChatMsg(chatMsg);
            dataContent.setExtend(currentChannelId);

            // 广播消息到所有 Netty 服务器
            MessagePublisher.sendMsgToNettyServers(
                JsonUtils.objectToJson(dataContent)
            );

            // 保存消息到数据库
            MessagePublisher.sendMsgToSave(chatMsg);
        }
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
   
        Channel currentChannel = ctx.channel();
        String userId = UserChannelSession.getUserIdByChannelId(
            currentChannel.id().asLongText()
        );

        // 移除用户会话
        UserChannelSession.removeUserChannels(
            userId, 
            currentChannel.id().asLongText()
        );
        clients.remove(currentChannel);

        // 更新在线人数
        Jedis jedis = JedisPoolUtils.getJedis();
        NettyServerNode serverNode = JsonUtils.jsonToPojo(
            jedis.get(userId), 
            NettyServerNode.class
        );
        ZookeeperRegister.decrementOnlineCounts(serverNode);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
        throws Exception {
   
        // 异常处理逻辑与 handlerRemoved 相同
        handlerRemoved(ctx);
    }
}

小结

通过本文的实践,我们成功将单体 Netty 服务改造成了高可用的分布式集群架构。整个方案利用了 ZooKeeper 的分布式协调能力实现服务注册与发现,通过 RabbitMQ 的广播模式解决了跨服务器消息传递的难题,并使用 Redis 实现了动态端口分配。这套架构已经能够满足大部分即时通讯场景的需求。当然,在超大规模场景下,还可以进一步优化,比如引入更智能的负载均衡策略、实现跨地域部署等。希望本文的实践经验能为大家在构建高性能即时通讯服务时提供参考。

目录
相关文章
|
4月前
|
存储 缓存 安全
某鱼电商接口架构深度剖析:从稳定性到高性能的技术密码
某鱼电商接口架构揭秘:分层解耦、安全加固、性能优化三维设计,实现200ms内响应、故障率低于0.1%。详解三层架构、多引擎存储、异步发布、WebSocket通信与全链路防护,助力开发者突破电商接口“三难”困境。
|
5月前
|
人工智能 监控 测试技术
告别只会写提示词:构建生产级LLM系统的完整架构图​
本文系统梳理了从提示词到生产级LLM产品的八大核心能力:提示词工程、上下文工程、微调、RAG、智能体开发、部署、优化与可观测性,助你构建可落地、可迭代的AI产品体系。
739 51
|
5月前
|
机器学习/深度学习 人工智能 搜索推荐
从零构建短视频推荐系统:双塔算法架构解析与代码实现
短视频推荐看似“读心”,实则依赖双塔推荐系统:用户塔与物品塔分别将行为与内容编码为向量,通过相似度匹配实现精准推送。本文解析其架构原理、技术实现与工程挑战,揭秘抖音等平台如何用AI抓住你的注意力。
1312 7
从零构建短视频推荐系统:双塔算法架构解析与代码实现
|
4月前
|
缓存 运维 监控
Redis 7.0 高性能缓存架构设计与优化
🌟蒋星熠Jaxonic,技术宇宙中的星际旅人。深耕Redis 7.0高性能缓存架构,探索函数化编程、多层缓存、集群优化与分片消息系统,用代码在二进制星河中谱写极客诗篇。
|
5月前
|
存储 监控 NoSQL
Redis高可用架构全解析:从主从复制到集群方案
Redis高可用确保服务持续稳定,避免单点故障导致数据丢失或业务中断。通过主从复制实现数据冗余,哨兵模式支持自动故障转移,Cluster集群则提供分布式数据分片与水平扩展,三者层层递进,保障读写分离、容灾切换与大规模数据存储,构建高性能、高可靠的Redis架构体系。
|
5月前
|
消息中间件 缓存 监控
中间件架构设计与实践:构建高性能分布式系统的核心基石
摘要 本文系统探讨了中间件技术及其在分布式系统中的核心价值。作者首先定义了中间件作为连接系统组件的&quot;神经网络&quot;,强调其在数据传输、系统稳定性和扩展性中的关键作用。随后详细分类了中间件体系,包括通信中间件(如RabbitMQ/Kafka)、数据中间件(如Redis/MyCAT)等类型。文章重点剖析了消息中间件的实现机制,通过Spring Boot代码示例展示了消息生产者的完整实现,涵盖消息ID生成、持久化、批量发送及重试机制等关键技术点。最后,作者指出中间件架构设计对系统性能的决定性影响,
|
5月前
|
SQL 弹性计算 关系型数据库
如何用读写分离构建高效稳定的数据库架构?
在少写多读业务场景中,主实例读请求压力大,影响性能。通过创建只读实例并使用数据库代理实现读写分离,可有效降低主实例负载,提升系统性能与可用性。本文详解配置步骤,助你构建高效稳定的数据库架构。
|
存储 缓存 NoSQL
跟着源码学IM(十一):一套基于Netty的分布式高可用IM详细设计与实现(有源码)
本文将要分享的是如何从零实现一套基于Netty框架的分布式高可用IM系统,它将支持长连接网关管理、单聊、群聊、聊天记录查询、离线消息存储、消息推送、心跳、分布式唯一ID、红包、消息同步等功能,并且还支持集群部署。
13924 1
|
9月前
|
算法 Java 容器
Netty源码—4.客户端接入流程
本文主要介绍了关于Netty客户端连接接入问题整理、Reactor线程模型和服务端启动流程、Netty新连接接入的整体处理逻辑、新连接接入之检测新连接、新连接接入之创建NioSocketChannel、新连接接入之绑定NioEventLoop线程、新连接接入之注册Selector和注册读事件、注册Reactor线程总结、新连接接入总结
|
9月前
|
安全 Java 调度
Netty源码—3.Reactor线程模型二
本文主要介绍了NioEventLoop的执行总体框架、Reactor线程执行一次事件轮询、Reactor线程处理产生IO事件的Channel、Reactor线程处理任务队列之添加任务、Reactor线程处理任务队列之执行任务、NioEventLoop总结。