motan服务端

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 服务端的处理也有套路,不管上层怎么玩,最后还是得通过反射方法对象,再调用invoke()可知序列图,可以把服务端分成两部分1. NettyServer前面的算部分,搭基础制造Exporter对象2. nettyserver 后面的算计,找到方法,调用,通过返回

前引

服务端的处理也有套路,不管上层怎么玩,最后还是得通过反射方法对象,再调用invoke()

image.png

可知序列图,可以把服务端分成两部分

  1. NettyServer前面的算部分,搭基础制造Exporter对象
  2. nettyserver 后面的算计,找到方法,调用,通过返回

制造出口商对象

结合弹簧

其实在《motan客户端》时有提过,但没有深研;spring扩展自定义xml是一个老的技术了spring扩展xml文档

spring通过XML解析程序将其解析为DOM树,通过NamespaceHandler指定的Namespace的BeanDefinitionParser将其转换成BeanDefinition的功能。再通过Spring自身的对应对BeanDefinition实例化对象。

在此期间,春暖花开的资料:

  1. META-INF/spring.handlers

指定NamespaceHandler(实现org.springframework.beans.factory.xml.NamespaceHandler)接口,或使用org.springframework.beans.factory.xml.NamespaceHandlerSupport的子类。

  1. META-INF/spring.schemas

在解析XML文件时将XSD功能控制器到本地文件,不用在解析XML文件时需要上网下载XSD文件。通过实况org.xml.sax.EntityResolver接口来实现。

配置处理程序

解析完xml后,服务器通过ServiceConfigBean来监听spring容器加载完成。

@Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (!getExported().get()) {
            export();
        }
    }

调用export(),构建Exporter;

public synchronized void export() {
        if (exported.get()) {
            LoggerUtil.warn(String.format("%s has already been expoted, so ignore the export request!", interfaceClass.getName()));
            return;
        }
        checkInterfaceAndMethods(interfaceClass, methods);
        List<URL> registryUrls = loadRegistryUrls();
        if (registryUrls == null || registryUrls.size() == 0) {
            throw new IllegalStateException("Should set registry config for service:" + interfaceClass.getName());
        }
        Map<String, Integer> protocolPorts = getProtocolAndPort();
        for (ProtocolConfig protocolConfig : protocols) {
            Integer port = protocolPorts.get(protocolConfig.getId());
            if (port == null) {
                throw new MotanServiceException(String.format("Unknow port in service:%s, protocol:%s", interfaceClass.getName(),
                        protocolConfig.getId()));
            }
            doExport(protocolConfig, port, registryUrls);
        }
        afterExport();
    }

这个主要方法就做两件事

  1. loadRegistryUrls(),根据
    ,生成URL对象。URL也收录了整个框架的内核,一个包含了配置中的所有内容的url。就像一个领域对象一样。

如果使用的是zookeeper,对就的URL就是:zookeeper://127.0.0.1:2181/com.weibo.api.motan.registry.RegistryService?group=default_rpc

这是注册中心的网址

而服务URL是参数为embed为key包含在里面,里面包含了的服务参数

motan://127.0.0.1:8002/com.share.rpc.service.DEMOService?module=match-rpc&loadbalance=activeWeight&nodeType=service&accessLog=true&minWorkerThread=2&protocol=motan&isDefault=true&maxWorkerThread=10&refreshTimestamp=1486448597095&id=com.weibo.api.motan。 config.springsupport.ServiceConfigBean&export=protocolMatch:8002&requestTimeout=60000&group=match-rpc&

做出口()
ConfigHandler configHandler = ExtensionLoader.getExtensionLoader(ConfigHandler.class).getExtension(MotanConstants.DEFAULT_VALUE);
 exporters.add(configHandler.export(interfaceClass, ref, urls));

到这里就是委托给ConfigHandler处理了。

实现类:SimpleConfigHandler

这个类里面有两个重要的方法:

