dubbo的spi机制分析和实战案例(下)

简介: dubbo的spi机制分析和实战案例(下)

这个时候我们只能通过spi机制来自定义一套负载均衡策略进行实现了:


package com.sise.dubbo.config.loadBalanceSpi;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.cluster.LoadBalance;
import java.time.LocalTime;
import java.util.List;
import java.util.Random;
/**
 *
 * @author idea
 * @data 2019/5/18
 */
public class MyLoadBalance implements LoadBalance {
    private final String A_MACHINE_HOST_PORT = "192.168.43.191:20880";
    private final String B_MACHINE_HOST_PORT = "192.168.43.191:20880";
    @Override
    public <T> Invoker<T> select(List<Invoker<T>> list, URL url, Invocation invocation) throws RpcException {
        System.out.println("执行自定义的负载均衡算法");
        //模拟场景
        System.out.println(url);
        int currentHour = LocalTime.now().getHour();
        if (currentHour >= 9 && currentHour <= 18) {
            System.out.println("请求A机器");
            findInvokerInList(list, A_MACHINE_HOST_PORT);
        } else if (currentHour >= 18 && currentHour <= 23) {
            System.out.println("请求B机器");
            findInvokerInList(list, B_MACHINE_HOST_PORT);
        }
        int randIndex = new Random().nextInt(list.size());
        return list.get(randIndex);
    }
    /**
     * 从服务列表里面进行dubbo服务地址匹配
     *
     * @param list
     * @param matchKey
     * @param <T>
     * @return
     */
    private <T> Invoker findInvokerInList(List<Invoker<T>> list, String matchKey) {
        for (Invoker tInvoker : list) {
            String addr = tInvoker.getUrl().getHost() + tInvoker.getUrl().getPort();
            if (matchKey.equals(addr)) {
                return tInvoker;
            }
        }
        return null;
    }
}


然后在META-INF/dubbo文件夹底下配置一份纯文本的配置文件,文件命名为:


com.alibaba.dubbo.rpc.cluster.LoadBalance


(ps:不同版本的dubbo,LoadBalance的包名可能不同)

image.png


在这份文件里面写入这么一行内容(有点key,value的味道)


mylb=com.sise.dubbo.config.loadBalanceSpi.MyLoadBalance


在consumer端的配置文件中写入以下内容,这里的loadbalance需要和配置文件里的mylb一致。


<dubbo:reference interface="com.sise.dubbo.api.UserRpcService" id="userRpcService" loadbalance="mylb" />


然后我们可以启动多台provider,用consumer去调用这些服务进行测试,通过调整机器的时间点,控制台就会打印出不同的属性信息


请求B机器
执行自定义的负载均衡算法
zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?anyhost=true&application=consumer&check=false&dubbo=2.5.3&interface=com.sise.dubbo.api.UserRpcService&loadbalance=mylb&methods=findByUsername,findAll,printStr&pid=12460&printStr.async=true&service.filter=MyFilter&side=consumer&timestamp=1558143174084&weight=1600
请求A机器
执行自定义的负载均衡算法
zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?anyhost=true&application=consumer&check=false&dubbo=2.5.3&interface=com.sise.dubbo.api.UserRpcService&loadbalance=mylb&methods=findByUsername,findAll,printStr&pid=12460&printStr.async=true&service.filter=MyFilter&side=consumer&timestamp=1558143174084&weight=1600


通过上述的这种思路,我们借助dubbo的spi机制来加载满足自己特殊业务的负载均衡器,使得该框架的灵活性更高,扩展性更强。


自定义的dubbo过滤器


基于spi的扩展机制,dubbo里面还提供了对于filter类型的自定义拓展。开发者可以自定义一套filter来进行对于请求的功能拦截和校验,这个有点类似于springmvc里面的filter过滤器,通过特定的过滤器拦截数据之后,可以结合特殊的业务场景来做一些控制性的功能。


如何建立自己的filter过滤器?


首先我们需要在provider模块那定义一个filter类:


package com.sise.dubbo.config.filterSpi;
import com.alibaba.dubbo.rpc.*;
/**
 * @author idea
 * @date 2019/5/17
 */
public class MyFilter implements Filter {
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        System.out.println("this is before");
        Result result = invoker.invoke(invocation);
        System.out.println("this is after");
        return result;
    }
}


然后在META-INF/dubbo文件夹底下去创建相应的配置文件:(这个项目里面我还加入了其他的spi配置,不过对于过滤器配置没有影响)



image.png


配置里面需要加入下边的内容:


