你该不会也觉得Dubbo参数回调中callbacks属性是用来限制回调次数的吧?

简介: 前些天,一个同事在使用Dubbo的参数回调时,骂骂咧咧的说,Dubbo的这个回调真是奇葩,居然会限制回调次数,自己不得不把callbacks属性值设置的非常大,但是还是会怕服务运行太久后超过回调次数限制,后续的回调就无法正常执行。

哈喽,大家好,我是强哥。


前些天,一个同事在使用Dubbo的参数回调时,骂骂咧咧的说,Dubbo的这个回调真是奇葩,居然会限制回调次数,自己不得不把callbacks属性值设置的非常大,但是还是会怕服务运行太久后超过回调次数限制,后续的回调就无法正常执行。


突然被他这么一说,我倒是有点奇怪,正常来说Dubbo一个这么牛逼的框架不应该会有这样的限制才对啊。于是,强哥便开始对这个callbacks产生了兴趣,官网和百度找了一番也没有找到具体这个属性的详细解释,哈哈,这不就是又到我亲自研究的时候了吗。


Dubbo参数回调


首先讲一下什么时候Dubbo的参数回调吧,简单的说就是:通过参数回调从服务器端调用客户端逻辑。


参数回调方式与调用本地 callback 或 listener 相同,只需要在 Spring 的配置文件中声明哪个参数是 callback 类型即可。Dubbo 将基于长连接生成反向代理,这样就可以从服务器端调用客户端逻辑。


其实就是说,我们的provider在提供接口的时候,可以设置某个参数是让consumer放回调对象的。当consumer调用provider接口时,provider就可以拿到这个回调对象并进行反向调用(provider回调consumer接口)。


具体配置的代码如下:


<dubbo:service interface="org.apache.dubbo.samples.callback.api.CallbackService" ref="callbackService"
               connections="1" callbacks="1000">
    <dubbo:method name="addListener">
        <dubbo:argument index="1" callback="true"/>
    </dubbo:method>
</dubbo:service>


其中<dubbo:argument index="1" callback="true"/>的index就表示接口CallbackService的第一个参数是用来让consumer传递回调对象的。


callbacks属性


我们从上面的代码中可以看到,在dubbo:service的配置中,有一个属性是callbacks="1000",这个就是上面我同事所谓的“Dubbo对于回调次数的限制”,按他的说法,如果这么配,就相当于回调1000次之后,就会导致Dubbo之后的回调都失败。


这显然不能让强哥相信他理解的正确性,真要是这样,谁还会用Dubbo的回调啊。当然,事实胜于雄辩。直接搞几个测试代码试试就知道了。


测试准备


要说测试,Dubbo这点做得就挺好。在其GitHub仓库的源码目录dubbo-samples下,有参数回调对应的dubbo-samples-callback项目,我们直接就拿这个项目来做测试。


项目结构相对比较简单,具体如下:


38.png


这里强哥也要说说项目中使用了内嵌式的ZookeeperEmbeddedZookeeper,不用单独部署ZK,这点确实方便很多,尤其是写这种小示例的时候,太方便了。


不过,我们需要对Dubbo的QoS功能进行关闭,否则在同一台机子上启动多个Dubbo服务会出现端口已被占用的情况。具体需要在callback-provider.xmlcallback-consumer两个文件中添加配置:


<dubbo:application name="callback-provider">
    <dubbo:parameter key="qos.enable" value="false"/>
</dubbo:application>


<dubbo:application name="callback-consumer">
    <dubbo:parameter key="qos.enable" value="false"/>
</dubbo:application>


Dubbo的QoS功能是干什么的?


QoS,全称为Quality of Service, 是常见于网络设备中的一个术语 ,例如在路由器中,可以通过Qos动态的调整和控制某些端口的权重,从而优先保障运行在这些端口上的服务质量。在Dubbo中,QoS这个概念被用于动态对服务进行查询和控制。例如对获取当前提供和消费的所有服务,以及对服务进行动态的上下线,即从注册中心上进行注册和反注册操作。


开启该功能需要占用端口,这里为了简化就不配置多个端口了,直接关了就行。


项目中的provider参数回调配置如下:


<dubbo:service interface="org.apache.dubbo.samples.callback.api.CallbackService" ref="callbackService"
               connections="1" callbacks="1">
    <dubbo:method name="addListener">
        <dubbo:argument index="1" callback="true"/>
    </dubbo:method>
