简单的Java实现Netty进行通信

简介:

使用Java搭建一个简单的Netty通信例子

看过dubbo源码的同学应该都清楚,使用dubbo协议的底层通信是使用的netty进行交互,而最近看了dubbo的Netty部分后,自己写了个简单的Netty通信例子。

本文源地址:实现Netty进行通信

准备
工程截图

模块详解

rpc-common
rpc-common作为各个模块都需使用的模块,工程中出现的是一些通信时请求的参数以及返回的参数,还有一些序列化的工具。

rpc-client
rpc-client中目前只是单单的一个NettyClient启动类。

rpc-server
rpc-client中目前也只是单单的一个NettyServer服务启动类。

需要的依赖

目前所有的依赖项都出现在 rpc-common 下的 pom.xml中。

<!-- Netty -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.10.Final</version>
</dependency>

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.25</version>
</dependency>

<!-- Protostuff -->
<dependency>
    <groupId>com.dyuproject.protostuff</groupId>
    <artifactId>protostuff-core</artifactId>
    <version>1.0.9</version>
</dependency>

<dependency>
    <groupId>com.dyuproject.protostuff</groupId>
    <artifactId>protostuff-runtime</artifactId>
    <version>1.0.9</version>
</dependency>

<!-- Objenesis -->
<dependency>
    <groupId>org.objenesis</groupId>
    <artifactId>objenesis</artifactId>
    <version>2.1</version>
</dependency>

<!-- fastjson -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.38</version>
</dependency>


实现
首先我们在common中先定义本次的Request和Response的基类对象。

public class Request {

private String requestId;

private Object parameter;

public String getRequestId() {
    return requestId;
}

public void setRequestId(String requestId) {
    this.requestId = requestId;
}

public Object getParameter() {
    return parameter;
}

public void setParameter(Object parameter) {
    this.parameter = parameter;
}

}

public class Response {

private String requestId;

private Object result;

public String getRequestId() {
    return requestId;
}

public void setRequestId(String requestId) {
    this.requestId = requestId;
}

public Object getResult() {
    return result;
}

public void setResult(Object result) {
    this.result = result;
}

}

使用fastJson进行本次序列化

Netty对象的序列化转换很好懂, ByteToMessageDecoder 和 MessageToByteEncoder 分别只要继承它们,重写方法后,获取到Object和Byte,各自转换就OK。

不过如果是有要用到生产上的同学,建议不要使用 fastJson,因为它的漏洞补丁真的是太多了,可以使用google的 protostuff。

public class RpcDecoder extends ByteToMessageDecoder {

// 目标对象类型进行解码
private Class<?> target;

public RpcDecoder(Class target) {
    this.target = target;
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    if (in.readableBytes() < 4) {   // 不够长度丢弃
        return;
    }
    in.markReaderIndex();   // 标记一下当前的readIndex的位置
    int dataLength = in.readInt();  // 读取传送过来的消息的长度。ByteBuf 的readInt()方法会让他的readIndex增加4

    if (in.readableBytes() < dataLength) {  // 读到的消息体长度如果小于我们传送过来的消息长度,则resetReaderIndex. 这个配合markReaderIndex使用的。把readIndex重置到mark的地方
        in.resetReaderIndex();
        return;
    }
    byte[] data = new byte[dataLength];
    in.readBytes(data);

    Object obj = JSON.parseObject(data, target);    // 将byte数据转化为我们需要的对象
    out.add(obj);
}

}

public class RpcEncoder extends MessageToByteEncoder {

//目标对象类型进行编码
private Class<?> target;

public RpcEncoder(Class target) {
    this.target = target;
}

@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
    if (target.isInstance(msg)) {
        byte[] data = JSON.toJSONBytes(msg);    // 使用fastJson将对象转换为byte
        out.writeInt(data.length);  // 先将消息长度写入,也就是消息头
        out.writeBytes(data);   // 消息体中包含我们要发送的数据
    }
}

}
NetyServer

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    Request request = (Request) msg;

    System.out.println("Client Data:" + JSON.toJSONString(request));

    Response response = new Response();
    response.setRequestId(request.getRequestId());
    response.setResult("Hello Client !");

    // client接收到信息后主动关闭掉连接
    ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    ctx.close();
}

}

public class NettyServer {

private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);

private String ip;
private int port;

public NettyServer(String ip, int port) {
    this.ip = ip;
    this.port = port;
}

public void server() throws Exception {

    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();

    try {

        final ServerBootstrap serverBootstrap = new ServerBootstrap();

        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_SNDBUF, 32 * 1024)
                .option(ChannelOption.SO_RCVBUF, 32 * 1024)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new RpcDecoder(Request.class))
                                .addLast(new RpcEncoder(Response.class))
                                .addLast(new NettyServerHandler());
                    }
                });

        serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);  // 开启长连接

        ChannelFuture future = serverBootstrap.bind(ip, port).sync();

// if (future.isSuccess()) {
//
// new Register().register("/yanzhenyidai/com.yanzhenyidai.server", ip + ":" + port);
// }

        future.channel().closeFuture().sync();
    } finally {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

public static void main(String[] args) throws Exception {
    new NettyServer("127.0.0.1", 20000).server();
}

}

关键名词:

EventLoopGroup
workerGroup
bossGroup
Server端的EventLoopGroup分为两个,一般workerGroup作为处理请求,bossGroup作为接收请求。

ChannelOption
SO_BACKLOG
SO_SNDBUF
SO_RCVBUF
SO_KEEPALIVE
以上四个常量作为TCP连接中的属性。

ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
NettyServerHandler中出现的 ChannelFutureListener.CLOSE ,作为Server端主动关闭与Client端的通信,如果没有主动Close,那么NettyClient将会一直处于阻塞状态,得不到NettyServer的返回信息。

