Kafka压缩

简介: 在某些情况下,整个应用的瓶颈不在于CPU或者磁盘,而是受网络带宽的影响。当然你可以选择在业务代码中对每一条消息做压缩处理,之后再发送到kafka中,之后业务消费端再进行解压处理,这种方式对应消息的压缩效率是非常低。

在某些情况下,整个应用的瓶颈不在于CPU或者磁盘,而是受网络带宽的影响。当然你可以选择在业务代码中对每一条消息做压缩处理,之后再发送到kafka中,之后业务消费端再进行解压处理,这种方式对应消息的压缩效率是非常低。而真正有效的压缩是对一批消息进行压缩而不是单独的为每条消息进行压缩。

Kafka(本文是以0.8.2.x的版本做基准的)本身可以支持几种类型的压缩,比如gzip和snappy,更高的版本还支持lz4。默认是none,即不采用任何压缩。开启压缩的方式是在客户端调用的时候设置producer的参数。与压缩有关的参数有:

名称 默认值 在哪使用 描述
compression.type none new producer configs(kafka-client) The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid values are none, gzip, or snappy. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).
compression.codec none kafka-scala-client This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are “none”, “gzip” and “snappy”.
compressed.topics null kafka-scala-client This parameter allows you to set whether compression should be turned on for particular topics. If the compression codec is anything other than NoCompressionCodec, enable compression only for specified topics if any. If the list of compressed topics is empty, then enable the specified compression codec for all topics. If the compression codec is NoCompressionCodec, compression is disabled for all topics

上面表格中提及了kafka-client与kafka-scala-client,这两者之间有什么区别呢?kafka-client是kafka的一个分支,其全部使用java语言来开发kafka的客户端。而kafka-scala-client是使用scala语言开发的客户端,两者之间采用的参数会有所不同,注意区分。

下面演示两个demo来便于区分两者之间的用法。
1.kafka-client的使用demo:

        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("bootstrap.servers", brokerList);
        properties.put("compression.type", "gzip");

        Producer<String,byte[]> producer = new KafkaProducer<String,byte[]>(properties);

        ProducerRecord<String,byte[]> producerRecord = new ProducerRecord<String,byte[]>(topic, "messages".getBytes());
        Future<RecordMetadata> future =  producer.send(producerRecord, new Callback() 
        {
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                System.out.println(metadata.offset());
            }
        });

2.kafka-scala-client的使用demo:

Properties props = new Properties();
props.put("serializer.class", "com.kafka.compression.kafka.MessageEncoder");
props.put("metadata.broker.list", brokerList);
props.put("producer.type", "async");
props.put("compression.codec", "gzip");

kafka.javaapi.producer.Producer<Integer, byte[]> producer = new kafka.javaapi.producer.Producer<Integer,
        byte[]>(new kafka.producer.ProducerConfig(props));
producer.send(new KeyedMessage<Integer, byte[]>(topic, "messages".getBytes()));

下面我们分别采用5个测试样本来进行测试,包括结构化log日志,xls,docx以及随机字符串这4种类型,分别采用Linux gzip工具(gzip -c * > *)、java内置的gzip压缩以及kafka内置的压缩进行测试。(其他的压缩方式可以类推,在此不做赘述。)

查看kafka压缩之后消息的大小,可以通过查看kafka-log文件的大小,没发生一次消息,将前后的文件大小相差即为消息的大小。只需check **.log文件即可。

得到测试接入如下表所示:

Item 1.log 2.log 3.xls 4.docx 随机字符串
原始文件大小 35136B 51354B 412160B 322737B 204800B
linux gzip压缩 930B 3557B 90098B 302378B 166474B
java gzip压缩 924B 3542B 90531B 302687B 166524B
kafka-client 未压缩 35162B 51380B
kafka-client 压缩 980B/981B 3607B
kafka-scala-client sync 未压缩 35162B 51380B
kafka-scala-client sync 压缩 980B/981B 3606B
kafka-scala-client async 未压缩 35162B 51380B 412290B 322841B 204852B
kafka-scala-client async 压缩 980B/981B 3619B 90660B 302803B 166626B
压缩率 约3% 约7% 约22% 约94% 约81%

压缩率=文件压缩后的大小与压缩前的大小之比。压缩率一般是越小越好,但是也得越小,一般解压时间也越长。

通过查看以上测试结果可以发现,kafka的压缩率和linux gzip或者java gzip的压缩率基本相同。

