想法的来源
学了netty框架以及看了一下一小部分的netty框架的源码,听说dubbo是基于netty框架的一个优秀的落地实现,所以看了一小部分dubbo的源码,感觉学习netty总要有一个方式证明自己曾经学过,所以写下这一篇小笔记,写给自己看。
源码下载
https://github.com/cbeann/NettyRpcDemooo
前提
Zookeeper知识点
Netty知识点
Spring的生命周期
自定义starter
动手实践
注意
下面只贴了部分代码和写这个小demo的想法,全部代码在 https://github.com/cbeann/NettyRpcDemooo 中
目标
1)zk做为注册中心
2)服务提供者提供集群方法,服务消费者轮询调用
3)服务提供者上线和下线服务消费者可以感知
4)整合自定义starter
5)没有使用序列化,使用json字符串进行接口调用
RPC程序需要的配置参数
服务提供者
注册中心ZK的IP
注册中心ZK的端口
服务提供者的名称
服务提供者的端口(NettyServer的端口,不是SpringBoot的端口)
服务消费者
注册中心ZK的IP
注册中心ZK的端口
服务消费者的名称
zk中服务提供者的存储目录结构
如果要满足集群版本的服务提供者存储,zk的存储设计也应该好好的想一想,有一点可以明确的创建的节点有临时节点,这样服务消费者才能通过某种机制监听到服务提供者上下线并进行业务逻辑操作。
本文中的存储结构如图所示
/rpc为项目根目录(持久化节点)
/rpc/provider下存储的是服务提供者集群(持久化节点)
/rpc/provider/XXX存储的是服务提供者名称(持久化节点)
/rpc/provider/XXX/IP:端口存储的提供者名称为XXX的实例(临时节点)
服务提供者思路
服务提供者的思路其实比服务消费者简单多了,其实只需要解决把ioc容器放入到自定义的SimpleChannelInboundHandler中,然后读取json字符串获取class、方法和参数,然后在ioc容器中获取类并反射调用方法返回结果即可。
在配置文件中拿到服务提供者NettyServer中ServerBootstrap的端口,然后启动ServerBootstrap,并向zk暴露自己的服务(添加临时节点信息),服务提供者就完成了。
- 1)在配置文件中拿到ServerBootstrap的端口,调用下面的构造器
public NettyServer(int port) { this.port = port; }
- 2)通过ApplicationContextAware,保存ApplicationContext在ServerHander中
public class NettyServer implements ApplicationContextAware{ private ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } } /* socketChannel.pipeline().addLast(new RcpServerHandler(applicationContext)); */
- 3)最后通过Bean的生命周期,创建Bean完毕后调用@PostConstruct的方法
/** 启动Netty服务器 */ @PostConstruct public void postConstruct() { // 向外暴露端口,zk添加服务信息 doExport(); // 异步开启netty服务器 new Thread( () -> { this.bind();//ServerBootstrap.bind(ip) }) .start(); }
- 4)此时Netty服务器已经启动并且保存ApplicationContext在ServerHander中
部分代码如下所示:
package com.rpc.server; import com.rpc.properties.RpcProperties; import com.rpc.properties.RpcServerProperties; import com.rpc.zk.ZKServer; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import javax.annotation.PostConstruct; import java.net.InetAddress; /** * @author chaird * @create 2021-02-07 2:32 */ public class NettyServer implements ApplicationContextAware { private Integer port; private ApplicationContext applicationContext; EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); public NettyServer(int port) { this.port = port; } /** * 开启NettyServer的方法 */ public void bind() { try { b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler( new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // TimeClientHandler是自己定义的方法 socketChannel.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8)); socketChannel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); socketChannel.pipeline().addLast(new RcpServerHandler(applicationContext)); } }); System.out.println("服务端 Netty服务器启动成功:" + port); // 绑定端口 ChannelFuture f = b.bind(port).sync(); // 等待服务端监听端口关闭 f.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { // 优雅关闭,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } /** 启动Netty服务器 */ @PostConstruct public void postConstruct() { // 向外暴露端口 doExport(); // 异步开启netty服务器 new Thread( () -> { this.bind(); }) .start(); } /** 服务暴露(其实就是把服务信息保存到Zookeeper上) */ private void doExport() { ZKServer zkServer = applicationContext.getBean(ZKServer.class); RpcServerProperties rpcServerProperties = applicationContext.getBean(RpcServerProperties.class); RpcProperties rpcProperties = applicationContext.getBean(RpcProperties.class); // providerGroupDir = /rpc/provider/myProviderName String providerGroupDir = rpcProperties.getPath() + rpcProperties.getProviderPath(); providerGroupDir = providerGroupDir + "/" + rpcServerProperties.getProviderName(); try { // 创建服务名目录(用于集群) zkServer.createPathPermanent(providerGroupDir, ""); } catch (Exception e) { e.printStackTrace(); } try { String providerAddress = InetAddress.getLocalHost().getHostAddress(); String providerInstance = providerAddress + ":" + rpcServerProperties.getProviderPort(); // key(path) = /rpc/provider/myProviderName/127.0.0.1:8080 value:127.0.0.1:8080 zkServer.createPathTemp(providerGroupDir + "/" + providerInstance, providerInstance); } catch (Exception e) { e.printStackTrace(); } } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } }
服务消费者思路和难点
服务消费者则从构思遇到的难点进行拆解,服务消费者比服务提供者难多了!!!
服务消费者中怎么存储与多个服务提供者的关系?
客户端比如果你看过部分Netty的源码,那你比较容易下面的设计思路。没看过也没关系,反正我写的也是个demo,也很容易理解。
首先从细粒度开始,客户端要和每一个服务提供者建立连接,即和每一个ServerBootstrap建立连接,所以会有如下的自定义实体类(ip,port)
//Netty连接服务提供者的自定义客户端 public class NettyClient { public NettyClient(String ip, Integer port) { this.ip = ip; this.port = port; } private String ip; private Integer port; EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); //... }
- 如果我们设计的是一个服务提供者,那上面就没有问题了,如果要实现服务提供者的集群方案,那么就需要在上面的基础上再包装一层,定义为服务提供者组(集群),如下所示。
//服务提供者组(集群) public class NettyClientGroup { /** 下一个下标 */ private AtomicInteger index = new AtomicInteger(0); /** 服务提供者名称 */ private String providerName; /** 服务提供者列表 */ List<NettyClient> providerList = new ArrayList<>(); /** key:服务提供者ip:端口 value:NettyClient */ Map<String, NettyClient> providerMap = new HashMap<>(); }
服务提供者的集群实现了,我可以从NettyClientGroup中选择NettyClient,但是服务提供者不是一种类型的,比如有订单服务提供者、用户服务提供者等,所以外面还要包装一层,NettyClientBootStarp,用于保存服务提供者组名称和服务提供者组的关系,如下所示
//服务消费者启动器 public class NettyClientBootStarp implements ApplicationContextAware { /**key:服务提供者组名称,服务提供者组*/ Map<String, NettyClientGroup> providers = new HashMap<>(); List<NettyClient> providerList = new ArrayList<>(); public NettyClientBootStarp() {} }
- 此时,存储关系已经很明显了,下面用实例展示
服务消费者中怎么感知服务提供者上下线以及感知到如何处理?
- 首先要监听到zk服务提供者节点的上下和下线,如下所示,其实我们是可以监听到的
//注册默认的watcher zk = new ZooKeeper(url, 5000, watcher); String listenProviderPath = path + providerPath; // 给某节点添加watcher zk.getChildren(listenProviderPath, true);
- 监听到需要做什么业务逻辑呢?
就是把某个上线或者下线的服务提供者从NettyClientBootStarp 中移除,但是此处我用了一个比较极端的操作,即重新链接所有的服务提供者,这样就可以添加或者删除掉变动的提供者
public class RpcServiceChangeWatcher implements Watcher, ApplicationContextAware { private RpcProperties rpcProperties; private NettyClientBootStarp nettyClientBootStarp; @Override public void process(WatchedEvent event) { System.out.println(event); // 实际业务 try { nettyClientBootStarp.refreshProviders(); } catch (Exception e) { e.printStackTrace(); } // 重新监听 String providersPath = rpcProperties.getPath() + rpcProperties.getProviderPath(); try { zkServer.getZk().getChildren(providersPath, true); } catch (Exception e) { e.printStackTrace(); } } }
怎么实现服务消费者调用正常的controller就能通过NettyClient发送数据到NettyServer?
如下所示,我在SpringBoot的客户端中的HelloController中放入StudentService(接口,非实现类),怎么就能发送数据呢????
@RestController public class HelloController { @RpcService("provider01") // 自定义注解,其中value为服务提供者名称,类似OpenFeign的使用 private StudentService studentService; @GetMapping("/index/{id}") public Object hello(@PathVariable Integer id) { String res = studentService.getId(id); return res; }
答案:代理、反射、自定义注解
- 首先自定义注解,其中value为服务提供者名称
/** * 该注解用于注入远程服务 */ @Target(ElementType.FIELD) // 方法注解 @Retention(RetentionPolicy.RUNTIME) // 运行时注解 public @interface RpcService { String value();//服务提供者名称 }
然后自定义BeanPostProcessor(Bean的生命周期知识点),目的是包装Bean,大体的逻辑为判断类里是否有自定义注解@RpcService,并获取注解的value,即需要调用的远程服务的名称;遍历获取标注该注解的属性,生成包含属性为(NettyClientBootStarp )的代理对象,然后注入到对象的该属性中,这样从逻辑上解决了 空指针异常
public class RcpServiceInjectBeanPostProcessor implements InstantiationAwareBeanPostProcessor, ApplicationContextAware { private ApplicationContext context; @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { // 判断类里是否有@RpcService注解 Class<?> clazz = context.getType(beanName); if (Objects.isNull(clazz)) { return bean; } Field[] declaredFields = clazz.getDeclaredFields(); for (Field field : declaredFields) { // 找出标记了InjectService注解的属性 RpcService injectService = field.getAnnotation(RpcService.class); if (injectService == null) { continue; } // 获取服务名称 String providerName = injectService.value(); // 获取接口Class Class<?> fieldClass = field.getType(); // 获取nettyClient NettyClientBootStarp nettyClientBootStarp = context.getBean(NettyClientBootStarp.class); RpcFactoryProxy rpcFactoryProxy = new RpcFactoryProxy(fieldClass, providerName, nettyClientBootStarp); Object proxy = rpcFactoryProxy.getProxy(); Object object = bean; field.setAccessible(true); try { // 请开始你的表演 field.set(object, proxy); } catch (IllegalAccessException e) { e.printStackTrace(); } } return bean; } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { return bean; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.context = applicationContext; } public RcpServiceInjectBeanPostProcessor() { System.out.println("-----RcpServiceInjectBeanPostProcessor-----------"); } }
通过反射发送数据。
(1)invoke方法可以获取到调用的接口名称、方法名称和参数
(2)代理对象构造的时候传入了需要调用的服务名称和NettyClientBootStarp
(3)在invoke中把接口名称、方法名称和参数通过Netty发送给服务提供者是可以实现的
public class RpcFactoryProxy<T> implements InvocationHandler { private Class<T> proxyInterface; // 这里可以维护一个缓存,存这个接口的方法抽象的对象 private NettyClientBootStarp nettyClientBootStarp; private String serviceName; public RpcFactoryProxy( Class<T> proxyInterface, String serviceName, NettyClientBootStarp nettyClient) { this.serviceName = serviceName; this.proxyInterface = proxyInterface; this.nettyClientBootStarp = nettyClient; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { System.out.println("invoke"); Map<String, NettyClientGroup> providers = nettyClientBootStarp.getProviders(); NettyClientGroup nettyClientGroup = providers.get(serviceName); if (null == nettyClientGroup) { RpcResponse response = RpcResponse.NO_SERVICE(); return response.getReturnValue(); } NettyClient nettyClient = nettyClientGroup.next(); if (null == nettyClient) { RpcResponse response = RpcResponse.NO_SERVICE(); return response.getReturnValue(); } RpcRequest rpcRequest = new RpcRequest(); rpcRequest.setRequestId(UUID.randomUUID().toString().substring(0, 8)); // 设置服务名称 rpcRequest.setServiceName(serviceName); // 设置是哪个类 rpcRequest.setClazzName(proxyInterface.getName()); // 设置哪个方法 rpcRequest.setMethodName(method.getName()); // 设置参数类型 Class<?>[] parameterTypes = method.getParameterTypes(); String[] parameterTypeString = Class2String.class2String(parameterTypes); rpcRequest.setParameterTypeStrings(parameterTypeString); // 设置参数 rpcRequest.setParameters(args); // 发送消息 RpcResponse response = nettyClient.sendMessage(rpcRequest); if (response == null) { response = RpcResponse.TIME_OUT(rpcRequest.getRequestId()); } return response.getReturnValue(); } public T getProxy() { return (T) Proxy.newProxyInstance(proxyInterface.getClassLoader(), new Class[] {proxyInterface}, this); } }
Netty客户端的SimpleChannelInboundHandler都是打印数据,我怎么实现要调用后返回数据而不是打印数据呢?
首先自定义实现Future的实现类RpcFuture,注意这里面有一个CountDownLatch(1)
public class RpcFuture<T> implements Future<T> { private T response; /** * 因为请求和响应是一一对应的,所以这里是1 */ private CountDownLatch countDownLatch = new CountDownLatch(1); /** * 获取响应,直到有结果才返回 * @return * @throws InterruptedException * @throws ExecutionException */ @Override public T get() throws InterruptedException, ExecutionException { countDownLatch.await(); return response; } @Override public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (countDownLatch.await(timeout,unit)){ return response; } return null; } public void setResponse(T response) { this.response = response; countDownLatch.countDown(); } }
当发送数据的时候创建一个RcpFuture,然后把该RcpFuture存在一个地方FuturePool,此时调用RcpFuture.get方法是阻塞的(阻塞原因CountDownLatch ),如下所示
public RpcResponse sendMessage(RpcRequest msg) { // 存起来 RpcFuture<RpcResponse> future = new RpcFuture<>(); FuturePool.put(msg.getRequestId(), future); RpcResponse rpcResponse = null; try { String s = JSONUtil.toJsonStr(msg); f.channel().writeAndFlush(s); rpcResponse = future.get(2000, TimeUnit.MILLISECONDS); } catch (Exception e) { e.printStackTrace(); } finally { FuturePool.remove(msg.getRequestId()); } return rpcResponse; }
那么在打印的地方通过key获取RpcFuture ,然后把结果通过RpcFuture.setResponse方法设计进去,如下所示,上面就能返回结果了。
public class RpcClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { String resp = msg; System.out.println("resp:"+resp); RpcResponse rpcResponse = JSONUtil.toBean(resp, RpcResponse.class); RpcFuture future = FuturePool.get(rpcResponse.getRequestId()); future.setResponse(rpcResponse); }
总结
1)一个小案例的实现要整合多个知识点,并且写了这篇晦涩难懂的文章,还是花了点时间的。
2)dubbo中使用的FactoryBean实现的远程调用,可以看看人家的思路。
3)学习到了很多东西,比如我现在才知道zk的监听机制只监听一次。
4)需要用到的知识点太多了,自定义注解,BeanPostProcessor,反射,Future等,是个学习Netty不错的案例。
5)新的一年,希望大家步步高升,一年比一年好。