Netty中序列化框架Protobuf的简单实现

简介: Protobuf是一种平台无关、语言无关、可扩展且轻便高效的序列化数据结构的协议,可以用于网络通信和数据存储。


什么是protocol buffers

 Protobuf是一种平台无关、语言无关、可扩展且轻便高效的序列化数据结构的协议,可以用于网络通信和数据存储。

image.png

Protobuf入门

1.开发环境搭建

 protobuf现在官方的最新版本是3.7.x版本,https://github.com/protocolbuffers/protobuf/releases ,protobuf2和protobuf3版本区别还是蛮大的,hadoop中使用的就是protobuf来实现序列化的,我们在此处使用的版本是2.5,官网对于此版本已经没有下载链接了,我在百度云盘上提供有(windows,linux):

链接:https://pan.baidu.com/s/1kxhlNqlu2Z3_E65Zi7W7Pg

提取码:vcqh

定义proto文件

SubscribeReq.proto

package netty;
option java_package = "com.dpb.netty.codec.protobuf";
option java_outer_classname = "SubscribeReqProto";
message SubscribeReq{
 required int32 subReqID = 1;
 required string userName = 2;
 required string preductName = 3;
 repeated string address = 4;
}

SubscribeResp.proto

package netty;
option java_package = "com.dpb.netty.codec.protobuf";
option java_outer_classname = "SubscribeRespProto";
message SubscribeResp{
 required int32 subReqID = 1;
 required int32 respCode = 2;
 required string desc = 3;
}

生成java文件

 &esmp;cmd进入命令行模式,进入相关文件夹。

image.png

分别指向下面两条命令

C:\tools>protoc-2.5.0-windows-x86_64.exe --java_out=./ SubscribeReq.proto
C:\tools>protoc-2.5.0-windows-x86_64.exe --java_out=./ SubscribeResp.proto

在相关文件夹下会生成对于的java文件,将文件拷贝到eclipse工作空间中。

2.编解码案例

 演示protobuf编解码操作。

package com.dpb.netty.codec;
import java.util.ArrayList;
import java.util.List;
import com.dpb.netty.codec.protobuf.SubscribeReqProto;
import com.dpb.netty.codec.protobuf.SubscribeReqProto.SubscribeReq;
import com.google.protobuf.InvalidProtocolBufferException;
/**
 * protobuf 编解码操作案例
 * 
 * @author 波波烤鸭
 * @email dengpbs@163.com
 *
 */
public class TestSubscribeReqProto {
  /**
   * 编码
   * @return
   */
  private static byte[] encode(SubscribeReqProto.SubscribeReq req){
    return req.toByteArray();
  }
  /**
   * 解码
   * @param body
   * @return
   * @throws InvalidProtocolBufferException
   */
  private static SubscribeReqProto.SubscribeReq decode(byte[] body) 
      throws InvalidProtocolBufferException{
    return SubscribeReqProto.SubscribeReq.parseFrom(body);
  }
  /**
   * 构建SubscribeReq对象
   * @return
   */
  private static SubscribeReqProto.SubscribeReq createSubscribeReq(){
    SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();
    builder.setSubReqID(1);
    builder.setUserName("bobo");
    builder.setPreductName("Netty");
    List<String> address = new ArrayList<>();
    address.add("beijing");
    address.add("guangzhou");
    address.add("shezheng");
    builder.addAllAddress(address);
    return builder.build();
  }
  public static void main(String[] args) throws InvalidProtocolBufferException {
    SubscribeReqProto.SubscribeReq req = createSubscribeReq();
    System.out.println("编码前:"+req.toString());
    SubscribeReq req2 = decode(encode(req));
    System.out.println("编码后:"+req);
    System.out.println("编码后:"+req2);
    System.out.println(req2.equals(req));
  }
}

输出结果:

编码前:subReqID: 1
userName: "bobo"
preductName: "Netty"
address: "beijing"
address: "guangzhou"
address: "shezheng"
编码后:subReqID: 1
userName: "bobo"
preductName: "Netty"
address: "beijing"
address: "guangzhou"
address: "shezheng"
编码后:subReqID: 1
userName: "bobo"
preductName: "Netty"
address: "beijing"
address: "guangzhou"
address: "shezheng"
true

 通过结果我们发现编码前后的结果是一致的而且前后对象是等价的。

