motan客户端

简介: 早在学习motan框架之前(新浪微博公布RPC的代码,最近在看我司框架,温排版的情况,在浏览器里看会正常显示)

早在学习motan框架之前(新浪微博公布RPC的代码,最近在看我司框架,温排版的情况,在浏览器里看会正常显示)


RPC的复本

用于执行的操作过程又是再次正常使用的事(对象.方法),RPC 的对端也重新正常使用,但有客户端和服务器相同的实现,也就是调用者与执行者者

因为他们不再在同一个进程中,需要通过网络跨JVM实现这一调用过程

在java中的但实现方式:动态+socket通信;这就是如何实现套路,上层代理实现,就是这样,概莫能外

请求过程

motan 的调用实现

先画个简单的顺序图,理清一下调用过程

image.png

motan与spring的结合,后面再写了,spring的扩展也很简单。

基于对RPC复本的认识,可以先找到InvocationHandler的实现类RefererInvocationHandler

这个接口就是一个方法

public Object invoke(Object proxy, Method method, Object[] args)
        throws Throwable;

在这个方法里面就是去构造socket传输的请求对象,请求主要就是方法的签名信息与参数,传输到服务器端,执行的实现方法

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if(isLocalMethod(method)){
            if("toString".equals(method.getName())){
                return clustersToString();
            }
            throw new MotanServiceException("can not invoke local method:" + method.getName());
        }
        DefaultRequest request = new DefaultRequest();
        request.setRequestId(RequestIdGenerator.getRequestId());
        request.setArguments(args);
        request.setMethodName(method.getName());
        request.setParamtersDesc(ReflectUtil.getMethodParamDesc(method));
        request.setInterfaceName(clz.getName());
        request.setAttachment(URLParamType.requestIdFromClient.getName(), String.valueOf(RequestIdGenerator.getRequestIdFromClient()));
        // 当 referer配置多个protocol的时候,比如A,B,C,
        // 那么正常情况下只会使用A,如果A被开关降级,那么就会使用B,B也被降级,那么会使用C
        for (Cluster<T> cluster : clusters) {
            String protocolSwitcher = MotanConstants.PROTOCOL_SWITCHER_PREFIX + cluster.getUrl().getProtocol();
            Switcher switcher = switcherService.getSwitcher(protocolSwitcher);
            if (switcher != null && !switcher.isOn()) {
                continue;
            }
            request.setAttachment(URLParamType.version.getName(), cluster.getUrl().getVersion());
            request.setAttachment(URLParamType.clientGroup.getName(), cluster.getUrl().getGroup());
            // 带上client的application和module
            request.setAttachment(URLParamType.application.getName(), ApplicationInfo.getApplication(cluster.getUrl()).getApplication());
            request.setAttachment(URLParamType.module.getName(), ApplicationInfo.getApplication(cluster.getUrl()).getModule());
            Response response = null;
            boolean throwException =
                    Boolean.parseBoolean(cluster.getUrl().getParameter(URLParamType.throwException.getName(),
                            URLParamType.throwException.getValue()));
            try {
                //真正执行
                response = cluster.call(request);
                return response.getValue();
            }

调用分配了Cluster.call,而Cluster给了HAStraty.call,HA策略通过负载均衡选择均衡策略给Referer

集群是什么

在InvocationHandler里面,调用了Cluster的调用,从代码上看,它的本质就是Referer的集合,并且提供了HA服务以及负载平均。而Referer是提供服务的一个抽象

HA与负载平衡

image.png

HA 策略,就提供了一个,

快速失败

fail-fast很简单,失败就抛异常;

故障转移

fail-over相对fail-fast再重试次数,如果失败,就重试一个referer

负载均衡

这倒提供了

循环赛

这个很简单,一个往下询轮就行了,但需要记住上一次的位置

随机的

随时

最小负载

这个motan实现有点英文

推荐人列表例如上几个,,,,每次都从上百个推荐人的位置,或者如果在台上,或者如果出现随机的性能。n下一个性能,然后获取最多不MAX REFERER COUNT的状态是isAvailable的referer进行判断activeCount。

本地优先

本地服务优先获取策略:对referers根据ip顺序查找远程服务存在,多存在多个本地服务,获取Active最小的本地进行服务。当不本地服务,但是,则根据ActivWeight获取远程服务;当任何时候都存在时,所有本地服务都应优先于远程服务,本地RPC服务与远程RPC服务则根据ActiveWeight进行

网络客户端

上层不管怎么选择服务,最后都需要传输层去传输,nettyclient就是传输任务。

在DefaultRpcReferer中创建了一个nettyClient。向server发送远程调用

private Response request(Request request, boolean async) throws TransportException {
        Channel channel = null;
        Response response = null;
        try {
            // return channel or throw exception(timeout or connection_fail)
            channel = borrowObject();
            if (channel == null) {
                LoggerUtil.error("NettyClient borrowObject null: url=" + url.getUri() + " "
                        + MotanFrameworkUtil.toString(request));
                return null;
            }
            // async request
            response = channel.request(request);
            // return channel to pool
            returnObject(channel);

使用了common-pool连接池

在这儿是委托给了nettychannel.request(),nettyclient和nettychannel是什么关系呢?client有server地址,channel就是这个地址连接的通道。在nettychannel中

public Response request(Request request) throws TransportException {
        int timeout = nettyClient.getUrl().getMethodParameter(request.getMethodName(), request.getParamtersDesc(),
                URLParamType.requestTimeout.getName(), URLParamType.requestTimeout.getIntValue());
        if (timeout <= 0) {
               throw new MotanFrameworkException("NettyClient init Error: timeout(" + timeout + ") <= 0 is forbid.",
                       MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
           }
        NettyResponseFuture response = new NettyResponseFuture(request, timeout, this.nettyClient);
        this.nettyClient.registerCallback(request.getRequestId(), response);
        ChannelFuture writeFuture = this.channel.write(request);
        boolean result = writeFuture.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS);
        if (result && writeFuture.isSuccess()) {
            response.addListener(new FutureListener() {
                @Override
                public void operationComplete(Future future) throws Exception {
                    if (future.isSuccess() || (future.isDone() && ExceptionUtil.isBizException(future.getException()))) {
                        // 成功的调用 
                        nettyClient.resetErrorCount();
                    } else {
                        // 失败的调用 
                        nettyClient.incrErrorCount();
                    }
                }
            });
            return response;
        }

到此,整个请求过程已经完成。

返回处理

调用完成之后,总得得到结果才行

motan返回过程

在nettychannel.request方法中,会返回一个响应,NettyResponseFuture这个类名就说明了一切,使用了Future模式。

在返回响应时,构造真实的响应

private Response asyncResponse(Response response, boolean async) {
        if (async || !(response instanceof NettyResponseFuture)) {
            return response;
        }
        return new DefaultResponse(response);
    }

真实回应里面,使用未来回应去取价值

public DefaultResponse(Response response) {
        this.value = response.getValue();
        this.exception = response.getException();
        this.requestId = response.getRequestId();
        this.processTime = response.getProcessTime();
        this.timeout = response.getTimeout();
    }

在未来的回应里面:

public Object getValue() {
        synchronized (lock) {
            if (!isDoing()) {
                return getValueOrThrowable();
            }
            if (timeout <= 0) {
                try {
                    lock.wait();
                } catch (Exception e) {
                    cancel(new MotanServiceException("NettyResponseFuture getValue InterruptedException : "
                            + MotanFrameworkUtil.toString(request) + " cost="
                            + (System.currentTimeMillis() - createTime), e));
                }
                // don't need to notifylisteners, because onSuccess or
                // onFailure or cancel method already call notifylisteners
                return getValueOrThrowable();
            } else {
                long waitTime = timeout - (System.currentTimeMillis() - createTime);
                if (waitTime > 0) {
                    for (;;) {
                        try {
                            lock.wait(waitTime);
                        } catch (InterruptedException e) {
                        }
                        if (!isDoing()) {
                            break;
                        } else {
                            waitTime = timeout - (System.currentTimeMillis() - createTime);
                            if (waitTime <= 0) {
                                break;
                            }
                        }
                    }
                }
                if (isDoing()) {
                    timeoutSoCancel();
                }
            }
            return getValueOrThrowable();
        }
    }

没有使用java.util.concurrent包中Condition,CountDownLatch之类的工具类,而是使用原始的wait,notify组合

在 NettyClient 中,返回对象,然后对响应进行处理

pipeline.addLast("handler", new NettyChannelHandler(NettyClient.this, new MessageHandler() {
                    @Override
                    public Object handle(Channel channel, Object message) {
                        //得到返回对象
                        Response response = (Response) message;
                        //得到对应request的future
                        NettyResponseFuture responseFuture = NettyClient.this.removeCallback(response.getRequestId());
                        if (responseFuture == null) {
                            LoggerUtil.warn(
                                    "NettyClient has response from server, but resonseFuture not exist,  requestId={}",
                                    response.getRequestId());
                            return null;
                        }
                        if (response.getException() != null) {
                            responseFuture.onFailure(response);
                        } else {
                            responseFuture.onSuccess(response);
                        }
                        return null;
                    }
                }));

responseFuture 成功的方法,主动进行并通知

public void onSuccess(Response response) {
        this.result = response.getValue();
        this.processTime = response.getProcessTime();
        done();
    }
private boolean done() {
        synchronized (lock) {
            if (!isDoing()) {
                return false;
            }
            state = FutureState.DONE;
            lock.notifyAll();
        }
        notifyListeners();
        return true;
    }

总结

到这里,客户端部分已经完成,主要是两方面

  1. 调用请求
  2. 返回处理

还有一些问题:

  1. 客户端如何服务发现的?
  2. 服务降低如何处理的?


目录
相关文章
|
1月前
|
负载均衡 网络协议 Java
gRPC远程调用协议
gRPC远程调用协议
32 0
|
1月前
|
JSON 负载均衡 网络协议
RPC远程调用协议
RPC远程调用协议
59 0
|
10月前
|
负载均衡 Dubbo Java
|
11月前
|
负载均衡 监控 Dubbo
Dubbo协议详解
Dubbo协议详解
895 0
|
12月前
|
网络协议 Dubbo Java
【远程调用框架概述 一】基于HTTP和RPC的远程调用方式
【远程调用框架概述 一】基于HTTP和RPC的远程调用方式
278 0
|
消息中间件 缓存 前端开发
Springboot 整合 WebSocket ,使用STOMP协议 ,前后端整合实战 (一)
Springboot 整合 WebSocket ,使用STOMP协议 ,前后端整合实战 (一)
1730 1
Springboot 整合 WebSocket ,使用STOMP协议 ,前后端整合实战 (一)
|
JSON Dubbo 网络协议
|
XML 缓存 JavaScript
motan服务端
服务端的处理也有套路,不管上层怎么玩,最后还是得通过反射方法对象,再调用invoke() 可知序列图,可以把服务端分成两部分 1. NettyServer前面的算部分,搭基础制造Exporter对象 2. nettyserver 后面的算计,找到方法,调用,通过返回
175 0
motan服务端
|
消息中间件 JSON JavaScript
Springboot 整合Websocket+Stomp协议+RabbitMQ做消息代理 实例教程
Springboot 整合Websocket+Stomp协议+RabbitMQ做消息代理 实例教程
1529 0
Springboot 整合Websocket+Stomp协议+RabbitMQ做消息代理 实例教程
|
消息中间件 Java RocketMQ
RocketMQ源码分析-Rpc通信模块(remoting)二
今天继续RocketMQ-Rpc通信模块(remoting)的源码分析。上一章提到了主要的start()方法执行流程,如果有不清楚的地方可以一起讨论哈,这篇文章会继续解读主要方法,按照惯例先看看NettyRemotingAbstract的类图,看类图知方法。和NettyEventExecutor以及MQ的交互流程。 按照惯例先看看NettyRemotingAbstract的类图,看类图知方法,文中会挑重要方法和主要流程解读。
410 0
RocketMQ源码分析-Rpc通信模块(remoting)二

热门文章

最新文章