Netty快速入门RPC项目

简介: Netty快速入门RPC项目

1 Netty所需背景知识

前言:如果你还对jdk的socket还有印象,下面的例子很简单理解的,不过后续我的文章会详细Netty模型各个组件。

Netty快速入门

阻塞IO(BIO)并发不高的原因就是有阻塞的方法,去等待服务方或者等待客户端方发送数据之后进行处理,就是加异步线程也无法解决高并发问题

2 创建MyRPC项目

pom.xml⽂件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>cn.oldlu.myrpc</groupId>
    <artifactId>oldlu-MyRPC</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.50.Final</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- java编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

3 服务端

3.1 MyRPCServer

package cn.oldlu.myrpc.server;
import cn.oldlu.myrpc.server.handler.MyChannelInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class MyRPCServer {
    public void start(int port) throws Exception {
// 主线程,不处理任何业务逻辑,只是接收客户的连接请求
        EventLoopGroup boss = new NioEventLoopGroup(1);
// ⼯作线程,线程数默认是:cpu*2
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
// 服务器启动类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss, worker) //设置线程组
                    .channel(NioServerSocketChannel.class) //配置server通道
                    .childHandler(new MyChannelInitializer()); //worker线程的处理器
            ChannelFuture future = serverBootstrap.bind(port).sync();
            System.out.println("服务器启动完成,端⼝为:" + port);
//等待服务端监听端⼝关闭
            future.channel().closeFuture().sync();
        } finally {
//优雅关闭
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

3.2 MyChannelInitializer

package cn.oldlu.myrpc.server.handler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
/**
 * ChannelHandler的初始化
 */
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
//将业务处理器加⼊到列表中
        ch.pipeline().addLast(new MyChannelHandler());
    }
}

3.3 MyChannelHandler

package cn.oldlu.myrpc.server.handler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class MyChannelHandler extends ChannelInboundHandlerAdapter {
    /**
     * 获取客户端发来的数据
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws
            Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        String msgStr = byteBuf.toString(CharsetUtil.UTF_8);
        System.out.println("客户端发来数据:" + msgStr);
//向客户端发送数据
        ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8));
    }
    /**
     * 异常处理
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

3.4 测试用例

package cn.oldlu.myrpc;
import cn.oldlu.myrpc.server.MyRPCServer;
import org.junit.Test;
public class TestServer {
    @Test
    public void testServer() throws Exception{
        MyRPCServer myRPCServer = new MyRPCServer();
        myRPCServer.start(5566);
    }
}

3.5 测试

这里会有人疑问你netty为什么会去拿socket测试,可能你没有细看我最开始的文章,socket偏向于底层,而netty是对socket的封装。一般很少直接使用socket来编程,使用框架比较多,而netty就是其中一种框架,广泛应用在各个互联网中间件,大数据,游戏,IM中。


可以看到,客户端发送数据到服务端。

3 客户端

3.1 MyRPCClient

package cn.oldlu.myrpc.client;
import cn.oldlu.myrpc.client.handler.MyClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class MyRPCClient {
    public void start(String host, int port) throws Exception {
//定义⼯作线程组
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
//注意:client使⽤的是Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(worker)
                    .channel(NioSocketChannel.class) //注意:client使⽤的是
            NioSocketChannel
                    .handler(new MyClientHandler());
//连接到远程服务
            ChannelFuture future = bootstrap.connect(host, port).sync();
            future.channel().closeFuture().sync();
        } finally {
            worker.shutdownGracefully();
        }
    }
}

3.2 MyClientHandler

package cn.oldlu.myrpc.client.handler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws
            Exception {
        System.out.println("接收到服务端的消息:" +
                msg.toString(CharsetUtil.UTF_8));
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 向服务端发送数据
        String msg = "hello";
        ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

3.3 测试用例

package cn.oldlu.myrpc;
import cn.oldlu.myrpc.client.MyRPCClient;
import org.junit.Test;
public class TestClient {
    @Test
    public void testClient() throws Exception{
        new MyRPCClient().start("127.0.0.1", 5566);
    }
}

客户端:

服务端:

目录
相关文章
|
Dubbo Java 应用服务中间件
Netty入门到超神系列-手撸简单版RPC框架(仿Dubbo)
原理还是比较简单 : 代理 + 线程池 + Netty 下面做一些解释: 首先需要定义一个统一的API接口,例:UserApi , 服务端(provider)需要实现这个接口,提供相应的方法UserApiImpl#save,客户端通过远程来调用该接口。 然后需要约定一个协议,服务器如何才能识别到客户端要调用哪个接口?:我这里用 “接口权限定名#方法名#参数” ,的方式来,因为是一个简单版本的RPC。服务端解析该内容就能匹配对应的接口的实现类,然后调用该方法。并把方法的返回值通过Netty写回给客户端 使用的编解码器都是比价简单的String的编解码器
180 0
|
前端开发
Netty手写RPC框架
创建Request类,继承Message,klass是调用的Class目标,name,parameterType,argument分别是方法名称,参数类型,参数
96 0
|
7月前
|
Java Spring
Spring Boot+Netty实现远程过程调用(RPC)
Spring Boot+Netty实现远程过程调用(RPC)
170 0
|
安全 网络协议 Java
Netty快速入门
Netty快速入门
232 0
|
负载均衡
06RPC - netty实现RPC以及Zookeeper
06RPC - netty实现RPC以及Zookeeper
65 0
|
编解码 架构师 网络协议
美团架构师熬夜整理:Netty权威指南2.0版+英雄传说项目
什么?你现在会觉得使用Netty编程的难度和工作量大了吗?Netty是一个令人惊讶的项目,在短短几年内成为众多Java高并发异步通信的首选框架。
130 0
|
前端开发 JavaScript Java
Seata 高性能RPC通信的实现基石-Netty篇
Netty是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络 IO 程序。
186 0
|
JSON Java 关系型数据库
JAVA基于Swing和Netty,仿QQ界面聊天小项目
先说一下这个小项目也算是我在大学做得第一个应该算的上是的项目的项目,前前后后用了20天左右吧。先是用swing写好了仿QQ界面(界面很丑)最后逻辑实现都是后面断断续续加进去的。写这个项目之前没有很好的规划在逻辑实现方面与数据库逻辑交互过于频繁。走了很多的弯路
|
XML 存储 JSON
JAVA面试——Netty 与 RPC(二)
JAVA面试——Netty 与 RPC
143 0
JAVA面试——Netty 与 RPC(二)
|
XML 编解码 弹性计算
JAVA面试——Netty 与 RPC(一)
JAVA面试——Netty 与 RPC
225 0
JAVA面试——Netty 与 RPC(一)