flume 单机问题解决与架构更改-阿里云开发者社区

开发者社区> 千狼> 正文

flume 单机问题解决与架构更改

简介: [TOC] 引言 今天针对线上生产环境下单机 flume 拉取kafka数据并存储数据入Hdfs 出现大批量数据延迟. 在网上官网各种搜索数据,并结合官网数据,现进行以下总结 1. 线上单机存在问题简述 当前flume拉取kafa数据量并不大 ,根据flume客户端日志 ,每半分钟hdfs文件写入一次数据生成文件 发现问题: **拉取kafka数据过慢** 2.
+关注继续查看

[TOC]

引言

今天针对线上生产环境下单机 flume 拉取kafka数据并存储数据入Hdfs 出现大批量数据延迟. 在网上官网各种搜索数据,并结合官网数据,现进行以下总结

1. 线上单机存在问题简述

当前flume拉取kafa数据量并不大 ,根据flume客户端日志 ,每半分钟hdfs文件写入一次数据生成文件
发现问题:

**拉取kafka数据过慢**

2. 解决思路

  1. 加大kafka拉取数据量
  2. 加大flume中channel,source,sink 各通道的单条数据量
  3. 将flume拉取数据单机版本改成多数据拉取,通过flume-avore-sink-> flume-avore-source 进行数据多数据采取并合并

3 加大kafka拉取数据量

3.1 kafka-source简述

  • flume 输入单线程拉取数据并将数据发送内置channel并通过sink组件进行数据转发和处理,故对于kafka集群多副本方式拉取数据的时候,应适当考虑多个flume节点拉取kafka多副本数据,以避免flume节点在多个kafka集群副本中轮询。加大flume拉取kafka数据的速率。
  • flume-kafka-source 是flume内置的kafka source数据组件,是为了拉取kafka数据,配置如下:
agent.sources = r1
 agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.r1.batchSize = 50000
agent.sources.r1.batchDurationMillis = 2000
agent.sources.r1.kafka.bootstrap.servers = test-hadoop01:9092
agent.sources.r1.kafka.topics = topicTest
agent.sources.r1.kafka.consumer.group.id = groupTest
  • flume-kafka-source 的offset是交由zk集群去维护offset

3.2 kafka-source配置详解

Kafka Source是一个Apache Kafka消费者,它从Kafka主题中读取消息。 如果您正在运行多个Kafka源,则可以使用相同的使用者组配置它们,以便每个源都读取一组唯一的主题分区。

Property Name Default Description
channels 配置的channels 可配置多个channels 后续文章会说到
type org.apache.flume.source.kafka.KafkaSource
kafka.bootstrap.servers 配置kafka集群地址
kafka.consumer.group.id flume 唯一确定的消费者群体。 在多个源或代理中设置相同的ID表示它们是同一个使用者组的一部分
kafka.topics 你需要消费的topic
kafka.topics.regex 正则表达式,用于定义源订阅的主题集。 此属性的优先级高于kafka.topics,如果存在则覆盖kafka.topics
batchSize 1000 一批中写入Channel的最大消息数 (优化项)
batchDurationMillis 1000 将批次写入通道之前的最长时间(以毫秒为单位)只要达到第一个大小和时间,就会写入批次。(优化项)
backoffSleepIncrement 1000 Kafka主题显示为空时触发的初始和增量等待时间。 等待时间将减少对空kafka主题的激进ping操作。 一秒钟是摄取用例的理想选择,但使用拦截器的低延迟操作可能需要较低的值。
maxBackoffSleep 5000 Kafka主题显示为空时触发的最长等待时间。 5秒是摄取用例的理想选择,但使用拦截器的低延迟操作可能需要较低的值。
useFlumeEventFormat false 默认情况下,事件从Kafka主题直接作为字节直接进入事件主体。 设置为true以将事件读取为Flume Avro二进制格式。 与KafkaSink上的相同属性或Kafka Channel上的parseAsFlumeEvent属性一起使用时,这将保留在生成端发送的任何Flume标头。
setTopicHeader true 设置为true时,将检索到的消息的主题存储到标题中,该标题由topicHeader属性定义。
topicHeader topic 如果setTopicHeader属性设置为true,则定义用于存储接收消息主题名称的标题的名称。 如果与Kafka SinktopicHeader属性结合使用,应该小心,以避免在循环中将消息发送回同一主题。
migrateZookeeperOffsets true 如果找不到Kafka存储的偏移量,请在Zookeeper中查找偏移量并将它们提交给Kafka。 这应该是支持从旧版本的Flume无缝Kafka客户端迁移。 迁移后,可以将其设置为false,但通常不需要这样做。 如果未找到Zookeeper偏移量,则Kafka配置kafka.consumer.auto.offset.reset定义如何处理偏移量。 查看[Kafka文档](http://kafka.apache.org/documentation.html#newconsumerconfigs)了解详细信息
kafka.consumer.security.protocol PLAINTEXT 如果使用某种级别的安全性写入Kafka,则设置为SASL_PLAINTEXT,SASL_SSL或SSL。
Other Kafka Consumer Properties 这些属性用于配置Kafka Consumer。 可以使用Kafka支持的任何消费者财产。 唯一的要求是在前缀为“kafka.consumer”的前缀中添加属性名称。 例如:kafka.consumer.auto.offset.reset

注意:
Kafka Source会覆盖两个Kafka使用者参数:source.com将auto.commit.enable设置为“false”,并提交每个批处理。 Kafka源至少保证一次消息检索策略。 源启动时可以存在重复项。 Kafka Source还提供了key.deserializer(org.apache.kafka.common.serialization.StringSerializer)和value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer)的默认值。 不建议修改这些参数。
官方配置示例:

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id
Example for topic subscription by regex

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
# the default kafka.consumer.group.id=flume is used

