开发者社区 问答 正文

flink 注册kafka得csv格式

已解决

用flink注册kafka得消息源,format选择csv报错,选择avro可以。是不是kafka连接器不支持csv格式。
报错如下:Exception in thread "main" org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath。

示例代码如下:

tableEnv.connect(
  new Kafka()
    .version("0.11")
      .topic("result_count")
    .property("bootstrap.servers", "**")
)
  .withFormat(
    new Csv().fieldDelimiter(",")
  )
  .withSchema(
    new Schema()
      .field("world", Types.STRING)
      .field("count", Types.INT)
  )
  .inAppendMode()
  .registerTableSource("result_count")

展开
收起
jocean 2018-11-01 20:41:59 5826 分享 版权
2 条回答
写回答
取消 提交回答
  • 采纳回答

    就是现在kafka connector不支持schema是 csv。仅支持 avro和json。
    以后提问能否提供一下flink版本和详细的异常栈信息,便于排查问题。

    2019-07-17 23:11:45
    赞同 展开评论
  • 1

    2019-07-17 23:11:45
    赞同 展开评论