Flink工作中常用__Kafka SourceAPI

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink工作中常用__Kafka SourceAPI

大家好,我是ChinaManor,直译过来就是中国码农的意思,我希望自己能成为国家复兴道路的铺路人,大数据领域的耕耘者,平凡但不甘于平庸的人。

记录一下工作中可能用的到的FlinkAPI:

4.6Kafka Source

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/index.html

4.6.1API及其版本

Flink 里已经提供了一些绑定的 Connector,例如 Kafka Source 和 Sink,Elasticsearch Sink 等。读写 Kafka、ES、RabbitMQ 时可以直接使用相应 connector 的 API 即可,虽然该部分是Flink 项目源代码里的一部分,但是真正意义上不算作 Flink 引擎相关逻辑,并且该部分没有打包在二进制的发布包里面。所以在提交 Job 时候需要注意, job 代码 jar 包中一定要将相应的connetor 相关类打包进去,否则在提交作业时就会失败,提示找不到相应的类,或初始化某些类异常。

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html

4.6.2参数设置

以下参数都必须/建议设置1.订阅的主题:topic

2.反序列化规则:deserialization

3.消费者属性-集群地址:bootstrap.servers

4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理):groupId

5.消费者属性-offset重置规则,如earliest/latest…:offset

6.动态分区检测:dynamic partition detection

4.6.3Kafka命令

启动Kafka和Zookeeper命令,针对讲师提供虚拟机:

zookeeper-daemon.sh start
kafka-daemon.sh start

●查看当前服务器中的所有topic

/export/server/kafka/bin/kafka-topics.sh --list --bootstrap-server node1.itcast.cn:9092

●创建topic

/export/server/kafka/bin/kafka-topics.sh --create --topic flink-topic \
--bootstrap-server node1.itcast.cn:9092 --replication-factor 1 --partitions 3

●查看某个Topic的详情

/export/server/kafka/bin/kafka-topics.sh --describe --topic flink-topic \
--bootstrap-server node1.itcast.cn:9092

●删除topic

/export/server/kafka/bin/kafka-topics.sh --delete --topic flink-topic \
--bootstrap-server node1.itcast.cn:9092

●发送消息

/export/server/kafka/bin/kafka-console-producer.sh --topic flink-topic \
--broker-list node1.itcast.cn:9092

●消费消息

/export/server/kafka/bin/kafka-console-consumer.sh --topic flink-topic \
--bootstrap-server node1.itcast.cn:9092 --from-beginning

●修改分区

/export/server/kafka/bin/kafka-topics.sh --alter --topic flink-topic \
--bootstrap-server node1.itcast.cn:9092 --partitions 4

4.6.4代码实现

Flink 实时从Kafka消费数据,底层调用Kafka New Consumer API,演示案例代码如下:

package cn.itcast.flink.source.kafka;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.CommonClientConfigs; import java.util.Properties;
public class StreamSourceKafkaDemo {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3) ;
// 2. 数据源-source:从Kafka 消费数据
// a. Kafka Consumer消费者配置属性设置Properties props = new Properties() ;
props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "node1.itcast.cn:9092"); props.setProperty("group.id", "test-1001");
// b. 创建FlinkKafkaConsumer对象
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>( "flink-topic", // Topic 名称
new SimpleStringSchema(), // props //
) ;
// c. 添加数据源
DataStreamSource<String> kafkaDataStream = env.addSource(kafkaConsumer);
// 3. 数 据 终 端 -sink kafkaDataStream.printToErr();
// 4. 触 发 执 行 -execute env.execute(StreamSourceKafkaDemo.class.getSimpleName()) ;
}
}

4.6.5Kafka 消费起始位置

Kafka 可以被看成一个无限的流,里面的流数据是短暂存在的,如果不消费,消息就过期滚动没了。涉及一个问题:如果开始消费,就要定一下从什么位置开始。

 第一、earliest:从最起始位置开始消费,当然不一定是从0开始,因为如果数据过期就清掉了,所以可以理解为从现存的数据里最小位置开始消费;

第二、latest:从最末位置开始消费;

第三、per-partition assignment:对每个分区都指定一个offset,再从offset位置开始消费;

默认情况下,从Kafka消费数据时,采用的是:latest,最新偏移量开始消费数据。

在Flink Kafka Consumer 库中,允许用户配置从每个分区的哪个位置position开始消费数

据,具体说明如下所示:

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-start-position-configuration

在代码中设置消费数据起始位置相关API如下所示:

