Flink CDC3.0里面这种properties的参数怎么配置呀?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在Flink CDC 3.0中,properties
参数的配置主要用于传递自定义的Debezium属性或其他连接器相关的参数。这些参数可以通过WITH
子句进行设置,具体配置方式如下:
properties
参数的基本方法在Flink SQL中,properties
参数通常以键值对的形式添加到WITH
子句中。以下是一个通用的配置示例:
CREATE TABLE source_table (
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'your-hostname',
'port' = '3306',
'username' = 'your-username',
'password' = 'your-password',
'database-name' = 'your-database',
'table-name' = 'your-table',
'debezium.properties.bigint.unsigned.handling.mode' = 'long',
'debezium.properties.decimal.handling.mode' = 'double'
);
说明: - debezium.properties.*
是用于传递Debezium特定参数的前缀。 - 在上述示例中,bigint.unsigned.handling.mode
和decimal.handling.mode
是Debezium的配置项,分别用于控制无符号大整数和小数的处理方式。
properties
参数及其作用以下是一些常见的properties
参数及其用途:
bigint.unsigned.handling.mode
控制无符号大整数的处理方式,可选值为long
或precise
。
示例:
'debezium.properties.bigint.unsigned.handling.mode' = 'long'
decimal.handling.mode
控制小数的处理方式,可选值为double
或string
。
示例:
'debezium.properties.decimal.handling.mode' = 'double'
snapshot.mode
控制快照模式,可选值包括initial
、never
等。
示例:
'debezium.properties.snapshot.mode' = 'initial'
jdbc.properties.useSSL
控制是否使用SSL连接数据库,适用于MySQL等支持SSL的数据库。
示例:
'jdbc.properties.useSSL' = 'false'
jdbc.properties.characterEncoding
指定字符编码格式,例如utf-8
。
示例:
'jdbc.properties.characterEncoding' = 'utf-8'
obcdc.properties.sort_trans_participants
'obcdc.properties.sort_trans_participants' = '1'
参数前缀
不同的连接器可能需要不同的参数前缀,例如debezium.properties.*
、jdbc.properties.*
或obcdc.properties.*
。请根据具体的连接器类型选择正确的前缀。
版本兼容性
Flink CDC 3.0从2.0.0版本开始将group id
从com.alibaba.ververica
改为com.ververica
,因此在Maven仓库中查找依赖时,请确保路径为/com/ververica
。
自动清理Replication Slot
如果使用PostgreSQL CDC连接器,建议在作业停止时手动释放Replication Slot,或者通过配置'debezium.slot.drop.on.stop' = 'true'
实现自动清理。但需要注意,自动清理可能导致WAL日志被回收,从而无法保证At-Least-Once语义。
以下是一个完整的Flink CDC 3.0配置示例,包含properties
参数的使用:
CREATE TABLE mysql_source (
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = 'password',
'database-name' = 'test_db',
'table-name' = 'products',
'debezium.properties.bigint.unsigned.handling.mode' = 'long',
'debezium.properties.decimal.handling.mode' = 'double',
'jdbc.properties.useSSL' = 'false',
'jdbc.properties.characterEncoding' = 'utf-8'
);
通过WITH
子句中的properties
参数,您可以灵活地配置Flink CDC连接器的行为。根据具体的连接器类型(如MySQL、PostgreSQL、OceanBase等),选择合适的参数前缀和配置项即可满足需求。务必注意版本兼容性和参数的作用范围,以避免潜在的问题。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。