说明:
- 图片来自dubbo剖析:一 服务发布
- ServiceBean作为provider的发布入口
- ServiceConfig负责创建exporter对象
- RegistryProtocol负责注册URL到zookeeper
- DubboProtocol负责bind端口启动监听
发布过程 - 源码走读
provider的发布入口
说明:
- ServiceBean初始化后的afterPropertiesSet方法负责执行服务导出。
public class ServiceBean<T> extends ServiceConfig<T> implements
InitializingBean, DisposableBean, ApplicationContextAware,
ApplicationListener<ContextRefreshedEvent>, BeanNameAware {
public void afterPropertiesSet() throws Exception {
// 解析各种配置信息,省略相关代码
if (!isDelay()) {
export();
}
}
}
exporter的创建过程
说明:
- doExportUrls负责遍历所有注册中心执行服务发布。
- doExportUrlsFor1Protocol内部通过proxyFactory.getInvoker()创建invoker。
- doExportUrlsFor1Protocol内部通过protocol.export()创建exporter。
- protocol.export()进入RegistryProtocol的处理逻辑。
public class ServiceConfig<T> extends AbstractServiceConfig {
public synchronized void export() {
if (provider != null) {
if (export == null) {
export = provider.getExport();
}
if (delay == null) {
delay = provider.getDelay();
}
}
if (export != null && !export) {
return;
}
if (delay != null && delay > 0) {
delayExportExecutor.schedule(new Runnable() {
public void run() {
doExport();
}
}, delay, TimeUnit.MILLISECONDS);
} else {
doExport();
}
}
protected synchronized void doExport() {
// 省略相关代码,服务发布走的流程路径之一
doExportUrls();
}
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);
// 遍历所有注册中心
for (ProtocolConfig protocolConfig : protocols) {
// 服务发布走的流程路径之一
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String scope = url.getParameter(Constants.SCOPE_KEY);
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
// 走远程export逻辑
if (registryURLs != null && registryURLs.size() > 0) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// proxyFactory负责创建invoker对象并且包装成wrapperInvoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 走RegistryProtocol的export操作
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
// 走本地export逻辑
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
}
}
export的过程
说明:
- doLocalExport()内部执行protocol.export()进入DubboProtocol的处理逻辑。
- register()负责注册导出服务到zookeeper节点,进入ZookeeperRegistry的处理逻辑。
- registry.subscribe()负责订阅,具体订阅内容暂时没理解。
public class RegistryProtocol implements Protocol {
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 这里走服务export的核心流程,会走到dubbo协议的发布流程
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
URL registryUrl = getRegistryUrl(originInvoker);
//registry provider
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
boolean register = registedProviderUrl.getParameter("register", true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
if (register) {
// 注册到zookeeper的注册中心
register(registryUrl, registedProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//Ensure that a new exporter instance is returned every time export
return new Exporter<T>() {
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
public void unexport() {
try {
exporter.unexport();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
registry.unregister(registedProviderUrl);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
overrideListeners.remove(overrideSubscribeUrl);
registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
};
}
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
// protocol.export()执行导出exporter,是DobboProtocol,通过ExporterChangeableWrapper包装
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
public void register(URL registryUrl, URL registedProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registedProviderUrl);
}
}
bind端口启动监听
说明:
- DubboProtocol通过exporterMap保存导出的exporter对象。
- DubboProtocol通过openServer()方法bind端口启动监听。
public class DubboProtocol extends AbstractProtocol {
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
// 核心在于打开socket连接
openServer(url);
optimizeSerialization(url);
return exporter;
}
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
private ExchangeServer createServer(URL url) {
// send readonly event when server closes, it's enabled by default
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
}
注册发布的服务到zookeeper中心
说明:
- ZookeeperRegistry的doRegister负责创建zookeeper节点保存发布URL信息。
public abstract class FailbackRegistry extends AbstractRegistry {
public void register(URL url) {
if (destroyed.get()){
return;
}
super.register(url);
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// 注册zookeeper节点上
doRegister(url);
} catch (Exception e) {
// 省略相关代码
}
}
protected abstract void doRegister(URL url);
}
public class ZookeeperRegistry extends FailbackRegistry {
protected void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}