Flink1.9整合Kafka

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: 本文基于Flink1.9版本简述如何连接Kafka。

流式连接器


image.png

我们知道可以自己来开发Source 和 Sink ,但是一些比较基本的 Source 和 Sink 已经内置在 Flink 里。

预定义的source支持从文件、目录、socket,以及 collections 和 iterators 中读取数据。

预定义的sink支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 socket。

连接器可以和多种多样的第三方系统进行交互。目前支持以下系统:

  • Apache Kafka
  • Apache Cassandra(sink)
  • Amazon Kinesis Streams(source/sink)
  • Elasticsearch(sink)
  • Hadoop FileSystem (sink)
  • RabbitMQ(source/sink)
  • Apache NiFi(source/sink)
  • Twitter Streaming API(source)

请记住,在使用一种连接器时,通常需要额外的第三方组件,比如:数据存储服务器或者消息队列。

Apache Bahir 中定义了其他一些连接器

  • Apache ActiveMQ(source/sink)
  • Apache Flume(sink)
  • Redis(sink)
  • Akka (sink)
  • Netty (source)

使用connector并不是唯一可以使数据进入或者流出Flink的方式。一种常见的模式是从外部数据库或者 Web 服务查询数据得到初始数据流,然后通过 Map 或者 FlatMap 对初始数据流进行丰富和增强,这里要使用Flink的异步IO。

而向外部存储推送大量数据时会导致 I/O 瓶颈问题出现。在这种场景下,如果对数据的读操作远少于写操作,可以让外部应用从 Flink 拉取所需的数据,需要用到Flink的可查询状态接口。

本文重点介绍Apache Kafka Connector


Kafka连接器


此连接器提供对Apache Kafka提供的事件流的访问。

Flink提供特殊的Kafka连接器,用于从/向Kafka主题读取和写入数据。Flink Kafka Consumer集成了Flink的检查点机制,可提供一次性处理语义。为实现这一目标,Flink并不完全依赖Kafka 的消费者组的偏移量,而是在内部跟踪和检查这些偏移。

下表为不同版本的kafka与Flink Kafka Consumer的对应关系。

Maven Dependency Supported since Consumer and Producer Class name Kafka version
flink-connector-kafka-0.8_2.11 1.0.0 FlinkKafkaConsumer08 FlinkKafkaProducer08 0.8.x
flink-connector-kafka-0.9_2.11 1.0.0 FlinkKafkaConsumer09 FlinkKafkaProducer09 0.9.x
flink-connector-kafka-0.10_2.11 1.2.0 FlinkKafkaConsumer010 FlinkKafkaProducer010 0.10.x
flink-connector-kafka-0.11_2.11 1.4.0 FlinkKafkaConsumer011 FlinkKafkaProducer011 0.11.x
flink-connector-kafka_2.11 1.7.0 FlinkKafkaConsumer FlinkKafkaProducer >= 1.0.0

而从最新的Flink1.9.0版本开始,使用Kafka 2.2.0客户端。

下面简述使用步骤。

导入maven依赖:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.9.0</version>
</dependency>

安装Kafka:

可以参照 Kafka入门宝典(详细截图版)

兼容性:

从Flink 1.7开始,它不跟踪特定的Kafka主要版本。相反,它在Flink发布时跟踪最新版本的Kafka。如果您的Kafka代理版本是1.0.0或更高版本,则应使用此Kafka连接器。如果使用旧版本的Kafka(0.11,0.10,0.9或0.8),则应使用与代理版本对应的连接器。

升级Connect要注意Flink升级作业,同时

  • 在整个过程中使用Flink 1.9或更新版本。
  • 不要同时升级Flink和运营商。
  • 确保您作业中使用的Kafka Consumer和/或Kafka Producer分配了唯一标识符(uid)。
  • 使用stop with savepoint功能获取保存点(例如,使用stop --withSavepoint)。

用法:

引入依赖后,实例化新的source(FlinkKafkaConsumer)和sink(FlinkKafkaProducer)。

Kafka Consumer

先分步骤介绍构建过程,文末附Flink1.9连接Kafka完整代码。

Kafka consumer 根据版本分别叫做FlinkKafkaConsumer08 FlinkKafkaConsumer09等等

Kafka >= 1.0.0 的版本就叫FlinkKafkaConsumer。

构建FlinkKafkaConsumer

java示例代码如下:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
    .addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));

scala:

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
stream = env
    .addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties))
    .print()

必须有的:

1.topic名称

2.用于反序列化Kafka数据的DeserializationSchema / KafkaDeserializationSchema

3.配置参数:“bootstrap.servers” “group.id” (kafka0.8还需要 “zookeeper.connect”)

配置消费起始位置

java:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...);
myConsumer.setStartFromEarliest();     // start from the earliest record possible
myConsumer.setStartFromLatest();       // start from the latest record
myConsumer.setStartFromTimestamp(...); // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets(); // the default behaviour
//指定位置
//Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
//specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
//myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
DataStream<String> stream = env.addSource(myConsumer);

scala:

