编解码-marshalling

简介: JBoss的Marshalling序列化框架,它是JBoss内部使用的序列化框架,Netty提供了Marshalling编码和解码器,方便用户在Netty中使用Marshalling。 JBoss Marshalling是一个Java对象序列化包,对JDK默认的序列化框架进行了优化,但又保持跟java.io.Serializable接口的兼容,同时增加了一些可调的参数和附加的特性,这些参数和特性可通过工厂类进行配置。

JBoss的Marshalling序列化框架,它是JBoss内部使用的序列化框架,Netty提供了Marshalling编码和解码器,方便用户在Netty中使用Marshalling。

JBoss Marshalling是一个Java对象序列化包,对JDK默认的序列化框架进行了优化,但又保持跟java.io.Serializable接口的兼容,同时增加了一些可调的参数和附加的特性,这些参数和特性可通过工厂类进行配置。

import lombok.Data;

import java.io.Serializable;

@Data
public class SubscribeReq implements Serializable {

    /**
     * 默认的序列号ID
     */
    private static final long serialVersionUID = 1L;

    private int subReqID;

    private String userName;

    private String productName;

    private String phoneNumber;

    private String address;

    @Override
    public String toString() {
        return "SubscribeReq [subReqID=" + subReqID + ", userName=" + userName
                + ", productName=" + productName + ", phoneNumber="
                + phoneNumber + ", address=" + address + "]";
    }
}

import lombok.Data;

import java.io.Serializable;

@Data
public class SubscribeResp implements Serializable {

   /**
   * 默认序列ID
   */
 private static final long serialVersionUID = 1L;

   private int subReqID;

   private int respCode;

   private String desc;

   @Override
   public String toString() {
   return "SubscribeResp [subReqID=" + subReqID + ", respCode=" + respCode
     + ", desc=" + desc + "]";
   }
 }

编解码工厂类:

import io.netty.handler.codec.marshalling.*;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;

public final class MarshallingCodeCFactory {

    /**
     * 创建Jboss Marshalling解码器MarshallingDecoder
     */
    public static MarshallingDecoder buildMarshallingDecoder() {
        //首先通过Marshalling工具类的getProvidedMarshallerFactory静态方法获取MarshallerFactory实例
        //参数“serial”表示创建的是Java序列化工厂对象,它由jboss-marshalling-serial-1.3.0.CR9.jar提供。
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        //创建了MarshallingConfiguration对象
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        //将它的版本号设置为5
        configuration.setVersion(5);
        //然后根据MarshallerFactory和MarshallingConfiguration创建UnmarshallerProvider实例
        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
        //最后通过构造函数创建Netty的MarshallingDecoder对象
        //它有两个参数,分别是UnmarshallerProvider和单个消息序列化后的最大长度。
        MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
        return decoder;
    }

    /**
     * 创建Jboss Marshalling编码器MarshallingEncoder
     */
    public static MarshallingEncoder buildMarshallingEncoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        //创建MarshallerProvider对象,它用于创建Netty提供的MarshallingEncoder实例
        MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
        //MarshallingEncoder用于将实现序列化接口的POJO对象序列化为二进制数组。
        MarshallingEncoder encoder = new MarshallingEncoder(provider);
        return encoder;
    }
}

服务端代码示例:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class SubReqServer {
    public void bind(int port) throws Exception {
        // 配置服务端的NIO线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer() {
                        @Override
                        public void initChannel(Channel ch) {
                            //通过工厂类创建MarshallingEncoder解码器,并添加到ChannelPipeline.
                            ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                            //通过工厂类创建MarshallingEncoder编码器,并添加到ChannelPipeline中。
                            ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                            ch.pipeline().addLast(new SubReqServerHandler());
                        }
                    });

            // 绑定端口,同步等待成功
            ChannelFuture f = b.bind(port).sync();

            // 等待服务端监听端口关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放线程池资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // 采用默认值
            }
        }
        new SubReqServer().bind(port);
    }
}

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

