netty系列之:netty对marshalling的支持

简介: 在之前的文章中我们讲过了,jboss marshalling是一种非常优秀的java对象序列化的方式,它可以兼容JDK自带的序列化,同时也提供了性能和使用上的优化。那么这么优秀的序列化工具可不可以用在netty中作为消息传递的方式呢?答案当然是肯定的,在netty中一切皆有可能。

简介

在之前的文章中我们讲过了,jboss marshalling是一种非常优秀的java对象序列化的方式,它可以兼容JDK自带的序列化,同时也提供了性能和使用上的优化。

那么这么优秀的序列化工具可不可以用在netty中作为消息传递的方式呢?

答案当然是肯定的,在netty中一切皆有可能。

netty中的marshalling provider

回顾一下jboss marshalling的常用用法,我们需要从MarshallerFactory中创建出Marshaller,因为mashaller有不同的实现,所以需要指定具体的实现来创建MarshallerFactory,如下所示:

MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("river");

这个MarshallerFactory实际上就是一个MarshallerProvider。

netty中定义了这样的一个接口:

public interface MarshallerProvider {
    Marshaller getMarshaller(ChannelHandlerContext ctx) throws Exception;
}

MarshallerProvider实际上就做了和MarshallerFactory等同的工作。

既然MarshallerProvider是一个接口,那么它有哪些实现呢?

在netty中它有两个实现类,分别是DefaultMarshallerProvider和ThreadLocalMarshallerProvider。

两者有什么区别呢?

先来看一下DefaultMarshallerProvider:

public class DefaultMarshallerProvider implements MarshallerProvider {
    private final MarshallerFactory factory;
    private final MarshallingConfiguration config;
    public DefaultMarshallerProvider(MarshallerFactory factory, MarshallingConfiguration config) {
        this.factory = factory;
        this.config = config;
    }
    public Marshaller getMarshaller(ChannelHandlerContext ctx) throws Exception {
        return factory.createMarshaller(config);
    }
}

顾名思义,DefaultMarshallerProvider就是marshallerProvider的默认实现,从具体的实现代码中,我们可以看出,DefaultMarshallerProvider实际上需要传入MarshallerFactory和MarshallingConfiguration作为参数,然后使用传入的MarshallerFactory来创建具体的marshaller Provider,和我们手动创建marshaller的方式是一致的。

但是上面的实现中每次getMarshaller都需要重新从factory中创建一个新的,性能上可能会有问题。所以netty又实现了一个新的ThreadLocalMarshallerProvider:

public class ThreadLocalMarshallerProvider implements MarshallerProvider {
    private final FastThreadLocal<Marshaller> marshallers = new FastThreadLocal<Marshaller>();
    private final MarshallerFactory factory;
    private final MarshallingConfiguration config;
    public ThreadLocalMarshallerProvider(MarshallerFactory factory, MarshallingConfiguration config) {
        this.factory = factory;
        this.config = config;
    }
    @Override
    public Marshaller getMarshaller(ChannelHandlerContext ctx) throws Exception {
        Marshaller marshaller = marshallers.get();
        if (marshaller == null) {
            marshaller = factory.createMarshaller(config);
            marshallers.set(marshaller);
        }
        return marshaller;
    }
}

ThreadLocalMarshallerProvider和DefaultMarshallerProvider的不同之处在于,ThreadLocalMarshallerProvider中保存了一个FastThreadLocal的对象,FastThreadLocal是JDK中ThreadLocal的优化版本,比ThreadLocal更快。

在getMarshaller方法中,先从FastThreadLocal中get出Marshaller对象,如果Marshaller对象不存在,才从factory中创建出一个Marshaller对象,最后将Marshaller对象放到ThreadLocal中。

有MarshallerProvider就有和他对应的UnMarshallerProvider:

public interface UnmarshallerProvider {
    Unmarshaller getUnmarshaller(ChannelHandlerContext ctx) throws Exception;
}

netty中的UnmarshallerProvider有三个实现类,分别是DefaultUnmarshallerProvider,ThreadLocalUnmarshallerProvider和ContextBoundUnmarshallerProvider.

前面的两个DefaultUnmarshallerProvider,ThreadLocalUnmarshallerProvider跟marshaller的是实现是一样的,这里就不重复讲解了。

我们主要来看一下ContextBoundUnmarshallerProvider的实现。

从名字上我们可以看出,这个unmarshaller是和ChannelHandlerContext相关的。

ChannelHandlerContext表示的是channel的上下文环境,它里面有一个方法叫做attr,可以保存和channel相关的属性:

<T> Attribute<T> attr(AttributeKey<T> key);

ContextBoundUnmarshallerProvider的做法就是将Unmarshaller存放到context中,每次使用的时候先从context中获取,如果没有取到再从factroy中获取。

我们来看下ContextBoundUnmarshallerProvider的实现:

