首先看来自官方文档
这里的Invoker是Provider的一个可调用Service的抽象,Invoker封装了Provider地址及Service接口信息。
Directory代表多个Invoker,可以把它看成List<Invoker>,但与List不同的是,它的值可能是动态变化的,比如注册中心推送变更。
Cluster将Directory中的多个Invoker伪装成一个Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个。
Router负责从多个Invoker中按路由规则选出子集,比如读写分离,应用隔离等。
LoadBalance负责从多个Invoker中选出具体的一个用于本次调用,选的过程包含了负载均衡算法,调用失败后,需要重选。
以前自己看到上面的文字,理解不太深。随着对分布式系统的深入接触,以及DUBBO源码的研究,才有了更精确的理解。总结一句话:阅读技术类文档时,对一些语言的理解,受到阅读者自身情况的限制,往往理解有深有浅。闲话少说。本文,就是针对图中的组件,一个个进行剖析。
1.Invoker
Invoker 是Provider的一个可调用Service的抽象,Invoker封装了Provider地址及Service接口信息。
1.1 API
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
public
interface
Invoker<T>
extends
Node {
/**
* get service interface.
* @return service interface.
*/
Class<T> getInterface();
/**
* invoke.
* @param invocation
* @return result
* @throws RpcException
*/
Result invoke(Invocation invocation)
throws
RpcException;
}
|
1.2 层次树结构
分析可知:主要有2个分支
1.AbstractInvoker:主要具体远程实现,和RPC 协议有关,属于dubbo-rpc-api范畴。
2.AbstractClusterInvoker:主要逻辑包括Invoker的选择,和高可用有关,属于dubbo-cluster范畴。
1.3 AbstractInvoker VS AbstractClusterInvoker
1.3.1 类图比较
通过下图可以很容易发现:AbstractClusterInvoker比较AbstractInvoker,多了些select,doselect,reselect方法。这些方法的作用也可以猜到。
1.3.2 以两个实现类对比
选择AbstractInvoker的子类DubboInvoker,
和AbstractClusterInvoker的子类FailoverClusterInvoker为代表。
其中FailoverClusterInvoker是dubbo集群容错默认方案,Dubbo协议为默认协议。
先看下AbstractInvoker分支
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
public
abstract
class
AbstractInvoker<T>
implements
Invoker<T> {
public
Result invoke(Invocation inv)
throws
RpcException {
RpcInvocation invocation = (RpcInvocation) inv;
invocation.setInvoker(
this
);
Map<String, String> context = RpcContext.getContext().getAttachments();
try
{
return
doInvoke(invocation);
}
catch
(InvocationTargetException e) {
// biz exception
}
catch
(RpcException e) {
}
catch
(Throwable e) {
}
}
protected
abstract
Result doInvoke(Invocation invocation)
throws
Throwable;
..
}
public
class
DubboInvoker<T>
extends
AbstractInvoker<T> {
@Override
protected
Result doInvoke(
final
Invocation invocation)
throws
Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final
String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
logger.debug(
"doInvoke start -----------------------------"
);
ExchangeClient currentClient;
if
(clients.length ==
1
) {
currentClient = clients[
0
];
}
else
{
currentClient = clients[index.getAndIncrement() % clients.length];
}
try
{
boolean
isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean
isOneway = RpcUtils.isOneway(getUrl(), invocation);
int
timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
if
(isOneway) {
boolean
isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY,
false
);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(
null
);
return
new
RpcResult();
}
else
if
(isAsync) {
ResponseFuture future = currentClient.request(inv, timeout) ;
RpcContext.getContext().setFuture(
new
FutureAdapter<Object>(future));
return
new
RpcResult();
}
else
{
RpcContext.getContext().setFuture(
null
);
return
(Result) currentClient.request(inv, timeout).get();
}
}
catch
(TimeoutException e) {
throw
new
RpcException(RpcException.TIMEOUT_EXCEPTION,
"Invoke remote method timeout. method: "
+ invocation.getMethodName() +
", provider: "
+ getUrl() +
", cause: "
+ e.getMessage(), e);
}
catch
(RemotingException e) {
throw
new
RpcException(RpcException.NETWORK_EXCEPTION,
"Failed to invoke remote method: "
+ invocation.getMethodName() +
", provider: "
+ getUrl() +
", cause: "
+ e.getMessage(), e);
}
finally
{
logger.debug(
"doInvoke end-----------------------------"
);
}
}
...
}
|
这里可以很清晰地看到,远程调用分三种情况:
1. 不需要返回值的调用(所谓oneWay)
2. 异步(async)
3. 同步
对于第一种情况,客户端只管发请求就完了,不考虑返回结果。
对于第二种情况,客户端除了发请求,还需要将结果塞到一个ThreadLocal变量中,以便于客户端get返回值
对于第三种情况,客户端除了发请求,还会同步等待返回结果
再看下AbstractClusterInvoker分支
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
public
abstract
class
AbstractClusterInvoker<T>
implements
Invoker<T> {
public
Result invoke(
final
Invocation invocation)
throws
RpcException {
checkWheatherDestoried();
LoadBalance loadbalance;
List<Invoker<T>> invokers = list(invocation);
if
(invokers !=
null
&& invokers.size() >
0
) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.
class
).getExtension(invokers.get(
0
).getUrl()
.getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
}
else
{
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.
class
).getExtension(Constants.DEFAULT_LOADBALANCE);
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return
doInvoke(invocation, invokers, loadbalance);
}
protected
List<Invoker<T>> list(Invocation invocation)
throws
RpcException {
List<Invoker<T>> invokers = directory.list(invocation);
return
invokers;
}
protected
abstract
Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
LoadBalance loadbalance)
throws
RpcException;
..
}
public
class
FailoverClusterInvoker<T>
extends
AbstractClusterInvoker<T> {
public
Result doInvoke(Invocation invocation,
final
List<Invoker<T>> invokers, LoadBalance loadbalance)
throws
RpcException {
List<Invoker<T>> copyinvokers = invokers;
checkInvokers(copyinvokers, invocation);
int
len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) +
1
;
if
(len <=
0
) {
len =
1
;
}
// retry loop.
RpcException le =
null
;
// last exception.
List<Invoker<T>> invoked =
new
ArrayList<Invoker<T>>(copyinvokers.size());
// invoked invokers.
Set<String> providers =
new
HashSet<String>(len);
for
(
int
i =
0
; i < len; i++) {
//重试时,进行重新选择,避免重试时invoker列表已发生变化.
//注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变
if
(i >
0
) {
checkWheatherDestoried();
copyinvokers = list(invocation);
//重新检查一下
checkInvokers(copyinvokers, invocation);
}
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List)invoked);
try
{
Result result = invoker.invoke(invocation);
return
result;
}
catch
(RpcException e) {
if
(e.isBiz()) {
// biz exception.
throw
e;
}
le = e;
}
catch
(Throwable e) {
le =
new
RpcException(e.getMessage(), e);
}
finally
{
providers.add(invoker.getUrl().getAddress());
}
}
}
...
}
|
总结下AbstractClusterInvoker和AbstractClusterInvoker都需要实现Invoker接口,两者都声明了doInvoke抽象方法。但方法定义有区别。
1
2
3
4
|
protected
abstract
Result doInvoke(Invocation invocation)
throws
Throwable;
protected
abstract
Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
LoadBalance loadbalance)
throws
RpcException;
|
AbstractClusterInvoker 需要一个Invoker列表,他们来自Directory,LoadBalance 可以可以理解为负载均衡策略。
两者的联系:AbstractClusterInvoker 比AbstractInvoker多了一些选择和负载均衡部分,到最后还是会调用AbstractInvoker分支,负责具体RPC调用工作。
1.4 常见容错方案对比
Feature |
优点 |
缺点 |
实现类 |
Failover Cluster |
失败自动切换,当出现失败,重试其它服务器,通常用于读操作(推荐使用) |
重试会带来更长延迟 |
FailoverClusterInvoker |
Failfast Cluster |
快速失败,只发起一次调用,失败立即报错,通常用于非幂等性的写操作 |
如果有机器正在重启,可能会出现调用失败 |
FailfastClusterInvoker |
Failsafe Cluster |
失败安全,出现异常时,直接忽略,通常用于写入审计日志等操作 |
调用信息丢失 |
FailsafeClusterInvoker |
Failback Cluster |
失败自动恢复,后台记录失败请求,定时重发,通常用于消息通知操作 |
不可靠,重启丢失 |
FailbackClusterInvoker |
Forking Cluster |
并行调用多个服务器,只要一个成功即返回,通常用于实时性要求较高的读操作 |
需要浪费更多服务资源 |
ForkingClusterInvoker |
Broadcast Cluster |
广播调用所有提供者,逐个调用,任意一台报错则报错,通常用于更新提供方本地状态 |
速度慢,任意一台报错则报错 |
BroadcastClusterInvoker |
2. 总结
*ClusterInvoker 分别使用Router,和Directory获取Invoker列表。
*ClusterInvoker然后再Invoker列表中,借助LoadBalance提供的负载均衡策略,返回一个可用的Invoker.
*Directory代表多个Invoker,可以理解为Invoker的逻辑集合,负责Invoker的下线,上线。
Router和Directory两者都对我提供Invoker列表。通过提供的API,很容易找出区别来。
1
2
3
4
|
//Directory
List<Invoker<T>> list(Invocation invocation)
throws
RpcException;
//Route
<T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation)
throws
RpcException;
|
Directory:返回的Invoker列表,代表当前正常对外提供服务的Invoker列表。重点在维护可用节点列表。
Route:返回的Invoker列表,是在现有可用的Invoker列表里,按照规则进行再筛选,重点在规则匹配,过滤等。