本文主要探讨了消息总线支持Thrift RPC的实现过程。鉴于RabbitMQ官方的Java Client提供了基于RabbitMQ的JSON-RPC,消息总线也顺道提供了JSON-RPC的API。然后也尝试了为消息总线增加对Thrift-RPC的扩展支持,希望此举能让消息总线同时为SOA提供基础设施。
Thrift简介
Thrift是一个跨语言的服务部署框架,最初由Facebook于2007年开发,2008年进入Apache开源项目。Thrift通过一个中间语言(IDL, 接口定义语言)来定义RPC的接口和数据类型,然后通过一个编译器生成不同语言的代码(目前支持C++,Java, Python,PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, Smalltalk和OCaml),并由生成的代码负责RPC协议层和传输层的实现。
初衷
做这件事情的初衷是RabbitMQ可以用于模拟request/response这样的通信模型,而这个通信模型就是通常C/S以及B/S架构的通信模型。并且因为RPC的流行,其官方java client已经提供了对基于JSON(文本协议)的RPC的实现。而Thrift本身是个RPC框架,提供跨语言、多协议、多传输通信机制的实现。如果能将两者衔接起来,消息总线对RPC的支持无疑更加完善。
思路
Thrift的实现是基于类似TCP/IP的多层协议栈模型。它的特点是对等通信,逻辑分离,分层解耦。如下图:
在协议层,目前Thrift支持众多的协议,这些协议大致分为两类:
- 二进制协议
- 文本协议(以XML、JSON为代表)
现状
实现
客户端实现
Thrift默认提供了很多客户端通信技术的实现,而这些实现类都必须实现TTransport这一基接口,该接口的类图如下:
public void flush() throws TTransportException { byte[] data = this.reqMsgStream.toByteArray(); this.reqMsgStream.reset(); byte[] responseData = new byte[0]; try { responseData = this.client.primitiveRequest(secret, target, data, token, timeout); } catch (Exception e) { ExceptionHelper.logException(logger, e, "[flush]"); } this.respMsgStream = new ByteArrayInputStream(responseData); }
可以看到它先从输出流中获取请求数据,然后调用总线的消息发送请求,并阻塞等待响应数据,最后基于响应数据构建输入流。
public void testThriftRpc() throws Exception { TTransport transport = new TAMQPClientTransport(this.client, "kliwhiduhaiucvarkjajksdbfkjabw", "emapDemoRpcResponse", "klasehnfkljashdnflhkjahwlekdjf", 10000); transport.open(); TProtocol protocol = new TJSONProtocol(transport); CalcService.Client client = new CalcService.Client(protocol); int result = client.calcSum(); logger.info(result); transport.close(); }
大致来看层次还是比较清晰的,构建的流程是从低层向上层的构建方式:
服务器端实现
MessagebusSinglePool singlePool = new MessagebusSinglePool(host, port);; Messagebus client = singlePool.getResource(); //server code WrappedRpcServer rpcServer = null; try { TProcessor processor = new CalcService.Processor(new CalcServiceImpl()); TProtocolFactory inProtocolFactory = new TJSONProtocol.Factory(); TProtocolFactory outProtocolFactory = new TJSONProtocol.Factory(); rpcServer = client.buildRpcServer("mshdfjbqwejhfgasdfbjqkygaksdfa", new ThriftMessageHandler(processor, inProtocolFactory, outProtocolFactory)); rpcServer.mainLoop(); } finally { rpcServer.close(); singlePool.returnResource(client); singlePool.destroy(); }
它直接替换了Thrift提供的Server实现(跟客户端通信一样thrift同样提供了多个Server的实现),而是采用了基于RabbitMQ构建的一个RPCServer,然后启动一个event loop将其block住,等待客户端的请求,并进行处理。
消息总线提供了一个API:buildRpcServer,它构建出一个封装后的WrappedRpcServer。该WrappedRpcServer封装了上文提到的RabbitMQ java client自带的RpcServer。为什么要封装?主要的原因还是信息隐藏。其实,如果只接入RabbitMQ,不封装是没有问题的。但现在的目的是接入消息总线,而消息总线在RabbitMQjava client之上又封装了一层,屏蔽了一些不必要的设置,而这些设置恰好又是构建这个RpcServer类的实例所必备的参数(比如 queue name,channel等)。
因此这里我们基于组合的方式将RpcServer从WrappedRpcServer类的构造器注入进来,使得WrappedRpcServer成为RpcServer的代理,WrappedRpcServer的构造器访问标识符被设置为private,这是因为我们在消息总线内部构建了RpcServer的实例,然后通过反射机制来构造WrappedRpcServer的实例。看代码:public WrappedRpcServer buildRpcServer(String secret, final IRpcMessageProcessor rpcMsgProcessor) { Node source = this.getContext().getConfigManager().getNodeView(secret).getCurrentQueue(); try { RpcServer aServer = new RpcServer(this.getContext().getChannel(), source.getValue()) { @Override public byte[] handleCall(QueueingConsumer.Delivery request, AMQP.BasicProperties replyProperties) { return rpcMsgProcessor.onRpcMessage(request.getBody()); } }; Constructor<WrappedRpcServer> rpcServerConstructor = WrappedRpcServer.class.getDeclaredConstructor(RpcServer.class); rpcServerConstructor.setAccessible(true); WrappedRpcServer wrappedRpcServer = rpcServerConstructor.newInstance(aServer); rpcServerConstructor.setAccessible(false); return wrappedRpcServer;
buildRpcServerAPI需要的参数除了有一个用于自身标识的secret,还有一个rpc消息处理器接口:IRpcMessageProcessor。它的定义如下:
public interface IRpcMessageProcessor { public byte[] onRpcMessage(byte[] in); }
它的主要作用是给Thrift RPC(或后续可能接入的其他RPC框架)提供一个跟消息总线衔接的入口。它拥有一个输入作为参数,每个RPC Server在其内部处理请求并将结构作为输出参数返回。可以看到,这个接口没有对第三方产生任何依赖,而且输入参数跟输出参数都是byte[],因此它可以适配任何类似Thrift这样的RPC框架(当然前提是这些RPC框架都能像thrift具有这么好的扩展性)。
public byte[] onRpcMessage(byte[] inMsg) { InputStream in = new ByteArrayInputStream(inMsg); OutputStream out = new ByteArrayOutputStream(); TTransport transport = new TIOStreamTransport(in, out); TProtocol inProtocol = inProtocolFactory.getProtocol(transport); TProtocol outProtocol = outProtocolFactory.getProtocol(transport); try { processor.process(inProtocol, outProtocol); return ((ByteArrayOutputStream) out).toByteArray(); } catch (TException e) { ExceptionHelper.logException(logger, e, "onRpcMessage"); throw new RuntimeException(e); } finally { transport.close(); } }
首先它将方法参数作为输入,基于该输入构建一个输入流,然后构造一个空的输出流。通过输入、输出流可以构建出服务端的TTransport对象(它是thrift通信传输的基础也是,thrift协议栈的最底层)。接下来构造用于对输入解码以及对输出编码的协议处理对象。然后传入thrift的核心处理对象:processor进行处理,产生的原始输出作为方法返回。
RpcServer aServer = new RpcServer(this.getContext().getChannel(), source.getValue()) { @Override public byte[] handleCall(QueueingConsumer.Delivery request, AMQP.BasicProperties replyProperties) { return rpcMsgProcessor.onRpcMessage(request.getBody()); } };
我们在构建RpcServer实例的时候,覆盖了消息的处理方式,使其触发对IRpcMessageProcessor的onRpcMessage方法的调用。该方法的调用就衔接起了thrift的代码逻辑,如果这里的RPC框架不是thrift,那么它也可以用于衔接其他的RPC框架。