使用E-MapReduce服务将Kafka数据导入OSS

本文涉及的产品
EMR Serverless Spark 免费试用,1000 CU*H 有效期3个月
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: kafka是一个开源社区常用的消息队列,虽然kafka官方(Confluent公司)提供插件从Kafka直接导入数据到HDFS的connector,但对阿里云对文件存储系统OSS却没有官方的支持。本文会举一个简单的例子,实现kafka的数据写入阿里云OSS。因为阿里云E-MapReduce服...

概述

kafka是一个开源社区常用的消息队列,虽然kafka官方(Confluent公司)提供插件从Kafka直接导入数据到HDFS的connector,但对阿里云对文件存储系统OSS却没有官方的支持。本文会举一个简单的例子,实现kafka的数据写入阿里云OSS。因为阿里云E-MapReduce服务集成了大量开源组件和阿里云产品的对接工具,所以本文直接在E-MapReduce集群上运行这个例子。

这个例子使用开源的Flume工具作为中转,将kafka和OSS连接起来。Flume开源组件将来也可能出现在E-MapReduce平台上。

场景举例

下面举一个最简单的例子,如果已经有一个线上的Kafka集群,则可以直接跳到第4步。

  1. 在Kafka Home目录下启动Kafka服务进程,配置文件中Zookeeper的地址配置为E-MapReduce自带的服务地址 emr-header-1:2181
    bin/kafka-server-start.sh config/server.properties
  2. 创建一个Kafka的topic,名字为test
    bin/kafka-topics.sh --create --zookeeper emr-header-1:2181 --replication-factor 1 --partitions 1 --topic test
  3. 向Kafka test topic内写入数据,数据内容为本机的性能监控数据
    vmstat 1 | bin/kafka-console-producer.sh --broker-list emr-header-1:9092 --topic test
  4. 在Flume Home目录下配置并启动Flume服务

新建一个配置文件:conf/kafka-example.conf。其中source指定为kafka的对应topic,sink使用HDFS Sinker,并且路径指定为OSS的路径。因为E-MapReduce服务为我们实现了一个高效的OSS FileSystem(兼容Hadoop FileSystem),所以可以直接指定OSS路径,HDFS Sinker自动将数据写入OSS。

# Name the components on this agent
a1.sources = source1
a1.sinks = oss1
a1.channels = c1

# Describe/configure the source
a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.zookeeperConnect = emr-header-1:2181
a1.sources.source1.topic = test
a1.sources.source1.groupId = flume
a1.sources.source1.channels = c1
a1.sources.source1.interceptors = i1
a1.sources.source1.interceptors.i1.type = timestamp
a1.sources.source1.kafka.consumer.timeout.ms = 100

# Describe the sink
a1.sinks.oss1.type = hdfs
a1.sinks.oss1.hdfs.path = oss://emr-examples/kafka/%{topic}/%y-%m-%d
a1.sinks.oss1.hdfs.rollInterval = 10
a1.sinks.oss1.hdfs.rollSize = 0
a1.sinks.oss1.hdfs.rollCount = 0
a1.sinks.oss1.hdfs.fileType = DataStream

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.oss1.channel = c1

启动Flume服务:

bin/flume-ng agent --conf conf --conf-file conf/kafka-example.conf --name a1 -Dflume.root.logger=INFO,console --classpath '/usr/lib/hadoop-current/share/hadoop/tools/lib/*'

从日志中可以看到Flume HDFS sinker将数据写到了OSS,并且是每10秒钟轮转一次。

2016-12-05 18:41:04,794 (hdfs-oss1-call-runner-1) [INFO - org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:618)] Renaming oss://emr-perform/kafka/test/16-12-05/Flume
Data.1480934454657.tmp to oss://emr-perform/kafka/test/16-12-05/FlumeData.1480934454657
2016-12-05 18:41:04,852 (hdfs-oss1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:382)] Writer callback called.

备注:如果遇到如下 Exception,是因为Flume自带的httpclient jar包和EMR冲突:

Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.NoSuchFieldError: INSTANCE

通过删除Flume自带的 httpclient jar 包可以避免冲突(统一使用EMR Hadoop带的httpclient):
rm ~/apache-flume-1.8.0-bin/lib/httpclient-4.2.1.jar

查看OSS上的结果

$ hadoop fs -ls oss://emr-examples/kafka/test/16-12-05/
Found 6 items
-rw-rw-rw-   1     162691 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934394566
-rw-rw-rw-   1        925 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934407580
-rw-rw-rw-   1       1170 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934418597
-rw-rw-rw-   1       1092 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934430613
-rw-rw-rw-   1       1254 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934443638
-rw-rw-rw-   1        588 2016-12-05 18:41 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934454657

