Netty中序列化框架Protobuf的简单实现

简介: Protobuf是一种平台无关、语言无关、可扩展且轻便高效的序列化数据结构的协议,可以用于网络通信和数据存储。


什么是protocol buffers

 Protobuf是一种平台无关、语言无关、可扩展且轻便高效的序列化数据结构的协议,可以用于网络通信和数据存储。

image.png

Protobuf入门

1.开发环境搭建

 protobuf现在官方的最新版本是3.7.x版本,https://github.com/protocolbuffers/protobuf/releases ,protobuf2和protobuf3版本区别还是蛮大的,hadoop中使用的就是protobuf来实现序列化的,我们在此处使用的版本是2.5,官网对于此版本已经没有下载链接了,我在百度云盘上提供有(windows,linux):

链接:https://pan.baidu.com/s/1kxhlNqlu2Z3_E65Zi7W7Pg

提取码:vcqh

定义proto文件

SubscribeReq.proto

package netty;
option java_package = "com.dpb.netty.codec.protobuf";
option java_outer_classname = "SubscribeReqProto";
message SubscribeReq{
 required int32 subReqID = 1;
 required string userName = 2;
 required string preductName = 3;
 repeated string address = 4;
}

SubscribeResp.proto

package netty;
option java_package = "com.dpb.netty.codec.protobuf";
option java_outer_classname = "SubscribeRespProto";
message SubscribeResp{
 required int32 subReqID = 1;
 required int32 respCode = 2;
 required string desc = 3;
}

生成java文件

 &esmp;cmd进入命令行模式,进入相关文件夹。

image.png

分别指向下面两条命令

C:\tools>protoc-2.5.0-windows-x86_64.exe --java_out=./ SubscribeReq.proto
C:\tools>protoc-2.5.0-windows-x86_64.exe --java_out=./ SubscribeResp.proto

在相关文件夹下会生成对于的java文件,将文件拷贝到eclipse工作空间中。

2.编解码案例

 演示protobuf编解码操作。

package com.dpb.netty.codec;
import java.util.ArrayList;
import java.util.List;
import com.dpb.netty.codec.protobuf.SubscribeReqProto;
import com.dpb.netty.codec.protobuf.SubscribeReqProto.SubscribeReq;
import com.google.protobuf.InvalidProtocolBufferException;
/**
 * protobuf 编解码操作案例
 * 
 * @author 波波烤鸭
 * @email dengpbs@163.com
 *
 */
public class TestSubscribeReqProto {
  /**
   * 编码
   * @return
   */
  private static byte[] encode(SubscribeReqProto.SubscribeReq req){
    return req.toByteArray();
  }
  /**
   * 解码
   * @param body
   * @return
   * @throws InvalidProtocolBufferException
   */
  private static SubscribeReqProto.SubscribeReq decode(byte[] body) 
      throws InvalidProtocolBufferException{
    return SubscribeReqProto.SubscribeReq.parseFrom(body);
  }
  /**
   * 构建SubscribeReq对象
   * @return
   */
  private static SubscribeReqProto.SubscribeReq createSubscribeReq(){
    SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();
    builder.setSubReqID(1);
    builder.setUserName("bobo");
    builder.setPreductName("Netty");
    List<String> address = new ArrayList<>();
    address.add("beijing");
    address.add("guangzhou");
    address.add("shezheng");
    builder.addAllAddress(address);
    return builder.build();
  }
  public static void main(String[] args) throws InvalidProtocolBufferException {
    SubscribeReqProto.SubscribeReq req = createSubscribeReq();
    System.out.println("编码前:"+req.toString());
    SubscribeReq req2 = decode(encode(req));
    System.out.println("编码后:"+req);
    System.out.println("编码后:"+req2);
    System.out.println(req2.equals(req));
  }
}

输出结果:

编码前:subReqID: 1
userName: "bobo"
preductName: "Netty"
address: "beijing"
address: "guangzhou"
address: "shezheng"
编码后:subReqID: 1
userName: "bobo"
preductName: "Netty"
address: "beijing"
address: "guangzhou"
address: "shezheng"
编码后:subReqID: 1
userName: "bobo"
preductName: "Netty"
address: "beijing"
address: "guangzhou"
address: "shezheng"
true

 通过结果我们发现编码前后的结果是一致的而且前后对象是等价的。

Netty中Protobuf案例

服务端程序

SubReqServer

package com.dpb.netty.codec;
import com.dpb.netty.codec.protobuf.SubscribeReqProto;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
 * 图书订购服务端
 * @author 波波烤鸭
 * @email dengpbs@163.com
 *
 */
public class SubReqServer {
  private void bind(int port)throws Exception{
    // 配置服务端的NIO线程组
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
          .channel(NioServerSocketChannel.class)
          .option(ChannelOption.SO_BACKLOG, 100)
          .handler(new LoggingHandler(LogLevel.INFO))
          .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) {
          // 处理半包问题
            ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
            // 添加解码器
            ch.pipeline().addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance()));
            // 处理半包问题
            ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
            // 添加编码器
            ch.pipeline().addLast(new ProtobufEncoder());
            ch.pipeline().addLast(new SubReqServerHandler());
        }
          });
        // 绑定端口,同步等待成功
        ChannelFuture f = b.bind(port).sync();
        // 等待服务端监听端口关闭
        f.channel().closeFuture().sync();
    } finally {
        // 优雅退出,释放线程池资源
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
  }
  public static void main(String[] args) throws Exception {
    new SubReqServer().bind(8080);
  }
}

SubReqServerHandler

package com.dpb.netty.codec;
import com.dpb.netty.codec.protobuf.SubscribeReqProto;
import com.dpb.netty.codec.protobuf.SubscribeRespProto;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class SubReqServerHandler extends ChannelHandlerAdapter {
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 获取消息
    SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq) msg;
    // 验证账号
    if ("bobo".equalsIgnoreCase(req.getUserName())) {
      System.out.println("Service accept client subscribe req : [" + req.toString() + "]");
      // 回写消息
      ctx.writeAndFlush(resp(req.getSubReqID()));
    }
  }
  private SubscribeRespProto.SubscribeResp resp(int subReqID) {
    SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp.newBuilder();
    builder.setSubReqID(subReqID);
    builder.setRespCode(0);
    builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address");
    return builder.build();
  }
  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();
    ctx.close();// 发生异常,关闭链路
  }
}

客户端程序

SubReqClient

package com.dpb.netty.codec;
import com.dpb.netty.codec.protobuf.SubscribeRespProto;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
public class SubReqClient {
  public void connect(int port, String host) throws Exception {
    // 配置客户端NIO线程组
    EventLoopGroup group = new NioEventLoopGroup();
    try {
      Bootstrap b = new Bootstrap();
      b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
          .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
              ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
              ch.pipeline().addLast(new ProtobufDecoder(SubscribeRespProto.SubscribeResp.getDefaultInstance()));
              ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
              ch.pipeline().addLast(new ProtobufEncoder());
              ch.pipeline().addLast(new SubReqClientHandler());
            }
          });
      // 发起异步连接操作
      ChannelFuture f = b.connect(host, port).sync();
      // 当代客户端链路关闭
      f.channel().closeFuture().sync();
    } finally {
      // 优雅退出,释放NIO线程组
      group.shutdownGracefully();
    }
  }
  /**
   * @param args
   * @throws Exception
   */
  public static void main(String[] args) throws Exception {
    int port = 8080;
    new SubReqClient().connect(port, "127.0.0.1");
  }
}

SubReqClientHandler

