Netty中首先会经过OP_ACCEPT操作,再经过OP_READ事件,此时的操作是在processSelectionKeys中进行处理的,此时首先select出事件,然后执行处理操作。此时的read方法会会执行先后会执行两个事件,一个是连接事件16和一个读事件1。而在OP_WRITE则是在缓冲区写满的时候,才会去注册,等待通知去写。
一个普通的BIO的操作:
publicclassBIOServer { publicstaticvoidmain(String[] args) throwsException { ServerSocketserverSocket=newServerSocket(6666); ExecutorServiceexecutorService=Executors.newCachedThreadPool(); while (true) { System.out.println("等待客户端连接。。。。"); Socketsocket=serverSocket.accept(); //阻塞executorService.execute(() -> { try { InputStreaminputStream=socket.getInputStream(); //阻塞byte[] bytes=newbyte[1024]; while (true){ intlength=inputStream.read(bytes); if(length==-1){ break; } System.out.println(newString(bytes, 0, length, "UTF-8")); } } catch (Exceptione) { e.printStackTrace(); } }); } } }
从上面我们可以看到首先绑定端口,然后进行连接,然后执行read操作。
NIO操作:首先需要打开selector,同时打开serverSocketChannel,设置成非阻塞方式,此时进行通道获取,然后绑定bind端口,注册感兴趣的事件。接下来是在死循环中,进行轮询获取selectionKeys,然后对selectionKeys中的事件进行处理,处理完,进行移除。可以看到在处理的过程中,会涉及到连接事件和读事件的操作。
publicclassSelectorDemo { /*** 注册事件** @return*/privateSelectorgetSelector() throwsException { //获取selector对象Selectorselector=Selector.open(); ServerSocketChannelserverSocketChannel=ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); //非阻塞//获取通道并且绑定端⼝ServerSocketsocket=serverSocketChannel.socket(); socket.bind(newInetSocketAddress(6677)); //注册感兴趣的事件serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); returnselector; } publicvoidlisten() throwsException { Selectorselector=this.getSelector(); while (true) { selector.select(); //该方法会阻塞,直到至少有一个事件的发生Set<SelectionKey>selectionKeys=selector.selectedKeys(); Iterator<SelectionKey>iterator=selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKeyselectionKey=iterator.next(); process(selectionKey, selector); iterator.remove(); } } } privatevoidprocess(SelectionKeykey, Selectorselector) throwsException { if(key.isAcceptable()){ //新连接请求ServerSocketChannelserver= (ServerSocketChannel)key.channel(); SocketChannelchannel=server.accept(); channel.configureBlocking(false); //阻塞channel.register(selector, SelectionKey.OP_READ); }elseif(key.isReadable()){ //读数据SocketChannelchannel= (SocketChannel)key.channel(); ByteBufferbyteBuffer=ByteBuffer.allocate(1024); channel.read(byteBuffer); System.out.println("form 客户端 "+newString(byteBuffer.array(), 0, byteBuffer.position())); } } publicstaticvoidmain(String[] args) throwsException { newSelectorDemo().listen(); } }
而Netty,正是围绕NIO进行优化封装的。
也即Netty中,我们首先会启动服务,此时会将连接事件注册到NioEventLoop中,而这个过程首先是注册0,然后注册16这个事件,也即连接事件,接着注册读事件,如果不能写的时候,注册写事件,等待写通知。
在Dubbo中进行的封装:
dubbo暴露服务的流程如下
在dubbo中,经过invoker操作后,会调用协议进行dubbo协议适配:
Exporter<?>exporter=PROTOCOL.export(wrapperInvoker) <T>Exporter<T>export(Invoker<T>invoker) throwsRpcException; //进行网络通信操作,启动netty服务器public<T>Exporter<T>export(Invoker<T>invoker) throwsRpcException { URLurl=invoker.getUrl(); // export service.Stringkey=serviceKey(url); DubboExporter<T>exporter=newDubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispatching eventBooleanisStubSupportEvent=url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT); BooleanisCallbackservice=url.getParameter(IS_CALLBACK_SERVICE, false); if (isStubSupportEvent&&!isCallbackservice) { StringstubServiceMethods=url.getParameter(STUB_EVENT_METHODS_KEY); if (stubServiceMethods==null||stubServiceMethods.length() ==0) { if (logger.isWarnEnabled()) { logger.warn(newIllegalStateException("consumer ["+url.getParameter(INTERFACE_KEY) +"], has set stubproxy support event ,but no stub methods founded.")); } } } openServer(url); optimizeSerialization(url); returnexporter; }
openServer(url):
privatevoidopenServer(URLurl) { // find server.Stringkey=url.getAddress(); //client can export a service which's only for server to invokebooleanisServer=url.getParameter(IS_SERVER_KEY, true); if (isServer) { ProtocolServerserver=serverMap.get(key); if (server==null) { synchronized (this) { server=serverMap.get(key); if (server==null) { serverMap.put(key, createServer(url)); } } } else { // server supports reset, use together with overrideserver.reset(url); } } }
可以看到这里使用了单例模式。
同时执行bind操作:
privateProtocolServercreateServer(URLurl) { url=URLBuilder.from(url) // send readonly event when server closes, it's enabled by default .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()) // enable heartbeat by default .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT)) .addParameter(CODEC_KEY, DubboCodec.NAME) .build(); Stringstr=url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER); if (str!=null&&str.length() >0&&!ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { thrownewRpcException("Unsupported server type: "+str+", url: "+url); } ExchangeServerserver; try { server=Exchangers.bind(url, requestHandler); } catch (RemotingExceptione) { thrownewRpcException("Fail to start server(url: "+url+") "+e.getMessage(), e); } str=url.getParameter(CLIENT_KEY); if (str!=null&&str.length() >0) { Set<String>supportedTypes=ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { thrownewRpcException("Unsupported client type: "+str); } } returnnewDubboProtocolServer(server); }
可以看到绑定操作的实质:初始化和启动Netty服务器
publicRemotingServerbind(URLurl, ChannelHandlerhandler) throwsRemotingException { returnnewNettyServer(url, handler); } /*** Init and start netty server** @throws Throwable*/protectedvoiddoOpen() throwsThrowable { bootstrap=newServerBootstrap(); //boss线程组这里设置为1个bossGroup=NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss"); //worker线程组workerGroup=NettyEventLoopFactory.eventLoopGroup( getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), "NettyServerWorker"); finalNettyServerHandlernettyServerHandler=newNettyServerHandler(getUrl(), this); channels=nettyServerHandler.getChannels(); bootstrap.group(bossGroup, workerGroup) .channel(NettyEventLoopFactory.serverSocketChannelClass()) .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(newChannelInitializer<SocketChannel>() { protectedvoidinitChannel(SocketChannelch) throwsException { // FIXME: should we use getTimeout()?intidleTimeout=UrlUtils.getIdleTimeout(getUrl()); NettyCodecAdapteradapter=newNettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); if (getUrl().getParameter(SSL_ENABLED_KEY, false)) { ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler)); } ch.pipeline() .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("server-idle-handler", newIdleStateHandler(0, 0, idleTimeout, MILLISECONDS)) .addLast("handler", nettyServerHandler); } }); // bindChannelFuturechannelFuture=bootstrap.bind(getBindAddress()); channelFuture.syncUninterruptibly(); channel=channelFuture.channel(); }
同时可以看到dubbo中Netty的设置:将boss线程组设置为1个,避免过多的线程浪费。
禁用Nagle算法=> .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE),来减少网络卡顿。
设置编解码器,采用适配器的方式,适配协议对应的编解码,方便协议适配。
设置空闲处理handler参数:IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime,TimeUnit unit)
中的读闲置时间为0,同时写闲置时间为0,充分保证性能。
ch.pipeline() .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("server-idle-handler", newIdleStateHandler(0, 0, idleTimeout, MILLISECONDS)) .addLast("handler", nettyServerHandler); }
设置闲置超时时间:idleTimeout = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartBeat * 3)
publicstaticintgetIdleTimeout(URLurl) { intheartBeat=getHeartbeat(url); // idleTimeout should be at least more than twice heartBeat because possible retries of client.intidleTimeout=url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartBeat*3); if (idleTimeout<heartBeat*2) { thrownewIllegalStateException("idleTimeout < heartbeatInterval * 2"); } returnidleTimeout; }
连接完成之后,不能无所事事,此时应该会执行业务处理。也即此时可以看到上面的NettyServerHandler。因此可以看到dubbo的线程模型:
配置 Dubbo 中的线程模型
如果事件处理的逻辑能迅速完成,并且不会发起新的 IO 请求,比如只是在内存中记个标识,则直接在 IO 线程上处理更快,因为减少了线程池调度。
但如果事件处理逻辑较慢,或者需要发起新的 IO 请求,比如需要查询数据库,则必须派发到线程池,否则 IO 线程阻塞,将导致不能接收其它请求。
如果用 IO 线程处理事件,又在事件处理过程中发起新的 IO 请求,比如在连接事件中发起登录请求,会报“可能引发死锁”异常,但不会真死锁。
因此,需要通过不同的派发策略和不同的线程池配置的组合来应对不同的场景:
<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />
Dispatcher
all
所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。direct
所有消息都不派发到线程池,全部在 IO 线程上直接执行。message
只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO 线程上执行。execution
只有请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行。connection
在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。
ThreadPool
fixed
固定大小线程池,启动时建立线程,不关闭,一直持有。(缺省)cached
缓存线程池,空闲一分钟自动删除,需要时重建。limited
可伸缩线程池,但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。eager
优先创建Worker
线程池。在任务数量大于corePoolSize
但是小于maximumPoolSize
时,优先创建Worker
来处理任务。当任务数量大于maximumPoolSize
时,将任务放入阻塞队列中。阻塞队列充满时抛出RejectedExecutionException
。(相比于cached
:cached
在任务数量超过maximumPoolSize
时直接抛出异常而不是将任务放入阻塞队列)
下面来看NettyServerHandler中:执行的操作包括:连接操作connect、断开连接disconnect、received操作、写操作write和发送操作sent、关闭操作
/*** NettyServerHandler.*/netty.channel.ChannelHandler.Sharable .publicclassNettyServerHandlerextendsChannelDuplexHandler { //执行连接操作publicvoidchannelActive(ChannelHandlerContextctx) throwsException { NettyChannelchannel=NettyChannel.getOrAddChannel(ctx.channel(), url, handler); if (channel!=null) { channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel); } handler.connected(channel); } publicvoidchannelInactive(ChannelHandlerContextctx) throwsException { NettyChannelchannel=NettyChannel.getOrAddChannel(ctx.channel(), url, handler); try { channels.remove(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress())); handler.disconnected(channel); } finally { NettyChannel.removeChannel(ctx.channel()); } } publicvoidchannelRead(ChannelHandlerContextctx, Objectmsg) throwsException { NettyChannelchannel=NettyChannel.getOrAddChannel(ctx.channel(), url, handler); handler.received(channel, msg); } publicvoidwrite(ChannelHandlerContextctx, Objectmsg, ChannelPromisepromise) throwsException { super.write(ctx, msg, promise); NettyChannelchannel=NettyChannel.getOrAddChannel(ctx.channel(), url, handler); handler.sent(channel, msg); } publicvoiduserEventTriggered(ChannelHandlerContextctx, Objectevt) throwsException { // server will close channel when server don't receive any heartbeat from client util timeout.if (evtinstanceofIdleStateEvent) { NettyChannelchannel=NettyChannel.getOrAddChannel(ctx.channel(), url, handler); try { logger.info("IdleStateEvent triggered, close channel "+channel); channel.close(); } finally { NettyChannel.removeChannelIfDisconnected(ctx.channel()); } } super.userEventTriggered(ctx, evt); } publicvoidexceptionCaught(ChannelHandlerContextctx, Throwablecause) throwsException { NettyChannelchannel=NettyChannel.getOrAddChannel(ctx.channel(), url, handler); try { handler.caught(channel, cause); } finally { NettyChannel.removeChannelIfDisconnected(ctx.channel()); } } publicvoidhandshakeCompleted(HandshakeCompletionEventevt) { // TODO } }
同时NettyServerClient里面也有这几个事件。