详解rocketMq通信模块&升级构想(上)

简介: 详解rocketMq通信模块&升级构想(上)



本文从开发者的角度深入解析了基于netty的通信模块, 并通过简易扩展实现微服务化通信工具雏形, 适合于想要了解netty通信框架的使用案例, 想了解中间件通信模块设计, 以及微服务通信底层架构的同学。希望此文能给大家带来通信模块架构灵感。



概述


网络通信是很常见的需求,

对于传统web网页工具短连接场景,浏览器和服务器交互,常见为浏览器通过http协议请求Tomcat服务器;

对于长连接场景, 比如即时通讯,或中间件等实时性要求高的场景,一般采用tcp协议的长连接进行全双工实时通信;

对于java开发者来说,使用原生socket进行tcp开发,效率是比较低的,稳定性可靠性等也不好保障,一般选择网络通信框架netty加快开发效率。


对于上层应用来说,netty的标准使用方式依然比较繁琐,未能很好的适配一些业务使用场景,比如rocketMq根据netty包装了一层业务框架:通信模块remoting。

该模块可用性高,稳定性好,易扩展,经过了中间件产品长期高并发的质量验证, 值得信任,并广泛用于其他点对点(指定ip)通信场景,如dleger(raft的java实现)。

有相关通信需求的同学也都可以参考该通信模块,相信有很多的灵感,或直接使用该通信模块,带来开发效率的提升。


本文从一个普通java开发者的视角,去解析该通信模块

  1. 如何用 - 常见使用方式
  2. 实现原理 - 数据流转链路
  3. 设计关键点 - 为什么要如此设计
  4. 模块升级 - 实现简易的微服务化通信工具


本文代码版本:

<parent>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-remoting</artifactId>
  <version>5.0.1-PREVIEW-SNAPSHOT</version>
</parent>

如何用


编写简单易懂的测试demo,实现server client的交互流程。

简单示例 协议code 为写死 0 1 5 9,输入测试信息,输出使用sysout。


 启动server 注册服务监听

import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Server {
    public static void main(String[] args) throws Exception {
        NettyServerConfig nettyServerConfig = new NettyServerConfig();
        // 配置端口
        nettyServerConfig.setListenPort(8888);
        // 配置线程数 netty workGroup 线程池 处理io等低耗时
        nettyServerConfig.setServerSelectorThreads(2);
        // 配置线程数 netty eventGroup 线程池 处理自定义hander/长耗时等
        nettyServerConfig.setServerWorkerThreads(8);
        NettyRemotingServer remotingServer = new NettyRemotingServer(nettyServerConfig, null);
        // 支持共用或独立的业务处理线程池
        ExecutorService poolA = new ThreadPoolExecutor(4, 4, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024));
        ExecutorService poolB = new ThreadPoolExecutor(4, 4, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024));
        // 业务处理器
        NettyRequestProcessor processA = new NettyRequestProcessor() {
            @Override
            public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
                System.out.println("received from client, remark:" + request.getRemark() + ", coe:" + request.getCode());
                RemotingCommand response = RemotingCommand.createResponseCommand(0, "server");
                switch (request.getCode()) {
                    case 0:
                        response.setBody(new String("hello sync 0").getBytes());
                    case 1:
                        response.setBody(new String("hello sync 1").getBytes());
                    default:
                        break;
                }
                return response;
            }
            @Override
            public boolean rejectRequest() {
                return false;
            }
        };
        // 业务处理器
        NettyRequestProcessor processB = new NettyRequestProcessor(){
            @Override
            public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
                System.out.println("received from client, remark:" + request.getRemark() + ", coe:" + request.getCode());
                RemotingCommand response = RemotingCommand.createResponseCommand(0, "server");
                switch (request.getCode()) {
                    case 9:
                        response.setBody(new String("hello sync 9").getBytes());
                    default:
                        break;
                }
                return response;
            }
            @Override
            public boolean rejectRequest() {
                return false;
            }
        };
        // 注册 协议 - 对应的处理器, 类似web url 路由到对应的class
        remotingServer.registerProcessor(0, processA, poolA);
        remotingServer.registerProcessor(1, processA, poolA);
        remotingServer.registerProcessor(9, processB, poolB);
        remotingServer.start();
        System.out.println("start ok " + JSON.toJSONString(nettyServerConfig));
        System.in.read();
    }
}

 启动client 发起调用