Netty中Protobuf案例

服务端程序

SubReqServer

package com.dpb.netty.codec;
import com.dpb.netty.codec.protobuf.SubscribeReqProto;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
 * 图书订购服务端
 * @author 波波烤鸭
 * @email dengpbs@163.com
 *
 */
public class SubReqServer {
  private void bind(int port)throws Exception{
    // 配置服务端的NIO线程组
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
          .channel(NioServerSocketChannel.class)
          .option(ChannelOption.SO_BACKLOG, 100)
          .handler(new LoggingHandler(LogLevel.INFO))
          .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) {
          // 处理半包问题
            ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
            // 添加解码器
            ch.pipeline().addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance()));
            // 处理半包问题
            ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
            // 添加编码器
            ch.pipeline().addLast(new ProtobufEncoder());
            ch.pipeline().addLast(new SubReqServerHandler());
        }
          });
        // 绑定端口,同步等待成功
        ChannelFuture f = b.bind(port).sync();
        // 等待服务端监听端口关闭
        f.channel().closeFuture().sync();
    } finally {
        // 优雅退出,释放线程池资源
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
  }
  public static void main(String[] args) throws Exception {
    new SubReqServer().bind(8080);
  }
}

SubReqServerHandler

package com.dpb.netty.codec;
import com.dpb.netty.codec.protobuf.SubscribeReqProto;
import com.dpb.netty.codec.protobuf.SubscribeRespProto;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class SubReqServerHandler extends ChannelHandlerAdapter {
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 获取消息
    SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq) msg;
    // 验证账号
    if ("bobo".equalsIgnoreCase(req.getUserName())) {
      System.out.println("Service accept client subscribe req : [" + req.toString() + "]");
      // 回写消息
      ctx.writeAndFlush(resp(req.getSubReqID()));
    }
  }
  private SubscribeRespProto.SubscribeResp resp(int subReqID) {
    SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp.newBuilder();
    builder.setSubReqID(subReqID);
    builder.setRespCode(0);
    builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address");
    return builder.build();
  }
  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();
    ctx.close();// 发生异常,关闭链路
  }
}

客户端程序

SubReqClient

package com.dpb.netty.codec;
import com.dpb.netty.codec.protobuf.SubscribeRespProto;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
public class SubReqClient {
  public void connect(int port, String host) throws Exception {
    // 配置客户端NIO线程组
    EventLoopGroup group = new NioEventLoopGroup();
    try {
      Bootstrap b = new Bootstrap();
      b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
          .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
              ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
              ch.pipeline().addLast(new ProtobufDecoder(SubscribeRespProto.SubscribeResp.getDefaultInstance()));
              ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
              ch.pipeline().addLast(new ProtobufEncoder());
              ch.pipeline().addLast(new SubReqClientHandler());
            }
          });
      // 发起异步连接操作
      ChannelFuture f = b.connect(host, port).sync();
      // 当代客户端链路关闭
      f.channel().closeFuture().sync();
    } finally {
      // 优雅退出,释放NIO线程组
      group.shutdownGracefully();
    }
  }
  /**
   * @param args
   * @throws Exception
   */
  public static void main(String[] args) throws Exception {
    int port = 8080;
    new SubReqClient().connect(port, "127.0.0.1");
  }
}

SubReqClientHandler

package com.dpb.netty.codec;
import java.util.ArrayList;
import java.util.List;
import com.dpb.netty.codec.protobuf.SubscribeReqProto;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class SubReqClientHandler extends ChannelHandlerAdapter {
  /**
   * Creates a client-side handler.
   */
  public SubReqClientHandler() {
  }
  @Override
  public void channelActive(ChannelHandlerContext ctx) {
    for (int i = 0; i < 10; i++) {
      ctx.write(subReq(i));
    }
    ctx.flush();
  }
  private SubscribeReqProto.SubscribeReq subReq(int i) {
    SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();
    builder.setSubReqID(i);
    builder.setUserName("bobo");
    builder.setPreductName("Netty Book For Protobuf");
    List<String> address = new ArrayList<>();
    address.add("NanJing");
    address.add("BeiJing");
    address.add("ShenZhen");
    builder.addAllAddress(address);
    return builder.build();
  }
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    System.out.println("Receive server response : [" + msg + "]");
  }
  @Override
  public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    ctx.flush();
  }
  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();
    ctx.close();
  }
}

