1.前言
E-MapReduce从3.20.0版本开始对EMR-Flume新增了Log Service Source。借助Log Service的Logtail等工具,可以将需要同步的数据实时采集并上传到LogHub,再使用EMR-Flume将LogHub的数据同步至EMR集群的HDFS。
本文将介绍使用EMR-Flume实时同步Log Service的数据至EMR集群的HDFS,并根据record timestamp将数据存入HDFS相应的partition中。
有关采集数据到Log Service的LogHub的详细方法及步骤参见采集方式。
2.准备工作
创建Hadoop集群,在可选软件中选择Flume,详细步骤参考创建集群。
3.配置Flume
3.1 配置source
配置项 | 值 | 说明 |
---|---|---|
type | org.apache.flume.source.loghub.LogHubSource | |
endpoint | Lohub的endpoint | 如果使用vpc/经典网络的endpoint,要保证与emr集群在同一个region;如果使用公网endpoint,要保证运行Flume agent的节点有公网IP |
project | Lohub的project | |
logstore | Lohub的logstore | |
accessKeyId | Aliyun的access key id | |
accessKey | Aliyun的access key | |
useRecordTime | true | |
consumerGroup | consumer_1 | 消费组名称,默认值为consumer_1 |
配置项说明如下
- useRecordTime
默认值为false。如果header中没有timestamp属性,接收event的时间戳会被加入到header中;
但是在Flume Agent启停或者同步滞后等情况下,会将数据放入错误的时间分区中。为避免这种情况,可以将该值设置为true,使用数据收集到LogHub的时间作为timestamp。
- consumerPosition
消费组在第一次消费LogHub数据时的位置,默认值为end,即从最近的数据开始消费;
可以设置的其他值为begin或special。begin表示从最早的数据开始消费;special表示从指定的时间点开始消费,在配置为special时,需要配置startTime为开始消费的时间点,单位是秒。
首次运行后LogHub服务端会记录消费组的消费点,此时如果想更改consumerPosition,可以清除LogHub的消费组状态,参考消费组状态;或者更改配置consumerGroup为新的消费组。
- heartbeatInterval和fetchInOrder
heartbeatInterval表示消费组与服务端维持心跳的间隔,单位是毫秒,默认为30秒;fetchInOrder表示相同key的数据是否按序消费,默认值为false。
- batchSize和batchDurationMillis
通用的source batch配置,表示触发event写入channel的阈值。
- backoffSleepIncrement和maxBackoffSleep
通用的source sleep配置,表示LogHub没有数据时触发sleep的时间和增量。
3.2配置channel和sink
此处使用memory channel和hdfs sink,hdfs sink配置如下
type | hdfs |
---|---|
hdfs.path | /tmp/flume-data/loghub/datetime=%y%m%d/hour=%H |
hdfs.fileType | DataStream |
hdfs.rollInterval | 3600 |
hdfs.round | true |
hdfs.roundValue | 60 |
hdfs.roundUnit | minute |
hdfs.rollSize | 0 |
hdfs.rollCount | 0 |
memory channel配置如下
type | memory |
---|---|
capacity | 2000 |
transactionCapacity | 2000 |
4.运行Flume agent
在Console页面启动Flume agent的具体操作参见Flume使用说明。启动后,可以看到配置的HDFS路径下按照record timestamp存储的日志数据。
查看Log Service上的消费组状态