Netty中序列化框架MessagePack的简单实现

简介: MessagePack是一个高效的二进制序列化框架,它像JSON一样支持不同语言间的数据交换,但是它的性能更快,序列化之后的码流也更小。MessagePack的特点如下: 编解码高效,性能高; 序列化之后码流小 支持跨语言


 MessagePack是一个高效的二进制序列化框架,它像JSON一样支持不同语言间的数据交换,但是它的性能更快,序列化之后的码流也更小。MessagePack的特点如下:

   编解码高效,性能高;

   序列化之后码流小

   支持跨语言

MessagePack使用

1.依赖

 使用maven构建项目

<dependency>
  <groupId>org.msgpack</groupId>
  <artifactId>msgpack</artifactId>
  <version>0.6.12</version>
</dependency>

2.创建编码和解码器

编码器

/**
 * @param ctx 上下文
 * @param msg 需要编码的对象
 * @param out 编码后的数据
 */
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
  MessagePack msgpack = new MessagePack();
  // 对对象进行序列化
  byte[] raw = msgpack.write(msg);
  // 返回序列化的数据
  out.writeBytes(raw);
}

解码器

/**
 * @param ctx 上下文
 * @param msg 需要解码的数据
 * @param out 解码列表
 */
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
  final byte[] array;
  final int length = msg.readableBytes();
  array = new byte[length];
  // 获取需要解码的字节数组
  msg.getBytes(msg.readerIndex(), array,0,length);
  MessagePack msgpack = new MessagePack();
  // 反序列化并将结果保存到了解码列表中
  out.add(msgpack.read(array));
}

3.客户端

EchoClient

/**
 * MsgPack 编解码器
 * 
 * @author 波波烤鸭
 * @email dengpbs@163.com
 *
 */
public class EchoClient {
  public static void main(String[] args) throws Exception {
    int port = 8080;
    if (args != null && args.length > 0) {
      try {
        port = Integer.valueOf(args[0]);
      } catch (NumberFormatException e) {
        // 采用默认值
      }
    }
    new EchoClient().connector(port, "127.0.0.1",10);
  }
  public void connector(int port, String host,final int sendNumber) throws Exception {
    // 配置客户端NIO线程组
    EventLoopGroup group = new NioEventLoopGroup();
    try {
      Bootstrap b = new Bootstrap();
      b.group(group).channel(NioSocketChannel.class)
              .option(ChannelOption.TCP_NODELAY, true)
              .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
              .handler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
          //这里设置通过增加包头表示报文长度来避免粘包
                ch.pipeline().addLast("frameDecoder",new LengthFieldBasedFrameDecoder(1024, 0, 2,0,2));
                //增加解码器
                ch.pipeline().addLast("msgpack decoder",new MsgpackDecoder());
                //这里设置读取报文的包头长度来避免粘包
                ch.pipeline().addLast("frameEncoder",new LengthFieldPrepender(2));
                //增加编码器
                ch.pipeline().addLast("msgpack encoder",new MsgpackEncoder());
          // 4.添加自定义的处理器
          ch.pipeline().addLast(new EchoClientHandler(sendNumber));
        }
      });
      // 发起异步连接操作
      ChannelFuture f = b.connect(host, port).sync();
      // 等待客户端链路关闭
      f.channel().closeFuture().sync();
    }catch(Exception e){
      e.printStackTrace();
    }  finally {
      // 优雅退出,释放NIO线程组
      group.shutdownGracefully();
    }
  }
}

EchoClientHandler

/**
 * DelimiterBasedFrameDecoder 案例
 *    自定义处理器
 * @author 波波烤鸭
 * @email dengpbs@163.com
 *
 */
public class EchoServerHandler extends ChannelHandlerAdapter{
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //UserInfo user = (UserInfo) msg;
    System.out.println("server receive the msgpack message :"+msg);
    //ctx.writeAndFlush(user);
    ctx.writeAndFlush(msg);
  }
  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    cause.printStackTrace();
    ctx.close(); // 发生异常关闭链路
  }
}

4.服务端

EchoServer

/**
 * MsgPack 编解码器
 *    服务端
 * @author 波波烤鸭
 * @email dengpbs@163.com
 *
 */
public class EchoServer {
  public void bind(int port) throws Exception {
    // 配置服务端的NIO线程组
    // 服务端接受客户端的连接
    NioEventLoopGroup bossGroup = new NioEventLoopGroup();
    // 进行SocketChannel的网络读写
    NioEventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
      ServerBootstrap b = new ServerBootstrap();
      b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
          .option(ChannelOption.SO_BACKLOG, 100)
          .handler(new LoggingHandler(LogLevel.INFO))
          .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
              ch.pipeline().addLast("frameDecoder",new LengthFieldBasedFrameDecoder(65535, 0, 2,0,2));
              // 添加msgpack的编码和解码器
              ch.pipeline().addLast("msgpack decoder",new MsgpackDecoder());
              ch.pipeline().addLast("frameEncoder",new LengthFieldPrepender(2));
              ch.pipeline().addLast("msgpack encoder",new MsgpackEncoder());
              // 添加自定义的处理器
              ch.pipeline().addLast(new EchoServerHandler());
            }
          });
      // 绑定端口,同步等待成功
      ChannelFuture f = b.bind(port).sync();
      // 等待服务端监听端口关闭
      f.channel().closeFuture().sync();
    }catch(Exception e){
      e.printStackTrace();
    } finally {
      // 优雅退出,释放线程池资源
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }
  }
  public static void main(String[] args) throws Exception {
    int port = 8080;
    if(args!=null && args.length > 0){
      try{
        port = Integer.valueOf(args[0]);
      }catch(NumberFormatException e){
        // 采用默认值
      }
    }
    new EchoServer().bind(port);
  }
}

