开发者社区> 问答> 正文

flink 注册kafka得csv格式

已解决

用flink注册kafka得消息源,format选择csv报错,选择avro可以。是不是kafka连接器不支持csv格式。
报错如下:Exception in thread "main" org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath。

示例代码如下:

tableEnv.connect(
  new Kafka()
    .version("0.11")
      .topic("result_count")
    .property("bootstrap.servers", "**")
)
  .withFormat(
    new Csv().fieldDelimiter(",")
  )
  .withSchema(
    new Schema()
      .field("world", Types.STRING)
      .field("count", Types.INT)
  )
  .inAppendMode()
  .registerTableSource("result_count")

展开
收起
jocean 2018-11-01 20:41:59 5727 0
2 条回答
写回答
取消 提交回答
  • 采纳回答

    就是现在kafka connector不支持schema是 csv。仅支持 avro和json。
    以后提问能否提供一下flink版本和详细的异常栈信息,便于排查问题。

    2019-07-17 23:11:45
    赞同 展开评论 打赏
  • 1

    2019-07-17 23:11:45
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载