使用E-MapReduce服务将Kafka数据导入OSS

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: kafka是一个开源社区常用的消息队列,虽然kafka官方(Confluent公司)提供插件从Kafka直接导入数据到HDFS的connector,但对阿里云对文件存储系统OSS却没有官方的支持。本文会举一个简单的例子,实现kafka的数据写入阿里云OSS。因为阿里云E-MapReduce服...

概述

kafka是一个开源社区常用的消息队列,虽然kafka官方(Confluent公司)提供插件从Kafka直接导入数据到HDFS的connector,但对阿里云对文件存储系统OSS却没有官方的支持。本文会举一个简单的例子,实现kafka的数据写入阿里云OSS。因为阿里云E-MapReduce服务集成了大量开源组件和阿里云产品的对接工具,所以本文直接在E-MapReduce集群上运行这个例子。

这个例子使用开源的Flume工具作为中转,将kafka和OSS连接起来。Flume开源组件将来也可能出现在E-MapReduce平台上。

场景举例

下面举一个最简单的例子,如果已经有一个线上的Kafka集群,则可以直接跳到第4步。

  1. 在Kafka Home目录下启动Kafka服务进程,配置文件中Zookeeper的地址配置为E-MapReduce自带的服务地址 emr-header-1:2181
    bin/kafka-server-start.sh config/server.properties
  2. 创建一个Kafka的topic,名字为test
    bin/kafka-topics.sh --create --zookeeper emr-header-1:2181 --replication-factor 1 --partitions 1 --topic test
  3. 向Kafka test topic内写入数据,数据内容为本机的性能监控数据
    vmstat 1 | bin/kafka-console-producer.sh --broker-list emr-header-1:9092 --topic test
  4. 在Flume Home目录下配置并启动Flume服务

新建一个配置文件:conf/kafka-example.conf。其中source指定为kafka的对应topic,sink使用HDFS Sinker,并且路径指定为OSS的路径。因为E-MapReduce服务为我们实现了一个高效的OSS FileSystem(兼容Hadoop FileSystem),所以可以直接指定OSS路径,HDFS Sinker自动将数据写入OSS。

# Name the components on this agent
a1.sources = source1
a1.sinks = oss1
a1.channels = c1

# Describe/configure the source
a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.zookeeperConnect = emr-header-1:2181
a1.sources.source1.topic = test
a1.sources.source1.groupId = flume
a1.sources.source1.channels = c1
a1.sources.source1.interceptors = i1
a1.sources.source1.interceptors.i1.type = timestamp
a1.sources.source1.kafka.consumer.timeout.ms = 100

# Describe the sink
a1.sinks.oss1.type = hdfs
a1.sinks.oss1.hdfs.path = oss://emr-examples/kafka/%{topic}/%y-%m-%d
a1.sinks.oss1.hdfs.rollInterval = 10
a1.sinks.oss1.hdfs.rollSize = 0
a1.sinks.oss1.hdfs.rollCount = 0
a1.sinks.oss1.hdfs.fileType = DataStream

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.oss1.channel = c1

启动Flume服务:

bin/flume-ng agent --conf conf --conf-file conf/kafka-example.conf --name a1 -Dflume.root.logger=INFO,console --classpath '/usr/lib/hadoop-current/share/hadoop/tools/lib/*'

从日志中可以看到Flume HDFS sinker将数据写到了OSS,并且是每10秒钟轮转一次。

2016-12-05 18:41:04,794 (hdfs-oss1-call-runner-1) [INFO - org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:618)] Renaming oss://emr-perform/kafka/test/16-12-05/Flume
Data.1480934454657.tmp to oss://emr-perform/kafka/test/16-12-05/FlumeData.1480934454657
2016-12-05 18:41:04,852 (hdfs-oss1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:382)] Writer callback called.

备注:如果遇到如下 Exception,是因为Flume自带的httpclient jar包和EMR冲突:

Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.NoSuchFieldError: INSTANCE

通过删除Flume自带的 httpclient jar 包可以避免冲突(统一使用EMR Hadoop带的httpclient):
rm ~/apache-flume-1.8.0-bin/lib/httpclient-4.2.1.jar

查看OSS上的结果

$ hadoop fs -ls oss://emr-examples/kafka/test/16-12-05/
Found 6 items
-rw-rw-rw-   1     162691 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934394566
-rw-rw-rw-   1        925 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934407580
-rw-rw-rw-   1       1170 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934418597
-rw-rw-rw-   1       1092 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934430613
-rw-rw-rw-   1       1254 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934443638
-rw-rw-rw-   1        588 2016-12-05 18:41 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934454657

$ hadoop fs -cat oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934443638 
 0  0      0 1911216  50036 1343828    0    0     0     0 1341 2396  1  1 98  0  0
 0  0      0 1896964  50052 1343824    0    0     0   112 1982 2511 15  1 84  0  0
 1  0      0 1896552  50052 1343828    0    0     0    76 2314 3329  3  4 94  0  0
procs -----------memory---------- ---swap-- -----io---- --system-- -----cpu-----
 r  b   swpd   free   buff  cache   si   so    bi    bo   in   cs us sy id wa st
 5  0      0 1903016  50052 1343828    0    0     0     0 2277 3249  2  4 94  0  0
 0  0      0 1902892  50052 1343828    0    0     0     0 1417 2366  5  0 95  0  0
 0  0      0 1902892  50052 1343828    0    0     0     0 1072 2243  0  0 99  0  0
 0  0      0 1902892  50068 1343824    0    0     0   144 1275 2283  1  0 99  0  0
 1  0      0 1903024  50068 1343828    0    0     0    24 1099 2071  1  1 99  0  0
 0  0      0 1903272  50068 1343832    0    0     0     0 1294 2238  1  1 99  0  0
 1  0      0 1903412  50068 1343832    0    0     0     0 1024 2094  1  0 99  0  0
 2  0      0 1903148  50076 1343836    0    0     0    68 1879 2766  1  1 98  0  0
 1  0      0 1903288  50092 1343840    0    0     0    92 1147 2240  1  0 99  0  0
 0  0      0 1902792  50092 1343844    0    0     0    28 1456 2388  1  1 98  0  0

参考资料

  1. http://kafka.apache.org/quickstart
  2. https://www.cloudera.com/documentation/kafka/latest/topics/kafka_flume.html
相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
目录
相关文章
|
1月前
|
数据采集 分布式计算 Hadoop
使用Hadoop MapReduce进行大规模数据爬取
使用Hadoop MapReduce进行大规模数据爬取
|
3月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
144 1
|
3月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
68 1
|
3月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
150 0
|
3月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
63 0
|
3月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
83 0
|
8月前
|
分布式计算 Hadoop
Hadoop系列 mapreduce 原理分析
Hadoop系列 mapreduce 原理分析
92 1
|
3月前
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
143 3
|
7月前
|
分布式计算 Hadoop Java
Hadoop MapReduce编程
该教程指导编写Hadoop MapReduce程序处理天气数据。任务包括计算每个城市ID的最高、最低气温、气温出现次数和平均气温。在读取数据时需忽略表头,且数据应为整数。教程中提供了环境变量设置、Java编译、jar包创建及MapReduce执行的步骤说明,但假设读者已具备基础操作技能。此外,还提到一个扩展练习,通过分区功能将具有相同尾数的数字分组到不同文件。
74 1
|
7月前
|
数据采集 SQL 分布式计算