各位, 请问: flink cdc, 用 flink sql 的方式 sink 到 kafka 可以

各位, 请问: flink cdc, 用 flink sql 的方式 sink 到 kafka 可以指定输出 schema 信息嘛? 看到好像只有 api 中可以指定 .deserializer(new JsonDebeziumDeserializationSchema(true)). flink sql 没办法嘛?

展开
收起
雪哥哥 2022-10-30 10:27:13 1349 发布于浙江 分享
分享
版权
举报
2 条回答
写回答
取消 提交回答
  • 资深技术专家。主攻技术开发,擅长分享、写文、测评。

    flink-cdc-mysql2kafka 建立同步任务,可以使用sql如下: insert into product_view_kafka_sink select * from product_view_source; 这个时候是可以退出flink sql-client的,然后进入flink web-ui,可以看到mysql表数据已经同步到kafka中了,对mysql进行插入,kafka都是同步更新的。 image-20220914171441498 通过kafka控制台消费,可以看到数据已经从mysql同步到kafka了

    2022-11-30 07:47:58 发布于内蒙古 举报
    赞同 评论

    评论

    全部评论 (0)

    登录后可评论
  • 通过将数据写入 TableSink 来实现的。TableSink 是一个通用接口,可以支持不同的文件格式、存储数据库和消息队列。除了输出到文件,也可以输出到 Kafka。

    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.api.DataTypes
    import org.apache.flink.table.api.scala.StreamTableEnvironment
    import org.apache.flink.table.descriptors.{Csv, FileSystem, Kafka, Schema}
    
    /**
     * @Package
     * @author 大数据老哥
     * @date 2020/12/18 16:51
     * @version V1.0
     */
    object FlinkSqlSourceFileSinkKafka {
      def main(args: Array[String]): Unit = {
        // 构建运行环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        // 构建表运行环境
        val tableEnv = StreamTableEnvironment.create(env)
        // 读取文件数据
        tableEnv.connect(new FileSystem().path("./data/user.txt"))
          .withFormat(new Csv())
          .withSchema(new Schema()
            .field("id", DataTypes.STRING())
            .field("name", DataTypes.STRING())
          ).createTemporaryTable("FileInput")
        // 设置kafka的输出
        tableEnv.connect(new Kafka()
          .version("0.11") // 设置kafka的版本
          .topic("FlinkSqlTest") // 设置要连接的主题
          .property("zookeeper.connect", "node01:2181,node02:2181,node03:2181") //设置zookeeper的连接地址跟端口号
          .property("bootstrap.servers", "node01:9092,node02:9092,node03:9092") //设置kafka的连接地址跟端口号
        ).withFormat(new Csv())
          .withSchema(new Schema()
            .field("id", DataTypes.STRING())
            .field("name", DataTypes.STRING())
          ).createTemporaryTable("outPutKafka")
    
        val res = tableEnv.sqlQuery("select  * from  FileInput")
    
        res.insertInto("outPutKafka")
        env.execute("FlinkSqlSourceFileSinkKafka")
      }
    
    }
    
    
    
    2022-11-24 07:30:26 发布于北京 举报
    赞同 评论

    评论

    全部评论 (0)

    登录后可评论

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

收录在圈子:
实时计算 Flink 版(Alibaba Cloud Realtime Compute for Apache Flink,Powered by Ververica)是阿里云基于 Apache Flink 构建的企业级、高性能实时大数据处理系统,由 Apache Flink 创始团队官方出品,拥有全球统一商业化品牌,完全兼容开源 Flink API,提供丰富的企业级增值功能。
还有其他疑问?
咨询AI助理