开发者社区> 问答> 正文

flink1.9 blink planner table ddl 使用问题

flink1.9 blink planner table 使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 需要实现TableSourceFactory,还是其他什么.

提示: Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed. Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath.

代码:

import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.table.api.{EnvironmentSettings, Types}
import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
import org.apache.flink.types.Row


object KafkaInDDL extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)


val sourceDDL =
"""create table sourceTable(
id int,
name varchar
) with (
'connector.type' = 'kafka',
'connector.property-version' = '1',
'update-mode' = 'append',
'bootstrap.servers' = '192.168.1.160:19092',
'connector.topic' = 'hbtest1',
'connector.startup-mode' = 'earliest-offset'
)
"""
tEnv.sqlUpdate(sourceDDL)
tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print()
tEnv.execute("")
}
```*来自志愿者整理的flink邮件归档

展开
收起
小阿怪 2021-12-07 22:12:40 966 0
1 条回答
写回答
取消 提交回答
  • Hi,初步看下来你的 DDL 中有这几部分定义的有问题。

    1. 缺少format properties
    2. 缺少 connector.version
    3. bootstrap.severs 的配置方式写的不对...

    你可以参考下面这个作为example:

    CREATE TABLE kafka_json_source ( rowtime TIMESTAMP, user_name VARCHAR, event ROW<message_type VARCHAR, message VARCHAR> ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'test-json', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.0.key' = 'zookeeper.connect', 'connector.properties.0.value' = 'localhost:2181', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = 'localhost:9092', 'update-mode' = 'append', 'format.type' = 'json', 'format.derive-schema' = 'true' );

    Kafka 中的数据长这个样子:

    {"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": { "message_type": "WARNING", "message": "This is a warning."}} *来自志愿者整理的flink邮件归档

    2021-12-08 10:32:13
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载