Flink CDC我自定义的DebeziumDeserializationSchema 里面是把数据和类型组装成了row 类型,在通过把流转为table tEnv.fromChangelogStream(mapStream)
你说的json是干哈的?
在 Flink CDC 中,JSON(JavaScript Object Notation)是一种常用的数据格式,用于表示和交换结构化数据。Flink CDC 中使用 JSON 格式来序列化和传输变更数据(change data),即记录数据库中发生的数据变化。
Flink CDC 通过 CDC 消息(Change Data Capture messages)以 JSON 格式的形式将变更数据发布到外部系统,如消息队列或日志文件。这样,外部系统可以基于这些变更数据进行进一步的处理、分析和消费。
CDC 消息的 JSON 格式通常包括以下字段:
database
: 数据库的名称。table
: 表的名称。type
: 变更类型,如插入、更新、删除。data
: 变更的数据,以 JSON 对象的形式表示。ts_ms
: 变更发生的时间戳。使用 JSON 格式的好处是它的灵活性和可读性,以及广泛的支持和工具。JSON 格式易于解析和处理,可以方便地在各种编程语言和系统之间进行数据传递和交换。
在 Flink CDC 中,您可以通过自定义的 DeserializationSchema 或使用内置的 JSON DeserializationSchema 将 JSON 格式的变更数据转换为 Flink 中的数据类型,以进行进一步的处理和分析。
需要注意的是,Flink CDC 还支持其他的序列化格式,如 Avro、Debezium 格式等,您可以根据实际需求选择适合的数据格式和序列化/反序列化方案。
Flink CDC,指的是Flink的一组Source连接器,它是Flink的核心组件之一。这些连接器的主要职责是从MySQL、PostgreSQL、Oracle、MongoDB等不同的数据库中读取存量的历史数据以及增量的变更数据。CDC全称为Change Data Capture,是一种用于捕获并处理数据库中数据变更的技术。
关于你提到的json,在Flink 1.11版本中,它主要支持Kafka作为现成的变更日志源和JSON编码的变更日志。因此,json在这里主要是用于记录和传输数据库的变更信息。具体到你的自定义的DebeziumDeserializationSchema,它将数据和类型组装成了row类型,并通过把流转为table的方式使用Flink CDC,这一流程并未涉及到json的使用。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。