使用E-MapReduce服务将Kafka数据导入OSS

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: kafka是一个开源社区常用的消息队列,虽然kafka官方(Confluent公司)提供插件从Kafka直接导入数据到HDFS的connector,但对阿里云对文件存储系统OSS却没有官方的支持。本文会举一个简单的例子,实现kafka的数据写入阿里云OSS。因为阿里云E-MapReduce服...

概述

kafka是一个开源社区常用的消息队列,虽然kafka官方(Confluent公司)提供插件从Kafka直接导入数据到HDFS的connector,但对阿里云对文件存储系统OSS却没有官方的支持。本文会举一个简单的例子,实现kafka的数据写入阿里云OSS。因为阿里云E-MapReduce服务集成了大量开源组件和阿里云产品的对接工具,所以本文直接在E-MapReduce集群上运行这个例子。

这个例子使用开源的Flume工具作为中转,将kafka和OSS连接起来。Flume开源组件将来也可能出现在E-MapReduce平台上。

场景举例

下面举一个最简单的例子,如果已经有一个线上的Kafka集群,则可以直接跳到第4步。

  1. 在Kafka Home目录下启动Kafka服务进程,配置文件中Zookeeper的地址配置为E-MapReduce自带的服务地址 emr-header-1:2181
    bin/kafka-server-start.sh config/server.properties
  2. 创建一个Kafka的topic,名字为test
    bin/kafka-topics.sh --create --zookeeper emr-header-1:2181 --replication-factor 1 --partitions 1 --topic test
  3. 向Kafka test topic内写入数据,数据内容为本机的性能监控数据
    vmstat 1 | bin/kafka-console-producer.sh --broker-list emr-header-1:9092 --topic test
  4. 在Flume Home目录下配置并启动Flume服务

新建一个配置文件:conf/kafka-example.conf。其中source指定为kafka的对应topic,sink使用HDFS Sinker,并且路径指定为OSS的路径。因为E-MapReduce服务为我们实现了一个高效的OSS FileSystem(兼容Hadoop FileSystem),所以可以直接指定OSS路径,HDFS Sinker自动将数据写入OSS。

# Name the components on this agent
a1.sources = source1
a1.sinks = oss1
a1.channels = c1

# Describe/configure the source
a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.zookeeperConnect = emr-header-1:2181
a1.sources.source1.topic = test
a1.sources.source1.groupId = flume
a1.sources.source1.channels = c1
a1.sources.source1.interceptors = i1
a1.sources.source1.interceptors.i1.type = timestamp
a1.sources.source1.kafka.consumer.timeout.ms = 100

# Describe the sink
a1.sinks.oss1.type = hdfs
a1.sinks.oss1.hdfs.path = oss://emr-examples/kafka/%{topic}/%y-%m-%d
a1.sinks.oss1.hdfs.rollInterval = 10
a1.sinks.oss1.hdfs.rollSize = 0
a1.sinks.oss1.hdfs.rollCount = 0
a1.sinks.oss1.hdfs.fileType = DataStream

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.oss1.channel = c1

启动Flume服务:

bin/flume-ng agent --conf conf --conf-file conf/kafka-example.conf --name a1 -Dflume.root.logger=INFO,console --classpath '/usr/lib/hadoop-current/share/hadoop/tools/lib/*'

从日志中可以看到Flume HDFS sinker将数据写到了OSS,并且是每10秒钟轮转一次。

2016-12-05 18:41:04,794 (hdfs-oss1-call-runner-1) [INFO - org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:618)] Renaming oss://emr-perform/kafka/test/16-12-05/Flume
Data.1480934454657.tmp to oss://emr-perform/kafka/test/16-12-05/FlumeData.1480934454657
2016-12-05 18:41:04,852 (hdfs-oss1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:382)] Writer callback called.

备注:如果遇到如下 Exception,是因为Flume自带的httpclient jar包和EMR冲突:

Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.NoSuchFieldError: INSTANCE

通过删除Flume自带的 httpclient jar 包可以避免冲突(统一使用EMR Hadoop带的httpclient):
rm ~/apache-flume-1.8.0-bin/lib/httpclient-4.2.1.jar

查看OSS上的结果

