开发者社区> 问答> 正文

Flink SQL将group聚合的数据写入到HBase表报primary keys问题

各位好, 

最近在研究Flink Hbase连接器,测试实验是将聚合的数据写入到hbase报错。希望能得到各位的帮助。代码 如下: 

/** 

  • @Author: ellis.guan 

  • @Description: HBase测试类 

  • @Date: 2020/3/6 15:41 

  • @Version: 1.0 

*/ 

public class HbaseTest { 

private StreamExecutionEnvironment env; 

private StreamTableEnvironment tableEnv; 

@Before 

public void init(){ 

env=StreamExecutionEnvironment.getExecutionEnvironment(); 

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); 

tableEnv = StreamTableEnvironment.create(env, settings); 

tableEnv.sqlUpdate("create table resume01(\n" + 

" rowkey string,sdp_columns_family ROW<age string,mobile BIGINT> \n" + 

// " binfo ROW<age string,mobile string,site string>,\n" + 

// " edu ROW , \n" + 

// " work ROW \n" + 

") with (" + 

" 'connector.type' = 'hbase', " + 

" 'connector.version' = '1.4.3', " + 

" 'connector.table-name' = 'resume01'," + 

" 'connector.zookeeper.quorum' = 'localhost:2181'," + 

" 'connector.zookeeper.znode.parent' = '/hbase'" + 

")"); 

@Test 

public void testReadFromHBase() throws Exception { 

// HBaseTableSource resume = new HBaseTableSource(); 

Table table = tableEnv.sqlQuery("select * from resume"); 

DataStream<Tuple2<Boolean, Row>> out = tableEnv.toRetractStream(table, Row.class); 

out.print(); 

env.execute(); 

@Test 

public void testWriterToHBase() throws Exception { 

DataStream source = env.fromElements( 

Row.of("ellis","2015-03-27","17352837822","changsha","hun nan","shiji"), 

Row.of("ellis","2015-03-28","17352837825","changsha1","hun nan","shiji"), 

Row.of("ellis","2015-03-279","17352837826","changsha2","hun nan","shiji")); 

tableEnv.createTemporaryView("source_resume",source,"name,age,mobile,site,university,company1"); 

tableEnv.sqlUpdate("insert into resume01 select CONCAT_WS('_',age,name),ROW(age,mobile) from " + 

" (select name,age,sum(cast(mobile as bigint)) as mobile from source_resume group by name,age ) as tt"); 

env.execute(); 

运行报错如下: 

org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. 

at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:113) 

at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) 

at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) 

at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) 

at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) 

at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) 

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 

at scala.collection.Iterator$class.foreach(Iterator.scala:891) 

at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) 

at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 

at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 

at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 

at scala.collection.AbstractTraversable.map(Traversable.scala:104) 

at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) 

at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) 

at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) 

at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) 

at com.shiji.sdp.flink.HbaseTest.testWriterToHBase(HbaseTest.java:59) 

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 

at java.lang.reflect.Method.invoke(Method.java:498) 

at org.junit.internal.runners.TestMethod.invoke(TestMethod.java:59) 

at org.junit.internal.runners.MethodRoadie.runTestMethod(MethodRoadie.java:98) 

at org.junit.internal.runners.MethodRoadie$2.run(MethodRoadie.java:79) 

at org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:87) 

at org.junit.internal.runners.MethodRoadie.runTest(MethodRoadie.java:77) 

at org.junit.internal.runners.MethodRoadie.run(MethodRoadie.java:42) 

at org.junit.internal.runners.JUnit4ClassRunner.invokeTestMethod(JUnit4ClassRunner.java:88) 

at org.junit.internal.runners.JUnit4ClassRunner.runMethods(JUnit4ClassRunner.java:51) 

at org.junit.internal.runners.JUnit4ClassRunner$1.run(JUnit4ClassRunner.java:44) 

at org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:27) 

at org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:37) 

at org.junit.internal.runners.JUnit4ClassRunner.run(JUnit4ClassRunner.java:42) 

at org.junit.runner.JUnitCore.run(JUnitCore.java:130) 

at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)*来自志愿者整理的flink邮件归档

展开
收起
CCCC 2021-12-02 15:12:24 808 0
1 条回答
写回答
取消 提交回答
  • 目前 Flink SQL 在插入数据到数据库时,要求 query 的 key 与结果表的 key 相同。这里 HBase 的 key 一直都是  rowkey,但是 query 的 key 丢失了(concat_ws 丢失了 key 属性),因此需要直接 group by  concat_ws(..),才能获得 key 且对应上 HBase 的 rowkey。所以你的 query 需要改成这样: 

    insert into resume01  select age_name,ROW(age,mobile)  from (  select CONCAT_WS('',age,name) as age_name,sum(cast(mobile as bigint))  as mobile  from source_resume group by CONCAT_WS('',age,name)  ) as tt*来自志愿者整理的FLINK邮件归档

    2021-12-02 15:44:39
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
SQL Server 2017 立即下载
GeoMesa on Spark SQL 立即下载
原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载