开发者社区> 问答> 正文

flinksql1.11中主键声明的问题

我在使用pyflink1.11过程中,使用flinksql维表时声明了主键primary key 但是还是会报错说我没有用声明主键,另外,当我使用inner join代替left join就不会有这个问题,请问这是什么问题

下面我附录了报错信息和代码。谢谢!

报错附录

Traceback (most recent call last):

File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco

return f(*a, **kw)

File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value

format(target_id, ".", name), value)

py4j.protocol.Py4JJavaError: An error occurred while calling o5.execute.

: org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.

at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93)

at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)

at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)

at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)

at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)

at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.Iterator$class.foreach(Iterator.scala:891)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

at scala.collection.AbstractTraversable.map(Traversable.scala:104)

at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)

at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1240)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)

at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)

at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)

at java.lang.Thread.run(Thread.java:748)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

File "mysql_join.py", line 90, in

from_kafka_to_kafka_demo()

File "mysql_join.py", line 22, in from_kafka_to_kafka_demo

st_env.execute("2-from_kafka_to_kafka")

File "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 1057, in execute

return JobExecutionResult(self._j_tenv.execute(job_name))

File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in call

answer, self.gateway_client, self.target_id, self.name)

File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 154, in deco

raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)

pyflink.util.exceptions.TableException: 'UpsertStreamTableSink requires that Table has a full primary keys if it is updated.'

代码附录

from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic

from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings,DataTypes, CsvTableSource, CsvTableSink

from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime

from pyflink.table.window import Tumble

def from_kafka_to_kafka_demo():

use blink table planner

env = StreamExecutionEnvironment.get_execution_environment()

env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

env_settings = EnvironmentSettings.Builder().use_blink_planner().build()

st_env = StreamTableEnvironment.create(stream_execution_environment=env,environment_settings=env_settings)

register source and sink

register_rides_source(st_env)

register_rides_sink(st_env)

register_mysql_source(st_env)

st_env.sql_update("insert into flink_result select cast(t1.id as int) as id,cast(t2.type as varchar),cast( t1.time1 as bigint) as rowtime from source1 t1 left join dim_mysql t2 on t1.type=cast(t2.id as varchar) ")

st_env.execute("2-from_kafka_to_kafka")

def register_rides_source(st_env):

source_ddl = \

"""

create table source1(

id int,

time1 varchar ,

type string

) with (

'connector.type' = 'kafka',

'connector.topic' = 'tp1',

'connector.startup-mode' = 'latest-offset',

'connector.properties.bootstrap.servers' = 'localhost:9092',

'connector.properties.zookeeper.connect' = 'localhost:2181',

'format.type' = 'json',

'connector.version' = 'universal',

'update-mode' = 'append'

)

"""

st_env.sql_update(source_ddl)

def register_mysql_source(st_env):

source_ddl = \

"""

CREATE TABLE dim_mysql (

id int, --

type varchar --

) WITH (

'connector.type' = 'jdbc',

'connector.url' = 'jdbc:mysql://localhost:3390/test',

'connector.table' = 'flink_test',

'connector.driver' = 'com.mysql.cj.jdbc.Driver',

'connector.username' = '***',

'connector.password' = '***',

'connector.lookup.cache.max-rows' = '5000',

'connector.lookup.cache.ttl' = '1min',

'connector.lookup.max-retries' = '3'

)

"""

st_env.sql_update(source_ddl)

def register_rides_sink(st_env):

sink_ddl = \

"""

CREATE TABLE flink_result (

id int,

type varchar,

rtime bigint,

primary key(id) NOT ENFORCED

) WITH (

'connector.type' = 'jdbc',

'connector.url' = 'jdbc:mysql://localhost:3390/test',

'connector.table' = 'flink_result',

'connector.driver' = 'com.mysql.cj.jdbc.Driver',

'connector.username' = '***',

'connector.password' = '***',

'connector.write.flush.max-rows' = '5000',

'connector.write.flush.interval' = '2s',

'connector.write.max-retries' = '3'

)

"""

st_env.sql_update(sink_ddl)

if name == 'main':

from_kafka_to_kafka_demo()

*来自志愿者整理的flink邮件归档

展开
收起
小阿矿 2021-12-06 16:51:39 1104 0
1 条回答
写回答
取消 提交回答
  • 你这还是connector的with参数里不是新 connector的写法[1],会走到老代码,老代码不支持声明PK的。 在老代码里,PK是通过query推导的,你用inner join替换left join后,应该是能够推断出PK了,所以没有报错。 我理解你把connector的with参数更新成新的就解决问题了。[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options

    *来自志愿者整理的flink邮件归档

    2021-12-06 17:09:49
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载