NettyClient

public class NettyClient extends SimpleChannelInboundHandler {

private final String ip;
private final int port;
private Response response;

public NettyClient(String ip, int port) {
    this.ip = ip;
    this.port = port;
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    ctx.close();
}

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) throws Exception {
    this.response = response;
}

public Response client(Request request) throws Exception {
    EventLoopGroup group = new NioEventLoopGroup();

    try {

        // 创建并初始化 Netty 客户端 Bootstrap 对象
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group);
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel channel) throws Exception {
                ChannelPipeline pipeline = channel.pipeline();

                pipeline.addLast(new RpcDecoder(Response.class));
                pipeline.addLast(new RpcEncoder(Request.class));
                pipeline.addLast(NettyClient.this);
            }
        });
        bootstrap.option(ChannelOption.TCP_NODELAY, true);

// String[] discover = new Discover().discover("/yanzhenyidai/com.yanzhenyidai.server").split(":");

        // 连接 RPC 服务器
        ChannelFuture future = bootstrap.connect(ip, port).sync();

        // 写入 RPC 请求数据并关闭连接
        Channel channel = future.channel();

        channel.writeAndFlush(request).sync();
        channel.closeFuture().sync();

        return response;
    } finally {
        group.shutdownGracefully();
    }
}

public static void main(String[] args) throws Exception {
    Request request = new Request();
    request.setRequestId(UUID.randomUUID().toString());
    request.setParameter("Hello Server !");
    System.out.println(JSON.toJSONString(new NettyClient("127.0.0.1", 30000).client(request)));
}

}

测试

如果以上所有内容都准备就绪,那么就可以进行调试了。

启动顺序,先启动NettyServer,再启动NettyClient。

总结
记得刚出来工作时,有工作很多年的同事问我了不了解Netty,当时工作太短,直说听过Putty,现在回想起来真的挺丢人的,哈哈。😋

Netty作为通信框架,如果你了解TCP,而且项目中有类似传输信息的需求,又不想集成HTTP或者Socket,那么Netty真的挺实用的。

参考资料:

Dubbo-Netty

Netty.io

本项目Github地址:Netty-RPC

原文地址https://www.cnblogs.com/yanzhenyidai/p/12901527.html

相关文章
|
3月前
|
Java 调度
[Java]线程生命周期与线程通信
本文详细探讨了线程生命周期与线程通信。文章首先分析了线程的五个基本状态及其转换过程,结合JDK1.8版本的特点进行了深入讲解。接着,通过多个实例介绍了线程通信的几种实现方式,包括使用`volatile`关键字、`Object`类的`wait()`和`notify()`方法、`CountDownLatch`、`ReentrantLock`结合`Condition`以及`LockSupport`等工具。全文旨在帮助读者理解线程管理的核心概念和技术细节。
47 1
[Java]线程生命周期与线程通信
|
2月前
|
Java
JAVA多线程通信:为何wait()与notify()如此重要?
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是实现线程间通信的核心机制。它们通过基于锁的方式,使线程在条件不满足时进入休眠状态,并在条件满足时被唤醒,从而确保数据一致性和同步。相比其他通信方式,如忙等待,这些方法更高效灵活。 示例代码展示了如何在生产者-消费者模型中使用这些方法实现线程间的协调和同步。
46 3
|
3月前
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
2月前
|
消息中间件 缓存 Java
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
零拷贝技术 Zero-Copy 是指计算机执行操作时,可以直接从源(如文件或网络套接字)将数据传输到目标缓冲区, 而不需要 CPU 先将数据从某处内存复制到另一个特定区域,从而减少上下文切换以及 CPU 的拷贝时间。
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
|
3月前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
35 1
|
3月前
|
安全 Java 开发者
Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用
本文深入解析了Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用。通过示例代码展示了如何正确使用这些方法,并分享了最佳实践,帮助开发者避免常见陷阱,提高多线程程序的稳定性和效率。
71 1
|
3月前
|
Java
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是线程间通信的核心机制。
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是线程间通信的核心机制。它们通过基于锁的方式,使线程在条件不满足时进入休眠状态,并在条件成立时被唤醒,从而有效解决数据一致性和同步问题。本文通过对比其他通信机制,展示了 `wait()` 和 `notify()` 的优势,并通过生产者-消费者模型的示例代码,详细说明了其使用方法和重要性。
59 1
|
3月前
|
Java
|
3月前
|
Java
用java实现Client和Server之间的互相通信
本文介绍了如何使用Java实现客户端和服务器之间的通信,包括服务器端创建ServerSocket、接受客户端连接、读取和发送消息,以及客户端创建Socket连接、发送和接收消息的完整过程。
100 0
用java实现Client和Server之间的互相通信
|
4月前
|
传感器 网络协议 Java
三大硬核方式揭秘:Java如何与底层硬件和工业设备轻松通信!
大家好,我是V哥。最近与一位从事工业互联网项目的学员交流,启发我分享Java如何与底层硬件和工业设备通信。本文将介绍三种方法:1)使用`jLibModbus`库通过Modbus协议读取设备寄存器数据;2)使用JNI(Java Native Interface)直接访问硬件;3)使用`JSerialComm`库通过串口通信读取数据。每种方法都有详细步骤和示例代码,帮助你轻松实现与硬件设备的通信。无论是工业自动化还是物联网应用,这些方法都能派上用场。欢迎关注和支持!
178 0