前言
在使用Dubbo的日常开发中,通常我们想调用别人的服务时,我们需要在项目中引入对应的API(SDK)包。如果调用方没有服务方提供的API(SDK)的情况下,那么如何解决呢?
Dubbo Generic
Dubbo的泛化调用的作用是,我们无需依赖相关API(SDK)包,也能调用到该服务。
Dubbo的泛化调用主要用于实现一个通用的远程服务Mock框架,可通过实现GenericService接口处理所有服务请求。比如如下场景:
网关服务:如果要搭建一个网关服务,那么服务网关要作为所有RPC服务的调用端。但是网关本身不应该依赖于服务提供方的接口API(这样会导致每有一个新的服务发布,就需要修改网关的代码以及重新部署),所以需要泛化调用的支持。
测试平台:如果要搭建一个可以测试RPC调用的平台,用户输入分组名、接口、方法名等信息,就可以测试对应的RPC服务。那么由于同样的原因(即会导致每有一个新的服务发布,就需要修改网关的代码以及重新部署),所以平台本身不应该依赖于服务提供方的接口API。所以需要泛化调用的支持。
具体参阅:https://dubbo.apache.org/zh/docs3-v2/java-sdk/advanced-features-and-usage/service/generic-reference/
使用场景
Dubbo的泛化调用相对于需要依赖业务客户端API(SDK)包的正常调用,泛化调用不需要依赖二方包,使用其特定的GenericService接口,传入需要调用的方法名、方法签名和参数值等进行调用服务。 泛化调用常见于一些网关应用,没办法依赖所有服务的二方包,或者一些本地化部署的项目中。
Core
- org.apache.dubbo.config.RegistryConfig:注册中心配置
- org.apache.dubbo.config.ApplicationConfig:应用配置
- org.apache.dubbo.config.ReferenceConfig: 引用的服务配置(类似reference注解或xml配置)
- org.apache.dubbo.rpc.service.GenericService: 实际调用的服务(泛化服务)
Core Method
Demo
这种使用,应该经常遇见,但每次调用都要传一堆参数,使用起来不太方便
public static Object invoke(String interfaceName, String methodName, String address, String group, String[] paramTypes, Object[] params) throws BaseAppException {
ReferenceConfig reference = getReferenceConfig(interfaceName, address, group);// 用org.apache.dubbo.rpc.service.GenericService可以替代所有接口引用
// ReferenceConfig的get方法创建代理对象是一个很重的操作。
// 在泛化调用时如果每次都用新的ReferenceConfig对象的get方法创建代理对象,
// 将浪费大量的cpu时间并可能导致内存泄露(内存泄露问题9.0.8已修复)
ReferenceConfigCache cache = ReferenceConfigCache.getCache();
GenericService genericService = (GenericService) cache.get(reference);
// 基本类型以及Date,List,Map等不需要转换,直接调用
return genericService.$invoke(methodName, paramTypes, params);
}
/**
* 获取引用远程服务,封装了所有与注册中心及服务提供方连接
*/
private static ReferenceConfig getReferenceConfig(String interfaceName, String address, String group) throws BaseAppException {
ReferenceConfig referenceConfig = referenceCache.get(interfaceName);
if (null == referenceConfig) {
referenceConfig = new ReferenceConfig<>();
// dubbo consumer的application配置
referenceConfig.setApplication(application);
// 注册中心信息
referenceConfig.setRegistry(getRegistryConfig(address, group));
// 弱类型接口名
referenceConfig.setInterface(interfaceName);
// 声明为泛化接口
referenceConfig.setGeneric(true);
// 缓存
referenceCache.put(interfaceName, referenceConfig);
}
return referenceConfig;
}
/**
* 获取注册中心配置
*/
private static RegistryConfig getRegistryConfig(String address, String group) throws BaseAppException {
String key = address + "-" + group;
RegistryConfig registryConfig = registryConfigCache.get(key);
if (null == registryConfig) {
registryConfig = new RegistryConfig();
registryConfig.setAddress(address);
registryConfig.setGroup(group);
registryConfigCache.put(key, registryConfig);
}
return registryConfig;
}
GenericSupport
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ObjectUtils;
/**
*
* @author Duansg
* @date 2022-03-04 13:43:22
*/
@Slf4j
public final class GenericSupport extends AbstractGeneric {
private static GenericSupport generic = null;
/**
* Constructor
*
* @param appName
* @param zkUrl
* @param registryGroup
* @param properties
*/
private GenericSupport(String appName, String zkUrl, String registryGroup, GenericProperties properties) {
super(appName, zkUrl, registryGroup, properties);
}
/**
* 初始化执行方法
*
* @param appName
* 应用名称
* @param zkUrl
* zk地址
* @param registryGroup
* 服务注册分组,跨组的服务不会相互影响,也无法相互调用,适用于环境隔离,没有就不要乱填
*
* @param properties
* 扩展属性
*/
public static void initGeneric(String appName, String zkUrl, String registryGroup, ItemPlatformGenericProperties properties) {
if (ObjectUtils.isEmpty(generic)) {
generic = new GenericSupport(appName, zkUrl, registryGroup, properties);
log.info("GenericSupport init finshed");
}
}
/**
* 泛化调用
*
* @param interfaceName
* 接口类型
* @param funcName
* 执行方法名称
* @param params
* 实际参数
* @return
*/
public static Object invokeParams(String interfaceName, String funcName, Class<?>[] classs, Object... params) {
return generic.defaultInvokeSimple(interfaceName)
.$invoke(
funcName,
Stream.of(classs).map(Class::getName).toArray(String[]::new),
Stream.of(params).toArray(Object[]::new)
);
}
/**
* @param interfaceType
* 接口类型
* @param funcName
* 执行方法名称
* //@param funcIndex 执行方法下标@MethodGeneric
* @param paramType
* 参数类型
* @param params
* 实际参数
* @return
*/
public static Object invokeParams(Class<?> interfaceType, String funcName, Class<?>[] paramType, Object... params) throws Exception {
return generic.defaultInvokeSimple(interfaceType.getName())
.$invoke(
funcName,
Stream.of(paramType).map(Class::getName).toArray(String[]::new),
Stream.of(params).toArray(Object[]::new)
);
}
/**
* 泛化调用
*
* @param interfaceType
* 接口类型
* @param funcName
* 执行方法名称
* @param params
* 实际参数
* @return
*/
@Deprecated
public static Object invokeParams(Class<?> interfaceType, String funcName, Object... params) {
return generic.defaultInvokeSimple(interfaceType.getName())
.$invoke(
funcName,
Stream.of(params).map(t -> t.getClass().getName()).toArray(String[]::new),
Stream.of(params).toArray(Object[]::new)
);
}
public static Object invokeSimple(String interfaceName, String funcName, Class<?> paramType, Object param) {
return generic.defaultInvokeSimple(interfaceName)
.$invoke(funcName, new String[] {paramType.getName()}, new Object[] {param});
}
/**
* 泛化调用
*
* @param interfaceType
* 接口类型
* @param funcName
* 执行方法名称
* @param paramType
* 参数类型
* @param param
* 实际参数
* @return
*/
public static Object invokeSimple(Class<?> interfaceType, String funcName, Class<?> paramType, Object param) {
return generic.defaultInvokeSimple(interfaceType.getName())
.$invoke(
funcName,
new String[] {paramType.getName()},
Stream.of(param).toArray(Object[]::new)
);
}
public static Object invoke(String interfaceName, String funcName, Class<?> paramType, Object param, String group, String veriosn, Integer timeout) {
return generic.defaultInvokeSimple(interfaceName, group, veriosn, timeout)
.$invoke(funcName, new String[] {paramType.getName()}, new Object[] {param});
}
}
AbstractGeneric
import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import cn.gov.zcy.itemplatform.common.exception.BaseException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ReferenceConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.config.utils.ReferenceConfigCache;
import org.apache.dubbo.rpc.service.GenericService;
import org.springframework.util.ObjectUtils;
/**
* 泛化调用模板
*
* @author Duansg
* @date 2022-03-04 21:50:21
*/
@Slf4j
public abstract class AbstractGeneric {
/**
* 当前应用的信息
*/
protected ApplicationConfig application;
/**
* 配置项
*/
private GenericProperties properties;
protected static final String ZK_PREFIX = "zookeeper://";
protected static final String DEFAULT_REF_VERSION = "1.0.0";
/**
泛化调用涉及到公网->基础架构->service,耗时先提高一点,具体时间待摸清
*/
protected static final Integer DEFAULT_REF_TIMEOUT = 10000;
protected static final String DEFAULT_REF_GROUP = null;
/**
* 注册中心信息缓存,暂时只有1个,可支持多注册中心的场景
*/
protected static final Map<String, RegistryConfig> registryConfigCache = new ConcurrentHashMap<>();
/**
* 业务方的缓存
*/
protected static final Map<String, ReferenceConfig<GenericService>> referenceCache = new ConcurrentHashMap<>();
/**
* Args Constructor
*
* @param appName
* 应用名称
* @param zkUrl
* zk地址
* @param registryGroup
* 服务注册分组,跨组的服务不会相互影响,也无法相互调用,适用于环境隔离,没有就不要乱填
*/
public AbstractGeneric(String appName, String zkUrl, String registryGroup, GenericProperties properties) {
application = new ApplicationConfig(appName);
application.setRegistry(getRegistryConfig(zkUrl.startsWith(ZK_PREFIX) ? zkUrl : ZK_PREFIX.concat(zkUrl), registryGroup));
this.properties = properties;
}
/**
* 获取注册中心配置
*
* @param address
* zk地址
* @param group
* 服务注册分组,跨组的服务不会相互影响,也无法相互调用,适用于环境隔离,没有就不要乱填
* @return
*/
protected RegistryConfig getRegistryConfig(String address, String group) {
String cacheKey = String.format("%s-%s", address, group);
RegistryConfig registryConfig = registryConfigCache.get(cacheKey);
if (ObjectUtils.isEmpty(registryConfig)) {
registryConfig = new RegistryConfig();
if (StringUtils.isNotBlank(address)) {
registryConfig.setAddress(address);
}
if (StringUtils.isNotBlank(group)) {
registryConfig.setGroup(group);
}
registryConfigCache.put(cacheKey, registryConfig);
}
return registryConfig;
}
/**
* 获取业务方配置
*
* @param interfaceName
* 接口类型
* @param group
* 分组
* @param version
* 版本
* @param timeout
* milliseconds
* @return
*/
protected GenericService getReferenceConfig(String interfaceName, String group, String version, Integer timeout) {
// 获取业务方定义
ReferenceConfig<GenericService> referenceConfig = referenceCache.get(interfaceName);
if (ObjectUtils.isEmpty(referenceConfig)) {
referenceConfig = new ReferenceConfig<>();
referenceConfig.setApplication(application);
// 声明为泛化接口
referenceConfig.setGeneric(true);
referenceConfig.setVersion(StringUtils.isNotBlank(version) ? version : null);
referenceConfig.setGroup(StringUtils.isNotBlank(group) ? group : null);
referenceConfig.setTimeout(timeout);
referenceConfig.setInterface(interfaceName);
referenceCache.put(interfaceName, referenceConfig);
}
return ReferenceConfigCache.getCache().get(referenceConfig);
}
/**
* 解析方法名称
*
* @param interfaceType
* 接口类型
* @param index
* 执行方法下标
* @return
*/
protected static String getFuncName(Class<?> interfaceType, int index) {
for (int i = 0; i < interfaceType.getMethods().length; i++) {
Method method = interfaceType.getMethods()[i];
MethodGeneric methodGeneric = method.getAnnotation(MethodGeneric.class);
if (ObjectUtils.isEmpty(methodGeneric)) {
continue;
}
if (methodGeneric.index() == index) {
return method.getName();
}
}
throw BaseException.buildGenericException("Method name not found");
}
/**
* @param interfaceName
* @return
*/
protected GenericService defaultInvokeSimple(String interfaceName) {
if (ObjectUtils.isEmpty(properties) && properties.isPrintLog()) {
log.info("ItemPlatformGeneric准备执行,interfaceName:{}", interfaceName);
}
return getReferenceConfig(interfaceName, DEFAULT_REF_GROUP, DEFAULT_REF_VERSION, DEFAULT_REF_TIMEOUT);
}
/**
*
* @param interfaceName
* @param group
* @param version
* @param timeout
* @return
*/
protected GenericService defaultInvokeSimple(String interfaceName, String group, String version, Integer timeout) {
return getReferenceConfig(interfaceName, group, version, timeout);
}
}
EnableAutoConfigGeneric
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
/**
* 泛化调用自动装配配置类
*
* @author Duansg
* @date 2022-03-04 17:18:21
*/
@Configuration
@EnableConfigurationProperties(ItemPlatformGenericProperties.class)
@ConditionalOnProperty(prefix = "item.generic", name = "enabled", havingValue = "true")
public class EnableAutoConfigItemPlatformGeneric {
@Value("${spring.application.name}")
private String appName;
@Value("${spring.dubbo.registry}")
private String zkUrl;
@Value("${dubbo.registry.group:#{null}}")
private String group;
@Autowired
private GenericProperties properties;
@PostConstruct
public void initConfig() {
GenericSupport.initGeneric(appName, zkUrl, group, properties);
}
}
Test Unit
public Response<ItemRes> create(CreateDto createDto) {
Object result = GenericSupport.invokeParams(ItemWriteApi.class, "create", createDto);
return JSONObject.parseObject(JSON.toJSONString(result), new TypeReference<Response<ItemRes>>() {});
}