服务发布和注册是Dubbo非常核心的能力之一,也是Rpc技术非常关键的一步。本文将从源码层面掌握Dubbo的服务发布和注册。
先看下整体步骤
我们接着看下源码是不是按这个流程来处理的。
暴露服务的源码入口
com.alibaba.dubbo.config.spring.ServiceBean#onApplicationEvent spring容器启动完成后触发ContextRefreshedEvent事件 。
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware, ApplicationEventPublisherAware { @Override public void onApplicationEvent(ContextRefreshedEvent event) { if (!isExported() && !isUnexported()) { if (logger.isInfoEnabled()) { logger.info("The service ready on spring started. service: " + getInterface()); } //执行发布 export(); } }}
多协议发布
private void doExportUrls() { List<URL> registryURLs = loadRegistries(true); //遍历协议列表,分开发布服务 for (ProtocolConfig protocolConfig : protocols) { String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version); ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass); ApplicationModel.initProviderModel(pathKey, providerModel); doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
组织Url,需要说明的是Dubbo是根据组装的URL来驱动发布注册的。Url信息如下:
public class URL implements Serializable { private static final long serialVersionUID = -1985165475234910535L; private final String protocol; private final String username; private final String password; // by default, host to registry private final String host; // by default, port to registry private final int port; private final String path; private final Map<String, String> parameters;}
构建Invoker\
public class JavassistProxyFactory extends AbstractProxyFactory { @Override @SuppressWarnings("unchecked") public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); } @Override public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper cannot handle this scenario correctly: the classname contains '$' final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }}
发布本地tcp端口
@Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl(); // export service. String key = serviceKey(url); DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispatching event Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } openServer(url); optimizeSerialization(url); return exporter; }
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header"; @Override public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); } @Override public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }}
org.apache.dubbo.remoting.transport.netty4#doOpen 开启端口
并绑定请求处理器
@Override protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap(); bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true)); final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); channels = nettyServerHandler.getChannels(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override protected void initChannel(NioSocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()? int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)) .addLast("handler", nettyServerHandler); } }); // bind ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); channelFuture.syncUninterruptibly(); channel = channelFuture.channel(); }
注册到注册中心,将url注册到zk上 路径是/dubbo/服务名/providers
@Override public void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true)); } catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
看完源码,服务发布和注册流程确实是按如下图,可以debug再验证下。