</dubbo:service>


注意这里callbacks参数被我修改成了1,也就是说,如果callbacks真的是对回调次数的限制,那么,consumer只要回调一次后,下一次就会失败。


测试


改好配置后,直接开测,首先启动provider,provider在启动的时候会创建CallbackServiceImpl,其代码内容如下:


public class CallbackServiceImpl implements CallbackService {
    private final Map<String, CallbackListener> listeners = new ConcurrentHashMap<String, CallbackListener>();
    public CallbackServiceImpl() {
        Thread t = new Thread(new Runnable() {
            public void run() {
                while (true) {
                    try {
                        for (Map.Entry<String, CallbackListener> entry : listeners.entrySet()) {
                            try {
                                entry.getValue().changed(getChanged(entry.getKey()));
                            } catch (Throwable t) {
                                listeners.remove(entry.getKey());
                            }
                        }
                        Thread.sleep(5000); // timely trigger change event
                    } catch (Throwable t) {
                        t.printStackTrace();
                    }
                }
            }
        });
        t.setDaemon(true);
        t.start();
    }
    public void addListener(String key, CallbackListener listener) {
        listeners.put(key, listener);
        listener.changed(getChanged(key)); // send notification for change
    }
    private String getChanged(String key) {
        return "Changed: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
    }
}


整体就是在创建时,启动一个线程死循环每隔5秒调用一下从consumer加入进来的回调对象的changed方法。我们不会对provider的代码进行修改,所以启动好放着就行。

重点集中在consumer,强哥这里通过几种情况来对consumer进行测试。


情况1


consumer的代码如下:


public class CallbackConsumerBootstrap {
    public static void main(String[] args) throws Exception {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/callback-consumer.xml"});
        context.start();
        CallbackService callbackService = (CallbackService) context.getBean("callbackService");
        callbackService.addListener("foo.bar1", new CallbackListener() {
            public void changed(String msg) {
                System.out.println("callback1:" + msg);
            }
        });
        System.in.read();
    }
}


内容就是获取到provider提供的CallbackService接口实现,然后调用它的addListener方法,addListener方法的第二个参数就是consumer设置的回调对象,示例中使用了匿名内部类new CallbackListener()


我们直接启动,控制台输出内容如下:


callback1:Changed: 2022-04-06 20:40:07
callback1:Changed: 2022-04-06 20:40:08
callback1:Changed: 2022-04-06 20:40:13
callback1:Changed: 2022-04-06 20:40:18
callback1:Changed: 2022-04-06 20:40:23
callback1:Changed: 2022-04-06 20:40:28
callback1:Changed: 2022-04-06 20:40:33
callback1:Changed: 2022-04-06 20:40:38
……


可见,在设置了callbacks="1"后,回调并没有在第一次:"2022-04-07 14:40:07"执行结束后就失败,直接就可以证明callbacks并不是用来限制回调次数的


情况2


callbacks并不是用来限制回调次数的这点证明之后,那么callbacks到底表示的是什么呢?


会不会是限制回调连接的个数呢?我们继续操作:情况1的步骤保持不变,我们简单改下CallbackConsumerBootstrapcallbackService.addListener的第一个参数keyfoo.banir2,然后再起一个连接。结果如下,同时第一个consumer也还是正常输出日志。


callback1:Changed: 2022-04-06 20:25:15
callback1:Changed: 2022-04-06 20:25:19
callback1:Changed: 2022-04-06 20:25:24
callback1:Changed: 2022-04-06 20:25:29
callback1:Changed: 2022-04-06 20:25:34
callback1:Changed: 2022-04-06 20:25:39
callback1:Changed: 2022-04-06 20:25:44
……


连接数2超过了callbacks的个数限制。可见,**callbacks也不是限制回调连接的个数**。


情况3


我们对情况1的代码进行修改,在CallbackConsumerBootstrap中多添加一个回调:


public class CallbackConsumerBootstrap2 {
    public static void main(String[] args) throws Exception {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/callback-consumer.xml"});
        context.start();
        CallbackService callbackService = (CallbackService) context.getBean("callbackService");
        //第一个
        callbackService.addListener("foo.bar1", new CallbackListener() {
            public void changed(String msg) {
                System.out.println("callback1:" + msg);
            }
        });
        //第二个
        callbackService.addListener("foo.bar2", new CallbackListener() {
            public void changed(String msg) {
                System.out.println("callback2:" + msg);
            }
        });
        System.in.read();
    }
}


