Flink CDC不支持scan.startup.mode
参数。
Flink CDC 支持的参数代码如下:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.descriptors.TableDescriptor;
import org.apache.flink.table.descriptors.connectors.DebeziumCatalogConnector;
import org.apache.flink.table.descriptors.types.RowType;
public class FlinkCDCExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
HiveCatalog hiveCatalog = new HiveCatalog("default", "localhost", 31010, null, null);
tableEnv.registerCatalog("hive", hiveCatalog);
tableEnv.useCatalog("hive");
TableDescriptor tableDescriptor = TableDescriptor.builder()
.name("your_table_name")
.schema(Schema.builder().field("id", DataTypes.BIGINT()).field("name", DataTypes.STRING()).build())
.connector(DebeziumCatalogConnector.newBuilder()
.type("mysql")
.version("8.0.26")
.hostname("your_mysql_host")
.port(3306)
.username("your_mysql_username")
.password("your_mysql_password")
.databaseName("your_mysql_database")
.tableNamePrefix("your_mysql_table_prefix")
.build())
.build();
tableEnv.createTemporaryView("your_table_name", tableDescriptor);
tableEnv.toAppendStream(tableDescriptor, RowType.of(DataTypes.BIGINT(), DataTypes.STRING()))
.print();
env.execute("Flink CDC Example");
}
}
请将上述代码中的 your_table_name
、your_mysql_host
、your_mysql_username
、your_mysql_password
、your_mysql_database
和 your_mysql_table_prefix
替换为实际的值。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。