使用Kafka Connect 同步Kafka数据到日志服务

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,内容安全 1000次 1年
对象存储 OSS,恶意文件检测 1000次 1年
简介: 使用Kafka Connect 同步Kafka数据到日志服务 简介 Kafka作为最流行的消息队列,在业界有这非常广泛的使用。不少用户把日志投递到Kafka之后,再使用其他的软件如ElasticSearch进行分析。

简介

Kafka作为最流行的消息队列,在业界有这非常广泛的使用。不少用户把日志投递到Kafka之后,再使用其他的软件如ElasticSearch进行分析。Kafka Connect 是一个专门用于在Kafka 和其他数据系统直接进行数据搬运插件,如将Kafka数据写入到S3,数据库等。

image


Kafka Connect

阿里云日志服务是一个日志采集,查询分析与可视化的平台,服务于阿里云上数十万用户。借助于日志服务提供的Kafka Connect插件,我们可以使用Kafka Connect 把Kafka里面的数据同步到阿里云日志服务,利用日志服务强大的查询能力与丰富的可视化图表类型,对数据进行分析与结果展示。

环境准备

  1. 如果还没有开通日志服务,前往 日志服务控制台开通。
  2. 准备测试用的 Kafka 集群。
  3. 创建用于访问阿里云日志服务的Access Key。
  4. 在日志服务控制台创建Project 和Logstore,并开启索引。

Kafka Connect安装

下载Kafka 日志服务connect 插件并打包:

git clone https://github.com/liketic/kafka-connect-logservice.git
mvn clean compile assembly:single 

打包之后,在项目根目录下,会生成一个压缩包 target/kafka-connect-logservice-1.0.0-jar-with-dependencies.jar 。 这个文件包含了插件和所有依赖,把这个文件复制到Kafka运行的机器上。

Kafka connect的工作模式分为两种,分别是standalone模式和distributed模式。 standalone模式可以简单理解为只有一个单独的worker,只需在启动时指定配置文件即可。而distributed模式可以启动多个worker,可以水平扩展和failover,插件本身的配置通过REST API的方式传递。这里我们为了演示方便仅演示standalone模式,在生产环境中建议使用distributed模式。更多部署细节可以参考官方文档

启动Connect

1)修改日志服务插件配置文件
在项目目录下config目录内有一个配置文件sink.properties,里面包含了日志服务插件运行所必须的配置信息:

name=LoghubSinkConnector
topics=<Kafka topic>
tasks.max=10
connector.class=com.aliyun.openservices.log.kafka.connect.LoghubSinkConnector
loghub.endpoint=your log service endpoint
loghub.project=your log service project
loghub.logstore=your log service logstore
loghub.accessKeyId=your access key id
loghub.accessKeySecret=your access key secret
loghub.batchSize=1000
format=json

除了放日志服务必需的配置外,还可以指定数据格式。目前日志服务Connector只支持字符串类型的数据,format可以选择 json 或者 raw:

  • json:每条纪录的value作为一个JSON字符串解析,自动提取字段并写入日志服务。
  • raw:每条纪录的value作为一个字段,写入日志服务。

2)修改connect配置文件
在Kafka下载目录下,找到 config/connect-standalone.properties,修改如下配置:

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
plugin.path=

plugin.path即为上文构建的jar 所在目录。在Kafka 下载目录内执行启动命令:

./bin/connect-standalone.sh ./config/connect-standalone.properties <your sink config path>/sink.properties

生成测试数据

git clone https://github.com/liketic/logservice-samples.git
cd logservice-samples

替换其中的Kafka配置:

        String topicName = "mytopic";
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100000; i++) {
            for (int j = 0; j < randomNum(100); j++) {
                String r = generateLog();
                producer.send(new ProducerRecord<>(topicName, r));
            }
            Thread.sleep(randomNum(1000));
        }
        producer.close();

在IDE中运行产生测试数据的程序,会通过Kafka Producer往Kafka中写入一些模拟数据。

写入结果查询

日志服务控制台查看数据写入成功:

image
写入结果

参考资料