import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Client {
    public static void main(String[] args) throws Exception {
        NettyClientConfig nettyServerConfig = new NettyClientConfig();
        // 配置线程数 netty eventGroup 线程池 处理自定义hander/耗时长等
        nettyServerConfig.setClientWorkerThreads(8);
        NettyRemotingClient remotingClient = new NettyRemotingClient(nettyServerConfig, null);
        // 支持共用或独立的业务处理线程池
        ExecutorService poolA = new ThreadPoolExecutor(4, 4, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024));
        // 监听服务端发过来的请求
        remotingClient.registerProcessor(5, new NettyRequestProcessor() {
            @Override
            public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
                System.out.println("receive from server : " + request.getCode());
                return null;
            }
            @Override
            public boolean rejectRequest() {
                return false;
            }
        }, poolA);
        remotingClient.start();
        // 主动发起远程调用 
        {
            // 同步调用
            RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
            request.setRemark("sync");
            RemotingCommand response = remotingClient.invokeSync("127.0.0.1:8888", request, 30 * 1000L);
            System.out.println("call sync ok remark:" + response.getRemark() + " body:" + new String(response.getBody()));
        }
        {
            // 异步调用
            RemotingCommand request = RemotingCommand.createRequestCommand(1, null);
            request.setRemark("async");
            remotingClient.invokeAsync("127.0.0.1:8888", request, 30 * 1000L, new InvokeCallback() {
                @Override
                public void operationComplete(ResponseFuture responseFuture) {
                    RemotingCommand response = responseFuture.getResponseCommand();
                    System.out.println("call async ok remark:" + response.getRemark() + " body:" + new String(response.getBody()));
                }
            });
        }
        {
            // 单向调用
            RemotingCommand request = RemotingCommand.createRequestCommand(9, null);
            request.setRemark("oneway");
            remotingClient.invokeOneway("127.0.0.1:8888", request, 30 * 1000L);
            System.out.println("call oneway ok ");
        }
        System.in.read();
    }
}

该点对点调用,是需要手动指定目标服务器的ip和端口的,不同于hsf拥有注册中心进行协调撮合提供目标ip。

 日志输出

Connected to the target VM, address: '127.0.0.1:57381', transport: 'socket'
start ok {"listenPort":8888,"serverAsyncSemaphoreValue":64,"serverCallbackExecutorThreads":0,"serverChannelMaxIdleTimeSeconds":120,"serverOnewaySemaphoreValue":256,"serverPooledByteBufAllocatorEnable":true,"serverSelectorThreads":2,"serverSocketRcvBufSize":65535,"serverSocketSndBufSize":65535,"serverWorkerThreads":8,"useEpollNativeSelector":false}
received from client, remark:sync, coe:0
received from client, remark:async, coe:1
received from client, remark:oneway, coe:9
Connected to the target VM, address: '127.0.0.1:57385', transport: 'socket'
call sync ok remark:server body:hello sync 1
call oneway ok 
call async ok remark:server body:hello sync 1

实现原理


关于netty如何封装java基础nio socket不做展开。这里分析通信模块是如何封装netty,扩展调用协议规范的部分,重点描述其中关键的设计要点。

▐  server 启动 监听请求


作为服务端,需绑定端口,监听请求,这里采用标准netty服务端模式。

remotingServer.start();
@Override
    public void start() {
        ...
        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
    encoder,
    new NettyDecoder(),
    new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
    connectionManageHandler,
    serverHandler
);
                    }
                });
        ...
        ChannelFuture sync = this.serverBootstrap.bind().sync();
        InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
        ...
    }