$ hadoop fs -cat oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934443638 
 0  0      0 1911216  50036 1343828    0    0     0     0 1341 2396  1  1 98  0  0
 0  0      0 1896964  50052 1343824    0    0     0   112 1982 2511 15  1 84  0  0
 1  0      0 1896552  50052 1343828    0    0     0    76 2314 3329  3  4 94  0  0
procs -----------memory---------- ---swap-- -----io---- --system-- -----cpu-----
 r  b   swpd   free   buff  cache   si   so    bi    bo   in   cs us sy id wa st
 5  0      0 1903016  50052 1343828    0    0     0     0 2277 3249  2  4 94  0  0
 0  0      0 1902892  50052 1343828    0    0     0     0 1417 2366  5  0 95  0  0
 0  0      0 1902892  50052 1343828    0    0     0     0 1072 2243  0  0 99  0  0
 0  0      0 1902892  50068 1343824    0    0     0   144 1275 2283  1  0 99  0  0
 1  0      0 1903024  50068 1343828    0    0     0    24 1099 2071  1  1 99  0  0
 0  0      0 1903272  50068 1343832    0    0     0     0 1294 2238  1  1 99  0  0
 1  0      0 1903412  50068 1343832    0    0     0     0 1024 2094  1  0 99  0  0
 2  0      0 1903148  50076 1343836    0    0     0    68 1879 2766  1  1 98  0  0
 1  0      0 1903288  50092 1343840    0    0     0    92 1147 2240  1  0 99  0  0
 0  0      0 1902792  50092 1343844    0    0     0    28 1456 2388  1  1 98  0  0

参考资料

  1. http://kafka.apache.org/quickstart
  2. https://www.cloudera.com/documentation/kafka/latest/topics/kafka_flume.html
目录
相关文章
|
12天前
|
存储 监控 调度
阿里云对象存储OSS之间进行数据转移教程
讲解如何在阿里云对象存储OSS之间进行跨账号、跨地域、以及同地域内的数据迁移,包括数据迁移之前的准备工作和实施数据迁移以及一些后续操作
|
4月前
|
存储 人工智能 Kubernetes
AI 场景深度优化!K8s 集群 OSSFS 2.0 存储卷全面升级,高效访问 OSS 数据
阿里云对象存储OSS是一款海量、安全、低成本、高可靠的云存储服务,是用户在云上存储的高性价比选择…
|
14天前
|
存储 安全 数据管理
服务器违规资源被删,数据定时备份OSS 云存储才是 “救命稻草”
在数字化时代,数据已成为企业与个人的核心资产。然而,服务器违规、硬件故障等问题频发,导致数据丢失、业务中断,甚至造成不可挽回的损失。为保障数据安全与业务连续性,定时备份至关重要。阿里云国际站OSS提供高效、可靠的云存储解决方案,支持自动定时备份,帮助用户轻松应对数据风险。本文详解OSS备份操作步骤与注意事项,助你为数据穿上“防护甲”,实现安全无忧存储。
|
4月前
|
存储 人工智能 测试技术
AI 场景深度优化!K8s 集群 OSSFS 2.0 存储卷全面升级,高效访问 OSS 数据
OSSFS 2.0通过轻量化协议设计、协程化技术及FUSE3低级API重构,实现大文件顺序读写与小文件高并发加载的显著提升,在实际测试中表现出高达数十倍的吞吐量增长。适用于机器学习训练、推理等对高带宽低延迟要求严苛的场景,同时支持静态和动态挂载方式,方便用户在ACK集群中部署使用。
405 35
|
6月前
|
存储 Ubuntu 数据管理
使用s3cmd 2.x 与 Cyberduck 管理在 DigitalOcean Spaces 对象存储中的数据
通过 `s3cmd` 2.x 和 Cyberduck,你可以轻松管理 DigitalOcean Spaces 中的数据。`s3cmd` 提供了强大的命令行操作能力,适合脚本化和自动化任务,而 Cyberduck 提供了直观的图形界面,便于日常手动操作。掌握这两种工具的使用方法,将极大提高你的数据管理效率。希望本文能帮助你更好地使用 DigitalOcean Spaces。
119 7
|
9月前
|
数据采集 分布式计算 Hadoop
使用Hadoop MapReduce进行大规模数据爬取
使用Hadoop MapReduce进行大规模数据爬取
|
11月前
|
消息中间件 监控 Ubuntu
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
301 3
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
|
11月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
269 0
|
8月前
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
11月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
361 1