snappy流式编解码总结

简介: snappy流式编解码总结

介绍


snappy是谷歌开源的用于数据快速压缩和解压的程序库,它的目标并非实现最大压缩率,而是同时实现非常高的压缩速度和合理的压缩率。snappy被广泛应用于google内部和开源的项目中,例如Hadoop, LevelDB, Spark。官方库地址:


https://github.com/google/snappy


由于最近的工作需要,发现snappy还实现了两个变种,这两个变种分别实现了snappy算法在本地文件系统和hadoop中的流式编解码。以下分别称之为snappy stream code和hadoop snappy stream codec, 以区分于raw snappy codec.


  • raw snappy codec: 用于对整个文件或整块数据进行编解码.


  • stream snappy codec: 流式编解码。在raw snappy codec上增加了chunk的概念。编解码的最小粒度是chunk。


  • hadoop stream snappy codec: 流式编解码。在raw snappy codec的基础上增加了block的概念。编解码最小粒度是block


那么为什么我们需要snappy流式编解码呢?


因为如果没有流式编解码算法,要对snappy数据进行解压,就必须将数据全部读入到内存,如果文件特别大的话内存可能不够用。因此们需要流式编解码算法,每次从文件流中读取一个frame的数据,解压,再读取,再解压,这样内存占用不至于特别大。


snappy的google官方库并没有实现上述的流式codec, 而python snappy实现了:


https://github.com/andrix/python-snappy


以下是个范例


#/usr/bin/env python
# coding: utf-8
import snappy
text_file = "1.txt"
snappy_file = "1.snappy"
with open (text_file, "r") as input_file:
    uncompressed_data = input_file.read().encode('utf-8')
    # raw snappy codec
    compressed_data = snappy.compress(uncompressed_data)
    assert uncompressed_data == snappy.uncompress(compressed_data)
    # stream snappy codec
    c = snappy.StreamCompressor()
    d = snappy.StreamDecompressor()
    compressed_data = c.compress(uncompressed_data)
    assert uncompressed_data == d.decompress(compressed_data)
    # hadoop stream snappy codec
    c = snappy.hadoop_snappy.StreamCompressor()
    d = snappy.hadoop_snappy.StreamDecompressor()
    compressed_data = c.compress(uncompressed_data)
    assert uncompressed_data == d.decompress(compressed_data)



raw snappy codec


算法比较复杂,可参考:https://zzjw.cc/post/snappy/



stream snappy codec


使用stream snappy codec算法压缩的文件由一连串的chunk组成: chunk | chunk | chunk | ... | chunk


每一个chunk的组成:header | body


chunk header的组成: chunk_type(1B) | chunk_size(3B)    其中chunk_size是chunk中body部分的长度,以byte为单位


chunk_type取值如下


  • stream identifier(chunk_type = 0xff). 压缩文件中, 第一个chunk必须是该类型。chunk size = 6, body = "sNaPpY"


  • compressed data(chunk_type = 0x00). 包含实际数据。body组成:crc32校验码(4B) | compressed_data. 其中crc32校验码为使用raw snappy codec解压compressed_data之后得到的uncompressed_data的crc校验码。注意:size(compressed_data) <= 2^24-1, size(uncompressed_data) <= 65535


  • uncompressed_data(chunk_type = 0x01). 包含实际数据。body组成: crc32校验码(4B) | uncompressed_data. 其中crc32校验码为之后的uncompressed_data的crc校验码。注意:size(uncompressed_data) <= 65535


  • padding(0xfe). 主要用于补零对齐。body = 00000..


  • Reserved unskippable chunks (chunk_types介于0x02-0x7f) 预留给未来扩展的的chunk type。解码器遇到这种chunk type应该立即报错


  • Reserved skippable chunks (chunk types介于0x80-0xfd) 预留给未来扩展的chunk type, 解码器遇到这种chunk应该立即跳过,


crc32校验算法如下:


def _masked_crc32c(data):
    # see the framing format specification
    crc = _crc32c(data)
    return (((crc >> 15) | (crc << 17)) + 0xa282ead8) & 0xffffffff


这种编码格式保证了两个经过stream snappy编码的文件通过linux cat命令合并之后,仍然是合法的压缩文件。



snappy python库中解压算法实现:

def decompress(self, data):
    """Decompress 'data', returning a string containing the uncompressed
    data corresponding to at least part of the data in string. This data
    should be concatenated to the output produced by any preceding calls to
    the decompress() method. Some of the input data may be preserved in
    internal buffers for later processing.
    """
    self._buf.extend(data)
    uncompressed = bytearray()
    while True:
        if len(self._buf) < 4:
            return bytes(uncompressed)
        chunk_type = struct.unpack("<L", self._buf[:4])[0]
        size = (chunk_type >> 8)
        chunk_type &= 0xff
        if not self._header_found:
            if (chunk_type != _IDENTIFIER_CHUNK or
                    size != len(_STREAM_IDENTIFIER)):
                raise UncompressError("stream missing snappy identifier")
            self._header_found = True
        if (_RESERVED_UNSKIPPABLE[0] <= chunk_type and
                chunk_type < _RESERVED_UNSKIPPABLE[1]):
            raise UncompressError(
                "stream received unskippable but unknown chunk")
        if len(self._buf) < 4 + size:
            return bytes(uncompressed)
        chunk, self._buf = self._buf[4:4 + size], self._buf[4 + size:]
        if chunk_type == _IDENTIFIER_CHUNK:
            if chunk != _STREAM_IDENTIFIER:
                raise UncompressError(
                    "stream has invalid snappy identifier")
            continue
        if (_RESERVED_SKIPPABLE[0] <= chunk_type and
                chunk_type < _RESERVED_SKIPPABLE[1]):
            continue
        assert chunk_type in (_COMPRESSED_CHUNK, _UNCOMPRESSED_CHUNK)
        crc, chunk = chunk[:4], chunk[4:]
        if chunk_type == _COMPRESSED_CHUNK:
            chunk = _uncompress(chunk)
        if struct.pack("<L", _masked_crc32c(chunk)) != crc:
            raise UncompressError("crc mismatch")
        uncompressed += chunk



执行流程如下:


  • 首先判断第一个chunk的类型是否为stream identifier, 如果不是,说明格式非法,返回错误。如果是,校验chunk body是否为“sNaPpY”


  • 判断chunk类型是否为reserved unskippable, 如果是,说明格式非法,返回错误


  • 判断chunk类型是否为reserved skippable, 如果是,则跳过当前chunk


  • 对chunk body进行crc校验


  • 如果chunk类型是uncompressed_data, 则将uncompressed_data追加到解压结果当中


  • 如果chunk类型是compressed_data, 则使用raw snappy codec对compressed_data解压,得到uncompressed_data追加到解压结果中


  • 当前chunk处理完成后,接着下一个,直到文件结束




hadoop stream snappy codec


hadoop stream snappy codec主要用于hdfs file的snappy编解码场景。其格式相比stream snappy codec更为简单


它由多个block组成:block | ... |block


每个block的格式:total_len(4字节)|compressed_len(4字

节)|compressed_data(compressed_len长度,变长)| compressed_len | compressed_data



其中


  • total_len: 当前block中解压结果的总长度


  • compressed_len: 对应的compressed data长度


  • compressed_data: 压缩数据


snappy python库中解压算法如下:


def decompress(self, data):
        """Decompress 'data', returning a string containing the uncompressed
        data corresponding to at least part of the data in string. This data
        should be concatenated to the output produced by any preceding calls to
        the decompress() method. Some of the input data may be preserved in
        internal buffers for later processing.
        """
        int_size = _INT_SIZE
        self._buf += data
        uncompressed = []
        while True:
            if len(self._buf) < int_size:
                return b"".join(uncompressed)
            next_start = 0
            if not self._block_length:
                self._block_length = unpack_int(self._buf[:int_size])
                self._buf = self._buf[int_size:]
                if len(self._buf) < int_size:
                    return b"".join(uncompressed)
            compressed_length = unpack_int(
                self._buf[next_start:next_start + int_size]
            )
            next_start += int_size
            if len(self._buf) < compressed_length + next_start:
                return b"".join(uncompressed)
            chunk = self._buf[
                next_start:next_start + compressed_length
            ]
            self._buf = self._buf[next_start + compressed_length:]
            uncompressed_chunk = _uncompress(chunk)
            self._uncompressed_length += len(uncompressed_chunk)
            uncompressed.append(uncompressed_chunk)
            if self._uncompressed_length == self._block_length:
                # Here we have uncompressed all subblocks of the current block
                self._uncompressed_length = 0
                self._block_length = 0
                continue


代码逻辑相对简单,流程如下:


  • 读取block_length, 并初始化uncompressed_length, 表示当前解压结果的长度


  • 读取compressed_length和compressed_data, 使用raw snappy codec解压,得到uncompressed_data, append到输出中,并更新uncompressed_length.


  • 当uncompressed_length达到block_length时,表示当前block处理完毕,重置内部状态,准备处理下一个block,  如此循环往复,直到文件结束
相关文章
|
Oracle Java 关系型数据库
Oracle jdk 的国内下载镜像
Oracle jdk 的国内下载镜像
55408 0
|
12月前
|
机器学习/深度学习 人工智能 监控
探索人工智能的伦理困境:我们如何确保AI的道德发展?
在人工智能(AI)技术飞速发展的今天,其伦理问题也日益凸显。本文将探讨AI伦理的重要性,分析当前面临的主要挑战,并提出相应的解决策略。我们将通过具体案例和代码示例,深入理解如何在设计和开发过程中嵌入伦理原则,以确保AI技术的健康发展。
699 11
|
4月前
|
监控 数据安全/隐私保护 Python
微信自动抢红包免费版,2025微信抢红包神器,微信红包挂苹果版【python仅供学习】
这个模拟项目包含5个模块:核心监控逻辑、用户界面、配置管理、实用工具和主程序入口
|
安全 API Windows
CreateMutex用法
CreateMutex用法
三个线程交替打印ABC:技术深度解析与实战应用
【8月更文挑战第14天】在并发编程中,实现多个线程之间的精确协同工作是一项既具挑战性又极具实用价值的任务。今天,我们将深入探讨一个经典问题:如何使用三个线程交替打印字符A、B、C,且每个字符连续打印三次,之后循环进行。这个问题不仅考验了我们对线程同步机制的理解,还锻炼了我们在复杂并发场景下的设计能力。
319 0
|
Java 开发者
Java IO流实战技巧:如何优化InputStream/OutputStream和Reader/Writer的使用?
【6月更文挑战第26天】Java IO流优化涉及缓冲、资源管理、字符编码和流式处理。使用Buffered流提高读写效率,如`BufferedInputStream`和`BufferedReader`。确保资源关闭使用try-with-resources,如`try (InputStream is = ...) {...}`。处理文本时指定编码,如`InputStreamReader(is, StandardCharsets.UTF_8)`防止乱码。流式处理大文件,分块读写避免内存溢出,以减少内存占用。这些技巧能提升程序性能和健壮性。
604 0
|
JSON Ubuntu Linux
Linux文本新宠:别再用cat,转投bat的怀抱吧!
在 Linux 操作系统中,有许多强大的命令行工具,它们为用户提供了广泛的功能和灵活性。其中之一就是 bat 命令,这是一个用于查看文件内容的工具,被许多用户认为是 cat 命令的增强版本。
351 1
Linux文本新宠:别再用cat,转投bat的怀抱吧!
|
存储 大数据
大数据集群规划的一点建议
大数据集群规划的一点建议
|
存储 缓存 Linux
linux下定位多线程内存越界问题实践总结
linux下定位多线程内存越界问题实践总结
|
人工智能 小程序 搜索推荐
宠物小程序开发:探索宠物行业的数字化创新之路
随着社会的进步和人们对宠物的热爱,宠物行业正迎来数字化创新的浪潮。宠物小程序作为一种新兴的移动应用形式,以其便捷、互动和个性化的特点,为宠物主人和宠物服务提供了全新的体验。本文将深入探讨宠物小程序开发的专业性和创新性,并探讨其在宠物行业中的应用前景。

热门文章

最新文章