@Override
    public <T> T refer(Class<T> interfaceClass, List<Cluster<T>> clusters, String proxyType) {
        ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(proxyType);
        return proxyFactory.getProxy(interfaceClass, new RefererInvocationHandler<T>(interfaceClass, clusters));
    }
    @Override
    public <T> Exporter<T> export(Class<T> interfaceClass, T ref, List<URL> registryUrls) {
        String serviceStr = StringTools.urlDecode(registryUrls.get(0).getParameter(URLParamType.embed.getName()));
        URL serviceUrl = URL.valueOf(serviceStr);
        // export service
        // 利用protocol decorator来增加filter特性
        String protocolName = serviceUrl.getParameter(URLParamType.protocol.getName(), URLParamType.protocol.getValue());
        Protocol protocol = new ProtocolFilterDecorator(ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(protocolName));
        Provider<T> provider = new DefaultProvider<T>(ref, serviceUrl, interfaceClass);
        Exporter<T> exporter = protocol.export(provider, serviceUrl);
        // register service
        register(registryUrls, serviceUrl);
        return exporter;
    }

refer()就是RefererConfig配置完后调用的方法,就看到了客户端的内核类ReferInvocationHandler

export()就是服务端使用的方法了。

构造提供者

public interface Provider<T> extends Caller<T> {
    Class<T> getInterface();
}

其实就是外面服务类的代理类,这里面就一个类的引用。方法在初始化的时候,把所有的全部服务缓存

private void initMethodMap(Class<T> clz) {
        Method[] methods = clz.getMethods();
        for (Method method : methods) {
            String methodDesc = ReflectUtil.getMethodDesc(method);
            methodMap.put(methodDesc, method);
        }
    }

协议的导出就是启动一个netty server,提供前端服务。

注册 注册服务

按注册后,,,,,,,,,,,,,,,,,,,注册

private void register(List<URL> registryUrls, URL serviceUrl) {
        for (URL url : registryUrls) {
            // 根据check参数的设置,register失败可能会抛异常,上层应该知晓
            RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(url.getProtocol());
            if (registryFactory == null) {
                throw new MotanFrameworkException(new MotanErrorMsg(500, MotanErrorMsgConstant.FRAMEWORK_REGISTER_ERROR_CODE,
                        "register error! Could not find extension for registry protocol:" + url.getProtocol()
                                + ", make sure registry module for " + url.getProtocol() + " is in classpath!"));
            }
            Registry registry = registryFactory.getRegistry(url);
            registry.register(serviceUrl);
        }
    }

根据注册协议选择注册中心RegistryFacotry 有:1. 直接 2.local 3.zookeeper

的注册表也是这个

在zookeeperregistry里面,

protected void doRegister(URL url) {
        try {
            serverLock.lock();
            // 防止旧节点未正常注销
            removeNode(url, ZkNodeType.AVAILABLE_SERVER);
            removeNode(url, ZkNodeType.UNAVAILABLE_SERVER);
            createNode(url, ZkNodeType.UNAVAILABLE_SERVER);
        } catch (Throwable e) {
            throw new MotanFrameworkException(String.format("Failed to register %s to zookeeper(%s), cause: %s", url, getUrl(), e.getMessage()), e);
        } finally {
            serverLock.unlock();
        }
    }

这里注册后,这个服务还是不可用,需要调用MotanSwitcherUtil.setSwitcherValue(MotanConstants.REGISTRY HEARTBEAT SWITCHER, true);才能可用

网络服务器

的套路通道处理程序,一个不同的处理程序,不同的干,只是包装自定义

final NettyChannelHandler handler = new NettyChannelHandler(NettyServer.this, messageHandler,
                standardThreadExecutor);
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            // FrameDecoder非线程安全,每个连接一个 Pipeline
            public ChannelPipeline getPipeline() {
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("channel_manage", channelManage);
                pipeline.addLast("decoder", new NettyDecoder(codec, NettyServer.this, maxContentLength));
                pipeline.addLast("encoder", new NettyEncoder(codec, NettyServer.this));
                pipeline.addLast("handler", handler);
                return pipeline;
            }
        });

反倒是MessageHandler:ProviderProtectedMessageRouter从名字看,有保护功能

1) 如果接口只有一个方法,那么直接返回 true2)如果接口有多个方法,那么如果有多个方法超过maxThread /2 && totalCount > (maxThread * 3 / 4),那么返回false;3)如果接口有多个方法(4个),同时总的请求数maxThread * 3 / 4,同时该方法的请求数超过maxThead * 1 / 4,则return false4) 其他场景return true

