各位好,
最近在研究Flink Hbase连接器,测试实验是将聚合的数据写入到hbase报错。希望能得到各位的帮助。代码 如下:
/**
@Author: ellis.guan
@Description: HBase测试类
@Date: 2020/3/6 15:41
@Version: 1.0
*/
public class HbaseTest {
private StreamExecutionEnvironment env;
private StreamTableEnvironment tableEnv;
@Before
public void init(){
env=StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
tableEnv = StreamTableEnvironment.create(env, settings);
tableEnv.sqlUpdate("create table resume01(\n" +
" rowkey
string,sdp_columns_family ROW<age string,mobile BIGINT> \n" +
// " binfo
ROW<age string,mobile string,site string>,\n" +
// " edu ROW , \n" +
// " work ROW \n" +
") with (" +
" 'connector.type' = 'hbase', " +
" 'connector.version' = '1.4.3', " +
" 'connector.table-name' = 'resume01'," +
" 'connector.zookeeper.quorum' = 'localhost:2181'," +
" 'connector.zookeeper.znode.parent' = '/hbase'" +
")");
}
@Test
public void testReadFromHBase() throws Exception {
// HBaseTableSource resume = new HBaseTableSource();
Table table = tableEnv.sqlQuery("select * from resume");
DataStream<Tuple2<Boolean, Row>> out = tableEnv.toRetractStream(table, Row.class);
out.print();
env.execute();
}
@Test
public void testWriterToHBase() throws Exception {
DataStream source = env.fromElements(
Row.of("ellis","2015-03-27","17352837822","changsha","hun nan","shiji"),
Row.of("ellis","2015-03-28","17352837825","changsha1","hun nan","shiji"),
Row.of("ellis","2015-03-279","17352837826","changsha2","hun nan","shiji"));
tableEnv.createTemporaryView("source_resume",source,"name,age,mobile,site,university,company1");
tableEnv.sqlUpdate("insert into resume01 select CONCAT_WS('_',age,name),ROW(age,mobile) from " +
" (select name,age,sum(cast(mobile as bigint)) as mobile from source_resume group by name,age ) as tt");
env.execute();
}
}
运行报错如下:
org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:113)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
at com.shiji.sdp.flink.HbaseTest.testWriterToHBase(HbaseTest.java:59)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.internal.runners.TestMethod.invoke(TestMethod.java:59)
at org.junit.internal.runners.MethodRoadie.runTestMethod(MethodRoadie.java:98)
at org.junit.internal.runners.MethodRoadie$2.run(MethodRoadie.java:79)
at org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:87)
at org.junit.internal.runners.MethodRoadie.runTest(MethodRoadie.java:77)
at org.junit.internal.runners.MethodRoadie.run(MethodRoadie.java:42)
at org.junit.internal.runners.JUnit4ClassRunner.invokeTestMethod(JUnit4ClassRunner.java:88)
at org.junit.internal.runners.JUnit4ClassRunner.runMethods(JUnit4ClassRunner.java:51)
at org.junit.internal.runners.JUnit4ClassRunner$1.run(JUnit4ClassRunner.java:44)
at org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:27)
at org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:37)
at org.junit.internal.runners.JUnit4ClassRunner.run(JUnit4ClassRunner.java:42)
at org.junit.runner.JUnitCore.run(JUnitCore.java:130)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)*来自志愿者整理的flink邮件归档
目前 Flink SQL 在插入数据到数据库时,要求 query 的 key 与结果表的 key 相同。这里 HBase 的 key 一直都是 rowkey,但是 query 的 key 丢失了(concat_ws 丢失了 key 属性),因此需要直接 group by concat_ws(..),才能获得 key 且对应上 HBase 的 rowkey。所以你的 query 需要改成这样:
insert into resume01 select age_name,ROW(age,mobile) from ( select CONCAT_WS('',age,name) as age_name,sum(cast(mobile as bigint)) as mobile from source_resume group by CONCAT_WS('',age,name) ) as tt*来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。