关闭旧的consumer,启动当前的CallbackConsumerBootstrap2,启动后代码就报错了:


Caused by: java.lang.IllegalStateException: interface org.apache.dubbo.samples.callback.api.CallbackListener `s callback instances num exceed providers limit :1 ,current num: 2. The new callback service will not work !!! you can cancle the callback service which exported before. channel :NettyChannel [channel=[id: 0xde91033b, L:/10.0.227.75:63364 - R:/10.0.227.75:20880]]
 at com.alibaba.dubbo.rpc.protocol.dubbo.CallbackServiceCodec.isInstancesOverLimit(CallbackServiceCodec.java:210)
 at com.alibaba.dubbo.rpc.protocol.dubbo.CallbackServiceCodec.exportOrunexportCallbackService(CallbackServiceCodec.java:107)
 at com.alibaba.dubbo.rpc.protocol.dubbo.CallbackServiceCodec.encodeInvocationArgument(CallbackServiceCodec.java:255)
 at com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec.encodeRequestData(DubboCodec.java:180)
 at com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec.encodeRequest(ExchangeCodec.java:235)
 at com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec.encode(ExchangeCodec.java:72)
 at com.alibaba.dubbo.rpc.protocol.dubbo.DubboCountCodec.encode(DubboCountCodec.java:38)
 at com.alibaba.dubbo.remoting.transport.netty4.NettyCodecAdapter$InternalEncoder.encode(NettyCodecAdapter.java:70)
 at io.netty.handler.codec.MessageToByteEncoder.write(MessageToByteEncoder.java:107)
 ... 18 more
 at com.alibaba.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:109)
 at com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:244)
 at com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:75)
 at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:52)
 at com.alibaba.dubbo.common.bytecode.proxy0.addListener(proxy0.java)
 at org.apache.dubbo.samples.callback.CallbackConsumerBootstrap2.main(CallbackConsumerBootstrap2.java:41)


原因是:callback instances num exceed providers limit :1 ,current num: 2.。也就是说,回调的实例个数2个超过了阈值1个的限制。


由此可见,**callbacks就是用来限制同一个客户端(连接)连接的回调实例个数限制。**


这里需要重点说一下是:


  • 同一个连接
  • 回调实例个数而不是回调个数 怎么证明是实例个数呢?我们接着看。


情况4


还是修改CallbackConsumerBootstrap代码,我们调用两次addListener,但是使用同一个回调实例对象:


public class CallbackConsumerBootstrap3 {
    public static void main(String[] args) throws Exception {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/callback-consumer.xml"});
        context.start();
        CallbackService callbackService = (CallbackService) context.getBean("callbackService");
        CallbackListener callbackListener = new CallbackListener() {
            public void changed(String msg) {
                System.out.println("callback2:" + msg);
            }
        };
        callbackService.addListener("foo.bar1", callbackListener);
        callbackService.addListener("foo.bar2", callbackListener);
        System.in.read();
    }
}


运行后,输出如下:


callback2:Changed: 2022-04-06 20:39:47
callback2:Changed: 2022-04-06 20:39:47
callback2:Changed: 2022-04-06 20:39:48
callback2:Changed: 2022-04-06 20:39:48
callback2:Changed: 2022-04-06 20:39:53
callback2:Changed: 2022-04-06 20:39:53
callback2:Changed: 2022-04-06 20:39:58
callback2:Changed: 2022-04-06 20:39:58
callback2:Changed: 2022-04-06 20:40:03
callback2:Changed: 2022-04-06 20:40:03
……


代码没有报错,而且会正常回调两次。由此我们就证明了**callbacks是针对实例的。**


总结+源码分析


综上所述,我们便从4个代码示例中,层层证明,得到:callbacks属性是限制同一个连接的回调实例个数的。而并不是限制consumer的次数。


其实,强哥这次没有从源码的角度来进行分析。主要是为了通过不同的情况,来让大家更好地理解callbacks属性的含义,直接证明,比较一目了然。当然,我们在情况3报错的时候,可以通过错误栈来定位到具体的判断callbacks属性个数的源码位置:CallbackServiceCodecexportOrunexportCallbackService方法:


if (export) {
      // one channel can have multiple callback instances, no need to re-export for different instance.
      if (!channel.hasAttribute(cacheKey)) {
          if (!isInstancesOverLimit(channel, url, clazz.getName(), instid, false)) {
              Invoker<?> invoker = proxyFactory.getInvoker(inst, clazz, exporturl);
              // should destroy resource?
              Exporter<?> exporter = protocol.export(invoker);
              // this is used for tracing if instid has published service or not.
              channel.setAttribute(cacheKey, exporter);
              logger.info("export a callback service :" + exporturl + ", on " + channel + ", url is: " + url);
              increaseInstanceCount(channel, countkey);
          }
      }
  }


其中的isInstancesOverLimit方法内容就是判断的地方:


private static boolean isInstancesOverLimit(Channel channel, URL url, String interfaceClass, int instid, boolean isServer) {
    Integer count = (Integer) channel.getAttribute(isServer ? getServerSideCountKey(channel, interfaceClass) : getClientSideCountKey(interfaceClass));
    int limit = url.getParameter(Constants.CALLBACK_INSTANCES_LIMIT_KEY, Constants.DEFAULT_CALLBACK_INSTANCES);
    if (count != null && count >= limit) {
        //client side error
        throw new IllegalStateException("interface " + interfaceClass + " `s callback instances num exceed providers limit :" + limit
                + " ,current num: " + (count + 1) + ". The new callback service will not work !!! you can cancle the callback service which exported before. channel :" + channel);
    } else {
        return false;
    }
}


