【Flink-API】取消Kafka记录偏移量,flink管理偏移量

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 【Flink-API】取消Kafka记录偏移量,flink管理偏移量

一、偏移量


1.1 取消kafka偏移量


默认是使用kafka来管理偏移量,下面我们来取消他的管理机制,使用flink来管理偏移量,开启flink的容错重启机制,还有中间数据state的保存机制。

1.开启重启策略

2.stateBackend存储位置

3.取消任务checkpoint不删除

4.设置checkpoint的 EXACTLY_ONCE 模式

5.【重点】取消kafka管理偏移量,让flink来管理偏移量


1.2 kafka


创建topic如下,两个副本,四个分区


bin/kafka-topics.sh --create --zookeeper  hadoop1:2181,hadoop2:2181,hadoop3:2181 --replication


1.3 KafkaSourceV2.java

public class KafkaSourceV2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 开启重启策略
        env.enableCheckpointing(5000);
        // stateBackend
        env.setStateBackend(new FsStateBackend("file:///D://APP//IDEA//workplace//FlinkTurbineFaultDiagnosis//src//main//resources//checkpoint//chk001"));
        // 取消任务checkpoint不删除
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 设置checkpoint的 EXACTLY_ONCE 模式
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //kafka配置
        String topic = "superman2";
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers","192.168.52.200:9092");//多个的话可以指定
        prop.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("auto.offset.reset","earliest");
        prop.setProperty("group.id","consumer3");
        // 【重点】取消kafka管理偏移量,让flink来管理偏移量
        prop.setProperty("enable.auto.commit","false");
        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop);
        DataStreamSource<String> lines = env.addSource(kafkaSource);
        DataStreamSource<String> lines2 = env.socketTextStream("192.168.52.200", 8888);
        // 模拟异常
        lines2.map(new MapFunction<String, String>() {
            @Override
            public String map(String s) throws Exception {
                if (s.startsWith("wangyining")){
                    System.out.println(1/0);
                }
                return s;
            }
        }).print();
        lines.print();
        env.execute();
    }
}


以上程序中的偏移量保存到了StateBackend的中间数据目录,来进行checkpoint,不但把保存到了这里,还保存到了kafka中偏移量,只是用来监控

20200923205140982.png


1.4 启动测试


1.开启端口号

nc -lk 8888

2.启动程序

3.启动kafka生产者,生产数据

bin/kafka-console-producer.sh --broker-list 192.168.52.200:9092,192.168.52.201:9092,192.168.52.202:9092 --topic superman2

20200923204307587.png

4.模拟异常

if (s.startsWith(“wangyining”)){
System.out.println(1/0);
}

20200923204457432.png


5.再次kafka输入命令,会发现已经重启成功,偏移量已经纪录成功。

20200923204613219.png

6.问题:程序出现了异常,恢复subTask,读取偏移量,从StateBackend恢复还是kafka中恢复?

答案:

6.1.StateBackend 偏移量目录,你会发现kafka中特殊的topic也有偏移量,但是它不是用来恢复的,是用来做监控的。

6.2.如果你不想让kafka中特殊的topic有偏移量,也可以取消。官网中这样记载。


20200923205441844.png

6.3.只需要添加一下:


// kafka中的偏移量取消掉【不建议false,默认是true,它用来监控这个偏移量】
        kafkaSource.setCommitOffsetsOnCheckpoints(false);


6.4.从kafka生产者中继续写数据的话,并没有从头读取,也不会从kafka中特殊的topic继续读取。


7.如果停掉程序呢?

7.1.如果开启了checkpointing,一定会保存到statebackend中去

7.2.没有指定savepoint的话,首先查看kafka特殊的topic,然后查看恢复文件。

目录
相关文章
|
1天前
|
消息中间件 Kubernetes Java
实时计算 Flink版操作报错合集之写入 Kafka 报错 "Failed to send data to Kafka: Failed to allocate memory within the configured max blocking time 60000 ms",该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
11 0
|
1天前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
19 0
|
1天前
|
SQL 消息中间件 Kafka
实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
14 0
|
1天前
|
Oracle 关系型数据库 数据库
实时计算 Flink版操作报错合集之执行Flink job,报错“Could not execute SQL statement. Reason:org.apache.flink.table.api.ValidationException: One or more required options are missing”,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
11 0
|
1天前
|
Java API 数据安全/隐私保护
实时计算 Flink版操作报错合集之变更数据流转换为Insert-Only记录时,报错"datastream api record contains: Delete"如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
13 0
|
1天前
|
存储 关系型数据库 对象存储
实时计算 Flink版操作报错合集之变更数据流转换为Insert-Only记录时,报错"datastream api record contains: Delete"如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
22 1
|
1天前
|
网络安全 流计算 Python
实时计算 Flink版操作报错合集之Flink sql-client 针对kafka的protobuf格式数据建表,报错:java.lang.ClassNotFoundException 如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
14 1
|
5天前
|
消息中间件 安全 Kafka
2024年了,如何更好的搭建Kafka集群?
我们基于Kraft模式和Docker Compose同时采用最新版Kafka v3.6.1来搭建集群。
519 2
2024年了,如何更好的搭建Kafka集群?
|
5天前
|
消息中间件 存储 数据可视化
kafka高可用集群搭建
kafka高可用集群搭建
49 0
|
7月前
|
消息中间件 存储 Kubernetes
Helm方式部署 zookeeper+kafka 集群 ——2023.05
Helm方式部署 zookeeper+kafka 集群 ——2023.05
281 0

热门文章

最新文章