snappy流式编解码总结

简介: snappy流式编解码总结

介绍


snappy是谷歌开源的用于数据快速压缩和解压的程序库,它的目标并非实现最大压缩率,而是同时实现非常高的压缩速度和合理的压缩率。snappy被广泛应用于google内部和开源的项目中,例如Hadoop, LevelDB, Spark。官方库地址:


https://github.com/google/snappy


由于最近的工作需要,发现snappy还实现了两个变种,这两个变种分别实现了snappy算法在本地文件系统和hadoop中的流式编解码。以下分别称之为snappy stream code和hadoop snappy stream codec, 以区分于raw snappy codec.


  • raw snappy codec: 用于对整个文件或整块数据进行编解码.


  • stream snappy codec: 流式编解码。在raw snappy codec上增加了chunk的概念。编解码的最小粒度是chunk。


  • hadoop stream snappy codec: 流式编解码。在raw snappy codec的基础上增加了block的概念。编解码最小粒度是block


那么为什么我们需要snappy流式编解码呢?


因为如果没有流式编解码算法,要对snappy数据进行解压,就必须将数据全部读入到内存,如果文件特别大的话内存可能不够用。因此们需要流式编解码算法,每次从文件流中读取一个frame的数据,解压,再读取,再解压,这样内存占用不至于特别大。


snappy的google官方库并没有实现上述的流式codec, 而python snappy实现了:


https://github.com/andrix/python-snappy


以下是个范例


#/usr/bin/env python
# coding: utf-8
import snappy
text_file = "1.txt"
snappy_file = "1.snappy"
with open (text_file, "r") as input_file:
    uncompressed_data = input_file.read().encode('utf-8')
    # raw snappy codec
    compressed_data = snappy.compress(uncompressed_data)
    assert uncompressed_data == snappy.uncompress(compressed_data)
    # stream snappy codec
    c = snappy.StreamCompressor()
    d = snappy.StreamDecompressor()
    compressed_data = c.compress(uncompressed_data)
    assert uncompressed_data == d.decompress(compressed_data)
    # hadoop stream snappy codec
    c = snappy.hadoop_snappy.StreamCompressor()
    d = snappy.hadoop_snappy.StreamDecompressor()
    compressed_data = c.compress(uncompressed_data)
    assert uncompressed_data == d.decompress(compressed_data)



raw snappy codec


算法比较复杂,可参考:https://zzjw.cc/post/snappy/



stream snappy codec


使用stream snappy codec算法压缩的文件由一连串的chunk组成: chunk | chunk | chunk | ... | chunk


每一个chunk的组成:header | body


chunk header的组成: chunk_type(1B) | chunk_size(3B)    其中chunk_size是chunk中body部分的长度,以byte为单位


chunk_type取值如下


  • stream identifier(chunk_type = 0xff). 压缩文件中, 第一个chunk必须是该类型。chunk size = 6, body = "sNaPpY"


  • compressed data(chunk_type = 0x00). 包含实际数据。body组成:crc32校验码(4B) | compressed_data. 其中crc32校验码为使用raw snappy codec解压compressed_data之后得到的uncompressed_data的crc校验码。注意:size(compressed_data) <= 2^24-1, size(uncompressed_data) <= 65535


  • uncompressed_data(chunk_type = 0x01). 包含实际数据。body组成: crc32校验码(4B) | uncompressed_data. 其中crc32校验码为之后的uncompressed_data的crc校验码。注意:size(uncompressed_data) <= 65535


  • padding(0xfe). 主要用于补零对齐。body = 00000..


  • Reserved unskippable chunks (chunk_types介于0x02-0x7f) 预留给未来扩展的的chunk type。解码器遇到这种chunk type应该立即报错


  • Reserved skippable chunks (chunk types介于0x80-0xfd) 预留给未来扩展的chunk type, 解码器遇到这种chunk应该立即跳过,


crc32校验算法如下:


def _masked_crc32c(data):
    # see the framing format specification
    crc = _crc32c(data)
    return (((crc >> 15) | (crc << 17)) + 0xa282ead8) & 0xffffffff


这种编码格式保证了两个经过stream snappy编码的文件通过linux cat命令合并之后,仍然是合法的压缩文件。



snappy python库中解压算法实现:

def decompress(self, data):
    """Decompress 'data', returning a string containing the uncompressed
    data corresponding to at least part of the data in string. This data
    should be concatenated to the output produced by any preceding calls to
    the decompress() method. Some of the input data may be preserved in
    internal buffers for later processing.
    """
    self._buf.extend(data)
    uncompressed = bytearray()
    while True:
        if len(self._buf) < 4:
            return bytes(uncompressed)
        chunk_type = struct.unpack("<L", self._buf[:4])[0]
        size = (chunk_type >> 8)
        chunk_type &= 0xff
        if not self._header_found:
            if (chunk_type != _IDENTIFIER_CHUNK or
                    size != len(_STREAM_IDENTIFIER)):
                raise UncompressError("stream missing snappy identifier")
            self._header_found = True
        if (_RESERVED_UNSKIPPABLE[0] <= chunk_type and
                chunk_type < _RESERVED_UNSKIPPABLE[1]):
            raise UncompressError(
                "stream received unskippable but unknown chunk")
        if len(self._buf) < 4 + size:
            return bytes(uncompressed)
        chunk, self._buf = self._buf[4:4 + size], self._buf[4 + size:]
        if chunk_type == _IDENTIFIER_CHUNK:
            if chunk != _STREAM_IDENTIFIER:
                raise UncompressError(
                    "stream has invalid snappy identifier")
            continue
        if (_RESERVED_SKIPPABLE[0] <= chunk_type and
                chunk_type < _RESERVED_SKIPPABLE[1]):
            continue
        assert chunk_type in (_COMPRESSED_CHUNK, _UNCOMPRESSED_CHUNK)
        crc, chunk = chunk[:4], chunk[4:]
        if chunk_type == _COMPRESSED_CHUNK:
            chunk = _uncompress(chunk)
        if struct.pack("<L", _masked_crc32c(chunk)) != crc:
            raise UncompressError("crc mismatch")
        uncompressed += chunk



执行流程如下:


  • 首先判断第一个chunk的类型是否为stream identifier, 如果不是,说明格式非法,返回错误。如果是,校验chunk body是否为“sNaPpY”


  • 判断chunk类型是否为reserved unskippable, 如果是,说明格式非法,返回错误


  • 判断chunk类型是否为reserved skippable, 如果是,则跳过当前chunk


  • 对chunk body进行crc校验


  • 如果chunk类型是uncompressed_data, 则将uncompressed_data追加到解压结果当中


  • 如果chunk类型是compressed_data, 则使用raw snappy codec对compressed_data解压,得到uncompressed_data追加到解压结果中


  • 当前chunk处理完成后,接着下一个,直到文件结束




hadoop stream snappy codec


hadoop stream snappy codec主要用于hdfs file的snappy编解码场景。其格式相比stream snappy codec更为简单


它由多个block组成:block | ... |block


每个block的格式:total_len(4字节)|compressed_len(4字

节)|compressed_data(compressed_len长度,变长)| compressed_len | compressed_data



其中


  • total_len: 当前block中解压结果的总长度


  • compressed_len: 对应的compressed data长度


  • compressed_data: 压缩数据


snappy python库中解压算法如下:


def decompress(self, data):
        """Decompress 'data', returning a string containing the uncompressed
        data corresponding to at least part of the data in string. This data
        should be concatenated to the output produced by any preceding calls to
        the decompress() method. Some of the input data may be preserved in
        internal buffers for later processing.
        """
        int_size = _INT_SIZE
        self._buf += data
        uncompressed = []
        while True:
            if len(self._buf) < int_size:
                return b"".join(uncompressed)
            next_start = 0
            if not self._block_length:
                self._block_length = unpack_int(self._buf[:int_size])
                self._buf = self._buf[int_size:]
                if len(self._buf) < int_size:
                    return b"".join(uncompressed)
            compressed_length = unpack_int(
                self._buf[next_start:next_start + int_size]
            )
            next_start += int_size
            if len(self._buf) < compressed_length + next_start:
                return b"".join(uncompressed)
            chunk = self._buf[
                next_start:next_start + compressed_length
            ]
            self._buf = self._buf[next_start + compressed_length:]
            uncompressed_chunk = _uncompress(chunk)
            self._uncompressed_length += len(uncompressed_chunk)
            uncompressed.append(uncompressed_chunk)
            if self._uncompressed_length == self._block_length:
                # Here we have uncompressed all subblocks of the current block
                self._uncompressed_length = 0
                self._block_length = 0
                continue


代码逻辑相对简单,流程如下:


  • 读取block_length, 并初始化uncompressed_length, 表示当前解压结果的长度


  • 读取compressed_length和compressed_data, 使用raw snappy codec解压,得到uncompressed_data, append到输出中,并更新uncompressed_length.


  • 当uncompressed_length达到block_length时,表示当前block处理完毕,重置内部状态,准备处理下一个block,  如此循环往复,直到文件结束
相关文章
|
分布式计算 Hadoop
30 MAPREDUCE数据压缩
30 MAPREDUCE数据压缩
55 0
|
3月前
|
消息中间件 Kafka
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
|
7月前
|
存储 分布式计算 算法
MapReduce【数据压缩】
MapReduce【数据压缩】
|
存储 算法 Java
解压缩流和压缩流
解压缩流和压缩流
|
测试技术
如何使用AMR M分析rtp流中的amr语音
笔者分享最近找到一款amr工具amr master。该工具结合wireshark可以网卡包中的amr语音流转出.amr文件,采用vlc player可以进行播放。
如何使用AMR M分析rtp流中的amr语音
|
存储 编解码 API
FFmpeg编解码处理3-视频编码
基于 FFmpeg 4.1 版本。
344 0
FFmpeg编解码处理3-视频编码
|
编解码 缓存 API
|
存储 编解码 API
FFmpeg编解码处理4-音频编码
基于 FFmpeg 4.1 版本。
377 0
FFmpeg编解码处理4-音频编码