本案例kafka-source配置

agent.sources = r1
agent.sources.r1.channels=c1
 agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.r1.batchSize = 50000
agent.sources.r1.batchDurationMillis = 2000
agent.sources.r1.kafka.bootstrap.servers = test-hadoop01:9092
agent.sources.r1.kafka.topics = topicTest
agent.sources.r1.kafka.consumer.group.id = groupTest

官网配置文件地址kafka-source

3.3 配置优化

主要是在放入flume-channels 的批量数据加大
更改参数:
agent.sources.r1.batchSize = 50000
agent.sources.r1.batchDurationMillis = 2000
更改解释:

**即每2秒钟拉取 kafka 一批数据 批数据大小为50000  放入到flume-channels 中 。即flume该节点 flume-channels 输入端数据已放大**

更改依据:

  • 需要配置kafka单条数据 broker.conf 中配置 message.max.bytes
  • 当前flume channel sink 组件最大消费能力如何?

4. 加大flume中channel,source,sink 各通道的单条数据量

4.1 source 发送至channels 数据量大小已配置 见 3.3

4.2 channel 配置

Property Name Default Description
type The component type name, needs to be memory
capacity 100 通道中存储的最大事件数 (优化项)
transactionCapacity 100 每个事务通道从源或提供给接收器的最大事件数 (优化项)
keep-alive 3 添加或删除事件的超时(以秒为单位)
byteCapacityBufferPercentage 20 定义byteCapacity与通道中所有事件的估计总大小之间的缓冲区百分比,以计算标头中的数据。 见下文。
byteCapacity see description 允许的最大总字节作为此通道中所有事件的总和。 实现只计算Eventbody,这也是提供byteCapacityBufferPercentage配置参数的原因。 默认为计算值,等于JVM可用的最大内存的80%(即命令行传递的-Xmx值的80%)。 请注意,如果在单个JVM上有多个内存通道,并且它们碰巧保持相同的物理事件(即,如果您使用来自单个源的复制通道选择器),那么这些事件大小可能会因为通道byteCapacity目的而被重复计算。 将此值设置为“0”将导致此值回退到大约200 GB的内部硬限制。

配置 capacity 和 transactionCapacity 值 。默认配置规则为:

$$ channels.capacity >= channels.transactionCapacity >= source.batchSize $$

官方channels配置示例

a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

本案例修改之后的channels 配置

agent.channels.c1.type = memory
agent.channels.c1.capacity=550000
agent.channels.c1.transactionCapacity=520000