MyFilter=com.sise.dubbo.config.filterSpi.MyFilter


对于过滤器的xml配置只需要在相应的provider的xml中加入


<dubbo:provider filter="MyFilter"></dubbo:provider>


如果只是想对某个服务进行过滤操作的话,可以这么配置:


<!-- 需要暴露的服务接口 -->
    <dubbo:service interface="com.sise.dubbo.api.UserRpcService" ref="userRpcService"  filter="MyFilter"/>


通常我们可以基于自定义的filter来实现一些服务调度的权限校验,调度次数统计等功能,但是注意在拦截请求的时候对于性能方面的把控,有时候也可以加入一些特殊ip的拦截校验功能,主要还是需要结合特殊的业务场景来实现。


dubbo本身的可扩展性极强,阿里巴巴团队在官方文档上边给出了十多种常用的spi扩展配置方式,这里主要只展示了两种常见的spi扩展,剩余的可以自行前往官网去查看文档讲解。



dubbo的spi加载原理


拿dubbo的spi来说,它在运行的时候会通过一个叫做ExtensionLoader的加载器来进行dubbo的扩展点加载。


我们可以进入ExtensionLoader这个类里面先进行初步的阅览:


image.png


这里面包含写明了dubbo在使用spi机制加载配置文件的基本目录,这里的internal目录我个人理解为dubbo内置服务的配置地址。


核心的加载逻辑图如下所示:


image.png


通过getExtension函数来加载类:


image.png


这里面有用到了加锁双重判断,主要是初始化加载之后,这些扩展类信息会被放入到一个ConcurrentMap<string, holder<="" object="" style="font-size: inherit; color: inherit; line-height: inherit;">> cachedInstances 里面。


进入createExtension函数里面,我们会看到以下内容:


image.png



这段代码的核心操作在于getExtensionClasses函数,再进入该函数中阅读源码:

会发现又是一次双重判断加锁的加载


image.png



这里面的loadExtensionClasses函数是加载扩展配置类信息的作用,进去之后进行源码阅读会发现:


image.png


loadFile函数对dubbo配置里面的 META-INF/services/META-INF/dubbo/META-INF/dubbo/internal/目录都进行了类的加载。这一点相比于jdk自带的spi加载所支持的目录要多。


再点进去loadFile源码里面,核心的类加载功能就会展示出来了:



private void loadFile(Map<String, Class<?>> extensionClasses, String dir) {
        String fileName = dir + type.getName();
        try {
            Enumeration<java.net.URL> urls;
            ClassLoader classLoader = findClassLoader();
            if (classLoader != null) {
                urls = classLoader.getResources(fileName);
            } else {
                urls = ClassLoader.getSystemResources(fileName);
            }
            if (urls != null) {
                while (urls.hasMoreElements()) {
                    java.net.URL url = urls.nextElement();
                    try {
                        BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8"));
                        try {
                            String line = null;
                            while ((line = reader.readLine()) != null) {
                                final int ci = line.indexOf('#');
                                if (ci >= 0) line = line.substring(0, ci);
                                line = line.trim();
                                if (line.length() > 0) {
                                    try {
                                        String name = null;
                                        int i = line.indexOf('=');
                                        if (i > 0) {
                                            name = line.substring(0, i).trim();
                                            line = line.substring(i + 1).trim();
                                        }
                                        if (line.length() > 0) {
                                            Class<?> clazz = Class.forName(line, true, classLoader);
                                            if (! type.isAssignableFrom(clazz)) {
                                                throw new IllegalStateException("Error when load extension class(interface: " +
                                                        type + ", class line: " + clazz.getName() + "), class " 
                                                        + clazz.getName() + "is not subtype of interface.");
                                            }
                                            if (clazz.isAnnotationPresent(Adaptive.class)) {
                                                if(cachedAdaptiveClass == null) {
                                                    cachedAdaptiveClass = clazz;
                                                } else if (! cachedAdaptiveClass.equals(clazz)) {
                                                    throw new IllegalStateException("More than 1 adaptive class found: "
                                                            + cachedAdaptiveClass.getClass().getName()
                                                            + ", " + clazz.getClass().getName());
                                                }
                                            } else {
                                                try {
                                                    clazz.getConstructor(type);
                                                    Set<Class<?>> wrappers = cachedWrapperClasses;
                                                    if (wrappers == null) {
                                                        cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
                                                        wrappers = cachedWrapperClasses;
                                                    }
                                                    wrappers.add(clazz);
                                                } catch (NoSuchMethodException e) {
                                                    clazz.getConstructor();
                                                    if (name == null || name.length() == 0) {
                                                        name = findAnnotationName(clazz);
                                                        if (name == null || name.length() == 0) {
                                                            if (clazz.getSimpleName().length() > type.getSimpleName().length()
                                                                    && clazz.getSimpleName().endsWith(type.getSimpleName())) {
                                                                name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase();
                                                            } else {
                                                                throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + url);
                                                            }
                                                        }
                                                    }
                                                    String[] names = NAME_SEPARATOR.split(name);
                                                    if (names != null && names.length > 0) {
                                                        Activate activate = clazz.getAnnotation(Activate.class);
                                                        if (activate != null) {
                                                            cachedActivates.put(names[0], activate);
                                                        }
                                                        for (String n : names) {
                                                            if (! cachedNames.containsKey(clazz)) {
                                                                cachedNames.put(clazz, n);
                                                            }
                                                            Class<?> c = extensionClasses.get(n);
                                                            if (c == null) {
                                                                extensionClasses.put(n, clazz);
                                                            } else if (c != clazz) {
                                                                throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
                                                            }
                                                        }
                                                    }
                                                }
                                            }
                                        }
                                    } catch (Throwable t) {
                                        IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + url + ", cause: " + t.getMessage(), t);
                                        exceptions.put(line, e);
                                    }
                                }
                            } // end of while read lines
                        } finally {
                            reader.close();
                        }
                    } catch (Throwable t) {
                        logger.error("Exception when load extension class(interface: " +
                                            type + ", class file: " + url + ") in " + url, t);
                    }
                } // end of while urls
            }
        } catch (Throwable t) {
            logger.error("Exception when load extension class(interface: " +
                    type + ", description file: " + fileName + ").", t);
        }
    }


