一、引导
1.1 什么是引导
引导一个应用程序是指对它进行配置,并使它运行起来的过程。引导可以简单的认为是将分散的了 ChannelPipeline、ChannelHandler 和 EventLoop组合起来,成为一个完成应用程序的模块。
1.2 Bootstrap 类
引导类的层次结构包括一个抽象的父类和两个具体的引导子类。
相对于将具体的引导类分别看作用于服务器和客户端的引导来说,它们的本意是用来支撑不同的应用程序的功能。
服务器致力于使用一个父 Channel 来接受来自客户端的连接,并创建子Channel 以用于它们之间的通信;
而客户端将最可能只需要一个单独的、没有父 Channel 的 Channel 来用于所有的网络交互。
如 UDP,因为它们并不是每个连接都需要一个单独的 Channel。
我们在前面的几篇中学习的几个 Netty 组件都参与了引导的过程,而且其中一些在客户端和服务器都有用到。两种应用程序类型之间通用的引导步骤由 AbstractBootstrap 处理,而特定于客户端或者服务器的引导步骤则分别由 Bootstrap 或 ServerBootstrap 处理。
AbstractBootstrap类申明:
public abstract class AbstractBootstrap
<B extends AbstractBootstrap<B,C>,C extends Channel>
在这个签名中,子类型 B 是其父类型的一个类型参数,因此可以返回到运行时实例的引用以支持方法的链式调用(也就是所谓的流式语法)。
它的子类有两种申明方式,分别为:
public class Bootstrap
extends AbstractBootstrap<Bootstrap,Channel>
public class ServerBootstrap
extends AbstractBootstrap<ServerBootstrap,ServerChannel>
是不是有些眼熟?我们在第二篇博文中使用过他们构建过客户端和服务端。
1.3 引导客户端和无连接协议
Bootstrap 类被用于客户端或者使用了无连接协议的应用程序中,它的大部分方法都继承自 AbstractBootstrap 类。
名 称 | 描 述 |
---|---|
Bootstrap group(EventLoopGroup) | 设置用于处理 Channel 所有事件的 EventLoopGroup |
Bootstrap channel(Class<? extends C>) Bootstrap channelFactory(ChannelFactory<? extends C>) | channel()方法指定了Channel的实现类。如果该实现类没提供默认的构造函数 ,可以通过调用channelFactory()方法来指定一个工厂类,它将会被bind()方法调用 |
Bootstrap localAddress(SocketAddress) | 指定 Channel 应该绑定到的本地地址。如果没有指定,则将由操作系统创建一个随机的地址。或者,也可以通过bind()或者 connect()方法指定 localAddress |
Bootstrap option(ChannelOption option,T value) | 设置 ChannelOption,其将被应用到每个新创建的Channel 的 ChannelConfig。这些选项将会通过bind()或者 connect()方法设置到 Channel,不管哪个先被调用。这个方法在 Channel 已经被创建后再调用将不会有任何的效果。支持的 ChannelOption 取决于使用的 Channel 类型。 |
Bootstrap attr(Attribute key, T value) | 指定新创建的 Channel 的属性值。这些属性值是通过bind()或者 connect()方法设置到 Channel 的,具体取决于谁最先被调用。这个方法在 Channel 被创建后将不会有任何的效果。 |
Bootstrap handler(ChannelHandler) | 设置将被添加到 ChannelPipeline 以接收事件通知的ChannelHandler |
Bootstrap clone() | 创建一个当前 Bootstrap 的克隆,其具有和原始的Bootstrap 相同的设置信息 |
Bootstrap remoteAddress(SocketAddress) | 设置远程地址。或者,也可以通过 connect()方法来指定它 |
ChannelFuture connect() | 连接到远程节点并返回一个 ChannelFuture,其将 会在连接操作完成后接收到通知 |
ChannelFuture bind() | 绑定 Channel 并返回一个 ChannelFuture,其将会在绑定操作完成后接收到通知,在那之后必须调用 Channel.connect()方法来建立连接 |
这玩意太多了,建议可以收藏本文,用的时候翻出来看一看即可。
1.4 引导客户端
Bootstrap 类负责为客户端和使用无连接协议的应用程序创建 Channel。
它的引导过程可以参见下面的图示:
我们来一段引导了一个使用 NIO TCP 传输的客户端代码看看:
package com.example.netty.bootstrap.niotcp;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
/**
* @author lhd
* @date 2023/05/24 15:19
* @notes 引导了NIO 的 Netty 客户端代码
*/
public class client {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
//创建一个Bootstrap类的实例以创建和连接新的客户端Channe
Bootstrap bootstrap = new Bootstrap();
//设置 EventLoopGroup,提供用于处理 Channel事件的 EventLoop
bootstrap.group(group)
//指定channel实现
.channel(NioSocketChannel.class)
//设置用于 Channel 事件和数据的ChannelInboundHandle
.handler(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(
ChannelHandlerContext channelHandlerContext,
ByteBuf byteBuf) throws Exception {
System.out.println("Received data");
}
} );
//链接到远程主机
ChannelFuture future = bootstrap.connect(
new InetSocketAddress("www.manning.com", 80));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture)
throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("Connection established");
} else {
System.err.println("Connection attempt failed");
channelFuture.cause().printStackTrace();
}
}
} );
}
}
1.5 Channel 和 EventLoopGroup 的兼容性
Channel 和 EventLoopGroup 都有相关的 EventLoopGroup 和Channel 实现。它们是相互兼容的。
相互兼容的 EventLoopGroup 和 Channel:
channel
├───nio
│ NioEventLoopGroup
├───oio
│ OioEventLoopGroup
└───socket
├───nio
│ NioDatagramChannel
│ NioServerSocketChannel
│ NioSocketChannel
└───oio
OioDatagramChannel
OioServerSocketChannel
OioSocketChannel
二、引导服务器
我们将从 ServerBootstrap API 的概要视图开始我们对服务器引导过程的概述。
2.1 ServerBootstrap 类
和Bootstrap 类一样, ServerBootstrap 类也有属于它的类方法。
名 称 | 描 述 |
---|---|
group | 设置 ServerBootstrap 要用的 EventLoopGroup。这个 EventLoopGroup将用于 ServerChannel 和被接受的子 Channel 的 I/O 处理 |
channel | 设置将要被实例化的 ServerChannel 类 |
channelFactory | 如果不能通过默认的构造函数 ①创建Channel,那么可以提供一个ChannelFactory |
localAddress | 指定 ServerChannel 应该绑定到的本地地址。如果没有指定,则将由操作系统使用一个随机地址。或者,可以通过 bind()方法来指定该 localAddress |
option | 指定要应用到新创建的 ServerChannel 的 ChannelConfig 的 ChannelOption。这些选项将会通过 bind()方法设置到 Channel。在 bind()方法被调用之后,设置或者改变 ChannelOption 都不会有任何的效果。所支持的 ChannelOption 取决于所使用的 Channel 类型。 |
childOption | 指定当子 Channel 被接受时,应用到子 Channel 的 ChannelConfig 的ChannelOption。所支持的 ChannelOption 取决于所使用的 Channel 的类型。 |
attr | 指定 ServerChannel 上的属性,属性将会通过 bind()方法设置给 Channel。在调用 bind()方法之后改变它们将不会有任何的效果 |
childAttr | 将属性设置给已经被接受的子 Channel。接下来的调用将不会有任何的效果handler 设置被添加到ServerChannel 的ChannelPipeline中ChannelHandler。 |
childHandler | 设置将被添加到已被接受的子 Channel 的 ChannelPipeline 中的 ChannelHandler。handler()方法和 childHandler()方法之间的区别是:前者所添加的 ChannelHandler 由接受子 Channel 的 ServerChannel 处理,而childHandler()方法所添加的 ChannelHandler 将由已被接受的子 Channel处理,其代表一个绑定到远程节点的套接字 |
clone | 克隆一个设置和原始的 ServerBootstrap 相同的 ServerBootstrap |
bind | 绑定 ServerChannel 并且返回一个 ChannelFuture,其将会在绑定操作完成后收到通知(带着成功或者失败的结果) |
那服务器是咋引导的呢?我们继续往下看。
2.2 引导服务器
上面的表中列出了一些客户端Bootstrap 类没有的方法,像:childHandler()、 childAttr()和childOption()。这些调用支持特别用于服务器应用程序的操作。具体来说,ServerChannel 的实现负责创建子Channel,这些子 Channel 代表了已被接受的连接。因此,负责引导 ServerChannel 的 ServerBootstrap提供了这些方法,以简化将设置应用到已被接受的子 Channel 的 ChannelConfig 的任务。
ServerBootstrap 在 bind()方法被调用时创建了一个 ServerChannel,并且该 ServerChannel 管理了多个子 Channel的过程是咋样的呢?
看这张图:
用代码表示它的引导过程应该是这样的:
NioEventLoopGroup group = new NioEventLoopGroup();
//创建ServerBootstrap
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group)
//设置channel
.channel(NioServerSocketChannel.class)
//设置用于处理已被接受的子Channel的I/O及数据的 ChannelInboundHandler
.childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx,
ByteBuf byteBuf) throws Exception {
System.out.println("Received data");
}
} );
//通过配置好的ServerBootstrap的实例绑定该Channel
ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture)
throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("Server bound");
} else {
System.err.println("Bound attempt failed");
channelFuture.cause().printStackTrace();
}
}
} );
三、从 Channel 引导客户端
假设你的服务器正在处理一个客户端的请求,这个请求需要它充当第三方系统的客户端。当一个应用程序(如一个代理服务器)必须要和组织现有的系统(如 Web 服务或者数据库)集成时,就可能发生需要从已经被接受的子 Channel 中引导一个客户端 Channel。如果我们按照上面的方式去创建客户端,这会产生额外的线程,以及在已被接受的子 Channel 和客户端 Channel 之间交换数据时不可避免的上下文切换。
为了避免这种情况,我们可以将已被接受的子 Channel 的 EventLoop 传递给 Bootstrap的 group()方法来共享该 EventLoop。因为分配给 EventLoop 的所有 Channel 都使用同一个线程,所以这避免了额外的线程创建,以及前面所提到的相关的上下文切换。
实现 EventLoop 共享涉及通过调用 group()方法来设置 EventLoop,如代码:
//创建 ServerBootstrap 以创建ServerSocketChannel,并绑定它
ServerBootstrap bootstrap = new ServerBootstrap();
//设置 EventLoopGroup,其将提供用以处理 Channel 事件的 EventLoop
bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup())
//指定要使用的Channel 实现
.channel(NioServerSocketChannel.class)
//设置用于处理已被接受的子 Channel 的 I/O 和数据的ChannelInboundHandler
.childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
ChannelFuture connectFuture;
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
Bootstrap bootstrap = new Bootstrap();
//指定 Channel的实现
bootstrap.channel(NioSocketChannel.class).handler(
//为入站 I/O 设置ChannelInboundHandler
new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in)
throws Exception {
System.out.println("Received data");
}
} );
//使用与分配给已被接受的子channel相同的EventLoop
bootstrap.group(ctx.channel().eventLoop());
//创建一个 Bootstrap类的实例以连接到远程主机
connectFuture = bootstrap.connect(
new InetSocketAddress("www.manning.com", 80));
}
@Override
protected void channelRead0(
ChannelHandlerContext channelHandlerContext,
ByteBuf byteBuf) throws Exception {
//当连接完成时,执行一些数据操作(如代理)
if (connectFuture.isDone()) {
// do something with the data
}
}
} );
//通过配置好的ServerBootstrap绑定该 ServerSocketChannel
ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("Server bound");
} else {
System.err.println("Bind attempt failed");
channelFuture.cause().printStackTrace();
}
}
} );
这段代码表达了一个核心,就是尽可能地重用 EventLoop,以减少线程创建所带来的开销。但共享EventLoop就意味着共享线程。所以我们特别需要注意的是不能将有状态的数据带入(上一篇有提到,感兴趣可以返回去看看)。
四、在引导过程中添加多个 ChannelHandler
在所有我们展示过的代码示例中,我们都在引导的过程中调用了 handler()或者 childHandler()方法来添加单个的 ChannelHandler。这对于简单的应用程序来说可能已经足够,但是它不能满足更加复杂的需求。例如,一个必须要支持多种协议的应用程序将会有很多的ChannelHandler,而不会是一个庞大而又笨重的类。
我们可以可以根据需要,通过在 ChannelPipeline 中将它们链接在一起来部署尽可能多的ChannelHandler。但是,如果在引导的过程中你只能设置一个 ChannelHandler,那么你应该怎么做到这一点呢?
解决方式来了:Netty 提供了一个特殊的 ChannelInboundHandlerAdapter 子类:
public abstract class ChannelInitializer<C extends Channel>
extends ChannelInboundHandlerAdapter
它定义了下面的方法:
protected abstract void initChannel(C ch) throws Exception;
那这个方法该如何使用呢?
这个方法提供了一种将多个 ChannelHandler 添加到一个 ChannelPipeline 中的简便方法。
只需要简单地向 Bootstrap 或 ServerBootstrap 的实例提供ChannelInitializer 实现即可,并且一旦 Channel 被注册到了它的 EventLoop 之后,就会调用你的initChannel()版本。在该方法返回之后,ChannelInitializer 的实例将会从 ChannelPipeline 中移除它自己。
下面看一下它的代码示例:
//创建 ServerBootstrap 以创建和绑定新的 Channel
ServerBootstrap bootstrap = new ServerBootstrap();
//设置 EventLoopGroup,其将提供用以处理 Channel 事件的 EventLoop
bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup())
//指定 Channel 的实现
.channel(NioServerSocketChannel.class)
//注册一个 ChannelInitializerImpl 的实例来设置 ChannelPipeline
.childHandler(new ChannelInitializerImpl());
//绑定到地址
ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080));
future.sync();
final class ChannelInitializerImpl extends ChannelInitializer<Channel> {
//用以设置 ChannelPipeline 的自定义ChannelInitializerImpl 实现
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//将所需的ChannelHandler添加到ChannelPipeline
pipeline.addLast(new HttpClientCodec());
pipeline.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
}
}
五、使用 Netty 的 ChannelOption 和属性
在每个 Channel 创建时都手动配置它可能会变得相当繁琐。可以使用 option()方法来将 ChannelOption 应用到引导。我们所提供的值将会被自动应用到引导所创建的所有 Channel。可用的 ChannelOption 包括了底层连接的详细信息,如keep-alive 或者超时属性以及缓冲区设置。
如何使用 ChannelOption 来配置 Channel:
//创建一个 AttributeKey以标识该属性
final AttributeKey<Integer> id = new AttributeKey<Integer>("ID");
//创建一个 Bootstrap 类的实例以创建客户端 Channel 并连接它们
Bootstrap bootstrap = new Bootstrap();
//设置 EventLoopGroup,其提供了用以处理 Channel事件的 EventLoop
bootstrap.group(new NioEventLoopGroup())
//指定Channel的实现
.channel(NioSocketChannel.class)
//设置用以处理 Channel 的I/O 以及数据的 ChannelInboundHandler
.handler(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
public void channelRegistered(ChannelHandlerContext ctx)throws Exception {
//使用 AttributeKey 检索属性以及它的值
Integer idValue = ctx.channel().attr(id).get();
// do something with the idValue
}
@Override
protected void channelRead0(
ChannelHandlerContext channelHandlerContext,
ByteBuf byteBuf) throws Exception {
System.out.println("Received data");
}
}
);
bootstrap.option(ChannelOption.SO_KEEPALIVE,true)
//设置 ChannelOption,其将在 connect()或者bind()方法被调用时被设置到已经创建的Channel 上
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
//存储该id 属性
bootstrap.attr(id, 123456);
//使用配置好的 Bootstrap实例连接到远程主机
ChannelFuture future = bootstrap.connect(
new InetSocketAddress("www.manning.com", 80));
future.syncUninterruptibly();
六、引导 DatagramChannel
前面的引导代码示例使用的都是基于 TCP 协议的 SocketChannel,但是 Bootstrap 类也可以被用于无连接的协议。为此,Netty 提供了各种 DatagramChannel 的实现。唯一区别就是,不再调用 connect()方法,而是只调用 bind()方法。
使用 Bootstrap 和 DatagramChannel:
//创建一个 Bootstrap 的实例以创建和绑定新的数据报 Channel
Bootstrap bootstrap = new Bootstrap();
//设置 EventLoopGroup,其提供了用以处理 Channel 事件的 EventLoop
bootstrap.group(new OioEventLoopGroup()).channel(
//设置用以处理 Channel 的I/O 以及数据的 ChannelInboundHandlerOioDatagramChannel.class).handler(
new SimpleChannelInboundHandler<DatagramPacket>(){
@Override
public void channelRead0(ChannelHandlerContext ctx,
DatagramPacket msg) throws Exception {
// Do something with the packet
}
}
);
//调用 bind()方法,因为该协议是无连接的
ChannelFuture future = bootstrap.bind(new InetSocketAddress(0));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture)
throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("Channel bound");
} else {
System.err.println("Bind attempt failed");
channelFuture.cause().printStackTrace();
}
}
});
七、关闭
引导使你的应用程序启动并且运行起来,但是迟早你都需要优雅地将它关闭。当然,你也可以让 JVM 在退出时处理好一切,但是这不符合优雅的定义,优雅是指干净地释放资源。
我们需要关闭 EventLoopGroup,它将处理任何挂起的事件和任务,并且随后释放所有活动的线程。这就是调用EventLoopGroup.shutdownGracefully()方法的作用。
这个方法调用将会返回一个 Future,这个 Future 将在关闭完成时接收到通知。需要注意的是,shutdownGracefully()方法也是一个异步的操作,所以你需要阻塞等待直到它完成,或者向所返回的 Future 注册一个监听器以在关闭完成时获得通知。
优雅关闭:
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class);
...
//shutdownGracefully()方法将释放所有的资源,并且关闭所有的当前正在使用中的 Channe
Future<?> future = group.shutdownGracefully();
// block until the group has shutdown
future.syncUninterruptibly();
或者,你也可以在调用EventLoopGroup.shutdownGracefully()方法之前,显式地在所有活动的 Channel 上调用 Channel.close()方法。但是在任何情况下,都请记得关闭EventLoopGroup 本身。