Netty中序列化框架Protobuf的简单实现-阿里云开发者社区

开发者社区> 游客a74jvhcp7vclg> 正文

Netty中序列化框架Protobuf的简单实现

简介: Protobuf是一种平台无关、语言无关、可扩展且轻便高效的序列化数据结构的协议,可以用于网络通信和数据存储。
+关注继续查看


什么是protocol buffers

 Protobuf是一种平台无关、语言无关、可扩展且轻便高效的序列化数据结构的协议,可以用于网络通信和数据存储。

image.png

Protobuf入门

1.开发环境搭建

 protobuf现在官方的最新版本是3.7.x版本,https://github.com/protocolbuffers/protobuf/releases ,protobuf2和protobuf3版本区别还是蛮大的,hadoop中使用的就是protobuf来实现序列化的,我们在此处使用的版本是2.5,官网对于此版本已经没有下载链接了,我在百度云盘上提供有(windows,linux):

链接:https://pan.baidu.com/s/1kxhlNqlu2Z3_E65Zi7W7Pg

提取码:vcqh

定义proto文件

SubscribeReq.proto

package netty;
option java_package = "com.dpb.netty.codec.protobuf";
option java_outer_classname = "SubscribeReqProto";

message SubscribeReq{
 required int32 subReqID = 1;
 required string userName = 2;
 required string preductName = 3;
 repeated string address = 4;
}

SubscribeResp.proto

package netty;
option java_package = "com.dpb.netty.codec.protobuf";
option java_outer_classname = "SubscribeRespProto";

message SubscribeResp{
 required int32 subReqID = 1;
 required int32 respCode = 2;
 required string desc = 3;
}

生成java文件

 &esmp;cmd进入命令行模式,进入相关文件夹。

image.png

分别指向下面两条命令

C:\tools>protoc-2.5.0-windows-x86_64.exe --java_out=./ SubscribeReq.proto
C:\tools>protoc-2.5.0-windows-x86_64.exe --java_out=./ SubscribeResp.proto

在相关文件夹下会生成对于的java文件,将文件拷贝到eclipse工作空间中。

2.编解码案例

 演示protobuf编解码操作。

package com.dpb.netty.codec;

import java.util.ArrayList;
import java.util.List;

import com.dpb.netty.codec.protobuf.SubscribeReqProto;
import com.dpb.netty.codec.protobuf.SubscribeReqProto.SubscribeReq;
import com.google.protobuf.InvalidProtocolBufferException;
/**
 * protobuf 编解码操作案例
 * 
 * @author 波波烤鸭
 * @email dengpbs@163.com
 *
 */
public class TestSubscribeReqProto {

    /**
     * 编码
     * @return
     */
    private static byte[] encode(SubscribeReqProto.SubscribeReq req){
        return req.toByteArray();
    }
    
    /**
     * 解码
     * @param body
     * @return
     * @throws InvalidProtocolBufferException
     */
    private static SubscribeReqProto.SubscribeReq decode(byte[] body) 
            throws InvalidProtocolBufferException{
        return SubscribeReqProto.SubscribeReq.parseFrom(body);
    }
    
    /**
     * 构建SubscribeReq对象
     * @return
     */
    private static SubscribeReqProto.SubscribeReq createSubscribeReq(){
        SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();
        builder.setSubReqID(1);
        builder.setUserName("bobo");
        builder.setPreductName("Netty");
        List<String> address = new ArrayList<>();
        address.add("beijing");
        address.add("guangzhou");
        address.add("shezheng");
        builder.addAllAddress(address);
        return builder.build();
    }
    
    public static void main(String[] args) throws InvalidProtocolBufferException {
        SubscribeReqProto.SubscribeReq req = createSubscribeReq();
        System.out.println("编码前:"+req.toString());
        SubscribeReq req2 = decode(encode(req));
        System.out.println("编码后:"+req);
        System.out.println("编码后:"+req2);
        System.out.println(req2.equals(req));
    }
}

输出结果:

编码前:subReqID: 1
userName: "bobo"
preductName: "Netty"
address: "beijing"
address: "guangzhou"
address: "shezheng"

编码后:subReqID: 1
userName: "bobo"
preductName: "Netty"
address: "beijing"
address: "guangzhou"
address: "shezheng"

编码后:subReqID: 1
userName: "bobo"
preductName: "Netty"
address: "beijing"
address: "guangzhou"
address: "shezheng"

true

 通过结果我们发现编码前后的结果是一致的而且前后对象是等价的。

Netty中Protobuf案例

服务端程序

SubReqServer

package com.dpb.netty.codec;

import com.dpb.netty.codec.protobuf.SubscribeReqProto;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

/**
 * 图书订购服务端
 * @author 波波烤鸭
 * @email dengpbs@163.com
 *
 */
public class SubReqServer {
    
    private void bind(int port)throws Exception{
        // 配置服务端的NIO线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 100)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) {
                    // 处理半包问题
                    ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                    // 添加解码器
                    ch.pipeline().addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance()));
                    // 处理半包问题
                    ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                    // 添加编码器
                    ch.pipeline().addLast(new ProtobufEncoder());
                    ch.pipeline().addLast(new SubReqServerHandler());
                }
                });

            // 绑定端口,同步等待成功
            ChannelFuture f = b.bind(port).sync();

            // 等待服务端监听端口关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放线程池资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new SubReqServer().bind(8080);
    }
}

SubReqServerHandler

package com.dpb.netty.codec;

