我遇到个问题,请教一下: 环境 1.11 idea 参考的wuchong大神的demo想把client变成java的,第一个例子 统计每小时的成交量 数据可以读到但是没有输出结果,写入es没反应,后改为print sink 还是没反应 https://github.com/wuchong/flink-sql-demo/tree/v1.11-CN 求助,各位
下面是pom 和代码,以及运行结果
// 创建执行环境 StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); bsEnv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); //设置StateBackend bsEnv.setStateBackend(new FsStateBackend("file:///tmp/flink/chkdir")); EnvironmentSettings bsSettings = EnvironmentSettings .newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// Kafka String sourceDDL ="CREATE TABLE user_behavior (" + "user_id BIGINT," + "item_id BIGINT," + "category_id BIGINT," + "behavior STRING," + "ts TIMESTAMP (3)," + "proctime AS PROCTIME ()," + "WATERMARK FOR ts AS ts-INTERVAL '5' SECOND) " + "WITH (" + "'connector'='kafka'," + "'topic'='user_behavior'," + "'scan.startup.mode'='earliest-offset'," + "'properties.bootstrap.servers'='localhost:9092'," + "'format'='json'" + ")";
//写入es 改为print /* String sinkDDL = "CREATE TABLE buy_cnt_per_hour (" + "hour_of_day BIGINT," + "buy_cnt BIGINT" + ") WITH (" + "'connector'='elasticsearch-7'," + "'hosts'='http://localhost:9200'," + "'index'='buy_cnt_per_hour')";*/ String sinkDDL = "CREATE TABLE buy_cnt_per_hour (\n" + "hour_of_day BIGINT," + "buy_cnt BIGINT" + ") WITH (\n" + " 'connector' = 'print'\n" + ")";
String transformationDDL= "INSERT INTO buy_cnt_per_hour\n" + "SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)) as hour_of_day , COUNT(*) as buy_cnt\n" + "FROM user_behavior\n" + "WHERE behavior = 'buy'\n" + "GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)";
//注册source和sink tEnv.executeSql(sourceDDL); tEnv.executeSql(sinkDDL); // tableResult.print();
tEnv.executeSql(transformationDDL);
pom
org.apache.flink
flink-table-api-java-bridge_${scala.version} ${flink.version}
org.apache.flink
flink-table-planner-blink_${scala.version} ${flink.version}
org.apache.flink flink-table-common ${flink.version} provided
org.apache.flink flink-clients_${scala.version} ${flink.version}
org.apache.flink flink-json ${flink.version}
org.apache.flink
flink-connector-elasticsearch7_${scala.version} ${flink.version}
org.apache.flink
flink-sql-connector-kafka_${scala.version} ${flink.version}
org.apache.flink flink-connector-jdbc_${scala.version} ${flink.version}
mysql mysql-connector-java ${mysql.version}
org.apache.flink flink-runtime-web_${scala.version} ${flink.version} provided
运行结果 01:15:12,358 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser - Kafka version: unknown 01:15:12,358 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: unknown 01:15:12,358 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1597338912355 01:15:12,361 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-20, groupId=null] Subscribed to partition(s): user_behavior-0 01:15:12,365 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-20, groupId=null] Seeking to EARLIEST offset of partition user_behavior-0 01:15:12,377 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-20, groupId=null] Cluster ID: txkqox8yRL6aWBNsOcS67g 01:15:12,387 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-20, groupId=null] Resetting offset for partition user_behavior-0 to offset 0. 01:15:12,545 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1 (type=CHECKPOINT) @ 1597338912539 for job c10220b65246e8269defa48f441a7e09. 01:15:12,709 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1 for job c10220b65246e8269defa48f441a7e09 (14080 bytes in 169 ms). 01:15:17,541 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 2 (type=CHECKPOINT) @ 1597338917540 for job c10220b65246e8269defa48f441a7e09. 01:15:17,553 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 2 for job c10220b65246e8269defa48f441a7e09 (14752 bytes in 11 ms). 01:15:22,546 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 3 (type=CHECKPOINT) @ 1597338922545 for job c10220b65246e8269defa48f441a7e09. 01:15:22,558 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 3 for job c10220b65246e8269defa48f441a7e09 (15004 bytes in 12 ms).
原始数据
3> +I(999602,4024409,883960,cart,2017-11-27T00:07:36,2020-08-13T17:16:20.440) 3> +I(30616,1693200,4022701,pv,2017-11-27T00:07:36,2020-08-13T17:16:20.440) 3> +I(145183,3533745,1102540,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440) 3> +I(323010,3376212,1574064,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440) 3> +I(944547,2640409,2465336,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440) 3> +I(232939,1976318,411153,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440) 3> +I(355996,5161162,1582197,buy,2017-11-27T00:07:37,2020-08-13T17:16:20.440) 3> +I(443987,3791622,1464116,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440)*来自志愿者整理的flink邮件归档
这是因为flink 1.11里面executeSql是一个异步的接口,在idea里面跑的话,直接就结束了,你需要手动拿到那个executeSql的返回的TableResult,然后去
tableResult.getJobClient.get() .getJobExecutionResult(Thread.currentThread().getContextClassLoader) .get()
进行wait job finished*来自志愿者整理的flink
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。