@ChannelHandler.Sharable
public class SubReqServerHandler extends ChannelHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        //经过解码器handler ObjectDecoder的解码,
        //SubReqServerHandler接收到的请求消息已经被自动解码为SubscribeReq对象,可以直接使用。
        SubscribeReq req = (SubscribeReq) msg;
        if ("Lilinfeng".equalsIgnoreCase(req.getUserName())) {
            System.out.println("Service accept client subscribe req : ["
                    + req.toString() + "]");
            //对订购者的用户名进行合法性校验,校验通过后打印订购请求消息,构造订购成功应答消息立即发送给客户端。
            ctx.writeAndFlush(resp(req.getSubReqID()));
        }
    }

    private SubscribeResp resp(int subReqID) {
        SubscribeResp resp = new SubscribeResp();
        resp.setSubReqID(subReqID);
        resp.setRespCode(0);
        resp.setDesc("Netty book order succeed, 3 days later, sent to the designated address");
        return resp;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();// 发生异常,关闭链路
    }
}

客户端代码示例:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

public class SubReqClient {

    public void connect(int port, String host) throws Exception {
        // 配置客户端NIO线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer() {
                        @Override
                        public void initChannel(Channel ch)
                                throws Exception {
                            ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                            ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                            ch.pipeline().addLast(new SubReqClientHandler());
                        }
                    });

            // 发起异步连接操作
            ChannelFuture f = b.connect(host, port).sync();

            // 等待客户端链路关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放NIO线程组
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // 采用默认值
            }
        }
        new SubReqClient().connect(port, "127.0.0.1");
    }
}

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class SubReqClientHandler extends ChannelHandlerAdapter {

    public SubReqClientHandler() {
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        //在链路激活的时候循环构造10条订购请求消息,最后一次性地发送给服务端。
        for (int i = 0; i < 10; i++) {
            ctx.write(subReq(i));
        }
        ctx.flush();
    }

    private SubscribeReq subReq(int i) {
        SubscribeReq req = new SubscribeReq();
        req.setAddress("南京市江宁区方山国家地质公园");
        req.setPhoneNumber("138xxxxxxxxx");
        req.setProductName("Netty For Marshalling");
        req.setSubReqID(i);
        req.setUserName("Lilinfeng");
        return req;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        //由于对象解码器已经对订购请求应答消息进行了自动解码,
        //因此,SubReqClientHandler接收到的消息已经是解码成功后的订购应答消息。
        System.out.println("Receive server response : [" + msg + "]");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

运行结果:

服务端结果:

14:48:45.475 [nioEventLoopGroup-2-1] INFO i.n.handler.logging.LoggingHandler - [id: 0x71ea5def, /0:0:0:0:0:0:0:0:8080] RECEIVED: [id: 0x876eb7b4, /127.0.0.1:57423 => /127.0.0.1:8080]
14:48:45.707 [nioEventLoopGroup-3-1] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetectionLevel: simple
Service accept client subscribe req : [SubscribeReq [subReqID=0, userName=Lilinfeng, productName=Netty For Marshalling, phoneNumber=138xxxxxxxxx, address=南京市江宁区方山国家地质公园]]
Service accept client subscribe req : [SubscribeReq [subReqID=1, userName=Lilinfeng, productName=Netty For Marshalling, phoneNumber=138xxxxxxxxx, address=南京市江宁区方山国家地质公园]]
..........................................................................
Service accept client subscribe req : [SubscribeReq [subReqID=9, userName=Lilinfeng, productName=Netty For Marshalling, phoneNumber=138xxxxxxxxx, address=南京市江宁区方山国家地质公园]]

客户端结果:

Receive server response : [SubscribeResp [subReqID=0, respCode=0, desc=Netty book order succeed, 3 days later, sent to the designated address]]
..........................................................................
Receive server response : [SubscribeResp [subReqID=9, respCode=0, desc=Netty book order succeed, 3 days later, sent to the designated address]]

由于我们模拟了TCP的粘包/拆包场景,但是程序的运行结果仍然正确,说明Netty的Marshalling编解码器支持半包和粘包的处理,对于开发者而言,只需要正确地将Marshalling编码器和解码器加入到ChannelPipeline中,就能实现对Marshalling序列化的支持。

利用Netty的Marshalling编解码器,可以轻松地开发出与JBoss内部模块进行远程通信的程序,而且支持异步非阻塞,这无疑降低了基于Netty开发的应用程序与JBoss内部模块对接的难度。

pom.xml

        <dependency>
            <groupId>org.jboss.marshalling</groupId>
            <artifactId>jboss-marshalling</artifactId>
            <version>1.3.0.GA</version>
        </dependency>
        <dependency>
            <groupId>org.jboss.marshalling</groupId>
            <artifactId>jboss-marshalling-serial</artifactId>
            <version>1.3.0.GA</version>
        </dependency>

 

 

目录
相关文章
|
网络协议 网络架构 Windows
【Windows】MCSM面板搭建Mycraft服务器,实现公网远程联机
【Windows】MCSM面板搭建Mycraft服务器,实现公网远程联机
652 0
|
4月前
|
应用服务中间件 nginx Docker
静态资源管理:Nginx在Docker中的部署
部署Nginx到Docker中作为静态资源服务器是一种既简单又高效的方法,可以节省时间和资源,并能确保一致性和可扩展性。我们通过编写Dockerfile指定了基础镜像和所需指令,编写Nginx配置管理请求处理,构建自定义Docker镜像,并运行容器以启动服务。这一过程即符合开发规范,也保证了资源的高效管理和访问速度。
293 13
|
6月前
|
机器学习/深度学习 人工智能 算法
破解生成式AI认知边界:框架思维引擎如何重塑产业智能化未来
该内容深入解析了核心技术架构,涵盖思维链强化系统(DTT)、认知框架建模体系和实时纠偏算法体系。DTT通过多级问题拆解、混合精度推理及分布式验证,大幅提升复杂问题处理能力;认知框架结合知识图谱与逻辑推理,实现精准医疗诊断等应用;实时纠偏算法则通过多级验证机制保障事实与逻辑准确性。整体架构分应用层、框架层和基础层,支持高效、可信的跨领域适配。技术创新体现在混合计算加速、持续学习机制等方面,显著优于传统模型,在事实准确性、逻辑连续性及响应速度上优势明显。
269 28
|
6月前
|
人工智能 Java API
通义灵码 2.5 版深度评测:智能编程的边界在哪里?
通义灵码 2.5 版深度评测:智能编程的边界在哪里?
256 2
|
运维 负载均衡 Linux
阿里云轻量服务器最新收费标准与价格参考
阿里云轻量服务器具有灵活的镜像选择、快速上手、简便运维等优势,轻量服务器适合个人开发者和学生用来搭建网站、云端学习等场景使用,2024年截至目前国内地域有60元/月、80元/月等套餐可选,国外地域有24元/月、34元/月、67元/月等套餐可选,目前轻量应用服务器2核2G3M带宽82元1年、2核4G4M带宽298元1年。
|
8月前
|
SQL 人工智能 自然语言处理
DataV Note | 又在为年终报告头疼?让 AI 成为你的得力助手!
DataV Note | 又在为年终报告头疼?让 AI 成为你的得力助手!
254 2
|
SQL Java 关系型数据库
实时计算 Flink版操作报错合集之通过flink sql形式同步数据到hudi中,本地启动mian方法报错如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
479 8
|
缓存 监控 Kubernetes
go-zero解读与最佳实践(上)
go-zero解读与最佳实践(上)
|
并行计算 算法
【MATLAB 】 辛几何模态分解信号分解+模糊熵(近似熵)算法
【MATLAB 】 辛几何模态分解信号分解+模糊熵(近似熵)算法
414 0
|
缓存 监控 前端开发
前端开发中的性能瓶颈分析与优化
【7月更文挑战第27天】前端开发中的性能优化是一个系统工程,需要从多个角度入手,综合运用多种策略。通过减少网络延迟、优化资源加载、优化DOM操作、优化JavaScript执行以及第三方服务优化等措施,可以显著提升前端应用的性能。同时,通过性能监控和调优工具的使用,可以持续监控和优化应用性能,确保用户获得流畅、高效的体验。