import com.dpb.netty.codec.protobuf.SubscribeReqProto;
import com.dpb.netty.codec.protobuf.SubscribeRespProto;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class SubReqServerHandler extends ChannelHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 获取消息
        SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq) msg;
        // 验证账号
        if ("bobo".equalsIgnoreCase(req.getUserName())) {
            System.out.println("Service accept client subscribe req : [" + req.toString() + "]");
            // 回写消息
            ctx.writeAndFlush(resp(req.getSubReqID()));
        }
    }

    private SubscribeRespProto.SubscribeResp resp(int subReqID) {
        SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp.newBuilder();
        builder.setSubReqID(subReqID);
        builder.setRespCode(0);
        builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address");
        return builder.build();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();// 发生异常,关闭链路
    }
}

客户端程序

SubReqClient

package com.dpb.netty.codec;

import com.dpb.netty.codec.protobuf.SubscribeRespProto;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

public class SubReqClient {

    public void connect(int port, String host) throws Exception {
        // 配置客户端NIO线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                            ch.pipeline().addLast(new ProtobufDecoder(SubscribeRespProto.SubscribeResp.getDefaultInstance()));
                            ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                            ch.pipeline().addLast(new ProtobufEncoder());
                            ch.pipeline().addLast(new SubReqClientHandler());
                        }
                    });

            // 发起异步连接操作
            ChannelFuture f = b.connect(host, port).sync();

            // 当代客户端链路关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放NIO线程组
            group.shutdownGracefully();
        }
    }

    /**
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        int port = 8080;
        new SubReqClient().connect(port, "127.0.0.1");
    }
}

SubReqClientHandler

package com.dpb.netty.codec;

import java.util.ArrayList;
import java.util.List;

import com.dpb.netty.codec.protobuf.SubscribeReqProto;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class SubReqClientHandler extends ChannelHandlerAdapter {

    /**
     * Creates a client-side handler.
     */
    public SubReqClientHandler() {
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        for (int i = 0; i < 10; i++) {
            ctx.write(subReq(i));
        }
        ctx.flush();
    }

    private SubscribeReqProto.SubscribeReq subReq(int i) {
        SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();
        builder.setSubReqID(i);
        builder.setUserName("bobo");
        builder.setPreductName("Netty Book For Protobuf");
        List<String> address = new ArrayList<>();
        address.add("NanJing");
        address.add("BeiJing");
        address.add("ShenZhen");
        builder.addAllAddress(address);
        return builder.build();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Receive server response : [" + msg + "]");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

测试

服务端的输出:

Service accept client subscribe req : [subReqID: 0
userName: "bobo"
preductName: "Netty Book For Protobuf"
address: "NanJing"
address: "BeiJing"
address: "ShenZhen"
]
.......
Service accept client subscribe req : [subReqID: 9
userName: "bobo"
preductName: "Netty Book For Protobuf"
address: "NanJing"
address: "BeiJing"
address: "ShenZhen"
]

客户端的输出:

Receive server response : [subReqID: 0
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
....
Receive server response : [subReqID: 9
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]

 运行结果表明,我们基于Netty protobuf编解码框架开发的案例可以正常工作,利用Netty提供的Protobuf编解码能力,我们在不需要了解Protobuf实现和使用细节的情况下就能轻松支持Protobuf编解码,可以方便地实现跨语言的远程服务调用和与周边异构系统进行通信对接。


版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
计算机基础2 | 学习笔记
快速学习计算机基础2。
11 0
计算机基础1 | 学习笔记
快速学习计算机基础1。
9 0
计算机基础3 | 学习笔记
快速学习计算机基础3。
11 0
Linux 命令执行过程 | 学习笔记
快速学习 Linux 命令执行过程。
10 0
PG+MySQL第9课-实时精准营销
通常业务场景会涉及基于标签条件圈选目标客户、基于用户特征值扩选相似人群、群体用户画像分析这些技术,本文将围绕这三个场景去介绍在实施精准营销里面的PG数据库的使用
8 0
冬季实战营第一期学习报告
通过五天学习与实操,对ECS云服务器入门、快速搭建LAMP环境、部署MySQL数据库、回顾搭建Docker环境和Spring Boot以及使用PolarDB和ECS搭建门户网站操作,对本期学习与实操的认识。
9 0
Java classloader详解
Java程序并不是一个可执行文件,而是由很多的Java类组成,其运行是由JVM来控制的。而JVM从内存中查找到类,而真正将类加载进内存的就是ClassLoader,可以说我们每天都在接触ClassLoader,但是很多时候我们没有明白其执行的流程和原理。
8 0
Redis高可用架构演进
Redis是目前使用最广泛的缓存程序之一,也被应用于多种场景,例如数据缓存、分布式锁等,Redis官方提供了多种部署架构,以满足不同应用场景下对于高可用和扩展性的要求。
9 0
MySQL高可用架构演进
MySQL是数据库领域当之无愧的霸主之一,其在各行各业被广泛应用,随着广泛使用,对于MySQL本身的高可用性的要求就是不可避免的话题,而MySQL的高可用方案也随着MySQL功能的完善经历了多次升级,本文将对MySQL的各种高可用架构进行分析,以此来了解架构的演进。
11 0
系统安装前准备 | 学习笔记
快速学习系统安装前准备。
7 0
+关注
游客a74jvhcp7vclg
10余年开发架构经验,同时乐于技术分享!
332
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
《2021云上架构与运维峰会演讲合集》
立即下载
《零基础CSS入门教程》
立即下载
《零基础HTML入门教程》
立即下载