关注涉及几个线程池的地方:

  1. bossGroup -> eventLoopGroupBoss 固定线程数1
  2. workerGroup -> eventLoopGroupSelector 若linux采用epoll实现 否则使用nio实现, 线程数可配置
  3. eventGroup -> defaultEventExecutorGroup 普通实现的 handler 工作线程池, 线程数可配置


另外就是传统艺能:心跳, 解码器 NettyEncoder,编码器 NettyDecoder,连接管理器 connectionManageHandler,和最终的业务处理器 serverHandler


▐  server 注册业务处理器


业务线程池配置

请求协议code关联业务处理器

// 支持共用或独立的业务处理线程池
        ExecutorService poolA = new ThreadPoolExecutor(4, 4, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024));
        ExecutorService poolB = new ThreadPoolExecutor(4, 4, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024));
        // 业务处理器
        NettyRequestProcessor processA = new NettyRequestProcessor() {
            @Override
            public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
                System.out.println("received from client, remark:" + request.getRemark() + ", coe:" + request.getCode());
                RemotingCommand response = RemotingCommand.createResponseCommand(0, "server");
                switch (request.getCode()) {
                    case 0:
                        response.setBody(new String("hello sync 0").getBytes());
                    case 1:
                        response.setBody(new String("hello sync 1").getBytes());
                    default:
                        break;
                }
                return response;
            }
            @Override
            public boolean rejectRequest() {
                return false;
            }
        };
        // 业务处理器
        NettyRequestProcessor processB = new NettyRequestProcessor(){
            @Override
            public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
                System.out.println("received from client, remark:" + request.getRemark() + ", coe:" + request.getCode());
                RemotingCommand response = RemotingCommand.createResponseCommand(0, "server");
                switch (request.getCode()) {
                    case 9:
                        response.setBody(new String("hello sync 9").getBytes());
                    default:
                        break;
                }
                return response;
            }
            @Override
            public boolean rejectRequest() {
                return false;
            }
        };
        // 注册 协议 - 对应的处理器, 类似web url 路由到对应的class
        remotingServer.registerProcessor(0, processA, poolA);
        remotingServer.registerProcessor(1, processA, poolA);
        remotingServer.registerProcessor(9, processB, poolB);

不同业务独立线程池的必要性在复杂业务场景中,比如商品管理链路,订单交易链路,将所有的请求堆积在一个线程池中,快请求和慢请求公用一个赛道,无法避免资源分配不均问题通信模块设计为可手动配置每个业务的处理线程池

注册路由和线程池关系

 @Override
    public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
        ExecutorService executorThis = executor;
        if (null == executor) {
            executorThis = this.publicExecutor;
        }
        Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
        this.processorTable.put(requestCode, pair);
    }

建立 code - processor - pool 的三者映射关系,在后续收到请求后,可查找注册关系进行路由唤起processor

▐  client 启动 发起请求

        NettyRemotingClient remotingClient = new NettyRemotingClient(nettyServerConfig, null);
        remotingClient.start();
// 主动发起远程调用
        {
            // 同步调用
            RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
            request.setRemark("sync");
            RemotingCommand response = remotingClient.invokeSync("127.0.0.1:8888", request, 30 * 1000L);
            System.out.println("call sync ok remark:" + response.getRemark() + " body:" + new String(response.getBody()));
        }
        {
            // 异步调用
            RemotingCommand request = RemotingCommand.createRequestCommand(1, null);
            request.setRemark("async");
            remotingClient.invokeAsync("127.0.0.1:8888", request, 30 * 1000L, new InvokeCallback() {
                @Override
                public void operationComplete(ResponseFuture responseFuture) {
                    RemotingCommand response = responseFuture.getResponseCommand();
                    System.out.println("call async ok remark:" + response.getRemark() + " body:" + new String(response.getBody()));
                }
            });
        }
        {
            // 单向调用
            RemotingCommand request = RemotingCommand.createRequestCommand(9, null);
            request.setRemark("oneway");
            remotingClient.invokeOneway("127.0.0.1:8888", request, 30 * 1000L);
            System.out.println("call oneway ok ");
        }