package com.dpb.netty.codec;
import java.util.ArrayList;
import java.util.List;
import com.dpb.netty.codec.protobuf.SubscribeReqProto;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class SubReqClientHandler extends ChannelHandlerAdapter {
  /**
   * Creates a client-side handler.
   */
  public SubReqClientHandler() {
  }
  @Override
  public void channelActive(ChannelHandlerContext ctx) {
    for (int i = 0; i < 10; i++) {
      ctx.write(subReq(i));
    }
    ctx.flush();
  }
  private SubscribeReqProto.SubscribeReq subReq(int i) {
    SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();
    builder.setSubReqID(i);
    builder.setUserName("bobo");
    builder.setPreductName("Netty Book For Protobuf");
    List<String> address = new ArrayList<>();
    address.add("NanJing");
    address.add("BeiJing");
    address.add("ShenZhen");
    builder.addAllAddress(address);
    return builder.build();
  }
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    System.out.println("Receive server response : [" + msg + "]");
  }
  @Override
  public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    ctx.flush();
  }
  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();
    ctx.close();
  }
}

测试

服务端的输出:

Service accept client subscribe req : [subReqID: 0
userName: "bobo"
preductName: "Netty Book For Protobuf"
address: "NanJing"
address: "BeiJing"
address: "ShenZhen"
]
.......
Service accept client subscribe req : [subReqID: 9
userName: "bobo"
preductName: "Netty Book For Protobuf"
address: "NanJing"
address: "BeiJing"
address: "ShenZhen"
]

客户端的输出:

Receive server response : [subReqID: 0
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
....
Receive server response : [subReqID: 9
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]

 运行结果表明,我们基于Netty protobuf编解码框架开发的案例可以正常工作,利用Netty提供的Protobuf编解码能力,我们在不需要了解Protobuf实现和使用细节的情况下就能轻松支持Protobuf编解码,可以方便地实现跨语言的远程服务调用和与周边异构系统进行通信对接。


相关文章
|
6月前
|
缓存 网络协议 Dubbo
异步编程 - 12 异步、基于事件驱动的网络编程框架 Netty
异步编程 - 12 异步、基于事件驱动的网络编程框架 Netty
45 0
|
存储 设计模式 网络协议
Netty网络框架(一)
Netty网络框架
30 1
|
1月前
|
前端开发 Java 数据库连接
探索Java中最常用的框架:Spring、Spring MVC、Spring Boot、MyBatis和Netty
探索Java中最常用的框架:Spring、Spring MVC、Spring Boot、MyBatis和Netty
|
2月前
|
C++
[序列化协议] --- protobuf
[序列化协议] --- protobuf
20 0
|
2月前
|
前端开发 Java 数据库连接
认识Java中最常用的框架:Spring、Spring MVC、Spring Boot、MyBatis和Netty
Spring框架 Spring是一个轻量级的开源框架,用于构建企业级应用。它提供了广泛的功能,包括依赖注入、面向切面编程、事务管理、消息传递等。Spring的核心思想是控制反转(IoC)和面向切面编程(AOP)。
76 3
|
3月前
|
负载均衡 Java 调度
【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(Dispatcher和EventListener)(下)
经过阅读《【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(附通信协议和代码)(上)》,相信您已经对网络通信框架的网络通信层的实现原理和协议模型有了一定的认识和理解。
40 0
【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(Dispatcher和EventListener)(下)
|
3月前
|
Dubbo Java 应用服务中间件
【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(附通信协议和代码)(上)
今天,我要向大家实现一个基于Netty实现的高性能远程通信框架!这个框架利用了 Netty 的强大功能,提供了快速、可靠的远程通信能力。 无论是构建大规模微服务架构还是实现分布式计算,这个分布式通信框架都是一个不可或缺的利器。
61 2
【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(附通信协议和代码)(上)
|
3月前
|
前端开发 网络协议 Dubbo
Netty - 回顾Netty高性能原理和框架架构解析
Netty - 回顾Netty高性能原理和框架架构解析
248 0
|
4月前
protobuf 序列化和反序列化
protobuf 序列化和反序列化
21 0
|
4月前
|
存储 XML JSON
日常小知识点之序列化结构(protobuf使用及简单原理)
日常小知识点之序列化结构(protobuf使用及简单原理)
59 0