count为服务端模式的时候,是从getServerSideCountKey中获取的,也就是:


private static String getServerSideCountKey(Channel channel, String interfaceClass) {
    return Constants.CALLBACK_SERVICE_PROXY_KEY + "." + System.identityHashCode(channel) + "." + interfaceClass + ".COUNT";
}


其中:


  • System.identityHashCode(channel)代表着一个连接
  • interfaceClass就表一个实例 这个的COUNT就是我们说的:每个客户端的一个接口的回调服务实例的个数。


再与callbacks个数比较,来判断是否超过了限制,超过则抛出情况3的异常:


if (count != null && count >= limit) {
    //client side error
    throw new IllegalStateException("interface " + interfaceClass + " `s callback instances num exceed providers limit :" + limit
            + " ,current num: " + (count + 1) + ". The new callback service will not work !!! you can cancle the callback service which exported before. channel :" + channel);
} else {
    return false;
}


由此,源码也证明结束。


一个小小的属性,也真是要费这么大篇幅来说明。不过强哥也是想让大家能够更好地理解。至少走这么一遍,可以让大家对这个Dubbo的回调机制有个更深入的认识。


小伙伴们如果也想自己试试的话,可以关注「强哥叨逼叨」回复「dubbo回调」获取强哥配置好的测试代码哦。

相关文章
|
7月前
|
Dubbo Java 应用服务中间件
微服务框架(十一)Dubbo调用拦截及参数校检扩展
  此系列文章将会描述Java框架Spring Boot、服务治理框架Dubbo、应用容器引擎Docker,及使用Spring Boot集成Dubbo、Mybatis等开源框架,其中穿插着Spring Boot中日志切面等技术的实现,然后通过gitlab-CI以持续集成为Docker镜像。   使用Dubbo框架时,面对自身的业务场景,需根据定制的需求编写SPI拓展实现,再根据配置来加载拓展点。