这段代码由于比较冗长,因为dubbo在进行实际加载的过程中需要考虑很多的因素,主要目的就是实现加载指定目录底下的拓展类并且将其存入一个map中缓存起来。

这段代码我进行了稍微的改写之后,变成了一个比较简单的util类,简化学习和理解的难度


package com.sise.dubbo.spi.myspi;
import com.sise.dubbo.spi.spidemo.UserService;
import com.sise.dubbo.spi.spidemo.UserServiceImpl;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.Enumeration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
 * @author idea
 * @date 2019/5/17
 */
public class MySpiUtil {
    /**
    * 这里自定义了加载配置的文件夹
    **/
    private static final String SPI_DIR = "META-INF/idea/";
    private Map<String, Class<?>> classMap = new ConcurrentHashMap<>();
    /**
     * 加载目录底下的文件信息
     *
     * @param clazz
     */
    public void loadDirectory(Class clazz) {
        String fileName = SPI_DIR + clazz.getName();
        ClassLoader classLoader = this.getClass().getClassLoader();
        try {
            Enumeration<URL> resources = classLoader.getResources(fileName);
            if (resources != null) {
                while (resources.hasMoreElements()) {
                    URL url = resources.nextElement();
                    loadResource(classLoader, url);
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public void loadResource(ClassLoader classLoader, URL url) {
        //读取配置文件里面的内容
        try {
            BufferedReader reader = new BufferedReader(
                    new InputStreamReader(url.openStream(), "utf-8"));
            String line;
            while ((line = reader.readLine()) != null) {
                int c = line.indexOf("#");
                //该行内容没有注释
                if (c <= 0) {
                    line = line.trim();
                    if (line.length() > 0) {
                        int splitIndex = line.indexOf("=");
                        String name = line.substring(0, splitIndex).trim();
                        String className = line.substring(splitIndex + 1).trim();
                        classMap.put(name, Class.forName(className, true, classLoader));
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) throws IllegalAccessException, InstantiationException {
        MySpiUtil mySpiUtil = new MySpiUtil();
        mySpiUtil.loadDirectory(UserService.class);
        UserServiceImpl userService = (UserServiceImpl) mySpiUtil.classMap.get("UserService").newInstance();
        userService.say();
    }
}


相关的待加载服务代码:


image.png


然后根据代码里面的指定目录进行配置文件的放置:


image.png


配置文件也是按照dubbo的spi配置文件的格式来书写:



UserService=com.sise.dubbo.spi.spidemo.UserServiceImpl


运行程序之后,便可加载到相应的类并进行执行:


image.png


spi技术在java中应用场景比较广泛,通常在开发的时候为了实现接口自动寻找实现类的功能,可以通过spi来进行实现,将接口的实现类转移到一份配置文件中来进行控制。jdk自带的spi通常会一次性就将所有类进行实例化比较耗时,而dubbo在加载类的时候直接通过名称来定位具体的类,按实际需要加载,同时支持加载的路径也更加多,相比于传统jdk的spi加载要效率更高。



相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
相关文章
|
负载均衡 监控 Dubbo
Dubbo 原理和机制详解(非常全面)
本文详细解析了 Dubbo 的核心功能、组件、架构设计及调用流程,涵盖远程方法调用、智能容错、负载均衡、服务注册与发现等内容。欢迎留言交流。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
Dubbo 原理和机制详解(非常全面)
|
Dubbo Java 应用服务中间件
微服务框架Dubbo环境部署实战
微服务框架Dubbo环境部署的实战指南,涵盖了Dubbo的概述、服务部署、以及Dubbo web管理页面的部署,旨在指导读者如何搭建和使用Dubbo框架。
1253 17
微服务框架Dubbo环境部署实战
|
Dubbo IDE Java
dubbo学习二:下载Dubbo-Admin管理控制台,并分析在2.6.1及2.6.1以后版本的变化
这篇文章是关于如何下载和部署Dubbo管理控制台(dubbo-admin)的教程,并分析了2.6.1版本及以后版本的变化。
1257 0
dubbo学习二:下载Dubbo-Admin管理控制台,并分析在2.6.1及2.6.1以后版本的变化
|
缓存 负载均衡 Dubbo
Dubbo技术深度解析及其在Java中的实战应用
Dubbo是一款由阿里巴巴开源的高性能、轻量级的Java分布式服务框架,它致力于提供高性能和透明化的RPC远程服务调用方案,以及SOA服务治理方案。
525 6
|
负载均衡 Dubbo Java
Dubbo服务Spi机制和原理
该文章主要介绍了Dubbo中的SPI(Service Provider Interface)机制和原理,包括SPI的基本概念、Dubbo中的SPI分类以及SPI机制的实现细节。
Dubbo服务Spi机制和原理
|
Dubbo Java 应用服务中间件
Spring Boot 调用 Dubbo 接口与编写 Dubbo 接口实战
Spring Boot 调用 Dubbo 接口与编写 Dubbo 接口实战
2045 1
|
Dubbo Java Nacos
【实战攻略】破解Dubbo+Nacos+Spring Boot 3 Native打包后运行异常的终极秘籍——从零开始彻底攻克那些让你头疼不已的技术难题!
【8月更文挑战第15天】Nacos作为微服务注册与配置中心受到欢迎,但使用Dubbo+Nacos+Spring Boot 3进行GraalVM native打包后常遇运行异常。本文剖析此问题及其解决策略:确认GraalVM版本兼容性;配置反射列表以支持必要类和方法;采用静态代理替代动态代理;检查并调整配置文件;禁用不支持的功能;利用日志和GraalVM诊断工具定位问题;根据诊断结果调整GraalVM配置。通过系统排查方法,能有效解决此类问题,确保服务稳定运行。
724 0
|
Dubbo Java 应用服务中间件
微服务学习 | Springboot整合Dubbo+Nacos实现RPC调用
微服务学习 | Springboot整合Dubbo+Nacos实现RPC调用
|
Dubbo Java 应用服务中间件
Spring Cloud Dubbo:微服务通信的高效解决方案
【10月更文挑战第15天】随着信息技术的发展,微服务架构成为企业应用开发的主流。Spring Cloud Dubbo结合了Dubbo的高性能RPC和Spring Cloud的生态系统,提供高效、稳定的微服务通信解决方案。它支持多种通信协议,具备服务注册与发现、负载均衡及容错机制,简化了服务调用的复杂性,使开发者能更专注于业务逻辑的实现。
380 2
|
Dubbo Java 应用服务中间件
💥Spring Cloud Dubbo火爆来袭!微服务通信的终极利器,你知道它有多强大吗?🔥
【8月更文挑战第29天】随着信息技术的发展,微服务架构成为企业应用开发的主流模式,而高效的微服务通信至关重要。Spring Cloud Dubbo通过整合Dubbo与Spring Cloud的优势,提供高性能RPC通信及丰富的生态支持,包括服务注册与发现、负载均衡和容错机制等,简化了服务调用管理并支持多种通信协议,提升了系统的可伸缩性和稳定性,成为微服务通信领域的优选方案。开发者仅需关注业务逻辑,而无需过多关心底层通信细节,使得Spring Cloud Dubbo在未来微服务开发中将更加受到青睐。
277 0
下一篇
开通oss服务