使用EMR-Flume将非EMR集群的数据同步至EMR集群的HDFS

简介: E-MapReduce从3.20.0版本开始对EMR-Flume新增了Log Service Source。借助Log Service的Logtail等工具,可以将需要同步的数据实时采集并上传到LogHub,再使用EMR-Flume将LogHub的数据同步至EMR集群的HDFS

1.前言

E-MapReduce从3.20.0版本开始对EMR-Flume新增了Log Service Source。借助Log Service的Logtail等工具,可以将需要同步的数据实时采集并上传到LogHub,再使用EMR-Flume将LogHub的数据同步至EMR集群的HDFS。
本文将介绍使用EMR-Flume实时同步Log Service的数据至EMR集群的HDFS,并根据record timestamp将数据存入HDFS相应的partition中。
有关采集数据到Log Service的LogHub的详细方法及步骤参见采集方式

2.准备工作

创建Hadoop集群,在可选软件中选择Flume,详细步骤参考创建集群

3.配置Flume

3.1 配置source

配置项 说明
type org.apache.flume.source.loghub.LogHubSource
endpoint Lohub的endpoint 如果使用vpc/经典网络的endpoint,要保证与emr集群在同一个region;如果使用公网endpoint,要保证运行Flume agent的节点有公网IP
project Lohub的project
logstore Lohub的logstore
accessKeyId Aliyun的access key id
accessKey Aliyun的access key
useRecordTime true
consumerGroup consumer_1 消费组名称,默认值为consumer_1

配置项说明如下

  1. useRecordTime
    默认值为false。如果header中没有timestamp属性,接收event的时间戳会被加入到header中;
    但是在Flume Agent启停或者同步滞后等情况下,会将数据放入错误的时间分区中。为避免这种情况,可以将该值设置为true,使用数据收集到LogHub的时间作为timestamp。
  2. consumerPosition
    消费组在第一次消费LogHub数据时的位置,默认值为end,即从最近的数据开始消费;
    可以设置的其他值为begin或special。begin表示从最早的数据开始消费;special表示从指定的时间点开始消费,在配置为special时,需要配置startTime为开始消费的时间点,单位是秒。
    首次运行后LogHub服务端会记录消费组的消费点,此时如果想更改consumerPosition,可以清除LogHub的消费组状态,参考消费组状态;或者更改配置consumerGroup为新的消费组。
  3. heartbeatInterval和fetchInOrder
    heartbeatInterval表示消费组与服务端维持心跳的间隔,单位是毫秒,默认为30秒;fetchInOrder表示相同key的数据是否按序消费,默认值为false。
  4. batchSize和batchDurationMillis
    通用的source batch配置,表示触发event写入channel的阈值。
  5. backoffSleepIncrement和maxBackoffSleep
    通用的source sleep配置,表示LogHub没有数据时触发sleep的时间和增量。

3.2配置channel和sink

此处使用memory channel和hdfs sink,hdfs sink配置如下

type hdfs
hdfs.path /tmp/flume-data/loghub/datetime=%y%m%d/hour=%H
hdfs.fileType DataStream
hdfs.rollInterval 3600
hdfs.round true
hdfs.roundValue 60
hdfs.roundUnit minute
hdfs.rollSize 0
hdfs.rollCount 0

memory channel配置如下

type memory
capacity 2000
transactionCapacity 2000

4.运行Flume agent

在Console页面启动Flume agent的具体操作参见Flume使用说明。启动后,可以看到配置的HDFS路径下按照record timestamp存储的日志数据。

1

查看Log Service上的消费组状态

2

目录
相关文章
|
1月前
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
129 6
|
1月前
|
SQL 分布式计算 监控
Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比
Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比
59 3
|
30天前
|
消息中间件 分布式计算 关系型数据库
大数据-140 - ClickHouse 集群 表引擎详解5 - MergeTree CollapsingMergeTree 与其他数据源 HDFS MySQL
大数据-140 - ClickHouse 集群 表引擎详解5 - MergeTree CollapsingMergeTree 与其他数据源 HDFS MySQL
38 0
|
1月前
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
82 3
|
1月前
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(二)
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(二)
39 3
|
1月前
|
SQL 分布式计算 Hadoop
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
42 2
|
1月前
|
分布式计算 Java Hadoop
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(一)
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(一)
37 2
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
79 0
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
34 0
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
44 0