开篇
这篇文章尝试对Dubbo服务发布和调用中关于ServiceImpl->invoker->Exporter层面的过程进行分析,希望能够回答ServiceImpl到Exporter的转化过程。
因为Netty转发部分的逻辑也是一个比较复杂的过程,所以拆解成几篇文章分开讲解,这里我们只关注服务发布过程中对象的转换以及部分调用的过程。
整个需要分析的核心过程如下图所示,核心在于发布和调用两个过程。
service 到 invoker的过程
public class ServiceConfig<T> extends AbstractServiceConfig {
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
// 省略非核心代码
String scope = url.getParameter(SCOPE_KEY);
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (CollectionUtils.isNotEmpty(registryURLs)) {
for (URL registryURL : registryURLs) {
// 省略非核心代码
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass,
registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
}
this.urls.add(url);
}
}
- 核心invoker的生成逻辑,Invoker<?> invoker = PROXY_FACTORY.getInvoker();
- 核心exporter的生成逻辑,Exporter<?> exporter = protocol.export(wrapperInvoker);
- 我们关心的就是invoker的生成逻辑和exporter的生成逻辑。
private static final ProxyFactory PROXY_FACTORY =
ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
public class ProxyFactory$Adaptive implements org.apache.dubbo.rpc.ProxyFactory {
public org.apache.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0,
java.lang.Class arg1, org.apache.dubbo.common.URL arg2)
throws org.apache.dubbo.rpc.RpcException {
if (arg2 == null) {
throw new IllegalArgumentException("url == null");
}
org.apache.dubbo.common.URL url = arg2;
String extName = url.getParameter("proxy", "javassist");
if (extName == null) {
throw new IllegalStateException(
"Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" +
url.toString() + ") use keys([proxy])");
}
// 返回javassist对应的StubProxyFactoryWrapper
// StubProxyFactoryWrapper内部包装了JavassistProxyFactory对象。
org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)
ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class)
.getExtension(extName);
// 执行 StubProxyFactoryWrapper的getInvoker()方法
return extension.getInvoker(arg0, arg1, arg2);
}
}
- PROXY_FACTORY实际指代的是ProxyFactory$Adaptive对象。
- PROXY_FACTORY.getInvoker()调用的是ProxyFactory$Adaptive的getInvoker()方法。
- ProxyFactory$Adaptive的getInvoker()执行ExtensionLoader.getExtensionLoader().getExtension("javassist")获取动态扩展。
- javassist的扩展是StubProxyFactoryWrapper,内部包装了JavassistProxyFactory对象。
- extension.getInvoker()执行StubProxyFactoryWrapper的getInvoker()方法。
public class StubProxyFactoryWrapper implements ProxyFactory {
// 实际是JavassistProxyFactory对象。
private final ProxyFactory proxyFactory;
private Protocol protocol;
// 参数ProxyFactory是JavassistProxyFactory对象.
public StubProxyFactoryWrapper(ProxyFactory proxyFactory) {
this.proxyFactory = proxyFactory;
}
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException {
// 执行的是JavassistProxyFactory的getInvoker
return proxyFactory.getInvoker(proxy, type, url);
}
private <T> Exporter<T> export(T instance, Class<T> type, URL url) {
return protocol.export(proxyFactory.getInvoker(instance, type, url));
}
}
- StubProxyFactoryWrapper包装JavassistProxyFactory对象。
- 实际执行的是JavassistProxyFactory的getInvoker()方法。
- 继续关注JavassistProxyFactory的getInvoker()方法。
public class JavassistProxyFactory extends AbstractProxyFactory {
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
- JavassistProxyFactory的getInvoker()核心步骤包括创建Wrapper对象并返回AbstractProxyInvoker的Invoker对象。
- AbstractProxyInvoker对warpper进行一层包装,doInvoke内部调用的wrapper.invokeMethod()方法。
- Wrapper对象是动态生成的代码,继续关注Wrapper对象的内部代码。
package org.apache.dubbo.common.bytecode;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import org.apache.dubbo.common.bytecode.ClassGenerator;
import org.apache.dubbo.common.bytecode.NoSuchMethodException;
import org.apache.dubbo.common.bytecode.NoSuchPropertyException;
import org.apache.dubbo.common.bytecode.Wrapper;
import org.apache.dubbo.demo.provider.DemoServiceImpl;
public class Wrapper1 extends Wrapper implements ClassGenerator.DC {
// 核心的代码逻辑在于执行真正的ServiceImpl的调用
public Object invokeMethod(Object object, String string,
Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
DemoServiceImpl demoServiceImpl;
try {
demoServiceImpl = (DemoServiceImpl)object;
}
catch (Throwable throwable) {
throw new IllegalArgumentException(throwable);
}
try {
// 调用ServiceImpl的真正方法
if ("sayHello".equals(string) && arrclass.length == 1) {
return demoServiceImpl.sayHello((String)arrobject[0]);
}
}
catch (Throwable throwable) {
throw new InvocationTargetException(throwable);
}
throw new NoSuchMethodException(new StringBuffer()
.append("Not found method \"").append(string)
.append("\" in class org.apache.dubbo.demo.provider.DemoServiceImpl.").toString());
}
}
- 完整的Wrapper代码在Dubbo之ProxyFactory解析可以查看,这里仅展示核心的invokeMethod方法。
- invokeMethod()方法内部执调用ServiceImpl的方法,例子中sayHello()。
public T getExtension(String name) {
if (StringUtils.isEmpty(name)) {
throw new IllegalArgumentException("Extension name == null");
}
if ("true".equals(name)) {
return getDefaultExtension();
}
final Holder<Object> holder = getOrCreateHolder(name);
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
instance = createExtension(name);
holder.set(instance);
}
}
}
return (T) instance;
}
private T createExtension(String name) {
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
// 核心在于如果有wrapperClass 就包装一层
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
type + ") couldn't be instantiated: " + t.getMessage(), t);
}
}
- 这部分尝试描述清楚ProxyFactory$Adaptive内部getExtension()返回StubProxyFactoryWrapper对象逻辑。
- getExtension() => createExtension() 内部会判断是否有wrapperClass并针对JavassistProxyFactory进行包装。
- 对于JavassistProxyFactory而言,StubProxyFactoryWrapper就是包装类。
- getExtension()返回StubProxyFactoryWrapper对象,StubProxyFactoryWrapper包装JavassistProxyFactory对象。
invoker生成总结
- 通过ProxyFactory实现了ServiceImpl -> Wrapper -> AbstractProxyInvoker的整个过程。
- ProxyFactory通过生成ProxyFactory$Adaptive对象包装了ProxyFactory的获取过程。
- ProxyFactory的ExtensionLoader.getExtension()获取StubProxyFactoryWrapper。
- StubProxyFactoryWrapper内部包装了JavassistProxyFactory对象。
- JavassistProxyFactory内部包含了AbstractProxyInvoker对象。
- AbstractProxyInvoker对象包含了Wrapper对象。
- Wrapper对象包含了ServiceImpl对象。
invoker 发布 exporter的过程
- protocol.export()的protocol对象是Protocol$Adaptive对象。
private static final Protocol protocol =
ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
// 暂时关注export()方法。
public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0)
throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException(
"org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException(
"org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException(
"Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url ("
+ url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader
.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
}
- Protocol$Adaptive内部的export()核心逻辑是获取
ExtensionLoader.getExtensionLoader().getExtension(extName)获取扩展对象。 - 扩展对象在dubbo协议下是DubboProtocol。
- 关注DubboProtocol的export()过程。
public class DubboProtocol extends AbstractProtocol {
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// 核心逻辑,用于保存invoker对象,真正的执行者
String key = serviceKey(url);
// 生成exporter对象并保存在exporterMap当中。
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url);
optimizeSerialization(url);
return exporter;
}
}
- 核心关注点在于生成invoker对象的key = serviceKey(url)。
- invoker包装成DubboExporter对象并保存在exporterMap当中。
- 服务调用最终会执行根据key去exporterMap当中查找DubboExporter并最终执行invoker对象。
public static String serviceKey(int port, String serviceName, String serviceVersion, String serviceGroup) {
StringBuilder buf = new StringBuilder();
if (StringUtils.isNotEmpty(serviceGroup)) {
buf.append(serviceGroup);
buf.append("/");
}
buf.append(serviceName);
if (serviceVersion != null && serviceVersion.length() > 0 && !"0.0.0".equals(serviceVersion)) {
buf.append(":");
buf.append(serviceVersion);
}
buf.append(":");
buf.append(port);
return buf.toString();
}
- serviceKey的生成逻辑是 serviceGroup/serviceName.serviceVersion.port,所以不同服务分组和不同版本的接口可以同时存在。
public class DubboProtocol extends AbstractProtocol {
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
}
public class DubboProtocol extends AbstractProtocol {
private ExchangeServer createServer(URL url) {
url = URLBuilder.from(url)
// send readonly event when server closes, it's enabled by default
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// enable heartbeat by default
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
ExchangeServer server;
try {
// 重点暂时关注下 requestHandler
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
}
- 这部分逻辑在创建server的过程中会执行Exchangers.bind(url, requestHandler)这部分逻辑。
- 核心关注requestHandler对象,主要关注内部如果查找invoker的getInvoker过程。
public class DubboProtocol extends AbstractProtocol {
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
boolean isCallBackServiceInvoke = false;
boolean isStubServiceInvoke = false;
int port = channel.getLocalAddress().getPort();
String path = inv.getAttachments().get(PATH_KEY);
// if it's callback service on client side
isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(STUB_EVENT_KEY));
if (isStubServiceInvoke) {
port = channel.getRemoteAddress().getPort();
}
//callback
isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
if (isCallBackServiceInvoke) {
path += "." + inv.getAttachments().get(CALLBACK_SERVICE_KEY);
inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
}
// 生成serviceKey并从exporterMap中根据serviceKey获取exporter对象进行执行。
String serviceKey = serviceKey(port, path, inv.getAttachments().get(VERSION_KEY), inv.getAttachments().get(GROUP_KEY));
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
if (exporter == null) {
throw new RemotingException();
}
return exporter.getInvoker();
}
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
if (!(message instanceof Invocation)) {
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress()
+ " --> provider: " + channel.getLocalAddress());
}
Invocation inv = (Invocation) message;
// 查找invoker过程
Invoker<?> invoker = getInvoker(channel, inv);
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || !methodsStr.contains(",")) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
+ " not found in callback service interface ,invoke will be ignored."
+ " please update the api interface. url is:"
+ invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
Result result = invoker.invoke(inv);
return result.completionFuture().thenApply(Function.identity());
}
};
}
- 核心在于getInvoker()方法内部根据生成serviceKey并从exporterMap中根据serviceKey获取exporter对象进行执行。
- exporterMap维护了所有的exporter,发布的时候加入exporter,执行的时候查找exporter,完成发布和执行的映射。