$ hadoop fs -ls oss://emr-examples/kafka/test/16-12-05/
Found 6 items
-rw-rw-rw-   1     162691 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934394566
-rw-rw-rw-   1        925 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934407580
-rw-rw-rw-   1       1170 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934418597
-rw-rw-rw-   1       1092 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934430613
-rw-rw-rw-   1       1254 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934443638
-rw-rw-rw-   1        588 2016-12-05 18:41 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934454657

$ hadoop fs -cat oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934443638 
 0  0      0 1911216  50036 1343828    0    0     0     0 1341 2396  1  1 98  0  0
 0  0      0 1896964  50052 1343824    0    0     0   112 1982 2511 15  1 84  0  0
 1  0      0 1896552  50052 1343828    0    0     0    76 2314 3329  3  4 94  0  0
procs -----------memory---------- ---swap-- -----io---- --system-- -----cpu-----
 r  b   swpd   free   buff  cache   si   so    bi    bo   in   cs us sy id wa st
 5  0      0 1903016  50052 1343828    0    0     0     0 2277 3249  2  4 94  0  0
 0  0      0 1902892  50052 1343828    0    0     0     0 1417 2366  5  0 95  0  0
 0  0      0 1902892  50052 1343828    0    0     0     0 1072 2243  0  0 99  0  0
 0  0      0 1902892  50068 1343824    0    0     0   144 1275 2283  1  0 99  0  0
 1  0      0 1903024  50068 1343828    0    0     0    24 1099 2071  1  1 99  0  0
 0  0      0 1903272  50068 1343832    0    0     0     0 1294 2238  1  1 99  0  0
 1  0      0 1903412  50068 1343832    0    0     0     0 1024 2094  1  0 99  0  0
 2  0      0 1903148  50076 1343836    0    0     0    68 1879 2766  1  1 98  0  0
 1  0      0 1903288  50092 1343840    0    0     0    92 1147 2240  1  0 99  0  0
 0  0      0 1902792  50092 1343844    0    0     0    28 1456 2388  1  1 98  0  0

参考资料

  1. http://kafka.apache.org/quickstart
  2. https://www.cloudera.com/documentation/kafka/latest/topics/kafka_flume.html
相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
目录
相关文章
|
20天前
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
4月前
|
消息中间件 监控 Ubuntu
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
138 3
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
|
4月前
|
存储 Java 开发工具
【三方服务集成】最新版 | 阿里云OSS对象存储服务使用教程(包含OSS工具类优化、自定义阿里云OSS服务starter)
阿里云OSS(Object Storage Service)是一种安全、可靠且成本低廉的云存储服务,支持海量数据存储。用户可通过网络轻松存储和访问各类文件,如文本、图片、音频和视频等。使用OSS后,项目中的文件上传业务无需在服务器本地磁盘存储文件,而是直接上传至OSS,由其管理和保障数据安全。此外,介绍了OSS服务的开通流程、Bucket创建、AccessKey配置及环境变量设置,并提供了Java SDK示例代码,帮助用户快速上手。最后,展示了如何通过自定义starter简化工具类集成,实现便捷的文件上传功能。
【三方服务集成】最新版 | 阿里云OSS对象存储服务使用教程(包含OSS工具类优化、自定义阿里云OSS服务starter)
|
4月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
184 1
|
4月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
83 1
|
5月前
|
消息中间件 Java Kafka
windows服务器重装系统之后,Kafka服务如何恢复?
windows服务器重装系统之后,Kafka服务如何恢复?
52 8
|
5月前
|
Java API 对象存储
微服务魔法启动!Spring Cloud与Netflix OSS联手,零基础也能创造服务奇迹!
这段内容介绍了如何使用Spring Cloud和Netflix OSS构建微服务架构。首先,基于Spring Boot创建项目并添加Spring Cloud依赖项。接着配置Eureka服务器实现服务发现,然后创建REST控制器作为API入口。为提高服务稳定性,利用Hystrix实现断路器模式。最后,在启动类中启用Eureka客户端功能。此外,还可集成其他Netflix OSS组件以增强系统功能。通过这些步骤,开发者可以更高效地构建稳定且可扩展的微服务系统。
88 1
|
6月前
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
251 0
|
6月前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。