Netty中序列化框架MessagePack的简单实现

简介: MessagePack是一个高效的二进制序列化框架,它像JSON一样支持不同语言间的数据交换,但是它的性能更快,序列化之后的码流也更小。MessagePack的特点如下: 编解码高效,性能高; 序列化之后码流小 支持跨语言


 MessagePack是一个高效的二进制序列化框架,它像JSON一样支持不同语言间的数据交换,但是它的性能更快,序列化之后的码流也更小。MessagePack的特点如下:

   编解码高效,性能高;

   序列化之后码流小

   支持跨语言

MessagePack使用

1.依赖

 使用maven构建项目

<dependency>
  <groupId>org.msgpack</groupId>
  <artifactId>msgpack</artifactId>
  <version>0.6.12</version>
</dependency>

2.创建编码和解码器

编码器

/**
 * @param ctx 上下文
 * @param msg 需要编码的对象
 * @param out 编码后的数据
 */
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
  MessagePack msgpack = new MessagePack();
  // 对对象进行序列化
  byte[] raw = msgpack.write(msg);
  // 返回序列化的数据
  out.writeBytes(raw);
}

解码器

/**
 * @param ctx 上下文
 * @param msg 需要解码的数据
 * @param out 解码列表
 */
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
  final byte[] array;
  final int length = msg.readableBytes();
  array = new byte[length];
  // 获取需要解码的字节数组
  msg.getBytes(msg.readerIndex(), array,0,length);
  MessagePack msgpack = new MessagePack();
  // 反序列化并将结果保存到了解码列表中
  out.add(msgpack.read(array));
}

3.客户端

EchoClient

/**
 * MsgPack 编解码器
 * 
 * @author 波波烤鸭
 * @email dengpbs@163.com
 *
 */
public class EchoClient {
  public static void main(String[] args) throws Exception {
    int port = 8080;
    if (args != null && args.length > 0) {
      try {
        port = Integer.valueOf(args[0]);
      } catch (NumberFormatException e) {
        // 采用默认值
      }
    }
    new EchoClient().connector(port, "127.0.0.1",10);
  }
  public void connector(int port, String host,final int sendNumber) throws Exception {
    // 配置客户端NIO线程组
    EventLoopGroup group = new NioEventLoopGroup();
    try {
      Bootstrap b = new Bootstrap();
      b.group(group).channel(NioSocketChannel.class)
              .option(ChannelOption.TCP_NODELAY, true)
              .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
              .handler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
          //这里设置通过增加包头表示报文长度来避免粘包
                ch.pipeline().addLast("frameDecoder",new LengthFieldBasedFrameDecoder(1024, 0, 2,0,2));
                //增加解码器
                ch.pipeline().addLast("msgpack decoder",new MsgpackDecoder());
                //这里设置读取报文的包头长度来避免粘包
                ch.pipeline().addLast("frameEncoder",new LengthFieldPrepender(2));
                //增加编码器
                ch.pipeline().addLast("msgpack encoder",new MsgpackEncoder());
          // 4.添加自定义的处理器
          ch.pipeline().addLast(new EchoClientHandler(sendNumber));
        }
      });
      // 发起异步连接操作
      ChannelFuture f = b.connect(host, port).sync();
      // 等待客户端链路关闭
      f.channel().closeFuture().sync();
    }catch(Exception e){
      e.printStackTrace();
    }  finally {
      // 优雅退出,释放NIO线程组
      group.shutdownGracefully();
    }
  }
}

EchoClientHandler

/**
 * DelimiterBasedFrameDecoder 案例
 *    自定义处理器
 * @author 波波烤鸭
 * @email dengpbs@163.com
 *
 */
public class EchoServerHandler extends ChannelHandlerAdapter{
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //UserInfo user = (UserInfo) msg;
    System.out.println("server receive the msgpack message :"+msg);
    //ctx.writeAndFlush(user);
    ctx.writeAndFlush(msg);
  }
  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    cause.printStackTrace();
    ctx.close(); // 发生异常关闭链路
  }
}

4.服务端

EchoServer

/**
 * MsgPack 编解码器
 *    服务端
 * @author 波波烤鸭
 * @email dengpbs@163.com
 *
 */
public class EchoServer {
  public void bind(int port) throws Exception {
    // 配置服务端的NIO线程组
    // 服务端接受客户端的连接
    NioEventLoopGroup bossGroup = new NioEventLoopGroup();
    // 进行SocketChannel的网络读写
    NioEventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
      ServerBootstrap b = new ServerBootstrap();
      b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
          .option(ChannelOption.SO_BACKLOG, 100)
          .handler(new LoggingHandler(LogLevel.INFO))
          .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
              ch.pipeline().addLast("frameDecoder",new LengthFieldBasedFrameDecoder(65535, 0, 2,0,2));
              // 添加msgpack的编码和解码器
              ch.pipeline().addLast("msgpack decoder",new MsgpackDecoder());
              ch.pipeline().addLast("frameEncoder",new LengthFieldPrepender(2));
              ch.pipeline().addLast("msgpack encoder",new MsgpackEncoder());
              // 添加自定义的处理器
              ch.pipeline().addLast(new EchoServerHandler());
            }
          });
      // 绑定端口,同步等待成功
      ChannelFuture f = b.bind(port).sync();
      // 等待服务端监听端口关闭
      f.channel().closeFuture().sync();
    }catch(Exception e){
      e.printStackTrace();
    } finally {
      // 优雅退出,释放线程池资源
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }
  }
  public static void main(String[] args) throws Exception {
    int port = 8080;
    if(args!=null && args.length > 0){
      try{
        port = Integer.valueOf(args[0]);
      }catch(NumberFormatException e){
        // 采用默认值
      }
    }
    new EchoServer().bind(port);
  }
}