启动客户端client后,即处于长连接状态,双向通信及时性有保障

三种调用模式作为通信组件,需要适配多种调用场景,同步异步调用已是基本操作,oneway用于不关心是否返回的场景。

试想一下,在全双工双向异步通信的背景下,如何能像http一样实现同步调用,发出一个请求,收到一个请求后怎么跟前面发出的请求关联起来,又如何实现异步等待转为同步响应。


  • 同步调用


发起请求

 public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
        // 唯一id
        final int opaque = request.getOpaque(); 
    ...
        final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
        // 把当前请求记录到待响应table中
        this.responseTable.put(opaque, responseFuture);
        final SocketAddress addr = channel.remoteAddress();
        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
                if (f.isSuccess()) {
                    //标记为写入成功
                    responseFuture.setSendRequestOK(true);
                    return;
                } else {
                    responseFuture.setSendRequestOK(false);
                }
                // 写入异常结果 并唤起wait的线程
                responseTable.remove(opaque);
                responseFuture.setCause(f.cause());
                responseFuture.putResponse(null);
                public void putResponse(final RemotingCommand responseCommand) {
                    this.responseCommand = responseCommand;
                    this.countDownLatch.countDown();
                }
                log.warn("send a request command to channel <" + addr + "> failed.");
            }
        });
        // 同步等待结果
        RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
        public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
            this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
            return this.responseCommand;
        }
      ...
    }

关键设计点:每一个请求request,都分配了一个 client唯一自增的id (request.getOpaque(); requestId.getAndIncrement())。


把id和上下文存储到请求待响应table中:发送请求后(写入channel),线程等待结果响应 responseFuture.waitResponse,利用countDownLatch等待结果。


更多精彩内容,欢迎观看:

详解rocketMq通信模块&升级构想(下):https://developer.aliyun.com/article/1396356

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 人工智能 Apache
Apache RocketMQ 中文社区全新升级!
RocketMQ 中文社区升级发布只是起点,我们将持续优化体验细节,推出更多功能和服务,更重要的是提供更多全面、深度、高质量的内容。
601 19
|
6月前
|
消息中间件 安全 API
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(1)
阿里云瑶池数据库云原生化和一体化产品能力升级,多款产品更新迭代
312 1
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(1)
|
6月前
|
消息中间件 安全 Apache
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(4)
阿里云瑶池数据库云原生化和一体化产品能力升级,多款产品更新迭代
193 1
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(4)
|
2月前
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
6月前
|
消息中间件 安全 Apache
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(2)
阿里云瑶池数据库云原生化和一体化产品能力升级,多款产品更新迭代
261 0
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(2)
|
3月前
|
消息中间件 Kafka 数据安全/隐私保护
RabbitMQ异步通信详解
RabbitMQ异步通信详解
109 16
|
4月前
|
消息中间件 存储 Cloud Native
RocketMQ从4.9.7 升级到5.3.0有什么变化?
【8月更文挑战第25天】RocketMQ从4.9.7 升级到5.3.0有什么变化?
296 4
|
5月前
|
消息中间件 安全 API
Apache RocketMQ ACL 2.0 全新升级
RocketMQ 作为一款流行的分布式消息中间件,被广泛应用于各种大型分布式系统和微服务中,承担着异步通信、系统解耦、削峰填谷和消息通知等重要的角色。随着技术的演进和业务规模的扩大,安全相关的挑战日益突出,消息系统的访问控制也变得尤为重要。然而,RocketMQ 现有的 ACL 1.0 版本已经无法满足未来的发展。因此,我们推出了 RocketMQ ACL 2.0 升级版,进一步提升 RocketMQ 数据的安全性。本文将介绍 RocketMQ ACL 2.0 的新特性、工作原理,以及相关的配置和实践。
13665 9
|
4月前
|
消息中间件 人工智能 监控