开发者社区> 问答> 正文

Flink Stream + StreamTableEnvironment 结合使用时checkpo

Hi:

有个问题想请教一下大佬们:正在研究流上join操作,使用FlinkKafkaConsume 消费kafka数据作为数据源,随后关联hbase维度表数据,可以成功关联,但是KafkaSource缺始终没有进行checkpoint,代码中是有设置checkpint的,我想请问一下是需要做其他什么配置吗?代码如下

DataStream kafkaSource = env.addSource(source);

Map<String, OutputTag > sideOutStreamMap = new HashMap<>();

for (RowToColumnBean bean : lists) {

OutputTag app = new OutputTag (bean.getMainTable()) {

};

sideOutStreamMap.put(bean.getMainTable(), app);

}

RowToNumberProcessFunction rowToNumberProcessFunction = new RowToNumberProcessFunction(sideOutStreamMap, lists);

SingleOutputStreamOperator process = kafkaSource.process(rowToNumberProcessFunction);

EnvironmentSettings settings = EnvironmentSettings.newInstance()

.useBlinkPlanner()

.inStreamingMode()

.build();

StreamTableEnvironment tableEnv = StreamTableEnvironmentImpl.create(env, settings, new TableConfig());

//设置checkpoint

tableEnv.getConfig().getConfiguration().setString("execution.checkpointing.interval", "10 s");

for (RowToColumnBean bean : lists) {

DataStream dataStream = process.getSideOutput(sideOutStreamMap.get(bean.getMainTable()));

String mainTable = bean.getMainTable().split(" ")[0].split("\.")[1].toLowerCase();

//Table tmpTable = tableEnv.fromDataStream(dataStream, StrUtil.list2Str(bean.getQueryColumns()));

tableEnv.createTemporaryView(mainTable, dataStream);

String joinTable = mainTable + "_join";

tableEnv.executeSql("CREATE TABLE " + joinTable + "(\n" +

"rowkey STRING,\n" +

"info ROW ,\n" +

"PRIMARY KEY (rowkey) NOT ENFORCED\n" +

") WITH (\n" +

"'connector' = 'hbase-2.2',\n" +

"'table-name' = 'hid0101_cache_his_dhcapp_nemrforms:dformfiled',\n" +

"'zookeeper.quorum' = '192.168.0.115:2181',\n" +

"'zookeeper.znode.parent' = '/hbase'\n" +

")");

//查询数据

//Table table = tableEnv.sqlQuery("select b.* from tmp a left join dformfiled b on a.key = b.rowkey");

Table table = tableEnv.sqlQuery("select a.,b. from " + mainTable + " a left join " + joinTable + " b on a.key = lower(b.rowkey) and b.formid='550' where b.rowkey is not null");

TableSchema schema = table.getSchema();

schema.getTableColumns().forEach(column -> {

System.err.println(column.asSummaryString());

});

DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnv.toRetractStream(table, Row.class);

tuple2DataStream.print(mainTable);

dataStream.print(mainTable);

}*来自志愿者整理的flink邮件归档

展开
收起
moonlightdisco 2021-12-02 17:14:07 748 0
1 条回答
写回答
取消 提交回答
  • 從代碼上來看是使用了 regular join 關聯了 kafka source 和 hbase source,hbase connector 目前是不支持流式數據源的 你可以從任務儀表板上確認下提交的任務,hbase source 的部分應該在執行一段時間後狀態會變更為 FINISHED,目前 flink checkpoint 還不支持在 FINISHED task 上執行

    你可以考慮改寫 sql 使用 processing time temporal join [1] 的方式來關聯 hbase table,從 kafka 消費的數據會實時的去查 hbase table 的當前數據做關聯。

    [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#processing-time-temporal-join*来自志愿者整理的FLINK邮件归档

    2021-12-02 17:40:56
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载