3.3 Sink
需要支持幂等写入或事务写入(Flink的两阶段提交需要事务支持)
3.3.1 幂等写入(Idempotent Writes)
幂等写操作是指:任意多次向一个系统写入数据,只对目标系统产生一次结果影响。
例如,重复向一个HashMap里插入同一个Key-Value二元对,第一次插入时这个HashMap发生变化,后续的插入操作不会改变HashMap的结果,这就是一个幂等写操作。
HBase、Redis和Cassandra这样的KV数据库一般经常用来作为Sink,用以实现端到端的Exactly-Once。
需要注意的是,并不是说一个KV数据库就百分百支持幂等写。幂等写对KV对有要求,那就是Key-Value必须是可确定性(Deterministic)计算的。假如我们设计的Key是:name + curTimestamp,每次执行数据重发时,生成的Key都不相同,会产生多次结果,整个操作不是幂等的。因此,为了追求端到端的Exactly-Once,我们设计业务逻辑时要尽量使用确定性的计算逻辑和数据模型。
3.3.2 事务写入(Transactional Writes)
Flink借鉴了数据库中的事务处理技术,同时结合自身的Checkpoint机制来保证Sink只对外部输出产生一次影响。大致的流程如下:
- Flink先将待输出的数据保存下来暂时不向外部系统提交,等到Checkpoint结束时,Flink上下游所有算子的数据都是一致的时候,Flink将之前保存的数据全部提交(Commit)到外部系统。换句话说,只有经过Checkpoint确认的数据才向外部系统写入。
如下图所示,如果使用事务写,那只把时间戳3之前的输出提交到外部系统,时间戳3以后的数据(例如时间戳5和8生成的数据)暂时保存下来,等待下次Checkpoint时一起写入到外部系统。这就避免了时间戳5这个数据产生多次结果,多次写入到外部系统。
在事务写的具体实现上,Flink目前提供了两种方式:
- 预写日志(Write-Ahead-Log,WAL)
- 两阶段提交(Two-Phase-Commit,2PC)
这两种方式区别主要在于:
- WAL方式通用性更强,适合几乎所有外部系统,但也不能提供百分百端到端的Exactly-Once,因为WAL预习日志会先写内存,而内存是易失介质。
- 如果外部系统自身就支持事务(比如MySQL、Kafka),可以使用2PC方式,可以提供百分百端到端的Exactly-Once。
事务写的方式能提供端到端的Exactly-Once一致性,它的代价也是非常明显的,就是牺牲了延迟。输出数据不再是实时写入到外部系统,而是分批次地提交。目前来说,没有完美的故障恢复和Exactly-Once保障机制,对于开发者来说,需要在不同需求之间权衡。
04 Flink+Kafka的End-to-End Exactly-Once
接下来我们了解下Flink的TwoPhaseCommitSinkFunction是如何支持End-to-End Exactly-Once的
4.1 版本说明
Flink 1.4版本之前,支持Exactly Once语义,仅限于应用内部。
Flink 1.4版本之后,通过两阶段提交(TwoPhaseCommitSinkFunction)支持End-To-End Exactly Once,而且要求Kafka 0.11+。
利用TwoPhaseCommitSinkFunction是通用的管理方案,只要实现对应的接口,而且Sink的存储支持变乱提交,即可实现端到端的划一性语义。
4.2 两阶段提交-API
在 Flink 中的Two-Phase-Commit-2PC两阶段提交的实现方法被封装到了 TwoPhaseCommitSinkFunction 这个抽象类中,只需要实现其中的beginTransaction、preCommit、commit、abort 四个方法就可以实现“精确一次”的处理语义,如FlinkKafkaProducer就实现了该类并实现了这些方法
- beginTransaction,在开启事务之前,我们在目标文件系统的临时目录中创建一个临时文件,后面在处理数据时将数据写入此文件;
- preCommit,在预提交阶段,刷写(flush)文件,然后关闭文件,之后就不能写入到文件了,我们还将为属于下一个检查点的任何后续写入启动新事务;
- commit,在提交阶段,我们将预提交的文件原子性移动到真正的目标目录中,请注意,这会增加输出数据可见性的延迟;
- abort,在中止阶段,我们删除临时文件。
4.3 两阶段提交-简单流程
整个过程可以总结为下面四个阶段:
- 一旦 Flink 开始做 checkpoint 操作,那么就会进入 pre-commit “预提交”阶段,同时JobManager的Coordinator 会将 Barrier 注入数据流中 ;
- 当所有的 barrier 在算子中成功进行一遍传递(就是Checkpoint完成),并完成快照后,则“预提交”阶段完成;
- 等所有的算子完成“预提交”,就会发起一个commit“提交”动作,但是任何一个“预提交”失败都会导致 Flink 回滚到最近的 checkpoint;
4.4 两阶段提交-详细流程
需求:
- 接下来将介绍两阶段提交协议,以及它如何在一个读写Kafka的Flink程序中实现端到端的Exactly-Once语义。Kafka经常与Flink一起使用,且Kafka在最近的0.11版本中添加了对事务的支持。这意味着现在通过Flink读写Kafaka,并提供端到端的Exactly-Once语义有了必要的支持。
在上图中,我们有: - 从Kafka读取的数据源(Flink内置的KafkaConsumer)
- 窗口聚合
- 将数据写回Kafka的数据输出端(Flink内置的KafkaProducer)
要使数据输出端提供Exactly-Once保证,它必须将所有数据通过一个事务提交给Kafka。提交捆绑了两个checkpoint之间的所有要写入的数据。这可确保在发生故障时能回滚写入的数据。
但是在分布式系统中,通常会有多个并发运行的写入任务的,简单的提交或回滚是不够的,因为所有组件必须在提交或回滚时“一致”才能确保一致的结果。
Flink使用两阶段提交协议及预提交阶段来解决这个问题。
4.4.1 预提交-内部状态
在checkpoint开始的时候,即两阶段提交协议的“预提交”阶段。当checkpoint开始时,Flink的JobManager会将checkpoint barrier(将数据流中的记录分为进入当前checkpoint与进入下一个checkpoint)注入数据流。
brarrier在operator之间传递。对于每一个operator,它触发operator的状态快照写入到state backend。
数据源保存了消费Kafka的偏移量(offset),之后将checkpoint barrier传递给下一个operator。
这种方式仅适用于operator具有『内部』状态。所谓内部状态,是指Flink state backend保存和管理的 -例如,第二个operator中window聚合算出来的sum值。当一个进程有它的内部状态的时候,除了在checkpoint之前需要将数据变更写入到state backend,不需要在预提交阶段执行任何其他操作。Flink负责在checkpoint成功的情况下正确提交这些写入,或者在出现故障时中止这些写入。
4.4.2 预提交-外部状态
但是,当进程具有『外部』状态时,需要作些额外的处理。外部状态通常以写入外部系统(如Kafka)的形式出现。在这种情况下,为了提供Exactly-Once保证,外部系统必须支持事务,这样才能和两阶段提交协议集成。
在该示例中的数据需要写入Kafka,因此数据输出端(Data Sink)有外部状态。在这种情况下,在预提交阶段,除了将其状态写入state backend之外,数据输出端还必须预先提交其外部事务。
当checkpoint barrier在所有operator都传递了一遍,并且触发的checkpoint回调成功完成时,预提交阶段就结束了。所有触发的状态快照都被视为该checkpoint的一部分。checkpoint是整个应用程序状态的快照,包括预先提交的外部状态。如果发生故障,我们可以回滚到上次成功完成快照的时间点。
4.4.3 提交阶段
下一步是通知所有operator,checkpoint已经成功了。这是两阶段提交协议的提交阶段,JobManager为应用程序中的每个operator发出checkpoint已完成的回调。
数据源和widnow operator没有外部状态,因此在提交阶段,这些operator不必执行任何操作。但是,数据输出端(Data Sink)拥有外部状态,此时应该提交外部事务。
4.4.4 总结
我们对上述知识点总结下:
- 一旦所有operator完成预提交,就提交一个commit。
- 如果只要有一个预提交失败,则所有其他提交都将中止,我们将回滚到上一个成功完成的checkpoint。
- 在预提交成功之后,提交的commit需要保证最终成功 – operator和外部系统都需要保障这点。如果commit失败(例如,由于间歇性网络问题),整个Flink应用程序将失败,应用程序将根据用户的重启策略重新启动,还会尝试再提交。这个过程至关重要,因为如果commit最终没有成功,将会导致数据丢失。
- 完整的实现两阶段提交协议可能有点复杂,这就是为什么Flink将它的通用逻辑提取到抽象类TwoPhaseCommitSinkFunction中的原因。
05 案例
5.1 Flink+Kafka实现End-to-End Exactly-Once
https://ververica.cn/developers/flink-kafka-end-to-end-exactly-once-analysis/
/** * Kafka --> Flink-->Kafka 的End-To-End-Exactly-once * * @author : YangLinWei * @createTime: 2022/3/8 11:40 下午 */ public class Kafka_Flink_Kafka_EndToEnd_ExactlyOnce { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //===========Checkpoint参数设置==== //===========类型1:必须参数============= //设置Checkpoint的时间间隔为1000ms做一次Checkpoint/其实就是每隔1000ms发一次Barrier! env.enableCheckpointing(1000); //设置State状态存储介质 if (SystemUtils.IS_OS_WINDOWS) { env.setStateBackend(new FsStateBackend("file:///D:/ckp")); } else { env.setStateBackend(new FsStateBackend("hdfs://node1:8020/flink-checkpoint/checkpoint")); } //===========类型2:建议参数=========== //设置两个Checkpoint 之间最少等待时间,如设置Checkpoint之间最少是要等 500ms(为了避免每隔1000ms做一次Checkpoint的时候,前一次太慢和后一次重叠到一起去了) //如:高速公路上,每隔1s关口放行一辆车,但是规定了两车之前的最小车距为500m env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//默认是0 //设置如果在做Checkpoint过程中出现错误,是否让整体任务失败:true是 false不是 //env.getCheckpointConfig().setFailOnCheckpointingErrors(false);//默认是true env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);//默认值为0,表示不容忍任何检查点失败 //设置是否清理检查点,表示 Cancel 时是否需要保留当前的 Checkpoint,默认 Checkpoint会在作业被Cancel时被删除 //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:true,当作业被取消时,删除外部的checkpoint(默认值) //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:false,当作业被取消时,保留外部的checkpoint env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //===========类型3:直接使用默认的即可=============== //设置checkpoint的执行模式为EXACTLY_ONCE(默认) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //设置checkpoint的超时时间,如果 Checkpoint在 60s内尚未完成说明该次Checkpoint失败,则丢弃。 env.getCheckpointConfig().setCheckpointTimeout(60000);//默认10分钟 //设置同一时间有多少个checkpoint可以同时执行 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//默认为1 //=============重启策略=========== env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS))); //2.Source Properties props_source = new Properties(); props_source.setProperty("bootstrap.servers", "node1:9092"); props_source.setProperty("group.id", "flink"); props_source.setProperty("auto.offset.reset", "latest"); props_source.setProperty("flink.partition-discovery.interval-millis", "5000");//会开启一个后台线程每隔5s检测一下Kafka的分区情况 //props_source.setProperty("enable.auto.commit", "true");//没有Checkpoint的时候使用自动提交偏移量到默认主题:__consumer_offsets中 //props_source.setProperty("auto.commit.interval.ms", "2000"); //kafkaSource就是KafkaConsumer FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>("flink_kafka", new SimpleStringSchema(), props_source); kafkaSource.setStartFromLatest(); //kafkaSource.setStartFromGroupOffsets();//设置从记录的offset开始消费,如果没有记录从auto.offset.reset配置开始消费 //kafkaSource.setStartFromEarliest();//设置直接从Earliest消费,和auto.offset.reset配置无关 kafkaSource.setCommitOffsetsOnCheckpoints(true);//执行Checkpoint的时候提交offset到Checkpoint(Flink用),并且提交一份到默认主题:__consumer_offsets(外部其他系统想用的话也可以获取到) DataStreamSource<String> kafkaDS = env.addSource(kafkaSource); //3.Transformation //3.1切割出每个单词并直接记为1 SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { //value就是每一行 String[] words = value.split(" "); for (String word : words) { Random random = new Random(); int i = random.nextInt(5); if (i > 3) { System.out.println("出bug了..."); throw new RuntimeException("出bug了..."); } out.collect(Tuple2.of(word, 1)); } } }); //3.2分组 //注意:批处理的分组是groupBy,流处理的分组是keyBy KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy(0); //3.3聚合 SingleOutputStreamOperator<Tuple2<String, Integer>> aggResult = groupedDS.sum(1); //3.4将聚合结果转为自定义的字符串格式 SingleOutputStreamOperator<String> result = (SingleOutputStreamOperator<String>) aggResult.map(new RichMapFunction<Tuple2<String, Integer>, String>() { @Override public String map(Tuple2<String, Integer> value) throws Exception { return value.f0 + ":::" + value.f1; } }); //4.sink //result.print(); Properties props_sink = new Properties(); props_sink.setProperty("bootstrap.servers", "node1:9092"); props_sink.setProperty("transaction.timeout.ms", 1000 * 5 + "");//设置事务超时时间,也可在kafka配置中设置 /*FlinkKafkaProducer<String> kafkaSink0 = new FlinkKafkaProducer<>( "flink_kafka", new SimpleStringSchema(), props_sink);*/ FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>( "flink_kafka2", new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()), props_sink, FlinkKafkaProducer.Semantic.EXACTLY_ONCE ); result.addSink(kafkaSink); //5.execute env.execute(); //测试: //1.创建主题 /export/server/kafka/bin/kafka-topics.sh --zookeeper node1:2181 --create --replication-factor 2 --partitions 3 --topic flink_kafka2 //2.开启控制台生产者 /export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic flink_kafka //3.开启控制台消费者 /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka2 } }
5.2 Flink+MySQL实现End-to-End Exactly-Once
需求:
- checkpoint每10s进行一次,此时用FlinkKafkaConsumer实时消费kafka中的消息
- 消费并处理完消息后,进行一次预提交数据库的操作
- 如果预提交没有问题,10s后进行真正的插入数据库操作,如果插入成功,进行一次checkpoint,flink会自动记录消费的offset,可以将checkpoint保存的数据放到hdfs中
- 如果预提交出错,比如在5s的时候出错了,此时Flink程序就会进入不断的重启中,重启的策略可以在配置中设置,checkpoint记录的还是上一次成功消费的offset,因为本次消费的数据在checkpoint期间,消费成功,但是预提交过程中失败了
- 注意此时数据并没有真正的执行插入操作,因为预提交(preCommit)失败,提交(commit)过程也不会发生。等将异常数据处理完成之后,再重新启动这个Flink程序,它会自动从上一次成功的checkpoint中继续消费数据,以此来达到Kafka到Mysql的Exactly-Once。
代码1:
/** * @author : YangLinWei * @createTime: 2022/3/8 11:42 下午 */ public class Kafka_Flink_MySQL_EndToEnd_ExactlyOnce { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);//方便测试 env.enableCheckpointing(10000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); //env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.setStateBackend(new FsStateBackend("file:///D:/ckp")); //2.Source String topic = "flink_kafka"; Properties props = new Properties(); props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "node1:9092"); props.setProperty("group.id", "flink"); props.setProperty("auto.offset.reset", "latest");//如果有记录偏移量从记录的位置开始消费,如果没有从最新的数据开始消费 props.setProperty("flink.partition-discovery.interval-millis", "5000");//开一个后台线程每隔5s检查Kafka的分区状态 FlinkKafkaConsumer<ObjectNode> kafkaSource = new FlinkKafkaConsumer<>("topic_in", new JSONKeyValueDeserializationSchema(true), props); kafkaSource.setStartFromGroupOffsets();//从group offset记录的位置位置开始消费,如果kafka broker 端没有该group信息,会根据"auto.offset.reset"的设置来决定从哪开始消费 kafkaSource.setCommitOffsetsOnCheckpoints(true);//Flink执行Checkpoint的时候提交偏移量(一份在Checkpoint中,一份在Kafka的默认主题中__comsumer_offsets(方便外部监控工具去看)) DataStreamSource<ObjectNode> kafkaDS = env.addSource(kafkaSource); //3.transformation //4.Sink kafkaDS.addSink(new MySqlTwoPhaseCommitSink()).name("MySqlTwoPhaseCommitSink"); //5.execute env.execute(); } } /** * 自定义kafka to mysql,继承TwoPhaseCommitSinkFunction,实现两阶段提交。 * 功能:保证kafak to mysql 的Exactly-Once * CREATE TABLE `t_test` ( * `id` bigint(20) NOT NULL AUTO_INCREMENT, * `value` varchar(255) DEFAULT NULL, * `insert_time` datetime DEFAULT NULL, * PRIMARY KEY (`id`) * ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 */ class MySqlTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction<ObjectNode, Connection, Void> { public MySqlTwoPhaseCommitSink() { super(new KryoSerializer<>(Connection.class, new ExecutionConfig()), VoidSerializer.INSTANCE); } /** * 执行数据入库操作 */ @Override protected void invoke(Connection connection, ObjectNode objectNode, Context context) throws Exception { System.err.println("start invoke......."); String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); System.err.println("===>date:" + date + " " + objectNode); String value = objectNode.get("value").toString(); String sql = "insert into `t_test` (`value`,`insert_time`) values (?,?)"; PreparedStatement ps = connection.prepareStatement(sql); ps.setString(1, value); ps.setTimestamp(2, new Timestamp(System.currentTimeMillis())); //执行insert语句 ps.execute(); //手动制造异常 if (Integer.parseInt(value) == 15) System.out.println(1 / 0); } /** * 获取连接,开启手动提交事务(getConnection方法中) */ @Override protected Connection beginTransaction() throws Exception { String url = "jdbc:mysql://localhost:3306/bigdata?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true"; Connection connection = DBConnectUtil.getConnection(url, "root", "root"); System.err.println("start beginTransaction......." + connection); return connection; } /** * 预提交,这里预提交的逻辑在invoke方法中 */ @Override protected void preCommit(Connection connection) throws Exception { System.err.println("start preCommit......." + connection); } /** * 如果invoke执行正常则提交事务 */ @Override protected void commit(Connection connection) { System.err.println("start commit......." + connection); DBConnectUtil.commit(connection); } @Override protected void recoverAndCommit(Connection connection) { System.err.println("start recoverAndCommit......." + connection); } @Override protected void recoverAndAbort(Connection connection) { System.err.println("start abort recoverAndAbort......." + connection); } /** * 如果invoke执行异常则回滚事务,下一次的checkpoint操作也不会执行 */ @Override protected void abort(Connection connection) { System.err.println("start abort rollback......." + connection); DBConnectUtil.rollback(connection); } } class DBConnectUtil { /** * 获取连接 */ public static Connection getConnection(String url, String user, String password) throws SQLException { Connection conn = null; conn = DriverManager.getConnection(url, user, password); //设置手动提交 conn.setAutoCommit(false); return conn; } /** * 提交事务 */ public static void commit(Connection conn) { if (conn != null) { try { conn.commit(); } catch (SQLException e) { e.printStackTrace(); } finally { close(conn); } } } /** * 事务回滚 */ public static void rollback(Connection conn) { if (conn != null) { try { conn.rollback(); } catch (SQLException e) { e.printStackTrace(); } finally { close(conn); } } } /** * 关闭连接 */ public static void close(Connection conn) { if (conn != null) { try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } }
代码2:
/** * @author : YangLinWei * @createTime: 2022/3/8 11:44 下午 */ public class DataProducer { public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "node1:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); try { for (int i = 1; i <= 20; i++) { DataBean data = new DataBean(String.valueOf(i)); ProducerRecord record = new ProducerRecord<String, String>("flink_kafka", null, null, JSON.toJSONString(data)); producer.send(record); System.out.println("发送数据: " + JSON.toJSONString(data)); Thread.sleep(1000); } } catch (Exception e) { System.out.println(e); } producer.flush(); } @Data @NoArgsConstructor @AllArgsConstructor class DataBean { private String value; } }
06 流处理的数据处理语义
本文主要讲解Flink的高级特性其中之一的双流End-to-End Exactly-Once,谢谢大家的阅读,本文完!