EchoServerHandler

/**
 * DelimiterBasedFrameDecoder 案例
 *    自定义处理器
 * @author 波波烤鸭
 * @email dengpbs@163.com
 *
 */
public class EchoServerHandler extends ChannelHandlerAdapter{
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //UserInfo user = (UserInfo) msg;
    System.out.println("server receive the msgpack message :"+msg);
    //ctx.writeAndFlush(user);
    ctx.writeAndFlush(msg);
  }
  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    cause.printStackTrace();
    ctx.close(); // 发生异常关闭链路
  }
}

5.注意点(POJO)

 消息类上加上注解Message,还有就是必须要有默认的无参构造器

/**
 * Msgpack 中必须添加@Message注解 及 无参构造方法
 * @author 波波烤鸭
 * @email dengpbs@163.com
 *
 */
@Message
public class UserInfo {
  private String name;
  private int age;
  public String getName() {
    return name;
  }
  public void setName(String name) {
    this.name = name;
  }
  public int getAge() {
    return age;
  }
  public void setAge(int age) {
    this.age = age;
  }
  @Override
  public String toString() {
    return "UserInfo [name=" + name + ", age=" + age + "]";
  }
}

6.测试

服务端输出

server receive the msgpack message :["bobo烤鸭:0",0]
server receive the msgpack message :["bobo烤鸭:1",1]
server receive the msgpack message :["bobo烤鸭:2",2]
server receive the msgpack message :["bobo烤鸭:3",3]
server receive the msgpack message :["bobo烤鸭:4",4]
server receive the msgpack message :["bobo烤鸭:5",5]
server receive the msgpack message :["bobo烤鸭:6",6]
server receive the msgpack message :["bobo烤鸭:7",7]
server receive the msgpack message :["bobo烤鸭:8",8]
server receive the msgpack message :["bobo烤鸭:9",9]

客户端输出

Client receive the msgpack message :["bobo烤鸭:0",0]
Client receive the msgpack message :["bobo烤鸭:1",1]
Client receive the msgpack message :["bobo烤鸭:2",2]
Client receive the msgpack message :["bobo烤鸭:3",3]
Client receive the msgpack message :["bobo烤鸭:4",4]
Client receive the msgpack message :["bobo烤鸭:5",5]
Client receive the msgpack message :["bobo烤鸭:6",6]
Client receive the msgpack message :["bobo烤鸭:7",7]
Client receive the msgpack message :["bobo烤鸭:8",8]
Client receive the msgpack message :["bobo烤鸭:9",9]

至此Netty中就可以通过MessagePack来处理序列化的情况了~


相关文章
|
3月前
|
NoSQL 前端开发 Java
Lettuce的特性和内部实现问题之Lettuce基于Netty框架实现的问题如何解决
Lettuce的特性和内部实现问题之Lettuce基于Netty框架实现的问题如何解决
|
2月前
|
编解码 分布式计算 网络协议
Netty高性能网络框架(一)
Netty高性能网络框架(一)
|
1月前
|
NoSQL Java Redis
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
这篇文章介绍了Redis的基本命令,并展示了如何使用Netty框架直接与Redis服务器进行通信,包括设置Netty客户端、编写处理程序以及初始化Channel的完整示例代码。
50 1
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
|
2月前
|
设计模式 缓存 算法
Netty框架的重要性
Netty框架的重要性
|
3月前
|
Java
JDK序列化原理问题之Hessian框架不支持writeObject/readObject方法如何解决
JDK序列化原理问题之Hessian框架不支持writeObject/readObject方法如何解决
|
3月前
|
前端开发 Java Spring
springboot 整合 netty框架, 实现 心跳检测,自动重连
springboot 整合 netty框架, 实现 心跳检测,自动重连
|
3月前
|
编解码 NoSQL Redis
(十一)Netty实战篇:基于Netty框架打造一款高性能的IM即时通讯程序
关于Netty网络框架的内容,前面已经讲了两个章节,但总归来说难以真正掌握,毕竟只是对其中一个个组件进行讲解,很难让诸位将其串起来形成一条线,所以本章中则会结合实战案例,对Netty进行更深层次的学习与掌握,实战案例也并不难,一个非常朴素的IM聊天程序。
|
3月前
|
Java 应用服务中间件 Linux
(九)Java网络编程无冕之王-这回把大名鼎鼎的Netty框架一网打尽!
现如今的开发环境中,分布式/微服务架构大行其道,而分布式/微服务的根基在于网络编程,而Netty恰恰是Java网络编程领域的无冕之王。Netty这个框架相信大家定然听说过,其在Java网络编程中的地位,好比JavaEE中的Spring。
143 3
|
3月前
|
开发框架 缓存 前端开发
基于SqlSugar的开发框架循序渐进介绍(24)-- 使用Serialize.Linq对Lambda表达式进行序列化和反序列化
基于SqlSugar的开发框架循序渐进介绍(24)-- 使用Serialize.Linq对Lambda表达式进行序列化和反序列化
|
3月前
|
XML 存储 JSON
(十二)探索高性能通信与RPC框架基石:Json、ProtoBuf、Hessian序列化详解
如今这个分布式风靡的时代,网络通信技术,是每位技术人员必须掌握的技能,因为无论是哪种分布式技术,都离不开心跳、选举、节点感知、数据同步……等机制,而究其根本,这些技术的本质都是网络间的数据交互。正因如此,想要构建一个高性能的分布式组件/系统,不得不思考一个问题:怎么才能让数据传输的速度更快?