开发者学堂课程【NiFi 知识精讲与项目实战(第三阶段):Kafka 使用-知识点】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/707/detail/12552
Kafka 使用-知识点
内容介绍:
一、简介
二、PublishKafka_0_10
三、ConsumeKafka_0_10
一、简介
Kafka 是一个由 Scala 和 java 编写的高吞吐量的分布式发布订阅消息。它拥有很高的吞吐量、稳定性和扩容能力,在OLTP 和 OLAP 中都会经常使用。使用NiFi可以简单快速的建立起 kafka 的生产者和消费者,而不需要编写繁杂的代码。Kafka 作用非常多,比如解耦和、流量削峰也可以做一些消息队列发送,还可以提高并发能力,接下来看两个与kafka 密切相关的处理器。
二、PublishKafka_0_10
1.描述
0_10代表兼容的版本号,nifi 中不仅支持0.1版本,还支持其它很多版本。因为在大数据服务器上安装的就是0.10版本[01]所以要使用对应版本号的处理器来进行处理。使用 Kafka 0.10.x Producer API 将 FlowFile 的内容作为消息发送到 Apache Kafka。要发送的消息可以是单独的 FlowFiles,也可以使用用户指定的定界符(例如换行符)进行定界。用于获取消息的辅助 NiFi 处理器是 consumeKafka_0_10。
2.属性配置
name |
Default value |
description |
Kafka Brokers |
localhost:9092(IP+端口号) |
大数据集群 Kafka 很有可能也是以集群方式进行安装部署的。集群模式的配置如下 逗号分隔的已知 Kafka Broker 列表,格式为<主机>:<端口 >支持表达式语言:true(仅使用变量注册表进行评估) |
Security Protocol |
纯文本 |
与经纪人通信的协议。对应于Kafka 的"security.protocol"属性。 |
Kerberos Service Name |
与代理 JAAS 文件中配置的 Kafka服务器的主要名称匹配的服务名称。可以在 Kafka 的 JAAS 配置或Kafka 的配置中定义。对应于Kafka 的"'security.protocol'属性,除非选择<Security Protoco1>的 SASL 选项之一,否则它将被忽略。支持表达式语言: true(仅使用变量注册表进行评估) |
|
Kerberos credentials service |
指定应用于 Kerberos 身份验证的Kerberos 凭据控制器服务 |
|
Kerberos Principal |
将用于连接到代理的 Kerberos 主体。如果未设置,则应在bootstrap.conf 文件中定义的 JVM属性中设置 JAAS 配置文 件。该主体将被设置为" sasl.jaas.config" Kafka 的属性。支持表达式语言: true(仅使用变量注册表进行评估) |
|
Kerberos Keytab |
使用变量注册表进行评估)用于连接代理的 Kerberos 密钥表。如果未设置,则应在 bootstrap.conf 文件中定义的 JVM 属性中设置 JAAS配置文 件。该主体将被设置为" sasl.jaas.config" Kafka 的属性。支持表达式语言: true(仅使用变量注册表进行评估) |
|
ssL Context service |
指定用于与 Kafka 通信的 SSL 上下文服务。 |
|
Topic Name |
发送 kfaka 数据时有一个 topic,针对于某一个 topic 去发送消息,所以 topic 也需要进行配置,发送端的 topic,需要和消费端的 topic保持一致,这样才能读取到消息内容。要发布到的 Kafka 主题的名称。支持表达式语言: true(将使用流文件属性和变量注册表进行评估) |
|
Delivery Guarantee |
相当于 kafka 里面的 ask 机制,这个机制是为了确保 kafka 的消息,是性能优先,还是确保消费的优先。指定保证消息发送到 Kafka 的要求。对应于 Kafka 的” acks"属性: <br> Best Effort<br>代表0,性能最快,但是最不安全的机制。 Guarantee Single Node Delivery代表1 <br>Guarantee Replicated Delivery 代表 all,最安全的机制,但是性能最慢。 是 publish 端最核心的参数。 |
三、ConsumeKafka_0_10
1.描述
主要是接收和消耗 Kafka 数据,消耗来自专门针对 Kafka 0.10.xConsumer API 构建的 Apache Kafka 的消息。用于发送消息的辅助 NiFi 处理器是 PublishKafka_0_10。
2.属性配置
在下面的列表中,必需属性的名称以粗体显示。其它任何属性(非粗体)均视为可选。该表还指示所有默认值,以及属性是否支持 NiFi 表达式语言。
名称 |
默认值 |
描述 |
Kafka Brokers |
需要配置 Kafka Broker 的地址。localhost:9092 |
逗号分隔的已知 Kafka Broker 列表,格式为<主机>:<端口>支持表达式语言: true(仅使用变量注册表进行评估) |
security Protocol |
纯文本 |
与经纪人通信的协议。对应于Kafka 的" security.protocol"属性。 |
Kerberos service Name |
Ⅰ 与代理 JAAS 文件中配置的Kafka 服务器的主要名称匹配的服务名称。可以在 Kafka 的 JAAS 配置或 Kafka 的配置中定义。对应于Kafka 的'security.protocol"属性,除非选择<Security Protoco1>的SASL 选项之一,否则它将被忽略。支持表达式语言: true(仅使用变量注册表进行评估) |
|
Kerberos Credentials service |
指定应用于 Kerberos 身份验证的Kerberos 凭据控制器服务 |
|
Topic Name(s) |
要从中提取的 Kafka 主题的名称。如果逗号分隔,则可以提供多个。支持表达式语言: true(仅使用变量注册表进行评估),要与发送端一致。 |
|
Topic Name Format |
Names |
指定提供的主题是逗号分隔的名称列表还是单个正则表达式。<br>names.pattern |
Group ID |
分组消费数据;组 ID 用于标识同一使用者组内的使用者。对应于Kafka 的"group.id"属性。支持表达式语言:true(仅使用变量注册表进行评估) |
|
Offset Reset |
Latest(最后的) |
当 Kafka 中没有初始偏移量或服务器上不再存在当前偏移量时(例如,因为该数据已被删除),使您可以管理条件。对应于 Kafka的'auto.offset.reset"属性。<br>earliest、latest、none指的是偏移量。Earliest 从最开始的数据进行消费,latest 从当前最新的数据进行消费,none 代表不去消费,根据自己的需求进行选择。 |