开发者社区> 问答> 正文

flink 1.11 SQL idea调试无数据也无报错是怎么回事?

我遇到个问题,请教一下: 环境 1.11 idea 参考的wuchong大神的demo想把client变成java的,第一个例子 统计每小时的成交量 数据可以读到但是没有输出结果,写入es没反应,后改为print sink 还是没反应 https://github.com/wuchong/flink-sql-demo/tree/v1.11-CN 求助,各位

下面是pom 和代码,以及运行结果

// 创建执行环境 StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); bsEnv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); //设置StateBackend bsEnv.setStateBackend(new FsStateBackend("file:///tmp/flink/chkdir")); EnvironmentSettings bsSettings = EnvironmentSettings .newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, bsSettings);

// Kafka String sourceDDL ="CREATE TABLE user_behavior (" + "user_id BIGINT," + "item_id BIGINT," + "category_id BIGINT," + "behavior STRING," + "ts TIMESTAMP (3)," + "proctime AS PROCTIME ()," + "WATERMARK FOR ts AS ts-INTERVAL '5' SECOND) " + "WITH (" + "'connector'='kafka'," + "'topic'='user_behavior'," + "'scan.startup.mode'='earliest-offset'," + "'properties.bootstrap.servers'='localhost:9092'," + "'format'='json'" + ")";

//写入es 改为print /* String sinkDDL = "CREATE TABLE buy_cnt_per_hour (" + "hour_of_day BIGINT," + "buy_cnt BIGINT" + ") WITH (" + "'connector'='elasticsearch-7'," + "'hosts'='http://localhost:9200'," + "'index'='buy_cnt_per_hour')";*/ String sinkDDL = "CREATE TABLE buy_cnt_per_hour (\n" + "hour_of_day BIGINT," + "buy_cnt BIGINT" + ") WITH (\n" + " 'connector' = 'print'\n" + ")";

String transformationDDL= "INSERT INTO buy_cnt_per_hour\n" + "SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)) as hour_of_day , COUNT(*) as buy_cnt\n" + "FROM user_behavior\n" + "WHERE behavior = 'buy'\n" + "GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)";

//注册source和sink tEnv.executeSql(sourceDDL); tEnv.executeSql(sinkDDL); // tableResult.print();

tEnv.executeSql(transformationDDL);

pom

org.apache.flink

flink-table-api-java-bridge_${scala.version} ${flink.version}

org.apache.flink

flink-table-planner-blink_${scala.version} ${flink.version}

org.apache.flink flink-table-common ${flink.version} provided

org.apache.flink flink-clients_${scala.version} ${flink.version}

org.apache.flink flink-json ${flink.version}

org.apache.flink

flink-connector-elasticsearch7_${scala.version} ${flink.version}

org.apache.flink

flink-sql-connector-kafka_${scala.version} ${flink.version}

org.apache.flink flink-connector-jdbc_${scala.version} ${flink.version}

mysql mysql-connector-java ${mysql.version}

org.apache.flink flink-runtime-web_${scala.version} ${flink.version} provided

运行结果 01:15:12,358 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser - Kafka version: unknown 01:15:12,358 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: unknown 01:15:12,358 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1597338912355 01:15:12,361 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-20, groupId=null] Subscribed to partition(s): user_behavior-0 01:15:12,365 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-20, groupId=null] Seeking to EARLIEST offset of partition user_behavior-0 01:15:12,377 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-20, groupId=null] Cluster ID: txkqox8yRL6aWBNsOcS67g 01:15:12,387 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-20, groupId=null] Resetting offset for partition user_behavior-0 to offset 0. 01:15:12,545 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1 (type=CHECKPOINT) @ 1597338912539 for job c10220b65246e8269defa48f441a7e09. 01:15:12,709 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1 for job c10220b65246e8269defa48f441a7e09 (14080 bytes in 169 ms). 01:15:17,541 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 2 (type=CHECKPOINT) @ 1597338917540 for job c10220b65246e8269defa48f441a7e09. 01:15:17,553 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 2 for job c10220b65246e8269defa48f441a7e09 (14752 bytes in 11 ms). 01:15:22,546 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 3 (type=CHECKPOINT) @ 1597338922545 for job c10220b65246e8269defa48f441a7e09. 01:15:22,558 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 3 for job c10220b65246e8269defa48f441a7e09 (15004 bytes in 12 ms).

原始数据

3> +I(999602,4024409,883960,cart,2017-11-27T00:07:36,2020-08-13T17:16:20.440) 3> +I(30616,1693200,4022701,pv,2017-11-27T00:07:36,2020-08-13T17:16:20.440) 3> +I(145183,3533745,1102540,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440) 3> +I(323010,3376212,1574064,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440) 3> +I(944547,2640409,2465336,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440) 3> +I(232939,1976318,411153,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440) 3> +I(355996,5161162,1582197,buy,2017-11-27T00:07:37,2020-08-13T17:16:20.440) 3> +I(443987,3791622,1464116,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440)*来自志愿者整理的flink邮件归档

展开
收起
游客sadna6pkvqnz6 2021-12-07 17:28:48 1282 0
1 条回答
写回答
取消 提交回答
  • 这是因为flink 1.11里面executeSql是一个异步的接口,在idea里面跑的话,直接就结束了,你需要手动拿到那个executeSql的返回的TableResult,然后去

    tableResult.getJobClient.get() .getJobExecutionResult(Thread.currentThread().getContextClassLoader) .get()

    进行wait job finished*来自志愿者整理的flink

    2021-12-07 21:05:16
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink峰会 - 陈政羽 立即下载
Flink峰会 - 李佳林 立即下载
蚂蚁金服高级开发工程师萧恺:IDEA 插件开发入门教程 立即下载