测试

服务端的输出:

Service accept client subscribe req : [subReqID: 0
userName: "bobo"
preductName: "Netty Book For Protobuf"
address: "NanJing"
address: "BeiJing"
address: "ShenZhen"
]
.......
Service accept client subscribe req : [subReqID: 9
userName: "bobo"
preductName: "Netty Book For Protobuf"
address: "NanJing"
address: "BeiJing"
address: "ShenZhen"
]

客户端的输出:

Receive server response : [subReqID: 0
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
....
Receive server response : [subReqID: 9
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]

 运行结果表明,我们基于Netty protobuf编解码框架开发的案例可以正常工作,利用Netty提供的Protobuf编解码能力,我们在不需要了解Protobuf实现和使用细节的情况下就能轻松支持Protobuf编解码,可以方便地实现跨语言的远程服务调用和与周边异构系统进行通信对接。


相关文章
|
3月前
|
NoSQL 前端开发 Java
Lettuce的特性和内部实现问题之Lettuce基于Netty框架实现的问题如何解决
Lettuce的特性和内部实现问题之Lettuce基于Netty框架实现的问题如何解决
|
2月前
|
编解码 分布式计算 网络协议
Netty高性能网络框架(一)
Netty高性能网络框架(一)
|
1月前
|
NoSQL Java Redis
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
这篇文章介绍了Redis的基本命令,并展示了如何使用Netty框架直接与Redis服务器进行通信,包括设置Netty客户端、编写处理程序以及初始化Channel的完整示例代码。
50 1
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
|
2月前
|
设计模式 缓存 算法
Netty框架的重要性
Netty框架的重要性
|
3月前
|
Java
JDK序列化原理问题之Hessian框架不支持writeObject/readObject方法如何解决
JDK序列化原理问题之Hessian框架不支持writeObject/readObject方法如何解决
|
3月前
|
前端开发 Java Spring
springboot 整合 netty框架, 实现 心跳检测,自动重连
springboot 整合 netty框架, 实现 心跳检测,自动重连
|
3月前
|
编解码 NoSQL Redis
(十一)Netty实战篇:基于Netty框架打造一款高性能的IM即时通讯程序
关于Netty网络框架的内容,前面已经讲了两个章节,但总归来说难以真正掌握,毕竟只是对其中一个个组件进行讲解,很难让诸位将其串起来形成一条线,所以本章中则会结合实战案例,对Netty进行更深层次的学习与掌握,实战案例也并不难,一个非常朴素的IM聊天程序。
|
3月前
|
Java 应用服务中间件 Linux
(九)Java网络编程无冕之王-这回把大名鼎鼎的Netty框架一网打尽!
现如今的开发环境中,分布式/微服务架构大行其道,而分布式/微服务的根基在于网络编程,而Netty恰恰是Java网络编程领域的无冕之王。Netty这个框架相信大家定然听说过,其在Java网络编程中的地位,好比JavaEE中的Spring。
143 3
|
3月前
|
开发框架 缓存 前端开发
基于SqlSugar的开发框架循序渐进介绍(24)-- 使用Serialize.Linq对Lambda表达式进行序列化和反序列化
基于SqlSugar的开发框架循序渐进介绍(24)-- 使用Serialize.Linq对Lambda表达式进行序列化和反序列化
|
3月前
|
XML 存储 JSON
(十二)探索高性能通信与RPC框架基石:Json、ProtoBuf、Hessian序列化详解
如今这个分布式风靡的时代,网络通信技术,是每位技术人员必须掌握的技能,因为无论是哪种分布式技术,都离不开心跳、选举、节点感知、数据同步……等机制,而究其根本,这些技术的本质都是网络间的数据交互。正因如此,想要构建一个高性能的分布式组件/系统,不得不思考一个问题:怎么才能让数据传输的速度更快?