Flume - Kafka日志平台整合

简介: 1. Flume介绍Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

1. Flume介绍

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

image

  • agent
    agent本身是一个Java进程,运行在日志收集节点—所谓日志收集节点就是服务器节点。
    agent里面包含3个核心的组件:source—->channel—–>sink,类似生产者、仓库、消费者的架构。

  • source
    source组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定义。

  • channel
    source组件把数据收集来以后,临时存放在channel中,即channel组件在agent中是专门用来存放临时数据的——对采集到的数据进行简单的缓存,可以存放在memory、jdbc、file等等。

  • sink
    sink组件是用于把数据发送到目的地的组件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、Hbase、solr、自定义。

  • event
    将传输的数据进行封装,是flume传输数据的基本单位,如果是文本文件,通常是一行记录,event也是事务的基本单位。event从source,流向channel,再到sink,本身为一个字节数组,并可携带headers(头信息)信息。event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。

2. Kafka Channel && Kafka Sink

2.1 Kafka channel

Kafka channel可以应用在多样的场景中:

  1. Flume source and sink:
    可以为event提供一个高可靠性和高可用的channel;
  2. Flume source and interceptor but no sink:
    其他应用可以将Fluem event写入kafka topic中;
  3. With Flume sink, but no source:
    提供低延迟、高容错的方式将Fluem event从kafka中写入其他sink,例如:HDFS,HBase或者Solr。
  • Kafka Channel配置

    加粗部分为必填属性。

Property Name Default Description
type The component type name, needs to be org.apache.flume.channel.kafka.KafkaChannel
kafka.bootstrap.servers List of brokers in the Kafka cluster used by the channel This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port
kafka.topic flume-channel Kafka topic which the channel will use
kafka.consumer.group.id flume Consumer group ID the channel uses to register with Kafka. Multiple channels must use the same topic and group to ensure that when one agent fails another can get the data Note that having non-channel consumers with the same ID can lead to data loss.
parseAsFlumeEvent true Expecting Avro datums with FlumeEvent schema in the channel. This should be true if Flume source is writing to the channel and false if other producers are writing into the topic that the channel is using. Flume source messages to Kafka can be parsed outside of Flume by using org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk artifact
migrateZookeeperOffsets true When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka. This should be true to support seamless Kafka client migration from older versions of Flume. Once migrated this can be set to false, though that should generally not be required. If no Zookeeper offset is found the kafka.consumer.auto.offset.reset configuration defines how offsets are handled.
pollTimeout 500 The amount of time(in milliseconds) to wait in the “poll()” call of the consumer. https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)
defaultPartitionId Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless overriden by partitionIdHeader. By default, if this property is not set, events will be distributed by the Kafka Producer’s partitioner - including by key if specified (or by a partitioner specified by kafka.partitioner.class).
partitionIdHeader When set, the producer will take the value of the field named using the value of this property from the event header and send the message to the specified partition of the topic. If the value represents an invalid partition the event will not be accepted into the channel. If the header value is present then this setting overrides defaultPartitionId.
kafka.consumer.auto.offset.reset latest What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): earliest: automatically reset the offset to the earliest offset latest: automatically reset the offset to the latest offset none: throw exception to the consumer if no previous offset is found for the consumer’s group anything else: throw exception to the consumer.
kafka.producer.security.protocol PLAINTEXT Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup.
kafka.consumer.security.protocol PLAINTEXT Same as kafka.producer.security.protocol but for reading/consuming from Kafka.
more producer/consumer security props If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additional properties that need to be set on producer/consumer.

2.2 Kafka Sink

Flume 支持将数据发布到一个kafka topic。目前支持Kafka 0.9.x版本。

  • KafkaSink 配置

    加粗部分为必填配置

Property Name Default Description
type Must be set to org.apache.flume.sink.kafka.KafkaSink
kafka.bootstrap.servers List of brokers Kafka-Sink will connect to, to get the list of topic partitions This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port
kafka.topic default-flume-topic The topic in Kafka to which the messages will be published. If this parameter is configured, messages will be published to this topic. If the event header contains a “topic” field, the event will be published to that topic overriding the topic configured here. Arbitrary header substitution is supported, eg. %{header} is replaced with value of event header named “header”. (If using the substitution, it is recommended to set “auto.create.topics.enable” property of Kafka broker to true.)
flumeBatchSize 100 How many messages to process in one batch. Larger batches improve throughput while adding latency.
kafka.producer.acks 1 How many replicas must acknowledge a message before its considered successfully written. Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas) Set this to -1 to avoid data loss in some cases of leader failure.
useFlumeEventFormat false By default events are put as bytes onto the Kafka topic directly from the event body. Set to true to store events as the Flume Avro binary format. Used in conjunction with the same property on the KafkaSource or with the parseAsFlumeEvent property on the Kafka Channel this will preserve any Flume headers for the producing side.
defaultPartitionId Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless overriden by partitionIdHeader. By default, if this property is not set, events will be distributed by the Kafka Producer’s partitioner - including by key if specified (or by a partitioner specified by kafka.partitioner.class).
partitionIdHeader When set, the sink will take the value of the field named using the value of this property from the event header and send the message to the specified partition of the topic. If the value represents an invalid partition, an EventDeliveryException will be thrown. If the header value is present then this setting overrides defaultPartitionId.
allowTopicOverride true When set, the sink will allow a message to be produced into a topic specified by the topicHeader property (if provided).
topicHeader topic When set in conjunction with allowTopicOverride will produce a message into the value of the header named using the value of this property. Care should be taken when using in conjunction with the Kafka Source topicHeader property to avoid creating a loopback.
kafka.producer.security.protocol PLAINTEXT Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup.
more producer security props If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additional properties that need to be set on producer.
Other Kafka Producer Properties These properties are used to configure the Kafka Producer. Any producer property supported by Kafka can be used. The only requirement is to prepend the property name with the prefix kafka.producer. For example: kafka.producer.linger.ms

