flume 单机问题解决与架构更改

简介: [TOC] 引言 今天针对线上生产环境下单机 flume 拉取kafka数据并存储数据入Hdfs 出现大批量数据延迟. 在网上官网各种搜索数据,并结合官网数据,现进行以下总结 1. 线上单机存在问题简述 当前flume拉取kafa数据量并不大 ,根据flume客户端日志 ,每半分钟hdfs文件写入一次数据生成文件 发现问题: **拉取kafka数据过慢** 2.

[TOC]

引言

今天针对线上生产环境下单机 flume 拉取kafka数据并存储数据入Hdfs 出现大批量数据延迟. 在网上官网各种搜索数据,并结合官网数据,现进行以下总结

1. 线上单机存在问题简述

当前flume拉取kafa数据量并不大 ,根据flume客户端日志 ,每半分钟hdfs文件写入一次数据生成文件
发现问题:

**拉取kafka数据过慢**

2. 解决思路

  1. 加大kafka拉取数据量
  2. 加大flume中channel,source,sink 各通道的单条数据量
  3. 将flume拉取数据单机版本改成多数据拉取,通过flume-avore-sink-> flume-avore-source 进行数据多数据采取并合并

3 加大kafka拉取数据量

3.1 kafka-source简述

  • flume 输入单线程拉取数据并将数据发送内置channel并通过sink组件进行数据转发和处理,故对于kafka集群多副本方式拉取数据的时候,应适当考虑多个flume节点拉取kafka多副本数据,以避免flume节点在多个kafka集群副本中轮询。加大flume拉取kafka数据的速率。
  • flume-kafka-source 是flume内置的kafka source数据组件,是为了拉取kafka数据,配置如下:
agent.sources = r1
 agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.r1.batchSize = 50000
agent.sources.r1.batchDurationMillis = 2000
agent.sources.r1.kafka.bootstrap.servers = test-hadoop01:9092
agent.sources.r1.kafka.topics = topicTest
agent.sources.r1.kafka.consumer.group.id = groupTest
  • flume-kafka-source 的offset是交由zk集群去维护offset

3.2 kafka-source配置详解

Kafka Source是一个Apache Kafka消费者,它从Kafka主题中读取消息。 如果您正在运行多个Kafka源,则可以使用相同的使用者组配置它们,以便每个源都读取一组唯一的主题分区。

Property Name Default Description
channels 配置的channels 可配置多个channels 后续文章会说到
type org.apache.flume.source.kafka.KafkaSource
kafka.bootstrap.servers 配置kafka集群地址
kafka.consumer.group.id flume 唯一确定的消费者群体。 在多个源或代理中设置相同的ID表示它们是同一个使用者组的一部分
kafka.topics 你需要消费的topic
kafka.topics.regex 正则表达式,用于定义源订阅的主题集。 此属性的优先级高于kafka.topics,如果存在则覆盖kafka.topics
batchSize 1000 一批中写入Channel的最大消息数 (优化项)
batchDurationMillis 1000 将批次写入通道之前的最长时间(以毫秒为单位)只要达到第一个大小和时间,就会写入批次。(优化项)
backoffSleepIncrement 1000 Kafka主题显示为空时触发的初始和增量等待时间。 等待时间将减少对空kafka主题的激进ping操作。 一秒钟是摄取用例的理想选择,但使用拦截器的低延迟操作可能需要较低的值。
maxBackoffSleep 5000 Kafka主题显示为空时触发的最长等待时间。 5秒是摄取用例的理想选择,但使用拦截器的低延迟操作可能需要较低的值。
useFlumeEventFormat false 默认情况下,事件从Kafka主题直接作为字节直接进入事件主体。 设置为true以将事件读取为Flume Avro二进制格式。 与KafkaSink上的相同属性或Kafka Channel上的parseAsFlumeEvent属性一起使用时,这将保留在生成端发送的任何Flume标头。
setTopicHeader true 设置为true时,将检索到的消息的主题存储到标题中,该标题由topicHeader属性定义。
topicHeader topic 如果setTopicHeader属性设置为true,则定义用于存储接收消息主题名称的标题的名称。 如果与Kafka SinktopicHeader属性结合使用,应该小心,以避免在循环中将消息发送回同一主题。
migrateZookeeperOffsets true 如果找不到Kafka存储的偏移量,请在Zookeeper中查找偏移量并将它们提交给Kafka。 这应该是支持从旧版本的Flume无缝Kafka客户端迁移。 迁移后,可以将其设置为false,但通常不需要这样做。 如果未找到Zookeeper偏移量,则Kafka配置kafka.consumer.auto.offset.reset定义如何处理偏移量。 查看[Kafka文档](http://kafka.apache.org/documentation.html#newconsumerconfigs)了解详细信息
kafka.consumer.security.protocol PLAINTEXT 如果使用某种级别的安全性写入Kafka,则设置为SASL_PLAINTEXT,SASL_SSL或SSL。
Other Kafka Consumer Properties 这些属性用于配置Kafka Consumer。 可以使用Kafka支持的任何消费者财产。 唯一的要求是在前缀为“kafka.consumer”的前缀中添加属性名称。 例如:kafka.consumer.auto.offset.reset

注意:
Kafka Source会覆盖两个Kafka使用者参数:source.com将auto.commit.enable设置为“false”,并提交每个批处理。 Kafka源至少保证一次消息检索策略。 源启动时可以存在重复项。 Kafka Source还提供了key.deserializer(org.apache.kafka.common.serialization.StringSerializer)和value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer)的默认值。 不建议修改这些参数。
官方配置示例:

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id
Example for topic subscription by regex

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
# the default kafka.consumer.group.id=flume is used

