正文
一、思路原理
当启动服务提供者(Netty服务器),将服务注册到zookeeper上,也就是在zk上创建/xiaojie-dubbo/com.xiaojie.api.UserService/providers/xiaojie-dubbo://127.0.0.1:8080方法名称等信息。
服务发现,从zookeeper节点上获取到服务列表[xiaojie://127.0.0.1:8080...,xiaojie://127.0.0.1:8081....],然后通过轮询、权重、一致性hash,随机等负载均衡算法获取服务。
当有服务下线或者宕机之后,通知服务调用者(zk事件通知实现,我没有实现)服务下线或者宕机。
通过反射,获取实现类的方法,调用之后,消费者通过代理模式,代理出来客户端的类,进行方法的调用,然后通过netty中ChannelInboundHandlerAdapter的channelRead()将获取的数据写出去。
利用户Jboss Marshalling解码器MarshallingDecoder、MarshallingEnecoder进行解码。
二、代码
代码结构如下
自定义注解代码
package com.xiaojie.dubbo.annotation; import java.lang.annotation.*; /** * 自定义rpc注解 */ @Documented @Inherited @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface RpcAnnotation { Class value(); }
服务的注册
package com.xiaojie.dubbo.server.register; import org.I0Itec.zkclient.ZkClient; import java.net.URLEncoder; /** * 将服务注册到zk上 */ public class ServiceRegisterImpl implements ServiceRegister{ //定义zk地址 private String zkServer="127.0.0.1:2181"; //超时时间 private Integer timeOut=5000; //zkclient private ZkClient zkClient; public ServiceRegisterImpl() { zkClient = new ZkClient(zkServer,timeOut); } //定义跟节点 private String rootNode="/xiaojie-dubbo"; @Override public void register(String serviceName, String servicePath) { System.out.println(".............."); if (!zkClient.exists(rootNode)){ zkClient.createPersistent(rootNode); //xiaojie-dubbo } //rootNode+"/"+serviceName String serviceNameNode=rootNode+"/"+serviceName; if (!zkClient.exists(serviceNameNode)){ zkClient.createPersistent(serviceNameNode); //xiaojie-dubbo/com.xiaojie.api.UserService } String providerNode=serviceNameNode+"/"+"providers"; if (!zkClient.exists(providerNode)){ zkClient.createPersistent(providerNode);//xiaojie-dubbo/com.xiaojie.api.UserService/providers } //创建我们服务地址 String serviceAddresNodePath = providerNode + "/" + URLEncoder.encode(servicePath); System.out.println("serviceAddresNodePath:" + serviceAddresNodePath); if (zkClient.exists(serviceAddresNodePath)) { zkClient.delete(serviceAddresNodePath); } zkClient.createEphemeral(serviceAddresNodePath);//xiaojie-dubbo/com.xiaojie.api.UserService/providers/getName.... } }
服务绑定
package com.xiaojie.dubbo.server.rpc; import com.xiaojie.dubbo.annotation.RpcAnnotation; import com.xiaojie.dubbo.server.handler.ServerHandler; import com.xiaojie.dubbo.server.marshalling.MarshallingCodeCFactory; import com.xiaojie.dubbo.server.register.ServiceRegister; import com.xiaojie.dubbo.server.register.ServiceRegisterImpl; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import java.util.HashMap; import java.util.Map; public class NettyServer { /** * 核心过程是将服务绑定到zk * 启动netty服务实现监听 */ private ServiceRegister serviceRegister; //host private String host; //端口号 private Integer port; private Map<String,Object> handlerMap=new HashMap<>(); public NettyServer( String host, Integer port) { this.host = host; this.port = port; serviceRegister = new ServiceRegisterImpl(); } public void bind(Object obj){ //获取类上注解 RpcAnnotation declaredAnnotation = obj.getClass().getDeclaredAnnotation(RpcAnnotation.class); if (declaredAnnotation == null) { throw new RuntimeException("declaredAnnotation is null"); } //获取名称,也就是serviceName String serviceName = declaredAnnotation.value().getName(); String serviceAddr="xiaojie://"+host+":"+port; serviceRegister.register(serviceName,serviceAddr); handlerMap.put(serviceName,obj); } /** * 创建netty服务端,开启监听 */ public void initNetty(){ //创建线程接收请求 NioEventLoopGroup bossGroup = new NioEventLoopGroup(); //创建线程处理请求 NioEventLoopGroup workGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap=new ServerBootstrap(); serverBootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); socketChannel.pipeline().addLast(new ServerHandler(handlerMap)); } }); //绑定端口号 try { ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //等待服务器监听端口 channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { //关闭 bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } //启动 public void start(Object obj){ bind(obj); initNetty(); } }
服务发现
package com.xiaojie.dubbo.server.discover; import org.I0Itec.zkclient.ZkClient; import java.util.List; public class ServiceDiscoverImpl implements ServiceDiscover{ //定义zk地址 private String zkServer="127.0.0.1:2181"; //超时时间 private Integer timeOut=5000; //zkclient private ZkClient zkClient; public ServiceDiscoverImpl() { zkClient = new ZkClient(zkServer,timeOut); } //定义跟节点 private String rootNode="/xiaojie-dubbo"; @Override public List<String> discoverList(String serviceName) { List<String> children = zkClient.getChildren(rootNode + "/" + serviceName + "/" + "providers"); return children; } }
传送Request的实体类
package com.xiaojie.dubbo.server.req; import java.io.Serializable; public class RpcRequest implements Serializable { private static final long SerialVersionUID = 1L; /** * 类的className */ private String className; /** * 方法名称 */ private String methodName; /** * 参数类型 */ Class<?> parameterTypes[]; /** * 参数value */ Object paramsValue[]; public RpcRequest(String className, String methodName, Class<?>[] parameterTypes, Object[] paramsValue) { this.className = className; this.methodName = methodName; this.parameterTypes = parameterTypes; this.paramsValue = paramsValue; } public String getClassName() { return className; } public String getMethodName() { return methodName; } public Class<?>[] getParameterTypes() { return parameterTypes; } public Object[] getParamsValue() { return paramsValue; } public void setClassName(String className) { this.className = className; } public void setMethodName(String methodName) { this.methodName = methodName; } public void setParameterTypes(Class<?>[] parameterTypes) { this.parameterTypes = parameterTypes; } public void setParamsValue(Object[] paramsValue) { this.paramsValue = paramsValue; } @Override public String toString() { return className + "," + methodName + "," + parameterTypes + paramsValue; } }
通过反射执行方法
package com.xiaojie.dubbo.server.handler; import com.xiaojie.dubbo.server.req.RpcRequest; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.lang.reflect.Method; import java.util.Map; public class ServerHandler extends ChannelInboundHandlerAdapter { private Map<String,Object> handlerMap; public ServerHandler(Map<String, Object> handlerMap) { this.handlerMap = handlerMap; } @Override /** * 服务器端监听客户端的消息 */ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { RpcRequest req=(RpcRequest) msg; if (req==null){ throw new RuntimeException("req is null"); } //获取serviceName 接口路径 String serviceName = req.getClassName(); //实现类 Object objectImpl = handlerMap.get(serviceName); if (objectImpl == null) { throw new RuntimeException("service is not exist"); } //反射获取方法 Method method = objectImpl.getClass().getMethod(req.getMethodName(), req.getParameterTypes()); //执行实现类方法 Object result = method.invoke(objectImpl,req.getParamsValue()); //返回结果给客户端 ctx.writeAndFlush(result); } }
客户端通过代理模式执行方法
package com.xiaojie.dubbo.server.proxy; import com.xiaojie.dubbo.server.discover.ServiceDiscover; import com.xiaojie.dubbo.server.discover.ServiceDiscoverImpl; import com.xiaojie.dubbo.server.handler.ClientHandler; import com.xiaojie.dubbo.server.loadBalance.LoadBalance; import com.xiaojie.dubbo.server.loadBalance.LoopBalance; import com.xiaojie.dubbo.server.loadBalance.RandomBalance; import com.xiaojie.dubbo.server.loadBalance.WeightBalance; import com.xiaojie.dubbo.server.marshalling.MarshallingCodeCFactory; import com.xiaojie.dubbo.server.req.RpcRequest; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.URLDecoder; import java.util.List; public class RpcClientProxy { public static <T> T create(Class<T> interfaceClass){ return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //使用代理拼接地址 ServiceDiscover serviceDiscover=new ServiceDiscoverImpl(); List<String> strings = serviceDiscover.discoverList(interfaceClass.getName()); LoadBalance loadBalance=new RandomBalance(); String servicePath = URLDecoder.decode((String) loadBalance.select(strings));//[xiaojie-dubbo://192.168.1.110:8080,xiaojie-dubbo://192.168.1.110:8081] String[] split = servicePath.split(":"); String host=split[1].replace("//",""); Integer port= Integer.valueOf(split[2]); //封装具体参数 RpcRequest rpcRequest = new RpcRequest(interfaceClass.getName(), method.getName(), method.getParameterTypes(), args); //启动客户端,发送结果 return sendMsg(host,port,rpcRequest); } }); } /** * 客户端发送消息 * @return */ public static Object sendMsg(String host, Integer port, RpcRequest rpcRequest){ final ClientHandler clientHandler = new ClientHandler(); NioEventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap=new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class) .remoteAddress( new InetSocketAddress( host,port)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); socketChannel.pipeline().addLast(clientHandler); } }); //发起同步连接 try { ChannelFuture channelFuture = bootstrap.connect().sync(); //客户端发送 channelFuture.channel().writeAndFlush(rpcRequest); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { group.shutdownGracefully(); } return clientHandler.getResponse(); } }
完整代码https://gitee.com/whisperofjune/netty-dubbo-rpc.git
参考:蚂蚁课堂的netty部分