public class ContextBoundUnmarshallerProvider extends DefaultUnmarshallerProvider {
    private static final AttributeKey<Unmarshaller> UNMARSHALLER = AttributeKey.valueOf(
            ContextBoundUnmarshallerProvider.class, "UNMARSHALLER");
    public ContextBoundUnmarshallerProvider(MarshallerFactory factory, MarshallingConfiguration config) {
        super(factory, config);
    }
    @Override
    public Unmarshaller getUnmarshaller(ChannelHandlerContext ctx) throws Exception {
        Attribute<Unmarshaller> attr = ctx.channel().attr(UNMARSHALLER);
        Unmarshaller unmarshaller = attr.get();
        if (unmarshaller == null) {
            unmarshaller = super.getUnmarshaller(ctx);
            attr.set(unmarshaller);
        }
        return unmarshaller;
    }
}

ContextBoundUnmarshallerProvider继承自DefaultUnmarshallerProvider,在getUnmarshaller方法首先从ctx取出unmarshaller,如果没有的话,则调用DefaultUnmarshallerProvider中的getUnmarshaller方法取出unmarshaller。

Marshalling编码器

上面的章节中我们获取到了marshaller,接下来看一下如何使用marshaller来进行编码和解码操作。

首先来看一下编码器MarshallingEncoder,MarshallingEncoder继承自MessageToByteEncoder,接收的泛型是Object:

public class MarshallingEncoder extends MessageToByteEncoder<Object>

是将Object对象编码成为ByteBuf。回顾一下之前我们讲到的通常对象的编码都需要用到一个对象长度的字段,用来分割对象的数据,同样的MarshallingEncoder也提供了一个4个字节的LENGTH_PLACEHOLDER,用来存储对象的长度。

具体的看一下它的encode方法:

protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
        Marshaller marshaller = provider.getMarshaller(ctx);
        int lengthPos = out.writerIndex();
        out.writeBytes(LENGTH_PLACEHOLDER);
        ChannelBufferByteOutput output = new ChannelBufferByteOutput(out);
        marshaller.start(output);
        marshaller.writeObject(msg);
        marshaller.finish();
        marshaller.close();
        out.setInt(lengthPos, out.writerIndex() - lengthPos - 4);
    }

encode的逻辑很简单,首先从provider中拿到marshaller对象,然后先向out中写入4个字节的LENGTH_PLACEHOLDER,接着使用marshaller向

out中写入编码的对象,最后根据写入对象长度填充out,得到最后的输出。

因为encode的数据保存的有长度数据,所以decode的时候就需要用到一个frame decoder叫做LengthFieldBasedFrameDecoder。

通常有两种方式来使用LengthFieldBasedFrameDecoder,一种是将LengthFieldBasedFrameDecoder加入到pipline handler中,decoder只需要处理经过frame decoder处理过后的对象即可。

还有一种方法就是这个decoder本身就是一个LengthFieldBasedFrameDecoder。

这里netty选择的是第二种方法,我们看下MarshallingDecoder的定义:

public class MarshallingDecoder extends LengthFieldBasedFrameDecoder

首先需要在构造函数中指定LengthFieldBasedFrameDecoder的字段长度,这里调用了super方法来实现:

public MarshallingDecoder(UnmarshallerProvider provider, int maxObjectSize) {
        super(maxObjectSize, 0, 4, 0, 4);
        this.provider = provider;
    }

并且重写了extractFrame方法:

protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) {
        return buffer.slice(index, length);
    }

最后再看下decode方法:

protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        ByteBuf frame = (ByteBuf) super.decode(ctx, in);
        if (frame == null) {
            return null;
        }
        Unmarshaller unmarshaller = provider.getUnmarshaller(ctx);
        ByteInput input = new ChannelBufferByteInput(frame);
        try {
            unmarshaller.start(input);
            Object obj = unmarshaller.readObject();
            unmarshaller.finish();
            return obj;
        } finally {
            unmarshaller.close();
        }
    }

decode的逻辑也很简单,首先调用super方法decode出frame ByteBuf。然后再调用unmarshaller实现对象的读取,最后将改对象返回。

Marshalling编码的另外一种实现

上面我们讲到对对象的编码使用的是LengthFieldBasedFrameDecoder,根据对象实际数据之前的一个length字段来确定字段的长度,从而读取真实的数据。

那么可不可以不指定对象长度也能够准确的读取对象呢?

其实也是可以的,我们可以不断的尝试读取数据,直到找到合适的对象数据为止。

看过我之前文章的朋友可能就想到了,ReplayingDecoder不就是做这个事情的吗?在ReplayingDecoder中会不断的重试,直到找到符合条件的消息为止。

于是netty基于ReplayingDecoder也有一个marshalling编码解码的实现,叫做CompatibleMarshallingEncoder和CompatibleMarshallingDecoder。