案例演示代码:

package cn.itcast.flink.source.kafka;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.kafka.clients.CommonClientConfigs;
import java.util.HashMap; import java.util.Map;
import java.util.Properties;
public class StreamSourceKafkaOffsetDemo {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3) ;
// 2. 数据源-source:从Kafka 消费数据
// a. Kafka Consumer消费者配置属性设置Properties props = new Properties() ;
props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "node1.itcast.cn:9092"); props.setProperty("group.id", "test-1001");
// b. 创建FlinkKafkaConsumer对象
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>( "flink-topic", // Topic 名称
new SimpleStringSchema(), // props //
) ;
package cn.itcast.flink.source.kafka;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.kafka.clients.CommonClientConfigs;
import java.util.HashMap; import java.util.Map;
import java.util.Properties;
public class StreamSourceKafkaOffsetDemo {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3) ;
// 2. 数据源-source:从Kafka 消费数据
// a. Kafka Consumer消费者配置属性设置Properties props = new Properties() ;
props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "node1.itcast.cn:9092"); props.setProperty("group.id", "test-1001");
// b. 创建FlinkKafkaConsumer对象
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>( "flink-topic", // Topic 名称
new SimpleStringSchema(), // props //
) ;

注意:开启 checkpoint 时 offset 是 Flink 通过状态 state 管理和恢复的,并不是从 kafka 的offset 位置恢复。在 checkpoint 机制下,作业从最近一次checkpoint 恢复,本身是会回放部分历史数据,导致部分数据重复消费,Flink 引擎仅保证计算状态的精准一次,要想做到端到端精准一次需要依赖一些幂等的存储系统或者事务操作。

4.6.6Kafka 分区发现

实际的生产环境中可能有这样一些需求,比如:

场景一:有一个 Flink 作业需要将五份数据聚合到一起,五份数据对应五个 kafka topic,随着业务增长,新增一类数据,同时新增了一个 kafka topic,如何在不重启作业的情况下作业自动感知新的 topic。

场景二:作业从一个固定的 kafka topic 读数据,开始该 topic 有 6 个 partition,但随着业务的增长数据量变大,需要对 kafka partition 个数进行扩容,由 6 个扩容到 12。该情况下如何在不重启作业情况下动态感知新扩容的 partition?

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka- consumers-topic-and-partition-discovery

针对上面的两种场景,首先在构建 FlinkKafkaConsumer 时的 properties 中设置flink.partition-discovery.interval-millis 参数为非负值,表示开启动态发现的开关,及设置的时间间隔。此时FlinkKafkaConsumer内部会启动一个单独的线程定期去Kafka获取最新的meta信息。

针对场景一,还需在构建 FlinkKafkaConsumer 时,topic 的描述可以传一个正则表达式描述的 pattern。每次获取最新 kafka meta 时获取正则匹配的最新 topic 列表。

针对场景二,设置前面的动态发现参数,在定期获取 kafka 最新 meta 信息时会匹配新的partition。为了保证数据的正确性,新发现的 partition 从最早的位置开始读取。


目录
相关文章
|
3月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
256 0
|
3月前
|
消息中间件 Java Kafka
Flink-07 Flink Java 3分钟上手 滚动窗口 事件驱动 Kafka TumblingWindow GlobalWindow CountWindow
Flink-07 Flink Java 3分钟上手 滚动窗口 事件驱动 Kafka TumblingWindow GlobalWindow CountWindow
48 7
|
3月前
|
消息中间件 NoSQL Kafka
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
82 4
|
3月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
226 0
|
3月前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
67 0
|
3月前
|
消息中间件 NoSQL Java
Flink-06 Flink Java 3分钟上手 滚动窗口 时间驱动 Kafka TumblingWindow TimeWindowFunction TumblingProcessing
Flink-06 Flink Java 3分钟上手 滚动窗口 时间驱动 Kafka TumblingWindow TimeWindowFunction TumblingProcessing
54 0
|
3月前
|
消息中间件 NoSQL Kafka
Flink-05 Flink Java 3分钟上手 Redis FlinkJedisPoolConfig 从Kafka写入Redis FlinkKafkaConsumer消费 结果写入Redis
Flink-05 Flink Java 3分钟上手 Redis FlinkJedisPoolConfig 从Kafka写入Redis FlinkKafkaConsumer消费 结果写入Redis
63 0
|
消息中间件 Kafka 流计算
|
4月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
2月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
1449 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