|
7月前
|
Dubbo Java 应用服务中间件
微服务学习 | Springboot整合Dubbo+Nacos实现RPC调用
微服务学习 | Springboot整合Dubbo+Nacos实现RPC调用
|
2月前
|
Dubbo Java 应用服务中间件
Spring Cloud Dubbo:微服务通信的高效解决方案
【10月更文挑战第15天】随着信息技术的发展,微服务架构成为企业应用开发的主流。Spring Cloud Dubbo结合了Dubbo的高性能RPC和Spring Cloud的生态系统,提供高效、稳定的微服务通信解决方案。它支持多种通信协议,具备服务注册与发现、负载均衡及容错机制,简化了服务调用的复杂性,使开发者能更专注于业务逻辑的实现。
75 2
|
4月前
|
Dubbo Java 应用服务中间件
💥Spring Cloud Dubbo火爆来袭!微服务通信的终极利器,你知道它有多强大吗?🔥
【8月更文挑战第29天】随着信息技术的发展,微服务架构成为企业应用开发的主流模式,而高效的微服务通信至关重要。Spring Cloud Dubbo通过整合Dubbo与Spring Cloud的优势,提供高性能RPC通信及丰富的生态支持,包括服务注册与发现、负载均衡和容错机制等,简化了服务调用管理并支持多种通信协议,提升了系统的可伸缩性和稳定性,成为微服务通信领域的优选方案。开发者仅需关注业务逻辑,而无需过多关心底层通信细节,使得Spring Cloud Dubbo在未来微服务开发中将更加受到青睐。
90 0
|
1月前
|
Dubbo Cloud Native 应用服务中间件
阿里云的 Dubbo 和 Nacos 深度整合,提供了高效的服务注册与发现、配置管理等关键功能,简化了微服务治理,提升了系统的灵活性和可靠性。
在云原生时代,微服务架构成为主流。阿里云的 Dubbo 和 Nacos 深度整合,提供了高效的服务注册与发现、配置管理等关键功能,简化了微服务治理,提升了系统的灵活性和可靠性。示例代码展示了如何在项目中实现两者的整合,通过 Nacos 动态调整服务状态和配置,适应多变的业务需求。
45 2
|
2月前
|
Dubbo Java 应用服务中间件
Dubbo学习圣经:从入门到精通 Dubbo3.0 + SpringCloud Alibaba 微服务基础框架
尼恩团队的15大技术圣经,旨在帮助开发者系统化、体系化地掌握核心技术,提升技术实力,从而在面试和工作中脱颖而出。本文介绍了如何使用Dubbo3.0与Spring Cloud Gateway进行整合,解决传统Dubbo架构缺乏HTTP入口的问题,实现高性能的微服务网关。
|
3月前
|
Dubbo 应用服务中间件 Apache
Star 4w+,Apache Dubbo 3.3 全新发布,Triple X 领衔,开启微服务通信新时代
在 Apache Dubbo 突破 4w Star 之际,Apache Dubbo 团队正式宣布,Dubbo 3.3 正式发布!作为全球领先的开源微服务框架,Dubbo 一直致力于为开发者提供高性能、可扩展且灵活的分布式服务解决方案。此次发布的 Dubbo 3.3,通过 Triple X 的全新升级,突破了以往局限,实现了对南北向与东西向流量的全面支持,并提升了对云原生架构的友好性。
156 10
|
7月前
|
Dubbo Java 应用服务中间件
阿里巴巴资深架构师深度解析微服务架构设计之SpringCloud+Dubbo
软件架构是一个包含各种组织的系统组织,这些组件包括Web服务器,应用服务器,数据库,存储,通讯层),它们彼此或和环境存在关系。系统架构的目标是解决利益相关者的关注点。
|
4月前
|
负载均衡 Dubbo 应用服务中间件
框架巨擘:Dubbo如何一统异构微服务江湖,成为开发者的超级武器!
【8月更文挑战第8天】在软件开发中,微服务架构因灵活性和可扩展性备受欢迎。面对异构微服务的挑战,Apache Dubbo作为高性能Java RPC框架脱颖而出。它具备服务注册与发现、负载均衡及容错机制等核心特性,支持多种通信协议和序列化方式,能有效连接不同技术栈的微服务。Dubbo的插件化设计保证了面向未来的扩展性,使其成为构建稳定高效分布式系统的理想选择。
66 5
|
7月前
|
Dubbo Cloud Native 应用服务中间件
【阿里云云原生专栏】云原生环境下的微服务治理:阿里云 Dubbo 与 Nacos 的深度整合
【5月更文挑战第25天】阿里云Dubbo和Nacos提供微服务治理的强大工具,整合后实现灵活高效的治理。Dubbo是高性能RPC框架,Nacos则负责服务发现和配置管理。整合示例显示,通过Nacos注册中心,服务能便捷注册发现,动态管理配置。简化部署,提升适应性,但也需注意服务稳定性和策略规划。这种整合为云原生环境的微服务架构带来强大支持,未来应用前景广阔。
293 2