5. 将flume拉取数据单机版本改成多数据拉取,通过flume-avore-sink-> flume-avore-source 进行数据多数据采取并合并

5.1 存在问题

通过上续修改会发现单机版本的flume会在多副本kafka轮询造成效率浪费
单机版本flume处理数据时会存在单机瓶颈,单机channels可能最多只能处理最大数据无法扩充
单机flume配置多个数据源不方便,不能适合后续多需求开发

5.2 修改架构

多数据聚合架构

5.3采集节点配置文件

收集节点配置(3台):

agent.sources = r1
agent.channels = c1
agent.sinks = k1

agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.r1.batchSize = 50000
agent.sources.r1.batchDurationMillis = 2000
agent.sources.r1.kafka.bootstrap.servers = qcloud-test-hadoop03:9092
agent.sources.r1.kafka.topics = topicTest
agent.sources.r1.kafka.consumer.group.id = groupTest

agent.channels.c1.type = memory
agent.channels.c1.capacity=550000
agent.channels.c1.transactionCapacity=520000

agent.sinks.k1.type = avro
agent.sinks.k1.hostname = test-hadoop03
agent.sinks.k1.port=4545

agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1

汇总节点配置(1台):

agent.sources = r1
agent.channels = memoryChannel
agent.sinks = hdfsSink


agent.sources.r1.type = avro
agent.sources.r1.bind = ip
agent.sources.r1.port = 4545
agent.sources.r1.batchSize = 100000
agent.sources.r1.batchDurationMillis = 1000


agent.channels.memoryChannel.type=memory
agent.channels.memoryChannel.keep-alive=30
agent.channels.memoryChannel.capacity=120000
agent.channels.memoryChannel.transactionCapacity=100000


agent.sinks.hdfsSink.type=hdfs
agent.sinks.hdfsSink.hdfs.path=hdfs://nameser/data/hm2/%Y-%m-%d-%H
agent.sinks.hdfsSink.hdfs.writeFormat=Text

agent.sinks.hdfsSink.hdfs.rollCount = 0

agent.sinks.hdfsSink.hdfs.rollSize = 134217728

agent.sinks.hdfsSink.hdfs.rollInterval = 60
agent.sinks.hdfsSink.hdfs.fileType=DataStream
agent.sinks.hdfsSink.hdfs.idleTimeout=65
agent.sinks.hdfsSink.hdfs.callTimeout=65000
agent.sinks.hdfsSink.hdfs.threadsPoolSize=300

agent.sinks.hdfsSink.channel = memoryChannel
agent.sources.r1.channels = memoryChannel

5.4 架构注意点

  • 当前架构需要保证聚合节点机器的性能
  • 当前架构新的瓶颈可能会存在存储Hdfs数据时过慢 ,导致聚合节点Channels 占用率居高不下,导致堵塞 。
  • 需要关注avro 自定义source sink 的发送效率

6.flume 监控工具(http)

flume 监控工具总共有三种方式 ,我们这里为方便简单,使用内置http接口监控方式进行操作

6.1 配置

在启动脚本处设置 参数 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545 即可

6.2 访问 地址 :

http://flumeIp:34545

6.3 返回结果示例 和字段解释 :

{
    "CHANNEL.memoryChannel": {
        "ChannelCapacity": "550000",
        "ChannelFillPercentage": "0.18181818181818182",
        "Type": "CHANNEL",
        "ChannelSize": "1000",
        "EventTakeSuccessCount": "33541400",
        "EventTakeAttemptCount": "33541527",
        "StartTime": "1536572886273",
        "EventPutAttemptCount": "33542500",
        "EventPutSuccessCount": "33542500",
        "StopTime": "0"
    },
    "SINK.hdfsSink": {
        "ConnectionCreatedCount": "649",
        "ConnectionClosedCount": "648",
        "Type": "SINK",
        "BatchCompleteCount": "335414",
        "BatchEmptyCount": "27",
        "EventDrainAttemptCount": "33541500",
        "StartTime": "1536572886275",
        "EventDrainSuccessCount": "33541400",
        "BatchUnderflowCount": "0",
        "StopTime": "0",
        "ConnectionFailedCount": "0"
    },
    "SOURCE.avroSource": {
        "EventReceivedCount": "33542500",
        "AppendBatchAcceptedCount": "335425",
        "Type": "SOURCE",
        "EventAcceptedCount": "33542500",
        "AppendReceivedCount": "0",
        "StartTime": "1536572886465",
        "AppendAcceptedCount": "0",
        "OpenConnectionCount": "3",
        "AppendBatchReceivedCount": "335425",
        "StopTime": "0"
    }
}

