本文从开发者的角度深入解析了基于netty的通信模块, 并通过简易扩展实现微服务化通信工具雏形, 适合于想要了解netty通信框架的使用案例, 想了解中间件通信模块设计, 以及微服务通信底层架构的同学。希望此文能给大家带来通信模块架构灵感。
概述
网络通信是很常见的需求,
对于传统web网页工具短连接场景,浏览器和服务器交互,常见为浏览器通过http协议请求Tomcat服务器;
对于长连接场景, 比如即时通讯,或中间件等实时性要求高的场景,一般采用tcp协议的长连接进行全双工实时通信;
对于java开发者来说,使用原生socket进行tcp开发,效率是比较低的,稳定性可靠性等也不好保障,一般选择网络通信框架netty加快开发效率。
对于上层应用来说,netty的标准使用方式依然比较繁琐,未能很好的适配一些业务使用场景,比如rocketMq根据netty包装了一层业务框架:通信模块remoting。
该模块可用性高,稳定性好,易扩展,经过了中间件产品长期高并发的质量验证, 值得信任,并广泛用于其他点对点(指定ip)通信场景,如dleger(raft的java实现)。
有相关通信需求的同学也都可以参考该通信模块,相信有很多的灵感,或直接使用该通信模块,带来开发效率的提升。
本文从一个普通java开发者的视角,去解析该通信模块
- 如何用 - 常见使用方式
- 实现原理 - 数据流转链路
- 设计关键点 - 为什么要如此设计
- 模块升级 - 实现简易的微服务化通信工具
本文代码版本:
<parent> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-remoting</artifactId> <version>5.0.1-PREVIEW-SNAPSHOT</version> </parent>
如何用
编写简单易懂的测试demo,实现server client的交互流程。
简单示例 协议code 为写死 0 1 5 9,输入测试信息,输出使用sysout。
▐ 启动server 注册服务监听
import com.alibaba.fastjson.JSON; import io.netty.channel.ChannelHandlerContext; import org.apache.rocketmq.remoting.RemotingServer; import org.apache.rocketmq.remoting.netty.NettyRemotingServer; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Server { public static void main(String[] args) throws Exception { NettyServerConfig nettyServerConfig = new NettyServerConfig(); // 配置端口 nettyServerConfig.setListenPort(8888); // 配置线程数 netty workGroup 线程池 处理io等低耗时 nettyServerConfig.setServerSelectorThreads(2); // 配置线程数 netty eventGroup 线程池 处理自定义hander/长耗时等 nettyServerConfig.setServerWorkerThreads(8); NettyRemotingServer remotingServer = new NettyRemotingServer(nettyServerConfig, null); // 支持共用或独立的业务处理线程池 ExecutorService poolA = new ThreadPoolExecutor(4, 4, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024)); ExecutorService poolB = new ThreadPoolExecutor(4, 4, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024)); // 业务处理器 NettyRequestProcessor processA = new NettyRequestProcessor() { @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception { System.out.println("received from client, remark:" + request.getRemark() + ", coe:" + request.getCode()); RemotingCommand response = RemotingCommand.createResponseCommand(0, "server"); switch (request.getCode()) { case 0: response.setBody(new String("hello sync 0").getBytes()); case 1: response.setBody(new String("hello sync 1").getBytes()); default: break; } return response; } @Override public boolean rejectRequest() { return false; } }; // 业务处理器 NettyRequestProcessor processB = new NettyRequestProcessor(){ @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception { System.out.println("received from client, remark:" + request.getRemark() + ", coe:" + request.getCode()); RemotingCommand response = RemotingCommand.createResponseCommand(0, "server"); switch (request.getCode()) { case 9: response.setBody(new String("hello sync 9").getBytes()); default: break; } return response; } @Override public boolean rejectRequest() { return false; } }; // 注册 协议 - 对应的处理器, 类似web url 路由到对应的class remotingServer.registerProcessor(0, processA, poolA); remotingServer.registerProcessor(1, processA, poolA); remotingServer.registerProcessor(9, processB, poolB); remotingServer.start(); System.out.println("start ok " + JSON.toJSONString(nettyServerConfig)); System.in.read(); } }
▐ 启动client 发起调用
import io.netty.channel.ChannelHandlerContext; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyRemotingClient; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.ResponseFuture; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Client { public static void main(String[] args) throws Exception { NettyClientConfig nettyServerConfig = new NettyClientConfig(); // 配置线程数 netty eventGroup 线程池 处理自定义hander/耗时长等 nettyServerConfig.setClientWorkerThreads(8); NettyRemotingClient remotingClient = new NettyRemotingClient(nettyServerConfig, null); // 支持共用或独立的业务处理线程池 ExecutorService poolA = new ThreadPoolExecutor(4, 4, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024)); // 监听服务端发过来的请求 remotingClient.registerProcessor(5, new NettyRequestProcessor() { @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception { System.out.println("receive from server : " + request.getCode()); return null; } @Override public boolean rejectRequest() { return false; } }, poolA); remotingClient.start(); // 主动发起远程调用 { // 同步调用 RemotingCommand request = RemotingCommand.createRequestCommand(0, null); request.setRemark("sync"); RemotingCommand response = remotingClient.invokeSync("127.0.0.1:8888", request, 30 * 1000L); System.out.println("call sync ok remark:" + response.getRemark() + " body:" + new String(response.getBody())); } { // 异步调用 RemotingCommand request = RemotingCommand.createRequestCommand(1, null); request.setRemark("async"); remotingClient.invokeAsync("127.0.0.1:8888", request, 30 * 1000L, new InvokeCallback() { @Override public void operationComplete(ResponseFuture responseFuture) { RemotingCommand response = responseFuture.getResponseCommand(); System.out.println("call async ok remark:" + response.getRemark() + " body:" + new String(response.getBody())); } }); } { // 单向调用 RemotingCommand request = RemotingCommand.createRequestCommand(9, null); request.setRemark("oneway"); remotingClient.invokeOneway("127.0.0.1:8888", request, 30 * 1000L); System.out.println("call oneway ok "); } System.in.read(); } }
该点对点调用,是需要手动指定目标服务器的ip和端口的,不同于hsf拥有注册中心进行协调撮合提供目标ip。
▐ 日志输出
Connected to the target VM, address: '127.0.0.1:57381', transport: 'socket' start ok {"listenPort":8888,"serverAsyncSemaphoreValue":64,"serverCallbackExecutorThreads":0,"serverChannelMaxIdleTimeSeconds":120,"serverOnewaySemaphoreValue":256,"serverPooledByteBufAllocatorEnable":true,"serverSelectorThreads":2,"serverSocketRcvBufSize":65535,"serverSocketSndBufSize":65535,"serverWorkerThreads":8,"useEpollNativeSelector":false} received from client, remark:sync, coe:0 received from client, remark:async, coe:1 received from client, remark:oneway, coe:9 Connected to the target VM, address: '127.0.0.1:57385', transport: 'socket' call sync ok remark:server body:hello sync 1 call oneway ok call async ok remark:server body:hello sync 1
实现原理
关于netty如何封装java基础nio socket不做展开。这里分析通信模块是如何封装netty,扩展调用协议规范的部分,重点描述其中关键的设计要点。
▐ server 启动 监听请求
作为服务端,需绑定端口,监听请求,这里采用标准netty服务端模式。
remotingServer.start();
@Override public void start() { ... ServerBootstrap childHandler = this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler) .addLast(defaultEventExecutorGroup, encoder, new NettyDecoder(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), connectionManageHandler, serverHandler ); } }); ... ChannelFuture sync = this.serverBootstrap.bind().sync(); InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress(); ... }
关注涉及几个线程池的地方:
- bossGroup -> eventLoopGroupBoss 固定线程数1
- workerGroup -> eventLoopGroupSelector 若linux采用epoll实现 否则使用nio实现, 线程数可配置
- eventGroup -> defaultEventExecutorGroup 普通实现的 handler 工作线程池, 线程数可配置
另外就是传统艺能:心跳, 解码器 NettyEncoder,编码器 NettyDecoder,连接管理器 connectionManageHandler,和最终的业务处理器 serverHandler
▐ server 注册业务处理器
业务线程池配置
请求协议code关联业务处理器
// 支持共用或独立的业务处理线程池 ExecutorService poolA = new ThreadPoolExecutor(4, 4, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024)); ExecutorService poolB = new ThreadPoolExecutor(4, 4, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024)); // 业务处理器 NettyRequestProcessor processA = new NettyRequestProcessor() { @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception { System.out.println("received from client, remark:" + request.getRemark() + ", coe:" + request.getCode()); RemotingCommand response = RemotingCommand.createResponseCommand(0, "server"); switch (request.getCode()) { case 0: response.setBody(new String("hello sync 0").getBytes()); case 1: response.setBody(new String("hello sync 1").getBytes()); default: break; } return response; } @Override public boolean rejectRequest() { return false; } }; // 业务处理器 NettyRequestProcessor processB = new NettyRequestProcessor(){ @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception { System.out.println("received from client, remark:" + request.getRemark() + ", coe:" + request.getCode()); RemotingCommand response = RemotingCommand.createResponseCommand(0, "server"); switch (request.getCode()) { case 9: response.setBody(new String("hello sync 9").getBytes()); default: break; } return response; } @Override public boolean rejectRequest() { return false; } }; // 注册 协议 - 对应的处理器, 类似web url 路由到对应的class remotingServer.registerProcessor(0, processA, poolA); remotingServer.registerProcessor(1, processA, poolA); remotingServer.registerProcessor(9, processB, poolB);
不同业务独立线程池的必要性在复杂业务场景中,比如商品管理链路,订单交易链路,将所有的请求堆积在一个线程池中,快请求和慢请求公用一个赛道,无法避免资源分配不均问题通信模块设计为可手动配置每个业务的处理线程池
注册路由和线程池关系
@Override public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) { ExecutorService executorThis = executor; if (null == executor) { executorThis = this.publicExecutor; } Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis); this.processorTable.put(requestCode, pair); }
建立 code - processor - pool 的三者映射关系,在后续收到请求后,可查找注册关系进行路由唤起processor
▐ client 启动 发起请求
NettyRemotingClient remotingClient = new NettyRemotingClient(nettyServerConfig, null); remotingClient.start(); // 主动发起远程调用 { // 同步调用 RemotingCommand request = RemotingCommand.createRequestCommand(0, null); request.setRemark("sync"); RemotingCommand response = remotingClient.invokeSync("127.0.0.1:8888", request, 30 * 1000L); System.out.println("call sync ok remark:" + response.getRemark() + " body:" + new String(response.getBody())); } { // 异步调用 RemotingCommand request = RemotingCommand.createRequestCommand(1, null); request.setRemark("async"); remotingClient.invokeAsync("127.0.0.1:8888", request, 30 * 1000L, new InvokeCallback() { @Override public void operationComplete(ResponseFuture responseFuture) { RemotingCommand response = responseFuture.getResponseCommand(); System.out.println("call async ok remark:" + response.getRemark() + " body:" + new String(response.getBody())); } }); } { // 单向调用 RemotingCommand request = RemotingCommand.createRequestCommand(9, null); request.setRemark("oneway"); remotingClient.invokeOneway("127.0.0.1:8888", request, 30 * 1000L); System.out.println("call oneway ok "); }
启动客户端client后,即处于长连接状态,双向通信及时性有保障
三种调用模式作为通信组件,需要适配多种调用场景,同步异步调用已是基本操作,oneway用于不关心是否返回的场景。
试想一下,在全双工双向异步通信的背景下,如何能像http一样实现同步调用,发出一个请求,收到一个请求后怎么跟前面发出的请求关联起来,又如何实现异步等待转为同步响应。
- 同步调用
发起请求
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { // 唯一id final int opaque = request.getOpaque(); ... final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null); // 把当前请求记录到待响应table中 this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { //标记为写入成功 responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } // 写入异常结果 并唤起wait的线程 responseTable.remove(opaque); responseFuture.setCause(f.cause()); responseFuture.putResponse(null); public void putResponse(final RemotingCommand responseCommand) { this.responseCommand = responseCommand; this.countDownLatch.countDown(); } log.warn("send a request command to channel <" + addr + "> failed."); } }); // 同步等待结果 RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException { this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS); return this.responseCommand; } ... }
关键设计点:每一个请求request,都分配了一个 client唯一自增的id (request.getOpaque(); requestId.getAndIncrement())。
把id和上下文存储到请求待响应table中:发送请求后(写入channel),线程等待结果响应 responseFuture.waitResponse,利用countDownLatch等待结果。
更多精彩内容,欢迎观看:
详解rocketMq通信模块&升级构想(下):https://developer.aliyun.com/article/1396356