使用Kafka Connect 同步Kafka数据到日志服务

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
对象存储 OSS,内容安全 1000次 1年
简介: 使用Kafka Connect 同步Kafka数据到日志服务 简介 Kafka作为最流行的消息队列,在业界有这非常广泛的使用。不少用户把日志投递到Kafka之后,再使用其他的软件如ElasticSearch进行分析。

简介

Kafka作为最流行的消息队列,在业界有这非常广泛的使用。不少用户把日志投递到Kafka之后,再使用其他的软件如ElasticSearch进行分析。Kafka Connect 是一个专门用于在Kafka 和其他数据系统直接进行数据搬运插件,如将Kafka数据写入到S3,数据库等。

image


Kafka Connect

阿里云日志服务是一个日志采集,查询分析与可视化的平台,服务于阿里云上数十万用户。借助于日志服务提供的Kafka Connect插件,我们可以使用Kafka Connect 把Kafka里面的数据同步到阿里云日志服务,利用日志服务强大的查询能力与丰富的可视化图表类型,对数据进行分析与结果展示。

环境准备

  1. 如果还没有开通日志服务,前往 日志服务控制台开通。
  2. 准备测试用的 Kafka 集群。
  3. 创建用于访问阿里云日志服务的Access Key。
  4. 在日志服务控制台创建Project 和Logstore,并开启索引。

Kafka Connect安装

下载Kafka 日志服务connect 插件并打包:

git clone https://github.com/liketic/kafka-connect-logservice.git
mvn clean compile assembly:single 

打包之后,在项目根目录下,会生成一个压缩包 target/kafka-connect-logservice-1.0.0-jar-with-dependencies.jar 。 这个文件包含了插件和所有依赖,把这个文件复制到Kafka运行的机器上。

Kafka connect的工作模式分为两种,分别是standalone模式和distributed模式。 standalone模式可以简单理解为只有一个单独的worker,只需在启动时指定配置文件即可。而distributed模式可以启动多个worker,可以水平扩展和failover,插件本身的配置通过REST API的方式传递。这里我们为了演示方便仅演示standalone模式,在生产环境中建议使用distributed模式。更多部署细节可以参考官方文档

启动Connect

1)修改日志服务插件配置文件
在项目目录下config目录内有一个配置文件sink.properties,里面包含了日志服务插件运行所必须的配置信息:

name=LoghubSinkConnector
topics=<Kafka topic>
tasks.max=10
connector.class=com.aliyun.openservices.log.kafka.connect.LoghubSinkConnector
loghub.endpoint=your log service endpoint
loghub.project=your log service project
loghub.logstore=your log service logstore
loghub.accessKeyId=your access key id
loghub.accessKeySecret=your access key secret
loghub.batchSize=1000
format=json

除了放日志服务必需的配置外,还可以指定数据格式。目前日志服务Connector只支持字符串类型的数据,format可以选择 json 或者 raw:

  • json:每条纪录的value作为一个JSON字符串解析,自动提取字段并写入日志服务。
  • raw:每条纪录的value作为一个字段,写入日志服务。

2)修改connect配置文件
在Kafka下载目录下,找到 config/connect-standalone.properties,修改如下配置:

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
plugin.path=

plugin.path即为上文构建的jar 所在目录。在Kafka 下载目录内执行启动命令:

./bin/connect-standalone.sh ./config/connect-standalone.properties <your sink config path>/sink.properties

生成测试数据

git clone https://github.com/liketic/logservice-samples.git
cd logservice-samples

替换其中的Kafka配置:

        String topicName = "mytopic";
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100000; i++) {
            for (int j = 0; j < randomNum(100); j++) {
                String r = generateLog();
                producer.send(new ProducerRecord<>(topicName, r));
            }
            Thread.sleep(randomNum(1000));
        }
        producer.close();

在IDE中运行产生测试数据的程序,会通过Kafka Producer往Kafka中写入一些模拟数据。

写入结果查询

日志服务控制台查看数据写入成功:

image
写入结果

参考资料

日志服务官方文档:https://help.aliyun.com/product/28958.html?spm=a2c4g.750001.list.102.4cc17b13hpRH8b
Kafka Connect 官方文档:https://kafka.apache.org/documentation/#connect

目录
相关文章
|
19小时前
|
消息中间件 Kafka 数据处理
实时计算 Flink版产品使用合集之消费Kafka数据时,实现限流如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2天前
|
定位技术 调度
自定义同步日志系统
自定义同步日志系统
8 2
|
3天前
|
消息中间件 数据采集 分布式计算
【数据采集与预处理】数据接入工具Kafka
【数据采集与预处理】数据接入工具Kafka
17 1
【数据采集与预处理】数据接入工具Kafka
|
4天前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
23 0
|
4天前
|
SQL 消息中间件 Kafka
实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
19 0
|
4天前
|
网络安全 流计算 Python
实时计算 Flink版操作报错合集之Flink sql-client 针对kafka的protobuf格式数据建表,报错:java.lang.ClassNotFoundException 如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
17 1
|
5天前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之遇到报错:Apache Kafka Connect错误如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
68 5
|
6天前
|
消息中间件 关系型数据库 网络安全
实时计算 Flink版操作报错合集之Flink sql-client 针对kafka的protobuf格式数据建表,报错:java.lang.ClassNotFoundException 如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
18 1
|
6天前
|
消息中间件 关系型数据库 Kafka
实时计算 Flink版产品使用合集之想要加快消费 Kafka 数据的速度,该怎么配置参数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
18 2
|
7天前
|
消息中间件 SQL 关系型数据库
实时计算 Flink版产品使用合集之读取kafka数据然后入库到starrocks,出现未知问题如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
16 1

相关产品

  • 日志服务