3. Flume - Kafka配置示例

切换到flume/conf目录下,编辑配置文件:

agent.sources = s1
agent.channels = c1
agent.sinks = k1

# Source Config
agent.sources.s1.type = spooldir
agent.sources.s1.channels = c1
agent.sources.s1.bind = 192.168.100.105
agent.sources.s1.port = 9696
agent.sources.s1.includePattern = *.log
agent.sources.s1.spoolDir = /home/usr/tomcat-test/logs

# Sink Config
## 输出到kafka
agent.sinks.s1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.s1.channel = c1
agent.sinks.s1.topic = test_tomcat_logs
agent.sinks.s1.serializer.class = kafka.serializer.StringEncoder
agent.sinks.s1.brokerList = 192.168.100.105:9092


# Channel Config
agent.channels.c1.type = memory
agent.channels.c1.keep-alive = 10
agent.channels.c1.capacity = 65535

很明显,由配置文件可以了解到:

  1. 我们需要读取目录:/home/usr/tomcat-test/logs下日志文件;

  2. flume连接到kafka的地址是 192.168.100.105:9092,注意不要配置出错了;
  3. flume会将采集后的内容输出到Kafka topic 为test_tomcat_logs,所以我们启动zk,kafka后需要打开一个终端消费topic kafkatest的内容。这样就可以看到flume与kafka之间开始工作了。

4. 运行

运行flume直接切换到flume目录执行以下命令即可:

$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

参考资料:
[1] Flume Doc:
http://flume.apache.org/FlumeUserGuide.html#kafka-channel

目录
相关文章
|
10月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
657 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
10月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
301 11
|
5月前
|
数据采集 缓存 大数据
【赵渝强老师】大数据日志采集引擎Flume
Apache Flume 是一个分布式、可靠的数据采集系统,支持从多种数据源收集日志信息,并传输至指定目的地。其核心架构由Source、Channel、Sink三组件构成,通过Event封装数据,保障高效与可靠传输。
354 1
|
6月前
|
消息中间件 监控 Java
Apache Kafka 分布式流处理平台技术详解与实践指南
本文档全面介绍 Apache Kafka 分布式流处理平台的核心概念、架构设计和实践应用。作为高吞吐量、低延迟的分布式消息系统,Kafka 已成为现代数据管道和流处理应用的事实标准。本文将深入探讨其生产者-消费者模型、主题分区机制、副本复制、流处理API等核心机制,帮助开发者构建可靠、可扩展的实时数据流处理系统。
589 4
|
存储 运维 监控
超越传统模型:从零开始构建高效的日志分析平台——基于Elasticsearch的实战指南
【10月更文挑战第8天】随着互联网应用和微服务架构的普及,系统产生的日志数据量日益增长。有效地收集、存储、检索和分析这些日志对于监控系统健康状态、快速定位问题以及优化性能至关重要。Elasticsearch 作为一种分布式的搜索和分析引擎,以其强大的全文检索能力和实时数据分析能力成为日志处理的理想选择。
1017 6
|
12月前
|
监控 Shell Linux
Android调试终极指南:ADB安装+多设备连接+ANR日志抓取全流程解析,覆盖环境变量配置/多设备调试/ANR日志分析全流程,附Win/Mac/Linux三平台解决方案
ADB(Android Debug Bridge)是安卓开发中的重要工具,用于连接电脑与安卓设备,实现文件传输、应用管理、日志抓取等功能。本文介绍了 ADB 的基本概念、安装配置及常用命令。包括:1) 基本命令如 `adb version` 和 `adb devices`;2) 权限操作如 `adb root` 和 `adb shell`;3) APK 操作如安装、卸载应用;4) 文件传输如 `adb push` 和 `adb pull`;5) 日志记录如 `adb logcat`;6) 系统信息获取如屏幕截图和录屏。通过这些功能,用户可高效调试和管理安卓设备。
|
存储 运维 监控
金融场景 PB 级大规模日志平台:中信银行信用卡中心从 Elasticsearch 到 Apache Doris 的先进实践
中信银行信用卡中心每日新增日志数据 140 亿条(80TB),全量归档日志量超 40PB,早期基于 Elasticsearch 构建的日志云平台,面临存储成本高、实时写入性能差、文本检索慢以及日志分析能力不足等问题。因此使用 Apache Doris 替换 Elasticsearch,实现资源投入降低 50%、查询速度提升 2~4 倍,同时显著提高了运维效率。
871 3
金融场景 PB 级大规模日志平台:中信银行信用卡中心从 Elasticsearch 到 Apache Doris 的先进实践
|
存储 消息中间件 大数据
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
416 4
|
存储 消息中间件 大数据
大数据-70 Kafka 高级特性 物理存储 日志存储 日志清理: 日志删除与日志压缩
大数据-70 Kafka 高级特性 物理存储 日志存储 日志清理: 日志删除与日志压缩
261 1
|
存储 消息中间件 大数据
大数据-68 Kafka 高级特性 物理存储 日志存储概述
大数据-68 Kafka 高级特性 物理存储 日志存储概述
153 1