本案例kafka-source配置

agent.sources = r1
agent.sources.r1.channels=c1
 agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.r1.batchSize = 50000
agent.sources.r1.batchDurationMillis = 2000
agent.sources.r1.kafka.bootstrap.servers = test-hadoop01:9092
agent.sources.r1.kafka.topics = topicTest
agent.sources.r1.kafka.consumer.group.id = groupTest

官网配置文件地址kafka-source

3.3 配置优化

主要是在放入flume-channels 的批量数据加大
更改参数:
agent.sources.r1.batchSize = 50000
agent.sources.r1.batchDurationMillis = 2000
更改解释:

**即每2秒钟拉取 kafka 一批数据 批数据大小为50000  放入到flume-channels 中 。即flume该节点 flume-channels 输入端数据已放大**

更改依据:

  • 需要配置kafka单条数据 broker.conf 中配置 message.max.bytes
  • 当前flume channel sink 组件最大消费能力如何?

4. 加大flume中channel,source,sink 各通道的单条数据量

4.1 source 发送至channels 数据量大小已配置 见 3.3

4.2 channel 配置

Property Name Default Description
type The component type name, needs to be memory
capacity 100 通道中存储的最大事件数 (优化项)
transactionCapacity 100 每个事务通道从源或提供给接收器的最大事件数 (优化项)
keep-alive 3 添加或删除事件的超时(以秒为单位)
byteCapacityBufferPercentage 20 定义byteCapacity与通道中所有事件的估计总大小之间的缓冲区百分比,以计算标头中的数据。 见下文。
byteCapacity see description 允许的最大总字节作为此通道中所有事件的总和。 实现只计算Eventbody,这也是提供byteCapacityBufferPercentage配置参数的原因。 默认为计算值,等于JVM可用的最大内存的80%(即命令行传递的-Xmx值的80%)。 请注意,如果在单个JVM上有多个内存通道,并且它们碰巧保持相同的物理事件(即,如果您使用来自单个源的复制通道选择器),那么这些事件大小可能会因为通道byteCapacity目的而被重复计算。 将此值设置为“0”将导致此值回退到大约200 GB的内部硬限制。

配置 capacity 和 transactionCapacity 值 。默认配置规则为:

$$ channels.capacity >= channels.transactionCapacity >= source.batchSize $$

官方channels配置示例

a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

本案例修改之后的channels 配置

agent.channels.c1.type = memory
agent.channels.c1.capacity=550000
agent.channels.c1.transactionCapacity=520000

5. 将flume拉取数据单机版本改成多数据拉取,通过flume-avore-sink-> flume-avore-source 进行数据多数据采取并合并

5.1 存在问题

通过上续修改会发现单机版本的flume会在多副本kafka轮询造成效率浪费
单机版本flume处理数据时会存在单机瓶颈,单机channels可能最多只能处理最大数据无法扩充
单机flume配置多个数据源不方便,不能适合后续多需求开发

5.2 修改架构

多数据聚合架构

5.3采集节点配置文件

收集节点配置(3台):

agent.sources = r1
agent.channels = c1
agent.sinks = k1

agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.r1.batchSize = 50000
agent.sources.r1.batchDurationMillis = 2000
agent.sources.r1.kafka.bootstrap.servers = qcloud-test-hadoop03:9092
agent.sources.r1.kafka.topics = topicTest
agent.sources.r1.kafka.consumer.group.id = groupTest

