哈喽,大家好,我是强哥。
前些天,一个同事在使用Dubbo的参数回调时,骂骂咧咧的说,Dubbo的这个回调真是奇葩,居然会限制回调次数,自己不得不把callbacks
属性值设置的非常大,但是还是会怕服务运行太久后超过回调次数限制,后续的回调就无法正常执行。
突然被他这么一说,我倒是有点奇怪,正常来说Dubbo一个这么牛逼的框架不应该会有这样的限制才对啊。于是,强哥便开始对这个callbacks
产生了兴趣,官网和百度找了一番也没有找到具体这个属性的详细解释,哈哈,这不就是又到我亲自研究的时候了吗。
Dubbo参数回调
首先讲一下什么时候Dubbo的参数回调吧,简单的说就是:通过参数回调从服务器端调用客户端逻辑。
参数回调方式与调用本地 callback 或 listener 相同,只需要在 Spring 的配置文件中声明哪个参数是 callback 类型即可。Dubbo 将基于长连接生成反向代理,这样就可以从服务器端调用客户端逻辑。
其实就是说,我们的provider在提供接口的时候,可以设置某个参数是让consumer放回调对象的。当consumer调用provider接口时,provider就可以拿到这个回调对象并进行反向调用(provider回调consumer接口)。
具体配置的代码如下:
<dubbo:service interface="org.apache.dubbo.samples.callback.api.CallbackService" ref="callbackService" connections="1" callbacks="1000"> <dubbo:method name="addListener"> <dubbo:argument index="1" callback="true"/> </dubbo:method> </dubbo:service>
其中<dubbo:argument index="1" callback="true"/>
的index就表示接口CallbackService
的第一个参数是用来让consumer传递回调对象的。
callbacks属性
我们从上面的代码中可以看到,在dubbo:service
的配置中,有一个属性是callbacks="1000"
,这个就是上面我同事所谓的“Dubbo对于回调次数的限制”,按他的说法,如果这么配,就相当于回调1000次之后,就会导致Dubbo之后的回调都失败。
这显然不能让强哥相信他理解的正确性,真要是这样,谁还会用Dubbo的回调啊。当然,事实胜于雄辩。直接搞几个测试代码试试就知道了。
测试准备
要说测试,Dubbo这点做得就挺好。在其GitHub仓库的源码目录dubbo-samples
下,有参数回调对应的dubbo-samples-callback
项目,我们直接就拿这个项目来做测试。
项目结构相对比较简单,具体如下:
这里强哥也要说说项目中使用了内嵌式的Zookeeper:EmbeddedZookeeper
,不用单独部署ZK,这点确实方便很多,尤其是写这种小示例的时候,太方便了。
不过,我们需要对Dubbo的QoS功能进行关闭,否则在同一台机子上启动多个Dubbo服务会出现端口已被占用的情况。具体需要在callback-provider.xml
和callback-consumer
两个文件中添加配置:
<dubbo:application name="callback-provider"> <dubbo:parameter key="qos.enable" value="false"/> </dubbo:application>
<dubbo:application name="callback-consumer"> <dubbo:parameter key="qos.enable" value="false"/> </dubbo:application>
Dubbo的QoS功能是干什么的?
QoS,全称为Quality of Service, 是常见于网络设备中的一个术语 ,例如在路由器中,可以通过Qos动态的调整和控制某些端口的权重,从而优先地保障运行在这些端口上的服务质量。在Dubbo中,QoS这个概念被用于动态地对服务进行查询和控制。例如对获取当前提供和消费的所有服务,以及对服务进行动态的上下线,即从注册中心上进行注册和反注册操作。
开启该功能需要占用端口,这里为了简化就不配置多个端口了,直接关了就行。
项目中的provider参数回调配置如下:
<dubbo:service interface="org.apache.dubbo.samples.callback.api.CallbackService" ref="callbackService" connections="1" callbacks="1"> <dubbo:method name="addListener"> <dubbo:argument index="1" callback="true"/> </dubbo:method> </dubbo:service>
注意这里callbacks
参数被我修改成了1,也就是说,如果callbacks
真的是对回调次数的限制,那么,consumer只要回调一次后,下一次就会失败。
测试
改好配置后,直接开测,首先启动provider,provider在启动的时候会创建CallbackServiceImpl
,其代码内容如下:
public class CallbackServiceImpl implements CallbackService { private final Map<String, CallbackListener> listeners = new ConcurrentHashMap<String, CallbackListener>(); public CallbackServiceImpl() { Thread t = new Thread(new Runnable() { public void run() { while (true) { try { for (Map.Entry<String, CallbackListener> entry : listeners.entrySet()) { try { entry.getValue().changed(getChanged(entry.getKey())); } catch (Throwable t) { listeners.remove(entry.getKey()); } } Thread.sleep(5000); // timely trigger change event } catch (Throwable t) { t.printStackTrace(); } } } }); t.setDaemon(true); t.start(); } public void addListener(String key, CallbackListener listener) { listeners.put(key, listener); listener.changed(getChanged(key)); // send notification for change } private String getChanged(String key) { return "Changed: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); } }
整体就是在创建时,启动一个线程死循环每隔5秒调用一下从consumer加入进来的回调对象的changed
方法。我们不会对provider的代码进行修改,所以启动好放着就行。
重点集中在consumer,强哥这里通过几种情况来对consumer进行测试。
情况1
consumer的代码如下:
public class CallbackConsumerBootstrap { public static void main(String[] args) throws Exception { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/callback-consumer.xml"}); context.start(); CallbackService callbackService = (CallbackService) context.getBean("callbackService"); callbackService.addListener("foo.bar1", new CallbackListener() { public void changed(String msg) { System.out.println("callback1:" + msg); } }); System.in.read(); } }
内容就是获取到provider提供的CallbackService
接口实现,然后调用它的addListener
方法,addListener
方法的第二个参数就是consumer设置的回调对象,示例中使用了匿名内部类new CallbackListener()
。
我们直接启动,控制台输出内容如下:
callback1:Changed: 2022-04-06 20:40:07 callback1:Changed: 2022-04-06 20:40:08 callback1:Changed: 2022-04-06 20:40:13 callback1:Changed: 2022-04-06 20:40:18 callback1:Changed: 2022-04-06 20:40:23 callback1:Changed: 2022-04-06 20:40:28 callback1:Changed: 2022-04-06 20:40:33 callback1:Changed: 2022-04-06 20:40:38 ……
可见,在设置了callbacks="1"
后,回调并没有在第一次:"2022-04-07 14:40:07"执行结束后就失败,直接就可以证明callbacks
并不是用来限制回调次数的。
情况2
callbacks
并不是用来限制回调次数的这点证明之后,那么callbacks
到底表示的是什么呢?
会不会是限制回调连接的个数呢?我们继续操作:情况1的步骤保持不变,我们简单改下CallbackConsumerBootstrap
中callbackService.addListener
的第一个参数key
为foo.banir2
,然后再起一个连接。结果如下,同时第一个consumer也还是正常输出日志。
callback1:Changed: 2022-04-06 20:25:15 callback1:Changed: 2022-04-06 20:25:19 callback1:Changed: 2022-04-06 20:25:24 callback1:Changed: 2022-04-06 20:25:29 callback1:Changed: 2022-04-06 20:25:34 callback1:Changed: 2022-04-06 20:25:39 callback1:Changed: 2022-04-06 20:25:44 ……
连接数2超过了callbacks
的个数限制。可见,**callbacks
也不是限制回调连接的个数**。
情况3
我们对情况1的代码进行修改,在CallbackConsumerBootstrap
中多添加一个回调:
public class CallbackConsumerBootstrap2 { public static void main(String[] args) throws Exception { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/callback-consumer.xml"}); context.start(); CallbackService callbackService = (CallbackService) context.getBean("callbackService"); //第一个 callbackService.addListener("foo.bar1", new CallbackListener() { public void changed(String msg) { System.out.println("callback1:" + msg); } }); //第二个 callbackService.addListener("foo.bar2", new CallbackListener() { public void changed(String msg) { System.out.println("callback2:" + msg); } }); System.in.read(); } }
关闭旧的consumer,启动当前的CallbackConsumerBootstrap2
,启动后代码就报错了:
Caused by: java.lang.IllegalStateException: interface org.apache.dubbo.samples.callback.api.CallbackListener `s callback instances num exceed providers limit :1 ,current num: 2. The new callback service will not work !!! you can cancle the callback service which exported before. channel :NettyChannel [channel=[id: 0xde91033b, L:/10.0.227.75:63364 - R:/10.0.227.75:20880]] at com.alibaba.dubbo.rpc.protocol.dubbo.CallbackServiceCodec.isInstancesOverLimit(CallbackServiceCodec.java:210) at com.alibaba.dubbo.rpc.protocol.dubbo.CallbackServiceCodec.exportOrunexportCallbackService(CallbackServiceCodec.java:107) at com.alibaba.dubbo.rpc.protocol.dubbo.CallbackServiceCodec.encodeInvocationArgument(CallbackServiceCodec.java:255) at com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec.encodeRequestData(DubboCodec.java:180) at com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec.encodeRequest(ExchangeCodec.java:235) at com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec.encode(ExchangeCodec.java:72) at com.alibaba.dubbo.rpc.protocol.dubbo.DubboCountCodec.encode(DubboCountCodec.java:38) at com.alibaba.dubbo.remoting.transport.netty4.NettyCodecAdapter$InternalEncoder.encode(NettyCodecAdapter.java:70) at io.netty.handler.codec.MessageToByteEncoder.write(MessageToByteEncoder.java:107) ... 18 more at com.alibaba.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:109) at com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:244) at com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:75) at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:52) at com.alibaba.dubbo.common.bytecode.proxy0.addListener(proxy0.java) at org.apache.dubbo.samples.callback.CallbackConsumerBootstrap2.main(CallbackConsumerBootstrap2.java:41)
原因是:callback instances num exceed providers limit :1 ,current num: 2.
。也就是说,回调的实例个数2个超过了阈值1个的限制。
由此可见,**callbacks
就是用来限制同一个客户端(连接)连接的回调实例个数限制。**
这里需要重点说一下是:
- 同一个连接
- 回调实例个数而不是回调个数 怎么证明是实例个数呢?我们接着看。
情况4
还是修改CallbackConsumerBootstrap代码
,我们调用两次addListener
,但是使用同一个回调实例对象:
public class CallbackConsumerBootstrap3 { public static void main(String[] args) throws Exception { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/callback-consumer.xml"}); context.start(); CallbackService callbackService = (CallbackService) context.getBean("callbackService"); CallbackListener callbackListener = new CallbackListener() { public void changed(String msg) { System.out.println("callback2:" + msg); } }; callbackService.addListener("foo.bar1", callbackListener); callbackService.addListener("foo.bar2", callbackListener); System.in.read(); } }
运行后,输出如下:
callback2:Changed: 2022-04-06 20:39:47 callback2:Changed: 2022-04-06 20:39:47 callback2:Changed: 2022-04-06 20:39:48 callback2:Changed: 2022-04-06 20:39:48 callback2:Changed: 2022-04-06 20:39:53 callback2:Changed: 2022-04-06 20:39:53 callback2:Changed: 2022-04-06 20:39:58 callback2:Changed: 2022-04-06 20:39:58 callback2:Changed: 2022-04-06 20:40:03 callback2:Changed: 2022-04-06 20:40:03 ……
代码没有报错,而且会正常回调两次。由此我们就证明了**callbacks
是针对实例的。**
总结+源码分析
综上所述,我们便从4个代码示例中,层层证明,得到:callbacks
属性是限制同一个连接的回调实例个数的。而并不是限制consumer的次数。
其实,强哥这次没有从源码的角度来进行分析。主要是为了通过不同的情况,来让大家更好地理解callbacks
属性的含义,直接证明,比较一目了然。当然,我们在情况3报错的时候,可以通过错误栈来定位到具体的判断callbacks
属性个数的源码位置:CallbackServiceCodec
的exportOrunexportCallbackService
方法:
if (export) { // one channel can have multiple callback instances, no need to re-export for different instance. if (!channel.hasAttribute(cacheKey)) { if (!isInstancesOverLimit(channel, url, clazz.getName(), instid, false)) { Invoker<?> invoker = proxyFactory.getInvoker(inst, clazz, exporturl); // should destroy resource? Exporter<?> exporter = protocol.export(invoker); // this is used for tracing if instid has published service or not. channel.setAttribute(cacheKey, exporter); logger.info("export a callback service :" + exporturl + ", on " + channel + ", url is: " + url); increaseInstanceCount(channel, countkey); } } }
其中的isInstancesOverLimit
方法内容就是判断的地方:
private static boolean isInstancesOverLimit(Channel channel, URL url, String interfaceClass, int instid, boolean isServer) { Integer count = (Integer) channel.getAttribute(isServer ? getServerSideCountKey(channel, interfaceClass) : getClientSideCountKey(interfaceClass)); int limit = url.getParameter(Constants.CALLBACK_INSTANCES_LIMIT_KEY, Constants.DEFAULT_CALLBACK_INSTANCES); if (count != null && count >= limit) { //client side error throw new IllegalStateException("interface " + interfaceClass + " `s callback instances num exceed providers limit :" + limit + " ,current num: " + (count + 1) + ". The new callback service will not work !!! you can cancle the callback service which exported before. channel :" + channel); } else { return false; } }
count
为服务端模式的时候,是从getServerSideCountKey
中获取的,也就是:
private static String getServerSideCountKey(Channel channel, String interfaceClass) { return Constants.CALLBACK_SERVICE_PROXY_KEY + "." + System.identityHashCode(channel) + "." + interfaceClass + ".COUNT"; }
其中:
System.identityHashCode(channel)
代表着一个连接interfaceClass
就表一个实例 这个的COUNT
就是我们说的:每个客户端的一个接口的回调服务实例的个数。
再与callbacks
个数比较,来判断是否超过了限制,超过则抛出情况3的异常:
if (count != null && count >= limit) { //client side error throw new IllegalStateException("interface " + interfaceClass + " `s callback instances num exceed providers limit :" + limit + " ,current num: " + (count + 1) + ". The new callback service will not work !!! you can cancle the callback service which exported before. channel :" + channel); } else { return false; }
由此,源码也证明结束。
一个小小的属性,也真是要费这么大篇幅来说明。不过强哥也是想让大家能够更好地理解。至少走这么一遍,可以让大家对这个Dubbo的回调机制有个更深入的认识。
小伙伴们如果也想自己试试的话,可以关注「强哥叨逼叨」回复「dubbo回调」获取强哥配置好的测试代码哦。