提示:阅读本文前最好先阅读:
- 《Spark2.1.0之内置RPC框架》
- 《spark2.1.0之源码分析——RPC配置TransportConf》
- 《spark2.1.0之源码分析——RPC客户端工厂TransportClientFactory》
TransportServer是RPC框架的服务端,可提供高效的、低级别的流服务。在说明《Spark2.1.0之内置RPC框架》一文所展示的图1中的记号②时提到过TransportContext的createServer方法用于创建TransportServer,其实现见代码清单1。
代码清单1 创建RPC服务端
public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps) {
return new TransportServer(this, null, port, rpcHandler, bootstraps);
}
public TransportServer createServer(
String host, int port, List<TransportServerBootstrap> bootstraps) {
return new TransportServer(this, host, port, rpcHandler, bootstraps);
}
public TransportServer createServer(List<TransportServerBootstrap> bootstraps) {
return createServer(0, bootstraps);
}
public TransportServer createServer() {
return createServer(0, Lists.<TransportServerBootstrap>newArrayList());
}
代码清单1中列出了四个名为createServer的重载方法,但是它们最终调用了TransportServer的构造器(见代码清单2)来创建TransportServer实例。
代码清单2 TransportServer的构造器
public TransportServer(
TransportContext context,
String hostToBind,
int portToBind,
RpcHandler appRpcHandler,
List<TransportServerBootstrap> bootstraps) {
this.context = context;
this.conf = context.getConf();
this.appRpcHandler = appRpcHandler;
this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));
try {
init(hostToBind, portToBind);
} catch (RuntimeException e) {
JavaUtils.closeQuietly(this);
throw e;
}
}
TransportServer的构造器中的各个变量分别为:
- context:即参数传递的TransportContext的引用;
- conf:即TransportConf,这里通过调用TransportContext的getConf获取;
- appRpcHandler:即RPC请求处理器RpcHandler;
- bootstraps:即参数传递的TransportServerBootstrap列表;
TransportServer的构造器(见代码清单2)中调用了init方法,init方法用于对TransportServer进行初始化,见代码清单3。
代码清单3 TransportServer的初始化
private void init(String hostToBind, int portToBind) {
// 根据Netty的API文档,Netty服务端需同时创建bossGroup和workerGroup
IOMode ioMode = IOMode.valueOf(conf.ioMode());
EventLoopGroup bossGroup =
NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");
EventLoopGroup workerGroup = bossGroup;
// 创建一个汇集ByteBuf但对本地线程缓存禁用的分配器
PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
// 创建Netty的服务端根引导程序并对其进行配置
bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NettyUtils.getServerChannelClass(ioMode))
.option(ChannelOption.ALLOCATOR, allocator)
.childOption(ChannelOption.ALLOCATOR, allocator);
if (conf.backLog() > 0) {
bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
}
if (conf.receiveBuf() > 0) {
bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
}
if (conf.sendBuf() > 0) {
bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
}
// 为根引导程序设置管道初始化回调函数
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
RpcHandler rpcHandler = appRpcHandler;
for (TransportServerBootstrap bootstrap : bootstraps) {
rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
}
context.initializePipeline(ch, rpcHandler);
}
});
// 给根引导程序绑定Socket的监听端口
InetSocketAddress address = hostToBind == null ?
new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
channelFuture = bootstrap.bind(address);
channelFuture.syncUninterruptibly();
port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
logger.debug("Shuffle server started on port: {}", port);
}
代码清单3中TransportServer初始化的步骤如下:
- 创建bossGroup和workerGroup(根据Netty的API文档,Netty服务端需同时创建bossGroup和workerGroup。);
创建一个汇集ByteBuf但对本地线程缓存禁用的分配器;
调用Netty的API创建Netty的服务端根引导程序并对其进行配置;
为根引导程序设置管道初始化回调函数,此回调函数首先设置TransportServerBootstrap到根引导程序中,然后调用TransportContext的initializePipeline方法初始化Channel的pipeline;
给根引导程序绑定Socket的监听端口,最后返回监听的端口。
提示:代码清单3中使用了NettyUtils工具类的很多方法,在《附录G Netty与NettyUtils》中有对它们的详细介绍。EventLoopGroup、PooledByteBufAllocator、ServerBootstrap都是Netty提供的API,对于它们的更多介绍,请访问http://netty.io/。
关于《Spark内核设计的艺术 架构设计与实现》
经过近一年的准备,《Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:
纸质版售卖链接如下: