介绍
snappy是谷歌开源的用于数据快速压缩和解压的程序库,它的目标并非实现最大压缩率,而是同时实现非常高的压缩速度和合理的压缩率。snappy被广泛应用于google内部和开源的项目中,例如Hadoop, LevelDB, Spark。官方库地址:
https://github.com/google/snappy
由于最近的工作需要,发现snappy还实现了两个变种,这两个变种分别实现了snappy算法在本地文件系统和hadoop中的流式编解码。以下分别称之为snappy stream code和hadoop snappy stream codec, 以区分于raw snappy codec.
- raw snappy codec: 用于对整个文件或整块数据进行编解码.
- stream snappy codec: 流式编解码。在raw snappy codec上增加了chunk的概念。编解码的最小粒度是chunk。
- hadoop stream snappy codec: 流式编解码。在raw snappy codec的基础上增加了block的概念。编解码最小粒度是block
那么为什么我们需要snappy流式编解码呢?
因为如果没有流式编解码算法,要对snappy数据进行解压,就必须将数据全部读入到内存,如果文件特别大的话内存可能不够用。因此们需要流式编解码算法,每次从文件流中读取一个frame的数据,解压,再读取,再解压,这样内存占用不至于特别大。
snappy的google官方库并没有实现上述的流式codec, 而python snappy实现了:
https://github.com/andrix/python-snappy
以下是个范例
#/usr/bin/env python # coding: utf-8 import snappy text_file = "1.txt" snappy_file = "1.snappy" with open (text_file, "r") as input_file: uncompressed_data = input_file.read().encode('utf-8') # raw snappy codec compressed_data = snappy.compress(uncompressed_data) assert uncompressed_data == snappy.uncompress(compressed_data) # stream snappy codec c = snappy.StreamCompressor() d = snappy.StreamDecompressor() compressed_data = c.compress(uncompressed_data) assert uncompressed_data == d.decompress(compressed_data) # hadoop stream snappy codec c = snappy.hadoop_snappy.StreamCompressor() d = snappy.hadoop_snappy.StreamDecompressor() compressed_data = c.compress(uncompressed_data) assert uncompressed_data == d.decompress(compressed_data)
raw snappy codec
算法比较复杂,可参考:https://zzjw.cc/post/snappy/
stream snappy codec
使用stream snappy codec算法压缩的文件由一连串的chunk组成: chunk | chunk | chunk | ... | chunk
每一个chunk的组成:header | body
chunk header的组成: chunk_type(1B) | chunk_size(3B) 其中chunk_size是chunk中body部分的长度,以byte为单位
chunk_type取值如下
- stream identifier(chunk_type = 0xff). 压缩文件中, 第一个chunk必须是该类型。chunk size = 6, body = "sNaPpY"
- compressed data(chunk_type = 0x00). 包含实际数据。body组成:crc32校验码(4B) | compressed_data. 其中crc32校验码为使用raw snappy codec解压compressed_data之后得到的uncompressed_data的crc校验码。注意:size(compressed_data) <= 2^24-1, size(uncompressed_data) <= 65535
- uncompressed_data(chunk_type = 0x01). 包含实际数据。body组成: crc32校验码(4B) | uncompressed_data. 其中crc32校验码为之后的uncompressed_data的crc校验码。注意:size(uncompressed_data) <= 65535
- padding(0xfe). 主要用于补零对齐。body = 00000..
- Reserved unskippable chunks (chunk_types介于0x02-0x7f) 预留给未来扩展的的chunk type。解码器遇到这种chunk type应该立即报错
- Reserved skippable chunks (chunk types介于0x80-0xfd) 预留给未来扩展的chunk type, 解码器遇到这种chunk应该立即跳过,
crc32校验算法如下:
def _masked_crc32c(data): # see the framing format specification crc = _crc32c(data) return (((crc >> 15) | (crc << 17)) + 0xa282ead8) & 0xffffffff
这种编码格式保证了两个经过stream snappy编码的文件通过linux cat命令合并之后,仍然是合法的压缩文件。
snappy python库中解压算法实现:
def decompress(self, data): """Decompress 'data', returning a string containing the uncompressed data corresponding to at least part of the data in string. This data should be concatenated to the output produced by any preceding calls to the decompress() method. Some of the input data may be preserved in internal buffers for later processing. """ self._buf.extend(data) uncompressed = bytearray() while True: if len(self._buf) < 4: return bytes(uncompressed) chunk_type = struct.unpack("<L", self._buf[:4])[0] size = (chunk_type >> 8) chunk_type &= 0xff if not self._header_found: if (chunk_type != _IDENTIFIER_CHUNK or size != len(_STREAM_IDENTIFIER)): raise UncompressError("stream missing snappy identifier") self._header_found = True if (_RESERVED_UNSKIPPABLE[0] <= chunk_type and chunk_type < _RESERVED_UNSKIPPABLE[1]): raise UncompressError( "stream received unskippable but unknown chunk") if len(self._buf) < 4 + size: return bytes(uncompressed) chunk, self._buf = self._buf[4:4 + size], self._buf[4 + size:] if chunk_type == _IDENTIFIER_CHUNK: if chunk != _STREAM_IDENTIFIER: raise UncompressError( "stream has invalid snappy identifier") continue if (_RESERVED_SKIPPABLE[0] <= chunk_type and chunk_type < _RESERVED_SKIPPABLE[1]): continue assert chunk_type in (_COMPRESSED_CHUNK, _UNCOMPRESSED_CHUNK) crc, chunk = chunk[:4], chunk[4:] if chunk_type == _COMPRESSED_CHUNK: chunk = _uncompress(chunk) if struct.pack("<L", _masked_crc32c(chunk)) != crc: raise UncompressError("crc mismatch") uncompressed += chunk
执行流程如下:
- 首先判断第一个chunk的类型是否为stream identifier, 如果不是,说明格式非法,返回错误。如果是,校验chunk body是否为“sNaPpY”
- 判断chunk类型是否为reserved unskippable, 如果是,说明格式非法,返回错误
- 判断chunk类型是否为reserved skippable, 如果是,则跳过当前chunk
- 对chunk body进行crc校验
- 如果chunk类型是uncompressed_data, 则将uncompressed_data追加到解压结果当中
- 如果chunk类型是compressed_data, 则使用raw snappy codec对compressed_data解压,得到uncompressed_data追加到解压结果中
- 当前chunk处理完成后,接着下一个,直到文件结束
hadoop stream snappy codec
hadoop stream snappy codec主要用于hdfs file的snappy编解码场景。其格式相比stream snappy codec更为简单
它由多个block组成:block | ... |block
每个block的格式:total_len(4字节)|compressed_len(4字
节)|compressed_data(compressed_len长度,变长)| compressed_len | compressed_data
其中
- total_len: 当前block中解压结果的总长度
- compressed_len: 对应的compressed data长度
- compressed_data: 压缩数据
snappy python库中解压算法如下:
def decompress(self, data): """Decompress 'data', returning a string containing the uncompressed data corresponding to at least part of the data in string. This data should be concatenated to the output produced by any preceding calls to the decompress() method. Some of the input data may be preserved in internal buffers for later processing. """ int_size = _INT_SIZE self._buf += data uncompressed = [] while True: if len(self._buf) < int_size: return b"".join(uncompressed) next_start = 0 if not self._block_length: self._block_length = unpack_int(self._buf[:int_size]) self._buf = self._buf[int_size:] if len(self._buf) < int_size: return b"".join(uncompressed) compressed_length = unpack_int( self._buf[next_start:next_start + int_size] ) next_start += int_size if len(self._buf) < compressed_length + next_start: return b"".join(uncompressed) chunk = self._buf[ next_start:next_start + compressed_length ] self._buf = self._buf[next_start + compressed_length:] uncompressed_chunk = _uncompress(chunk) self._uncompressed_length += len(uncompressed_chunk) uncompressed.append(uncompressed_chunk) if self._uncompressed_length == self._block_length: # Here we have uncompressed all subblocks of the current block self._uncompressed_length = 0 self._block_length = 0 continue
代码逻辑相对简单,流程如下:
- 读取block_length, 并初始化uncompressed_length, 表示当前解压结果的长度
- 读取compressed_length和compressed_data, 使用raw snappy codec解压,得到uncompressed_data, append到输出中,并更新uncompressed_length.
- 当uncompressed_length达到block_length时,表示当前block处理完毕,重置内部状态,准备处理下一个block, 如此循环往复,直到文件结束