EchoServerHandler

/**
 * DelimiterBasedFrameDecoder 案例
 *    自定义处理器
 * @author 波波烤鸭
 * @email dengpbs@163.com
 *
 */
public class EchoServerHandler extends ChannelHandlerAdapter{
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //UserInfo user = (UserInfo) msg;
    System.out.println("server receive the msgpack message :"+msg);
    //ctx.writeAndFlush(user);
    ctx.writeAndFlush(msg);
  }
  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    cause.printStackTrace();
    ctx.close(); // 发生异常关闭链路
  }
}

5.注意点(POJO)

 消息类上加上注解Message,还有就是必须要有默认的无参构造器

/**
 * Msgpack 中必须添加@Message注解 及 无参构造方法
 * @author 波波烤鸭
 * @email dengpbs@163.com
 *
 */
@Message
public class UserInfo {
  private String name;
  private int age;
  public String getName() {
    return name;
  }
  public void setName(String name) {
    this.name = name;
  }
  public int getAge() {
    return age;
  }
  public void setAge(int age) {
    this.age = age;
  }
  @Override
  public String toString() {
    return "UserInfo [name=" + name + ", age=" + age + "]";
  }
}

6.测试

服务端输出

server receive the msgpack message :["bobo烤鸭:0",0]
server receive the msgpack message :["bobo烤鸭:1",1]
server receive the msgpack message :["bobo烤鸭:2",2]
server receive the msgpack message :["bobo烤鸭:3",3]
server receive the msgpack message :["bobo烤鸭:4",4]
server receive the msgpack message :["bobo烤鸭:5",5]
server receive the msgpack message :["bobo烤鸭:6",6]
server receive the msgpack message :["bobo烤鸭:7",7]
server receive the msgpack message :["bobo烤鸭:8",8]
server receive the msgpack message :["bobo烤鸭:9",9]

客户端输出

Client receive the msgpack message :["bobo烤鸭:0",0]
Client receive the msgpack message :["bobo烤鸭:1",1]
Client receive the msgpack message :["bobo烤鸭:2",2]
Client receive the msgpack message :["bobo烤鸭:3",3]
Client receive the msgpack message :["bobo烤鸭:4",4]
Client receive the msgpack message :["bobo烤鸭:5",5]
Client receive the msgpack message :["bobo烤鸭:6",6]
Client receive the msgpack message :["bobo烤鸭:7",7]
Client receive the msgpack message :["bobo烤鸭:8",8]
Client receive the msgpack message :["bobo烤鸭:9",9]

至此Netty中就可以通过MessagePack来处理序列化的情况了~


相关文章
|
8月前
|
缓存 网络协议 Dubbo
异步编程 - 12 异步、基于事件驱动的网络编程框架 Netty
异步编程 - 12 异步、基于事件驱动的网络编程框架 Netty
53 0
|
10天前
|
网络协议 Java 物联网
Netty是什么?深入理解高性能网络框架
Netty是什么?深入理解高性能网络框架
40 1
|
1月前
|
XML 网络协议 前端开发
Netty网络框架(三)
Netty网络框架
33 1
|
1月前
|
存储 设计模式 网络协议
Netty网络框架(一)
Netty网络框架
76 1
|
1月前
|
存储 编解码 网络协议
Netty网络框架(二)
Netty网络框架
42 0
|
1月前
|
Dubbo Java 应用服务中间件
【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(附通信协议和代码)(上)
今天,我要向大家实现一个基于Netty实现的高性能远程通信框架!这个框架利用了 Netty 的强大功能,提供了快速、可靠的远程通信能力。 无论是构建大规模微服务架构还是实现分布式计算,这个分布式通信框架都是一个不可或缺的利器。
93 2
【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(附通信协议和代码)(上)
|
1月前
|
前端开发 Java 数据库连接
认识Java中最常用的框架:Spring、Spring MVC、Spring Boot、MyBatis和Netty
Spring框架 Spring是一个轻量级的开源框架,用于构建企业级应用。它提供了广泛的功能,包括依赖注入、面向切面编程、事务管理、消息传递等。Spring的核心思想是控制反转(IoC)和面向切面编程(AOP)。
141 3
|
1月前
|
前端开发 Java 数据库连接
探索Java中最常用的框架:Spring、Spring MVC、Spring Boot、MyBatis和Netty
探索Java中最常用的框架:Spring、Spring MVC、Spring Boot、MyBatis和Netty
|
1月前
|
负载均衡 Java 调度
【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(Dispatcher和EventListener)(下)
经过阅读《【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(附通信协议和代码)(上)》,相信您已经对网络通信框架的网络通信层的实现原理和协议模型有了一定的认识和理解。
52 0
【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(Dispatcher和EventListener)(下)
|
1月前
|
前端开发 网络协议 Dubbo
Netty - 回顾Netty高性能原理和框架架构解析
Netty - 回顾Netty高性能原理和框架架构解析
268 0