使用EMR-Flume将非EMR集群的数据同步至EMR集群的HDFS

简介: E-MapReduce从3.20.0版本开始对EMR-Flume新增了Log Service Source。借助Log Service的Logtail等工具,可以将需要同步的数据实时采集并上传到LogHub,再使用EMR-Flume将LogHub的数据同步至EMR集群的HDFS

1.前言

E-MapReduce从3.20.0版本开始对EMR-Flume新增了Log Service Source。借助Log Service的Logtail等工具,可以将需要同步的数据实时采集并上传到LogHub,再使用EMR-Flume将LogHub的数据同步至EMR集群的HDFS。
本文将介绍使用EMR-Flume实时同步Log Service的数据至EMR集群的HDFS,并根据record timestamp将数据存入HDFS相应的partition中。
有关采集数据到Log Service的LogHub的详细方法及步骤参见采集方式

2.准备工作

创建Hadoop集群,在可选软件中选择Flume,详细步骤参考创建集群

3.配置Flume

3.1 配置source

配置项 说明
type org.apache.flume.source.loghub.LogHubSource
endpoint Lohub的endpoint 如果使用vpc/经典网络的endpoint,要保证与emr集群在同一个region;如果使用公网endpoint,要保证运行Flume agent的节点有公网IP
project Lohub的project
logstore Lohub的logstore
accessKeyId Aliyun的access key id
accessKey Aliyun的access key
useRecordTime true
consumerGroup consumer_1 消费组名称,默认值为consumer_1

配置项说明如下

  1. useRecordTime
    默认值为false。如果header中没有timestamp属性,接收event的时间戳会被加入到header中;
    但是在Flume Agent启停或者同步滞后等情况下,会将数据放入错误的时间分区中。为避免这种情况,可以将该值设置为true,使用数据收集到LogHub的时间作为timestamp。
  2. consumerPosition
    消费组在第一次消费LogHub数据时的位置,默认值为end,即从最近的数据开始消费;
    可以设置的其他值为begin或special。begin表示从最早的数据开始消费;special表示从指定的时间点开始消费,在配置为special时,需要配置startTime为开始消费的时间点,单位是秒。
    首次运行后LogHub服务端会记录消费组的消费点,此时如果想更改consumerPosition,可以清除LogHub的消费组状态,参考消费组状态;或者更改配置consumerGroup为新的消费组。
  3. heartbeatInterval和fetchInOrder
    heartbeatInterval表示消费组与服务端维持心跳的间隔,单位是毫秒,默认为30秒;fetchInOrder表示相同key的数据是否按序消费,默认值为false。
  4. batchSize和batchDurationMillis
    通用的source batch配置,表示触发event写入channel的阈值。
  5. backoffSleepIncrement和maxBackoffSleep
    通用的source sleep配置,表示LogHub没有数据时触发sleep的时间和增量。

3.2配置channel和sink

此处使用memory channel和hdfs sink,hdfs sink配置如下

type hdfs
hdfs.path /tmp/flume-data/loghub/datetime=%y%m%d/hour=%H
hdfs.fileType DataStream
hdfs.rollInterval 3600
hdfs.round true
hdfs.roundValue 60
hdfs.roundUnit minute
hdfs.rollSize 0
hdfs.rollCount 0

memory channel配置如下

type memory
capacity 2000
transactionCapacity 2000

4.运行Flume agent

在Console页面启动Flume agent的具体操作参见Flume使用说明。启动后,可以看到配置的HDFS路径下按照record timestamp存储的日志数据。

1

查看Log Service上的消费组状态

2

目录
相关文章
|
2月前
|
存储 机器学习/深度学习 弹性计算
阿里云EMR数据湖文件系统问题之OSS-HDFS全托管服务的问题如何解决
阿里云EMR数据湖文件系统问题之OSS-HDFS全托管服务的问题如何解决
|
2月前
|
分布式计算 流计算
美团 Flink 大作业部署问题之Checkpoint Replicate Service 跨 HDFS 集群的副本制作是如何实现的
美团 Flink 大作业部署问题之Checkpoint Replicate Service 跨 HDFS 集群的副本制作是如何实现的
|
3月前
|
存储 数据库
zookeeper 集群环境搭建及集群选举及数据同步机制
zookeeper 集群环境搭建及集群选举及数据同步机制
54 2
|
5月前
|
存储 缓存 安全
阿里云EMR数据湖文件系统: 面向开源和云打造下一代 HDFS
本文作者详细地介绍了阿里云EMR数据湖文件系统JindoFS的起源、发展迭代以及性能。
72641 79
|
2月前
|
安全 数据安全/隐私保护
阿里云EMR数据湖文件系统问题之JindoFS的INode定义与HDFS有何不同
阿里云EMR数据湖文件系统问题之JindoFS的INode定义与HDFS有何不同
|
4月前
|
消息中间件 SQL Kafka
实时计算 Flink版产品使用问题之独立集群与hdfs集群不在一起,何配置checkpoint目录为hdfs
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
SQL JSON 数据处理
实时计算 Flink版产品使用问题之把hdfs集群里的core-site.xml hdfs.xml两个文件放到flink/conf/目录下,启动集群说找不到hdfs,该如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
分布式计算 Java Hadoop
HDFS 集群读写压测
在虚拟机中配置集群时,需设置每台服务器网络为百兆,以模拟实际网络环境。使用Hadoop的`TestDFSIO`进行HDFS性能测试,包括写入和读取数据。写测试中,创建11个128MB文件,平均写入速度为3.86 MB/sec,总处理数据量1408 MB,测试时间137.46秒。资源分配合理,传输速度超过单台服务器理论最大值12.5M/s,说明网络资源已充分利用。读测试主要依赖硬盘传输速率,速度快。测试完成后使用`TestDFSIO -clean`删除测试数据。
|
5月前
|
SQL 分布式计算 监控
Flume实时读取本地/目录文件到HDFS
Flume实时读取本地/目录文件到HDFS
92 7
|
5月前
|
分布式计算 资源调度 Hadoop
Hadoop【基础知识 03+04】【Hadoop集群资源管理器yarn】(图片来源于网络)(hadoop fs + hadoop dfs + hdfs dfs 使用举例)
【4月更文挑战第5天】Hadoop【基础知识 03】【Hadoop集群资源管理器yarn】(图片来源于网络)Hadoop【基础知识 04】【HDFS常用shell命令】(hadoop fs + hadoop dfs + hdfs dfs 使用举例)
124 9

热门文章

最新文章

下一篇
无影云桌面