问题一:Flink实现Kafka到Mysql的 End-To-End Exactly-Once中遇到的问题
最近是实现Kafka到Mysql的 End-To-End Exactly-Once中遇到以下2个问题: 1:com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Communications link failure during commit(). Transaction resolution unknown. 2:org.apache.flink.streaming.runtime.tasks.TimerException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
已经做了一个最简单的复现问题的demo,求指教 git clone https://github.com/lusecond/flink_help --depth=1
测试过程中,发现继承TwoPhaseCommitSinkFunction类的4个重写方法beginTransaction、preCommit、commit、abort 分别在不同的线程工作,怀疑过因为线程切换导致jdbc的事务提交出问题,已经做过相关测试排除不是由此引起的问题*来自志愿者整理的flink邮件归档
参考回答:
是否可以尝试使用幂等来解决 端到端的一致性
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/373592
问题二:Rewind offset to a previous position and ensure ce
Rewind offset to a previous position and ensure certainty. I'm trying to use Kafka as an event store and I want to create several partitions to improve read/write throughput. Occasionally I need to rewind offset to a previous position for recomputing. Since order isn't guaranteed among partitions in Kafka, does this mean that Flink won't produce the same results as before when rewind even if it uses event time? For example, consumer for a partition progresses extremely fast and raises watermark, so events from other partitions are discarded. Is there any ways to prevent this from happening?
参考回答:
Are you finding how to generate watermark pre Kafka partition? Flink provides Kafka-partition-aware watermark generation. 1
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/373594
问题三:关于 FLink historyserver没有completed-jobs的问题
我启动了flink的historyserver,但是里面并没有已完成的任务 配置如下:
结果界面如下:
hdfs如下:
参考回答:
flink-conf.yaml里需要有这些配置
historyserver.web.port: 8082
historyserver.web.address: 0.0.0.0
historyserver.archive.fs.refresh-interval: 10000
historyserver.archive.fs.dir: hdfs://127.0.0.1:8020/flink/v1.1/completed-jobs/
jobmanager.archive.fs.dir: hdfs://127.0.0.1:8020/flink/v1.1/completed-jobs/
#多少秒后,会将完成的任务提交到history
jobstore.expiration-time: 14400
jobmanager.archive.fs.dir和historyserver.archive.fs.dir一样即可
然后启动bin/historyserver.sh start
访问ip:8082,需要跑一个任务,并且等待jobstore.expiration-time这个时间,才会有数据
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/373597