日志服务官方文档:https://help.aliyun.com/product/28958.html?spm=a2c4g.750001.list.102.4cc17b13hpRH8b
Kafka Connect 官方文档:https://kafka.apache.org/documentation/#connect

目录
相关文章
|
9天前
|
消息中间件 Kafka API
python之kafka日志
python之kafka日志
14 3
|
26天前
|
消息中间件 存储 监控
Kafka的logs目录下的文件都是什么日志?
Kafka的logs目录下的文件都是什么日志?
40 11
|
1月前
|
SQL 人工智能 运维
在阿里云日志服务轻松落地您的AI模型服务——让您的数据更容易产生洞见和实现价值
您有大量的数据,数据的存储和管理消耗您大量的成本,您知道这些数据隐藏着巨大的价值,但是您总觉得还没有把数据的价值变现出来,对吗?来吧,我们用一系列的案例帮您轻松落地AI模型服务,实现数据价值的变现......
133 3
|
2月前
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
70 0
|
2月前
|
数据库 Java 监控
Struts 2 日志管理化身神秘魔法师,洞察应用运行乾坤,演绎奇幻篇章!
【8月更文挑战第31天】在软件开发中,了解应用运行状况至关重要。日志管理作为 Struts 2 应用的关键组件,记录着每个动作和决策,如同监控摄像头,帮助我们迅速定位问题、分析性能和使用情况,为优化提供依据。Struts 2 支持多种日志框架(如 Log4j、Logback),便于配置日志级别、格式和输出位置。通过在 Action 类中添加日志记录,我们能在开发过程中获取详细信息,及时发现并解决问题。合理配置日志不仅有助于调试,还能分析用户行为,提升应用性能和稳定性。
38 0
|
2月前
|
开发者 前端开发 编解码
Vaadin解锁移动适配新境界:一招制胜,让你的应用征服所有屏幕!
【8月更文挑战第31天】在移动互联网时代,跨平台应用开发备受青睐。作为一款基于Java的Web应用框架,Vaadin凭借其组件化设计和强大的服务器端渲染能力,助力开发者轻松构建多设备适应的Web应用。本文探讨Vaadin与移动设备的适配策略,包括响应式布局、CSS媒体查询、TouchKit插件及服务器端优化,帮助开发者打造美观且实用的移动端体验。通过这些工具和策略的应用,可有效应对屏幕尺寸、分辨率及操作系统的多样性挑战,满足广大移动用户的使用需求。
35 0
|
2月前
|
存储 运维 监控
Entity Framework Core 实现审计日志记录超棒!多种方法助你跟踪数据变化、监控操作,超实用!
【8月更文挑战第31天】在软件开发中,审计日志记录对于跟踪数据变化、监控用户操作及故障排查至关重要。Entity Framework Core (EF Core) 作为强大的对象关系映射框架,提供了多种实现审计日志记录的方法。例如,可以使用 EF Core 的拦截器在数据库操作前后执行自定义逻辑,记录操作类型、时间和执行用户等信息。此外,也可通过在实体类中添加审计属性(如 `CreatedBy`、`CreatedDate` 等),并在保存实体时更新这些属性来记录审计信息。这两种方法都能有效帮助我们追踪数据变更并满足合规性和安全性需求。
22 0
|
2月前
|
SQL 安全 测试技术
【数据守护者必备】SQL数据备份与恢复策略全解析:从全量到日志备份,手把手教你确保企业信息万无一失的实战技巧!
【8月更文挑战第31天】数据库是企业核心业务数据的基石,为防止硬件故障、软件错误或人为失误导致的数据丢失,制定可靠的备份与恢复策略至关重要。本文通过一个在线购物平台的案例,详细介绍了使用 SQL Server 进行全量备份、差异备份及事务日志备份的方法,并演示了如何利用 SQL Server Agent 实现自动化备份任务。此外,还提供了数据恢复的具体步骤和测试建议,确保数据安全与业务连续性。
50 0
|
2月前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
Kubernetes Ubuntu Windows
【Azure K8S | AKS】分享从AKS集群的Node中查看日志的方法(/var/log)
【Azure K8S | AKS】分享从AKS集群的Node中查看日志的方法(/var/log)

相关产品

  • 日志服务
  • 下一篇
    无影云桌面