protected boolean isAllowRequest(int requestCounter, int totalCounter, int maxThread, Request request) {
        if (methodCounter.get() == 1) {
            return true;
        }
        // 该方法第一次请求,直接return true
        if (requestCounter == 1) {
            return true;
        }
        // 不简单判断 requsetCount > (maxThread / 2) ,因为假如有2或者3个method对外提供,
        // 但是只有一个接口很大调用量,而其他接口很空闲,那么这个时候允许单个method的极限到 maxThread * 3 / 4
        if (requestCounter > (maxThread / 2) && totalCounter > (maxThread * 3 / 4)) {
            return false;
        }
        // 如果总体线程数超过 maxThread * 3 / 4个,并且对外的method比较多,那么意味着这个时候整体压力比较大,
        // 那么这个时候如果单method超过 maxThread * 1 / 4,那么reject
        return !(methodCounter.get() >= 4 && totalCounter > (maxThread * 3 / 4) && requestCounter > (maxThread * 1 / 4));
    }

拒绝的结果就是放一个异常:

private Response reject(String method, int requestCounter, int totalCounter, int maxThread) {
        DefaultResponse response = new DefaultResponse();
        MotanServiceException exception =
                new MotanServiceException("ThreadProtectedRequestRouter reject request: request_counter=" + requestCounter
                        + " total_counter=" + totalCounter + " max_thread=" + maxThread, MotanErrorMsgConstant.SERVICE_REJECT);
        exception.setStackTrace(new StackTraceElement[0]);
        response.setException(exception);
        LoggerUtil.error("ThreadProtectedRequestRouter reject request: request_method=" + method + " request_counter=" + requestCounter
                + " =" + totalCounter + " max_thread=" + maxThread);
        return response;
    }

其他

RpcContext

private static final ThreadLocal<RpcContext> localContext = new ThreadLocal<RpcContext>() {
        protected RpcContext initialValue() {
            return new RpcContext();
        }
    };
    public static RpcContext getContext() {
        return localContext.get();
    }

这个ThreadLocal然尽还可以设置默认初始值,以前尽然没用过

总结

服务端相对客户端还是很简单的。

没有哈哈,就是闹网提请求处理完了。


相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
1月前
|
Dubbo Java 应用服务中间件
从源码全面解析 dubbo 服务端服务调用的来龙去脉
从源码全面解析 dubbo 服务端服务调用的来龙去脉
|
1月前
|
负载均衡 网络协议 Java
gRPC远程调用协议
gRPC远程调用协议
34 0
|
10月前
|
负载均衡 Dubbo Java
|
消息中间件 缓存 前端开发
Springboot 整合 WebSocket ,使用STOMP协议 ,前后端整合实战 (一)
Springboot 整合 WebSocket ,使用STOMP协议 ,前后端整合实战 (一)
1746 1
Springboot 整合 WebSocket ,使用STOMP协议 ,前后端整合实战 (一)
|
11月前
|
前端开发 安全 Java
SpringBoot + WebSocket+STOMP指定推送消息
本文将简单的描述SpringBoot + WebSocket+STOMP指定推送消息场景,不包含信息安全加密等,请勿用在生产环境。
243 0
|
Dubbo Java 应用服务中间件
带你读《Apache Dubbo微服务开发从入门到精通》——二、 HTTP/2(Triple)协议(6)
带你读《Apache Dubbo微服务开发从入门到精通》——二、 HTTP/2(Triple)协议(6)
105 2
|
自然语言处理 运维 Dubbo
带你读《Apache Dubbo微服务开发从入门到精通》——二、 HTTP/2(Triple)协议(3)
带你读《Apache Dubbo微服务开发从入门到精通》——二、 HTTP/2(Triple)协议(3)
102 1
|
Dubbo 应用服务中间件 API
带你读《Apache Dubbo微服务开发从入门到精通》——二、 HTTP/2(Triple)协议(9)
带你读《Apache Dubbo微服务开发从入门到精通》——二、 HTTP/2(Triple)协议(9)
104 0
|
Dubbo Java 应用服务中间件
带你读《Apache Dubbo微服务开发从入门到精通》——二、 HTTP/2(Triple)协议(7)
带你读《Apache Dubbo微服务开发从入门到精通》——二、 HTTP/2(Triple)协议(7)
90 0
|
Dubbo Java 应用服务中间件
带你读《Apache Dubbo微服务开发从入门到精通》——二、 HTTP/2(Triple)协议(8)
带你读《Apache Dubbo微服务开发从入门到精通》——二、 HTTP/2(Triple)协议(8)
89 0