参数定义:

字段名称 含义 备注
SOURCE.OpenConnectionCount 打开的连接数
SOURCE.TYPE 组件类型
SOURCE.AppendBatchAcceptedCount 追加到channel中的批数量
SOURCE.AppendBatchReceivedCount source端刚刚追加的批数量
SOURCE.EventAcceptedCount 成功放入channel的event数量
SOURCE.AppendReceivedCount source追加目前收到的数量
SOURCE.StartTime(StopTIme) 组件开始时间、结束时间
SOURCE.EventReceivedCount source端成功收到的event数量
SOURCE.AppendAcceptedCount source追加目前放入channel的数量
CHANNEL.EventPutSuccessCount 成功放入channel的event数量
CHANNEL.ChannelFillPercentage 通道使用比例
CHANNEL.EventPutAttemptCount 尝试放入将event放入channel的次数
CHANNEL.ChannelSize 目前在channel中的event数量
CHANNEL.EventTakeSuccessCount 从channel中成功取走的event数量
CHANNEL.ChannelCapacity 通道容量
CHANNEL.EventTakeAttemptCount 尝试从channel中取走event的次数
SINK.BatchCompleteCount 完成的批数量
SINK.ConnectionFailedCount 连接失败数
SINK.EventDrainAttemptCount 尝试提交的event数量
SINK.ConnectionCreatedCount 创建连接数
SINK.Type 组件类型
SINK.BatchEmptyCount 批量取空的数量
SINK.ConnectionClosedCount 关闭连接数量
SINK.EventDrainSuccessCount 成功发送event的数量
SINK.BatchUnderflowCount 正处于批量处理的batch数

参考地址
flume-document : http://flume.apache.org/FlumeUserGuide.html

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
Spring Boot 基础教程:集成 Swagger2,构建强大的 API 文档
主要对 Swagger 进行了简单介绍,并用 Spring Boot 集成 Swagger,同时还进行简单的测试,构建我们自己的 API 接口文档。
6 0
【Elastic Engineering】Elasticsearch:使用 Elasticsearch 在键入时实现类似 Linkedin 的搜索
Elasticsearch:使用 Elasticsearch 在键入时实现类似 Linkedin 的搜索
10 0
看完这篇,你不可能不懂「动态代理」
四个版本迭代,带你一步步模拟JDK动态代理
3 0
Spring Boot 基础教程:使用 Swagger3 生成 API 接口文档
主要介绍如何使用 Spring Boot 集成 Swagger3,构建我们自己的 API 接口文档,并对比了 Swagger2 和 Swagger3 的区别,让我们从 Swagger2 向 Swagger3 过渡更加顺滑。
6 0
Spring Boot Serverless 实战系列 | 性能调优
Spring Boot Serverless 实战系列第四篇来啦,本文将向大家介绍如何对 Serverless 应用进行性能调优。
4 0
相约在冬季实战营——第一期学习报告
冬奥云小宝带你玩转五期实战营 覆盖五大技术方向 “0成本”实践学习 快速晋级为专业的“云上开发者” -- 摘自云起冬季实战营https://developer.aliyun.com/adc/series/wintercamp
13 0
漫谈“架构团队”之组织架构(下)
漫谈“架构团队”之组织架构(下)
4 0
也谈分库分表在实际应用的实践(上)
也谈分库分表在实际应用的实践(上)
4 0
+关注
千狼
常年在 javaWeb 大数据领域中 突破自我 ,希望可以有更多志同道合之士一起交流
12
文章
2
问答
文章排行榜
最热
最新
相关电子书
更多
《2021云上架构与运维峰会演讲合集》
立即下载
《零基础CSS入门教程》
立即下载
《零基础HTML入门教程》
立即下载