报错如下: **Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has full primary keys if it is updated. ** 代码如下:
tableEnv.connect(new kafka()
. property(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"30.23.13.231:9092")
.property("zookeeper.connect","....:2181"))
.withFormat(new Json())
.withSchema(new Schema().field("componentType",Types.STRING)
.field("endDate",Types.STRING)
.field("envName",Types.STRING)
.field("resultId",Types.STRING)
.field("spendTime",Types.INT)
.field("returnDataNum",Types.INT)
.field("startDate",Types.STRING)
.field("tableName",Types.STRING)
.field("tenantName",Types.STRING))
.inAppendMode()
.createTemporaryTable("MyTable")
val hbaseDDL :String =
"""
|Create table flink_log1 (
|rowkey string,
|cf ROW(f1 BIGINT,f2 BIGINT,f3 INT,f4 INT,f5 BIGINT,tenantName VARCHAR)
|) with(
| 'connector.type' = 'hbase',
| 'connector.version' = '1.4.3',
| 'connector.table-name' = 'flink_log1',
| 'connector.zookeeper.quorum' = '....:2181,....:2181',
| 'connector.zookeeper.znode.parent' = '/hbase',
| 'connector.write.buffer-flush.max-size' = '10mb',
| 'connector.write.buffer-flush.max-rows' = '1000'
|)
""".stripMargin
tableEnv.sqlUpdate(hbaseDDL)
val sql =
"select CONCAT_WS('_',tenantName,tenantName) as tm_tenantName, " +
"count(tenantName) as f1 ," +
"count(case when resultId = '2' then resultId else '' end) as f2 ,"+
"avg(spendTime) as f3 ,"+
"sum(returnDataNum) as f4 ,"+
"count(case when resultId = '1' then tenantName else '' end) as f5 ,"+
"tenantName "+
"from MyTable where substring(endDate,1,10)='2020-06-28' " +
"group by CONCAT_WS('_',tenantName,tenantName),tenantName"
val table: Table = tableEnv.sqlQuery(sql)
tableEnv.createTemporaryView("tmp",table)
tableEnv.sqlUpdate("insert into flink_log1 " +
"select tm_tenantName,ROW(f1,f2,f3,f4,f5,tenantName) as cf from tmp ")
streamEnv.execute("my insert hbase sql")
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。