Hi:
有个问题想请教一下大佬们:正在研究流上join操作,使用FlinkKafkaConsume 消费kafka数据作为数据源,随后关联hbase维度表数据,可以成功关联,但是KafkaSource缺始终没有进行checkpoint,代码中是有设置checkpint的,我想请问一下是需要做其他什么配置吗?代码如下
DataStream kafkaSource = env.addSource(source);
Map<String, OutputTag > sideOutStreamMap = new HashMap<>();
for (RowToColumnBean bean : lists) {
OutputTag app = new OutputTag (bean.getMainTable()) {
};
sideOutStreamMap.put(bean.getMainTable(), app);
}
RowToNumberProcessFunction rowToNumberProcessFunction = new RowToNumberProcessFunction(sideOutStreamMap, lists);
SingleOutputStreamOperator process = kafkaSource.process(rowToNumberProcessFunction);
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironmentImpl.create(env, settings, new TableConfig());
//设置checkpoint
tableEnv.getConfig().getConfiguration().setString("execution.checkpointing.interval", "10 s");
for (RowToColumnBean bean : lists) {
DataStream dataStream = process.getSideOutput(sideOutStreamMap.get(bean.getMainTable()));
String mainTable = bean.getMainTable().split(" ")[0].split("\.")[1].toLowerCase();
//Table tmpTable = tableEnv.fromDataStream(dataStream, StrUtil.list2Str(bean.getQueryColumns()));
tableEnv.createTemporaryView(mainTable, dataStream);
String joinTable = mainTable + "_join";
tableEnv.executeSql("CREATE TABLE " + joinTable + "(\n" +
"rowkey STRING,\n" +
"info ROW ,\n" +
"PRIMARY KEY (rowkey) NOT ENFORCED\n" +
") WITH (\n" +
"'connector' = 'hbase-2.2',\n" +
"'table-name' = 'hid0101_cache_his_dhcapp_nemrforms:dformfiled',\n" +
"'zookeeper.quorum' = '192.168.0.115:2181',\n" +
"'zookeeper.znode.parent' = '/hbase'\n" +
")");
//查询数据
//Table table = tableEnv.sqlQuery("select b.* from tmp a left join dformfiled b on a.key = b.rowkey");
Table table = tableEnv.sqlQuery("select a.,b. from " + mainTable + " a left join " + joinTable + " b on a.key = lower(b.rowkey) and b.formid='550' where b.rowkey is not null");
TableSchema schema = table.getSchema();
schema.getTableColumns().forEach(column -> {
System.err.println(column.asSummaryString());
});
DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnv.toRetractStream(table, Row.class);
tuple2DataStream.print(mainTable);
dataStream.print(mainTable);
}*来自志愿者整理的flink邮件归档
從代碼上來看是使用了 regular join 關聯了 kafka source 和 hbase source,hbase connector 目前是不支持流式數據源的 你可以從任務儀表板上確認下提交的任務,hbase source 的部分應該在執行一段時間後狀態會變更為 FINISHED,目前 flink checkpoint 還不支持在 FINISHED task 上執行
你可以考慮改寫 sql 使用 processing time temporal join [1] 的方式來關聯 hbase table,從 kafka 消費的數據會實時的去查 hbase table 的當前數據做關聯。
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#processing-time-temporal-join*来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。