对于不同的测试样本来说,压缩率会展示出天差地别,不过如果业务方可以先将业务本身的消息采用gzip工具压缩,然后可以估算出开启kafka的压缩率,之后再进行决策。

你或许会注意到上表中的kafka压缩之后的消息大小会比其他的压缩方式压缩之后的消息会大一点点,这是因为kafka的日志存储结构造成的,kafka将消息存盘并不只是存储消息本身,而是包含了以下结构:

字段 占用空间大小
offset 8B
message size 4B
crc32 4B
magic 1B
attributes 1B
key length 4B
key kB
payload length 4B
payload nB

对于一个没有key的消息来说,其占用的存储空间比消息体本身多占用=8B+4B+4B+1B+1B+4B+0B+4B=26B的大小。

上面的测试都是单条发送整个测试样本的数据的,如果同时发送多条消息,那么会有什么变化呢?我们知道kafka的发送方式有同步和异步之分,异步的发送方式可以批量发送消息,那么会不会批量压缩消息进而使得消息得到更有效的压缩呢?

我们采用前面的样本1.log进行测试,读取样本中的内容,然后拆分成1KB一条消息进行发送。由于kafka-client没有producer.type的设置,我们这里采用kafka-scala-client进行测试,分为同步和异步:

Item sync async
未压缩 36046B 36046B
压缩 7008B 1578B

由上表数据可知:如果采用async的发送方式,那么消息会批量发送,在存储上会比sync的方式更节省磁盘空间

在同步的发送方式中,kafka-logs中消息存储为:
offset:1 position:0
offset:2 position:xxx
…..
一共分成35(25136B/1024B)条消息存储。
而异步的发送方式中,消息的存储为:
offset:34 posiotion:0
一共分成1条消息存储。
所以可以理解为何async的发送方式比sync的发送方式所呈现的压缩效率更好。

查看log日志的内容: bin/kafka-run-class.sh kafka.tools.DumpLogSegments –files /tmp/kafka-logs/[topicName]-0/00000000000000000000.log –print-data-log

附:Java内置的gzip压缩代码:

package com.kafka.compression.gzip;

import java.io.*;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

/**
 * Created by hidden on 2017/6/5.
 */
public class GzipUtil {

    public static void compress(String inFileName, String outFileName) {
        try {
            System.out.println("Createing the Gzip output Stream.");
            GZIPOutputStream out = null;
            try {
                out = new GZIPOutputStream(new FileOutputStream(outFileName));
            } catch (IOException e) {
                System.out.println("Could not create file: " + outFileName);
                e.printStackTrace();
            }

            System.out.println("Opening the input file.");
            FileInputStream in = null;
            try {
                in = new FileInputStream(inFileName);
            } catch (FileNotFoundException e) {
                System.out.println("File not found: "+inFileName);
                e.printStackTrace();
            }

            System.out.println("Transfering bytes from input file to Gzip format");
            byte[] buf = new byte[1024];
            int len;
            while ((len = in.read(buf)) > 0) {
                out.write(buf, 0, len);
            }
            in.close();
            System.out.println("Completing the Gzip file");
            out.finish();
            out.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void uncompress(String inFileName, String outFileName){
        try {
            System.out.println("Opening the compressed file.");
            GZIPInputStream in = null;
            try {
                in = new GZIPInputStream(new FileInputStream(inFileName));
            } catch (IOException e) {
                e.printStackTrace();
            }

            System.out.println("Open the output file.");
            FileOutputStream out = null;
            try {
                out = new FileOutputStream(outFileName);
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            }


            byte[] buf = new byte[1024];
            int len;
            while ((len = in.read(buf)) > 0) {
                out.write(buf, 0, len);
            }
            out.flush();
            in.close();
            out.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
目录
相关文章
|
1月前
|
存储 消息中间件 大数据
大数据-70 Kafka 高级特性 物理存储 日志存储 日志清理: 日志删除与日志压缩
大数据-70 Kafka 高级特性 物理存储 日志存储 日志清理: 日志删除与日志压缩
39 1
|
2月前
|
消息中间件 Kafka
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
|
消息中间件 负载均衡 算法
【Kafka从入门到成神系列 三】Kafka 生产者消息分区及压缩算法
【Kafka从入门到成神系列 三】Kafka 生产者消息分区及压缩算法
【Kafka从入门到成神系列 三】Kafka 生产者消息分区及压缩算法
|
1月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
1月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
46 1
|
3月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
267 9
|
3月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
67 3
|
3月前
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
133 0
|
3月前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。