开发者社区> 问答> 正文

flinksql1.11 使用eventime消费kafka多分区时,没有水位线信息,聚合计算也不出

大佬好,我在1.11版本测试flink sql时发现一个问题,用streaming api 消费kafka,使用eventtime,再把stream转table,进行sql聚合,发现当kafka topic是多个分区时,flink webui watermarks 显示No Watermark,聚合计算也迟迟不触发计算,但当kafka topic只有一个分区时却能这个正常触发计算,watermarks也显示正常下面是测试代码:package com.test.opsimport java.util.Propertiesimport com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}import org.apache.flink.api.common.restartstrategy.RestartStrategiesimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanupimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport org.apache.flink.table.api.EnvironmentSettingsimport org.apache.flink.types.Rowimport scala.collection.JavaConversions._import org.apache.flink.api.scala._import org.apache.flink.table.api.import org.apache.flink.table.api.bridge.scala.object FlinkSqlTest { //源json解析结构化case类,以及基于此case类,分拆出来的RhoLog和PpoLog case类 分别计算 case class SrcLog(userid: String, guid: String, rho: JsonNode, ts: Long) case class RhoLog(userid: String, guid: String, ts: Long) def main(args: Array[String]): Unit = { val parallelism = 3 val kafkaBrokers = "172.x.x.x:9092" val jobName = "flinksql-test" val topicNames = List("ops_nginx_logs") val groupName = "test-ops-100000" val properties = new Properties() //流处理的环境构造 val conf: Configuration = new Configuration() import org.apache.flink.configuration.ConfigConstants conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) val streamEnv = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf) streamEnv.enableCheckpointing(10000) streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) streamEnv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) streamEnv.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)) streamEnv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) streamEnv.setParallelism(parallelism) // table环境构造 val blinkTableEnvSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() val tableEnv = StreamTableEnvironment.create(streamEnv, blinkTableEnvSettings) properties.setProperty("bootstrap.servers", kafkaBrokers) properties.setProperty("group.id", groupName) val myConsumer = new FlinkKafkaConsumer[String](topicNames, new SimpleStringSchema(), properties) //从kafka最新offset开始消费 myConsumer.setStartFromLatest() val mapper = new ObjectMapper val srcStream = streamEnv.addSource(myConsumer) .filter(.nonEmpty).filter(.contains(""message":{")).map(line => { val rootNode = mapper.readTree(line) val rowTime = rootNode.get("@timestamp").asText() // val timeStamp = dateTimeToTimestampJdk8(rowTime) val timeStamp = rowTime.toLong val messageNode = mapper.readTree(rootNode.get("message").toString) val rho = messageNode.get("rho") val userid = if (messageNode.has("u")) messageNode.get("u").asText else "nodata" val guid = if (messageNode.has("g")) messageNode.get("g").asText else "nodata" SrcLog(userid, guid, rho, timeStamp) }) val rhoStream = srcStream.map(src => { RhoLog(src.userid, src.guid, src.ts) }).assignAscendingTimestamps(row => { println(row.ts) row.ts }) //水位线 //流转表 val table = tableEnv.fromDataStream(rhoStream, 'userid, 'guid, 'ts as 'ts1, 'ts.rowtime() as 'ts) //源表转窗口聚合表 val resTableEventtime = table.window(Tumble over 10.seconds on 'ts as 'window) .groupBy('userid, 'window, 'ts, 'ts1) .select('userid, 'guid.count, 'window.start(), 'window.end(), 'ts, 'ts1) // 窗口聚合表输出 resTableEventtime.toAppendStream[Row].print("test") streamEnv.execute() }}症状:当topic为单分区时:kafka-topics --zookeeper 172.x.x.x:2181 --describe --topic ops_nginx_logs1Topic:ops_nginx_logs1 PartitionCount:1 ReplicationFactor:3 Configs: Topic: ops_nginx_logs1 Partition: 0 Leader: 104 Replicas: 104,107,105 Isr: 104,107,105flinksql 聚合结果:test:1> aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:17.932,1596979157932test:2> aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:19.932,1596979159932test:3> aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:11.930,1596979151930test:3> aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:15.931,1596979155931test:3> aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:13.931,1596979153931 http://apache-flink.147419.n8.nabble.com/file/t837/3.png web ui watermarks显示正常 http://apache-flink.147419.n8.nabble.com/file/t837/4.png 当kafka topic为多分区时:kafka-topics --zookeeper 172.x.x.x:2181 --describe --topic ops_nginx_logsTopic:ops_nginx_logs PartitionCount:3 ReplicationFactor:3 Configs: Topic: ops_nginx_logs Partition: 0 Leader: 104 Replicas: 104,106,107 Isr: 104,106,107 Topic: ops_nginx_logs Partition: 1 Leader: 105 Replicas: 105,107,104 Isr: 105,107,104 Topic: ops_nginx_logs Partition: 2 Leader: 106 Replicas: 106,104,105 Isr: 106,104,105flinksql 聚合计算迟迟不计算,没有结果输出: http://apache-flink.147419.n8.nabble.com/file/t837/2.png web ui watermarks 显示 No Watermark http://apache-flink.147419.n8.nabble.com/file/t837/1.png

-- Sent from: http://apache-flink.147419.n8.nabble.com/*来自志愿者整理的flink邮件归档

展开
收起
EXCEED 2021-12-08 11:02:21 1172 0
1 条回答
写回答
取消 提交回答
  • 字体太小,代码也都被格式化了,不方便看。不过按照问题描述,猜测是kafka某个分区没数据,flink的watermark机制有多输入取小机制。如果某个分区没数据,可能导致watermark一直无法生成。*来自志愿者整理的flink

    2021-12-08 11:16:14
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载