Kafka修炼日志(二):Connect简明使用教程

简介: Connect是Kafka 0.9版本新增的功能,可以方便的从其它源导入数据到Kafka数据流(指定Topic中),也可以方便的从Kafka数据流(指定Topic中)导出数据到其它源。

Connect是Kafka 0.9版本新增的功能,可以方便的从其它源导入数据到Kafka数据流(指定Topic中),也可以方便的从Kafka数据流(指定Topic中)导出数据到其它源。

     下面结合官方教程详述如何使用File Connector导入数据到Kafka Topic,和导出数据到File:

(1)创建文本文件test.txt,作为其它数据源。

[root@localhost home]# echo -e "connector\ntest" > test.txt

image.gifimage.gif

(2)启动Connect实验脚本,此脚本为官方提供的实验脚本,默认Connector是 File Connector。

[root@localhost kafka_2.12-0.10.2.0]# ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

image.gifimage.gif

出现下方错误,是因为文件位置不对,默认应将test.txt文件建立在Kafka目录下,和bin目录同级。

[2017-03-20 13:36:14,879] WARN Couldn't find file test.txt for FileStreamSourceTask, sleeping to wait for it to be created (org.apache.kafka.connect.file.FileStreamSourceTask:106)

image.gifimage.gif

出现下方错误,是因为Standalone模式Zookeeper会自动停止工作,重启Zookeeper服务器即可,如错误继续出现,重启Kafka服务器即可。

[2017-03-20 13:38:07,832] ERROR Failed to commit offsets for WorkerSourceTask{id=local-file-source-0} (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:112)
[2017-03-20 13:38:22,833] ERROR Failed to flush WorkerSourceTask{id=local-file-source-0}, timed out while waiting for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:304)

image.gifimage.gif

(3)查看导出文件,test.sink.txt,可以看到消费到的消息。

[root@localhost kafka_2.12-0.10.2.0]# cat test.sink.txt
connector
test

image.gifimage.gif

(4)消息已被存储到Topic:connect-test ,也可以启动一个消费者消费消息。

[root@localhost kafka_2.12-0.10.2.0]# ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning &

image.gifimage.gif

消费者消费的消息 :

[root@localhost kafka_2.12-0.10.2.0]# {"schema":{"type":"string","optional":false},"payload":"connector"}
{"schema":{"type":"string","optional":false},"payload":"test"}

image.gifimage.gif

(5)编辑文件test.txt,新增一条消息,由于Connector此时已经启动,可以实时的看到消费者消费到的新消息。

[root@localhost kafka_2.12-0.10.2.0]# echo "Another line" >> test.txt

image.gifimage.gif

新的消息,已被实时消费:

[root@localhost kafka_2.12-0.10.2.0]# {"schema":{"type":"string","optional":false},"payload":"connector"}
{"schema":{"type":"string","optional":false},"payload":"test"}
{"schema":{"type":"string","optional":false},"payload":"Another line"}

image.gifimage.gif

 

本文属作者原创,转贴请声明!

相关文章
|
2月前
|
消息中间件 Kafka
mac 搭建kafka系列教程
mac 搭建kafka系列教程
33 0
mac 搭建kafka系列教程
|
4月前
|
消息中间件 存储 Kafka
阿里 P7 三面凉凉,kafka Borker 日志持久化没答上来
阿里 P7 三面凉凉,kafka Borker 日志持久化没答上来
|
4月前
|
消息中间件 分布式计算 Kafka
亿万级别Kafka演进之路:可靠性+事务+消息中间件+源码+日志
Kafka起初是由LinkedIn公司采用Scala语言开发的-一个多分区、多副本且基于ZooKeeper协调的分布式消息系统,现已被捐献给Apache基金会。目前Kafka已经定位为一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。
|
3月前
|
消息中间件 数据可视化 关系型数据库
ELK7.x日志系统搭建 4. 结合kafka集群完成日志系统
ELK7.x日志系统搭建 4. 结合kafka集群完成日志系统
151 0
|
2月前
|
消息中间件 Kafka Apache
【Kafka专栏】windows搭建Kafka环境 & 详细教程(01)
【Kafka专栏】windows搭建Kafka环境 & 详细教程(01)
|
3月前
|
监控 Android开发 iOS开发
【完整版教程】查看APP崩溃日志
本文介绍了在Android和iOS双端设备上查看APP崩溃日志的方法,包括使用adb命令获取Android崩溃日志以及通过克魔助手工具查看iOS设备的崩溃日志。同时提供了操作步骤和相关代码案例演示。
|
3月前
|
存储 监控 iOS开发
【精品教程】如何查看iOS崩溃日志
当一个应用程序崩溃,会产生一个崩溃报告(crash report),并存储到设备中。崩溃报告描述了应用程序崩溃的条件,通常包含每个执行线程的完整回溯。查看崩溃报告可以帮助我们了解应用程序的崩溃情况,并尝试修复问题。
|
4月前
|
消息中间件 关系型数据库 MySQL
在kafka connect 同步 mysql 主从数据库
在kafka connect 同步 mysql 主从数据库
45 0
|
4月前
|
消息中间件 Kafka Windows
使用Kafka Connect 导入导出数据
使用Kafka Connect 导入导出数据
75 0
|
4月前
|
消息中间件 关系型数据库 MySQL
Kafka Connect :构建强大分布式数据集成方案
Kafka Connect 是 Apache Kafka 生态系统中的关键组件,专为构建可靠、高效的分布式数据集成解决方案而设计。本文将深入探讨 Kafka Connect 的核心架构、使用方法以及如何通过丰富的示例代码解决实际的数据集成挑战。