CompatibleMarshallingEncoder很简单,因为不需要对象的实际长度,所以直接使用marshalling编码即可。

public class CompatibleMarshallingEncoder extends MessageToByteEncoder<Object> {
    private final MarshallerProvider provider;
    public CompatibleMarshallingEncoder(MarshallerProvider provider) {
        this.provider = provider;
    }
    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
        Marshaller marshaller = provider.getMarshaller(ctx);
        marshaller.start(new ChannelBufferByteOutput(out));
        marshaller.writeObject(msg);
        marshaller.finish();
        marshaller.close();
    }
}

CompatibleMarshallingDecoder继承了ReplayingDecoder:

public class CompatibleMarshallingDecoder extends ReplayingDecoder<Void>

它的decode方法的核心就是调用unmarshaller的方法:

Unmarshaller unmarshaller = provider.getUnmarshaller(ctx);
        ByteInput input = new ChannelBufferByteInput(buffer);
        if (maxObjectSize != Integer.MAX_VALUE) {
            input = new LimitingByteInput(input, maxObjectSize);
        }
        try {
            unmarshaller.start(input);
            Object obj = unmarshaller.readObject();
            unmarshaller.finish();
            out.add(obj);
        } catch (LimitingByteInput.TooBigObjectException ignored) {
            discardingTooLongFrame = true;
            throw new TooLongFrameException();
        } finally {
            unmarshaller.close();
        }

注意,这里解码的时候会有两种异常,第一种异常就是unmarshaller.readObject时候的异常,这种异常会被ReplayingDecoder捕获从而重试。

还有一种就是字段太长的异常,这种异常无法处理只能放弃:

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof TooLongFrameException) {
            ctx.close();
        } else {
            super.exceptionCaught(ctx, cause);
        }
    }

总结

以上就是在netty中使用marshalling进行编码解码的实现。原理和对象编码解码是很类似的,大家可以对比分析一下。

本文已收录于 http://www.flydean.com/17-1-netty-marshalling/

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!

欢迎关注我的公众号:「程序那些事」,懂技术,更懂你!

相关文章
|
编解码
Netty(二)之整合Marshalling传输实体类
Netty(二)之整合Marshalling传输实体类
67 0
|
Java 应用服务中间件 程序员
netty系列之:使用Jboss Marshalling来序列化java对象
在JAVA程序中经常会用到序列化的场景,除了JDK自身提供的Serializable之外,还有一些第三方的产品可以实现对JAVA对象的序列化。其中比较有名的就是Google protobuf。当然,也有其他的比较出名的序列化工具,比如Kryo和JBoss Marshalling。
|
Java 前端开发
Netty使用Marshalling传输信息
使用Marshalling传输信息,需要有以下两个包,可以在官网下载 jboss-marshalling-1.
1049 0
|
存储 缓存 NoSQL
跟着源码学IM(十一):一套基于Netty的分布式高可用IM详细设计与实现(有源码)
本文将要分享的是如何从零实现一套基于Netty框架的分布式高可用IM系统,它将支持长连接网关管理、单聊、群聊、聊天记录查询、离线消息存储、消息推送、心跳、分布式唯一ID、红包、消息同步等功能,并且还支持集群部署。
13299 1
|
17天前
|
机器学习/深度学习 缓存 算法
netty源码解解析(4.0)-25 ByteBuf内存池:PoolArena-PoolChunk
netty源码解解析(4.0)-25 ByteBuf内存池:PoolArena-PoolChunk
|
2月前
|
消息中间件 Oracle Dubbo
Netty 源码共读(一)如何阅读JDK下sun包的源码
Netty 源码共读(一)如何阅读JDK下sun包的源码
61 1
|
2月前
|
编解码 前端开发 网络协议
Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读
Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读
66 0
|
2月前
|
编解码 安全 前端开发
Netty Review - StringEncoder字符串编码器和StringDecoder 解码器的使用与源码解读
Netty Review - StringEncoder字符串编码器和StringDecoder 解码器的使用与源码解读
85 0
|
7月前
|
NoSQL Java Redis
跟着源码学IM(十二):基于Netty打造一款高性能的IM即时通讯程序
关于Netty网络框架的内容,前面已经讲了两个章节,但总归来说难以真正掌握,毕竟只是对其中一个个组件进行讲解,很难让诸位将其串起来形成一条线,所以本章中则会结合实战案例,对Netty进行更深层次的学习与掌握,实战案例也并不难,一个非常朴素的IM聊天程序。 原本打算做个多人斗地主练习程序,但那需要织入过多的业务逻辑,因此一方面会带来不必要的理解难度,让案例更为复杂化,另一方面代码量也会偏多,所以最终依旧选择实现基本的IM聊天程序,既简单,又能加深对Netty的理解。
117 1
|
9月前
|
分布式计算 网络协议 前端开发
【Netty底层数据交互源码】
【Netty底层数据交互源码】