开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink如何更改SQLServerCDC连接器工厂类的Identifier?

Flink如何更改SQLServerCDC连接器工厂类的Identifier?

展开
收起
真的很搞笑 2024-01-04 14:32:58 58 0
3 条回答
写回答
取消 提交回答
  • Flink的SQLServerCDC连接器工厂类的Identifier是在创建SQLServerCDC连接器时指定的,可以通过Flink的SQLServerCDC连接器工厂类的Identifier是在创建SQLServerCDC连接器时指定的,可以通过修改连接器的配置参数来更改。

    具体来说,可以在创建SQLServerCDC连接器时,通过.withFactoryIdentifier()方法指定新的工厂类标识符。例如:

    ```java
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "test");
    properties.setProperty("enable.auto.commit", "true");
    properties.setProperty("auto.commit.interval.ms", "1000");
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty("schema.registry.url", "http://localhost:8081");
    properties.setProperty("table.name", "my-topic");
    properties.setProperty("format", "json");
    properties.setProperty("timestamp.column", "ts_ms");
    properties.setProperty("debezium.sqlserver.include.schema.changes", "false");
    properties.setProperty("debezium.sqlserver.exclude.tables", "dbo\\sysdiagrams, dbo\\dtproperties, dbo\\users, dbo\\sysfiles, dbo\\systypes, dbo\\syscomments, dbo\\sp_tables_info, dbo\\sp_columns_info, dbo\\sp_procedures, dbo\\sp_help");
    properties.setProperty("connector.class", "io.debezium.connector.sqlservercdc.SqlServerSourceConnector");
    properties.setProperty("tasks.max", "1");
    properties.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
    properties.setProperty("offset.storage.file.filename", "/path/to/offset/storage/file");
    properties.setProperty("offset.flush.interval.ms", "60000");
    properties.setProperty("offset.flush.threshold.rows", "0");
    properties.setProperty("name", "sqlserver-source");
    properties.setProperty("database.hostname", "localhost");
    properties.setProperty("database.port", "1433");
    properties.setProperty("database.user", "sa");
    properties.setProperty("database.password", "password");
    properties.setProperty("database.server.name", "localhost");
    properties

    2024-01-05 14:48:17
    赞同 展开评论 打赏
  • 在实时计算 Flink 中,更改 SQLServerCDC 连接器工厂类的 Identifier 需要进行以下操作:
    克隆 GitHub 仓库,切换到指定版本的分支代码。
    更改 SQLServerCDC 连接器工厂类的 Identifier。具体操作为:在 com.ververica.cdc.connectors.sqlserver.table.SqlServerTableFactory 类中,将 factoryIdentifier() 方法的返回值改为所需的连接器名字,例如 'sqlserver-cdc-test'。 ,此回答整理自钉群“实时计算Flink产品交流群”

    2024-01-04 17:16:47
    赞同 展开评论 打赏
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    要更改Flink SQLServerCDC连接器工厂类的Identifier,需要修改SqlServerSourceOptions类中的identifier属性。

    具体步骤如下:

    1. 找到SqlServerSourceOptions类,该类位于org.apache.flink.connector.sqlservercdc.options包中。

    2. 打开该类,找到identifier属性,将其修改为你想要的新值。例如,将identifier的值从"my-sqlserver-cdc-connector"改为"my-new-sqlserver-cdc-connector"。

    3. 保存修改后的代码。

    4. 重新编译并运行你的Flink应用程序,新的SQLServerCDC连接器工厂类的Identifier将会生效。

    2024-01-04 15:54:47
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 热门讨论

    热门文章

    相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载