agent.channels.c1.type = memory
agent.channels.c1.capacity=550000
agent.channels.c1.transactionCapacity=520000

agent.sinks.k1.type = avro
agent.sinks.k1.hostname = test-hadoop03
agent.sinks.k1.port=4545

agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1

汇总节点配置(1台):

agent.sources = r1
agent.channels = memoryChannel
agent.sinks = hdfsSink


agent.sources.r1.type = avro
agent.sources.r1.bind = ip
agent.sources.r1.port = 4545
agent.sources.r1.batchSize = 100000
agent.sources.r1.batchDurationMillis = 1000


agent.channels.memoryChannel.type=memory
agent.channels.memoryChannel.keep-alive=30
agent.channels.memoryChannel.capacity=120000
agent.channels.memoryChannel.transactionCapacity=100000


agent.sinks.hdfsSink.type=hdfs
agent.sinks.hdfsSink.hdfs.path=hdfs://nameser/data/hm2/%Y-%m-%d-%H
agent.sinks.hdfsSink.hdfs.writeFormat=Text

agent.sinks.hdfsSink.hdfs.rollCount = 0

agent.sinks.hdfsSink.hdfs.rollSize = 134217728

agent.sinks.hdfsSink.hdfs.rollInterval = 60
agent.sinks.hdfsSink.hdfs.fileType=DataStream
agent.sinks.hdfsSink.hdfs.idleTimeout=65
agent.sinks.hdfsSink.hdfs.callTimeout=65000
agent.sinks.hdfsSink.hdfs.threadsPoolSize=300

agent.sinks.hdfsSink.channel = memoryChannel
agent.sources.r1.channels = memoryChannel

5.4 架构注意点

  • 当前架构需要保证聚合节点机器的性能
  • 当前架构新的瓶颈可能会存在存储Hdfs数据时过慢 ,导致聚合节点Channels 占用率居高不下,导致堵塞 。
  • 需要关注avro 自定义source sink 的发送效率

6.flume 监控工具(http)

flume 监控工具总共有三种方式 ,我们这里为方便简单,使用内置http接口监控方式进行操作

6.1 配置

在启动脚本处设置 参数 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545 即可

6.2 访问 地址 :

http://flumeIp:34545

6.3 返回结果示例 和字段解释 :

{
    "CHANNEL.memoryChannel": {
        "ChannelCapacity": "550000",
        "ChannelFillPercentage": "0.18181818181818182",
        "Type": "CHANNEL",
        "ChannelSize": "1000",
        "EventTakeSuccessCount": "33541400",
        "EventTakeAttemptCount": "33541527",
        "StartTime": "1536572886273",
        "EventPutAttemptCount": "33542500",
        "EventPutSuccessCount": "33542500",
        "StopTime": "0"
    },
    "SINK.hdfsSink": {
        "ConnectionCreatedCount": "649",
        "ConnectionClosedCount": "648",
        "Type": "SINK",
        "BatchCompleteCount": "335414",
        "BatchEmptyCount": "27",
        "EventDrainAttemptCount": "33541500",
        "StartTime": "1536572886275",
        "EventDrainSuccessCount": "33541400",
        "BatchUnderflowCount": "0",
        "StopTime": "0",
        "ConnectionFailedCount": "0"
    },
    "SOURCE.avroSource": {
        "EventReceivedCount": "33542500",
        "AppendBatchAcceptedCount": "335425",
        "Type": "SOURCE",
        "EventAcceptedCount": "33542500",
        "AppendReceivedCount": "0",
        "StartTime": "1536572886465",
        "AppendAcceptedCount": "0",
        "OpenConnectionCount": "3",
        "AppendBatchReceivedCount": "335425",
        "StopTime": "0"
    }
}

参数定义:

字段名称 含义 备注
SOURCE.OpenConnectionCount 打开的连接数
SOURCE.TYPE 组件类型
SOURCE.AppendBatchAcceptedCount 追加到channel中的批数量
SOURCE.AppendBatchReceivedCount source端刚刚追加的批数量
SOURCE.EventAcceptedCount 成功放入channel的event数量
SOURCE.AppendReceivedCount source追加目前收到的数量
SOURCE.StartTime(StopTIme) 组件开始时间、结束时间
SOURCE.EventReceivedCount source端成功收到的event数量
SOURCE.AppendAcceptedCount source追加目前放入channel的数量
CHANNEL.EventPutSuccessCount 成功放入channel的event数量
CHANNEL.ChannelFillPercentage 通道使用比例
CHANNEL.EventPutAttemptCount 尝试放入将event放入channel的次数
CHANNEL.ChannelSize 目前在channel中的event数量
CHANNEL.EventTakeSuccessCount 从channel中成功取走的event数量
CHANNEL.ChannelCapacity 通道容量
CHANNEL.EventTakeAttemptCount 尝试从channel中取走event的次数
SINK.BatchCompleteCount 完成的批数量
SINK.ConnectionFailedCount 连接失败数
SINK.EventDrainAttemptCount 尝试提交的event数量
SINK.ConnectionCreatedCount 创建连接数
SINK.Type 组件类型
SINK.BatchEmptyCount 批量取空的数量
SINK.ConnectionClosedCount 关闭连接数量
SINK.EventDrainSuccessCount 成功发送event的数量
SINK.BatchUnderflowCount 正处于批量处理的batch数

参考地址
flume-document : http://flume.apache.org/FlumeUserGuide.html

目录
相关文章
|
机器学习/深度学习 Kubernetes 网络协议
K8s单机架构部署
这是我做了很多遍,参考很多文章得到的,为了便于大家参考和学习,我已经把每一步都整理出来了,步骤和提示都很清晰。 后续文档有什么问题那个地方写错了,大家都可以提出来。
1525 1
K8s单机架构部署
|
1月前
|
存储 消息中间件 druid
大数据-150 Apache Druid 安装部署 单机启动 系统架构
大数据-150 Apache Druid 安装部署 单机启动 系统架构
37 1
|
3月前
|
数据采集 存储 Java
Flume Agent 的内部原理分析:深入探讨 Flume 的架构与实现机制
【8月更文挑战第24天】Apache Flume是一款专为大规模日志数据的收集、聚合及传输而设计的分布式、可靠且高可用系统。本文深入解析Flume Agent的核心机制并提供实际配置与使用示例。Flume Agent由三大组件构成:Source(数据源)、Channel(数据缓存)与Sink(数据目的地)。工作流程包括数据采集、暂存及传输。通过示例配置文件和Java代码片段展示了如何设置这些组件以实现日志数据的有效管理。Flume的强大功能与灵活性使其成为大数据处理及实时数据分析领域的优选工具。
104 1
|
5月前
|
存储 缓存 NoSQL
了解Redis,第一弹,什么是RedisRedis主要适用于分布式系统,用来用缓存,存储数据,在内存中存储那么为什么说是分布式呢?什么叫分布式什么是单机架构微服务架构微服务的本质
了解Redis,第一弹,什么是RedisRedis主要适用于分布式系统,用来用缓存,存储数据,在内存中存储那么为什么说是分布式呢?什么叫分布式什么是单机架构微服务架构微服务的本质
|
存储 负载均衡 应用服务中间件
单机架构与主从架构简介
单机架构与主从架构简介
112 0
|
6月前
|
消息中间件 存储 SQL
Flume【基础知识 01】简介 + 基本架构及核心概念 + 架构模式 + Agent内部原理 + 配置格式(一篇即可入门Flume)
【2月更文挑战第18天】Flume【基础知识 01】简介 + 基本架构及核心概念 + 架构模式 + Agent内部原理 + 配置格式(一篇即可入门Flume)
1797 0
|
数据采集 存储 负载均衡
大数据数据采集的数据采集(收集/聚合)的Flume之架构模式的并联Agent
在Flume中,架构模式是数据采集和传输过程中最核心的部分。Flume支持多种不同的架构模式,其中并联Agent架构模式是一种比较常见的模式。
139 0
|
数据采集 存储 监控
大数据数据采集的数据采集(收集/聚合)的Flume之架构模式的串联Agent
在Flume中,架构模式是数据采集和传输过程中最核心的部分。Flume支持多种不同的架构模式,其中串联Agent架构模式是一种比较常见的模式。
108 0
|
数据采集 存储 监控
大数据数据采集的数据采集(收集/聚合)的Flume之架构模式的单Agent
在Flume中,架构模式是数据采集和传输过程中最核心的部分。Flume支持多种不同的架构模式,其中单Agent架构模式是最常见的一种。
121 0
|
关系型数据库 网络安全 数据库
MogDB/openGauss 手动部署(非OM工具)单机,主备,主备级联架构
MogDB/openGauss 手动部署(非OM工具)单机,主备,主备级联架构
392 0