val env = StreamExecutionEnvironment.getExecutionEnvironment()
val myConsumer = new FlinkKafkaConsumer[String](...)
myConsumer.setStartFromEarliest()      // start from the earliest record possible
myConsumer.setStartFromLatest()        // start from the latest record
myConsumer.setStartFromTimestamp(...)  // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets()  // the default behaviour
//指定位置
//val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
//specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)
//myConsumer.setStartFromSpecificOffsets(specificStartOffsets)
val stream = env.addSource(myConsumer)
检查点

启用Flink的检查点后,Flink Kafka Consumer将使用主题中的记录,并以一致的方式定期检查其所有Kafka偏移以及其他操作的状态。如果作业失败,Flink会将流式程序恢复到最新检查点的状态,并从存储在检查点中的偏移量开始重新使用Kafka的记录。

如果禁用了检查点,则Flink Kafka Consumer依赖于内部使用的Kafka客户端的自动定期偏移提交功能。

如果启用了检查点,则Flink Kafka Consumer将在检查点完成时提交存储在检查点状态中的偏移量。

java

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs

scala

val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // checkpoint every 5000 msecs
分区发现

Flink Kafka Consumer支持发现动态创建的Kafka分区,并使用一次性保证消费它们。

还可以使用正则:

java

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(
    java.util.regex.Pattern.compile("test-topic-[0-9]"),
    new SimpleStringSchema(),
    properties);
DataStream<String> stream = env.addSource(myConsumer);
...

scala

val env = StreamExecutionEnvironment.getExecutionEnvironment()
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
val myConsumer = new FlinkKafkaConsumer08[String](
  java.util.regex.Pattern.compile("test-topic-[0-9]"),
  new SimpleStringSchema,
  properties)
val stream = env.addSource(myConsumer)
...
时间戳和水印

在许多情况下,记录的时间戳(显式或隐式)嵌入记录本身。另外,用户可能想要周期性地或以不规则的方式发出水印。

我们可以定义好Timestamp Extractors / Watermark Emitters,通过以下方式将其传递给您的消费者:

java

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer08<String> myConsumer =
    new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
DataStream<String> stream = env
    .addSource(myConsumer)
    .print();

scala

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties)
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())
stream = env
    .addSource(myConsumer)
    .print()

Kafka Producer

Kafka Producer 根据版本分别叫做FlinkProducer011 FlinkKafkaProducer010等等

Kafka >= 1.0.0 的版本就叫FlinkKafkaProducer 。

构建FlinkKafkaConsumer

java

DataStream<String> stream = ...;
FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
        "localhost:9092",            // broker list
        "my-topic",                  // target topic
        new SimpleStringSchema());   // serialization schema
// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
// this method is not available for earlier Kafka versions
myProducer.setWriteTimestampToKafka(true);
stream.addSink(myProducer);

scala

val stream: DataStream[String] = ...
val myProducer = new FlinkKafkaProducer011[String](
        "localhost:9092",         // broker list
        "my-topic",               // target topic
        new SimpleStringSchema)   // serialization schema
// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
// this method is not available for earlier Kafka versions
myProducer.setWriteTimestampToKafka(true)
stream.addSink(myProducer)

需要指定broker list , topic,序列化类。

自定义分区:默认情况下,将使用FlinkFixedPartitioner将每个Flink Kafka Producer并行子任务映射到单个Kafka分区。

可以实现FlinkKafkaPartitioner类自定义分区。

Flink1.9消费Kafka完整代码:

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class KafkaConsumer {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "test");
        //构建FlinkKafkaConsumer
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties);
        //指定偏移量
        myConsumer.setStartFromEarliest();
        DataStream<String> stream = env
                .addSource(myConsumer);
        env.enableCheckpointing(5000);
        stream.print();
        env.execute("Flink Streaming Java API Skeleton");
    }
相关文章
|
3月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
229 0
|
3月前
|
消息中间件 Java Kafka
Flink-07 Flink Java 3分钟上手 滚动窗口 事件驱动 Kafka TumblingWindow GlobalWindow CountWindow
Flink-07 Flink Java 3分钟上手 滚动窗口 事件驱动 Kafka TumblingWindow GlobalWindow CountWindow
48 7
|
3月前
|
消息中间件 NoSQL Kafka
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
78 4
|
3月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
63 1
|
3月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
211 0
|
3月前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
62 0
|
3月前
|
消息中间件 NoSQL Java
Flink-06 Flink Java 3分钟上手 滚动窗口 时间驱动 Kafka TumblingWindow TimeWindowFunction TumblingProcessing
Flink-06 Flink Java 3分钟上手 滚动窗口 时间驱动 Kafka TumblingWindow TimeWindowFunction TumblingProcessing
50 0
|
3月前
|
消息中间件 NoSQL Kafka
Flink-05 Flink Java 3分钟上手 Redis FlinkJedisPoolConfig 从Kafka写入Redis FlinkKafkaConsumer消费 结果写入Redis
Flink-05 Flink Java 3分钟上手 Redis FlinkJedisPoolConfig 从Kafka写入Redis FlinkKafkaConsumer消费 结果写入Redis
60 0
|
3月前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
100 0
|
5月前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。