Flink CDC有做过MYSQL -> HIVE 的同步吗??
Flink CDC可以用于MySQL到Hive的数据同步。以下是一个示例:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.23</version>
</dependency>
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.descriptors.*;
import org.apache.flink.table.sources.DynamicTableSource;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
public class MySqlToHiveSync {
public static void main(String[] args) throws Exception {
// 创建流处理执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 创建源表,使用Flink CDC连接器从MySQL中读取数据
String mySqlDdl = "CREATE TABLE my_sql_source (" +
" id INT NOT NULL," +
" name STRING," +
" age INT," +
" PRIMARY KEY (id) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'mysql-cdc'," +
" 'hostname' = 'localhost'," +
" 'port' = '3306'," +
" 'username' = 'root'," +
" 'password' = 'password'," +
" 'database-name' = 'test'," +
" 'table-name' = 'user'" +
")";
DynamicTableSource mySqlSource = new SourceTableFactory().createTableSource(mySqlDdl, new String[]{"id", "name", "age"}, new DataTypes[]{DataTypes.INT(), DataTypes.STRING(), DataTypes.INT()});
tableEnv.registerTableSource("my_sql_source", mySqlSource);
// 创建目标表,将数据写入Hive中
String hiveDdl = "CREATE TABLE my_hive_sink (" +
" id INT," +
" name STRING," +
" age INT" +
") STORED AS parquet TBLPROPERTIES ('parquet.compression'='SNAPPY') LOCATION '/user/hive/warehouse/my_hive_sink';";
DynamicTableSink hiveSink = new SinkTableFactory().createTableSink(hiveDdl, new String[]{"id", "name", "age"}, new DataTypes[]{DataTypes.INT(), DataTypes.STRING(), DataTypes.INT()});
tableEnv.registerTableSink("my_hive_sink", hiveSink);
// 将源表中的数据插入到目标表中,并执行作业
tableEnv.executeSql("INSERT INTO my_hive_sink SELECT * FROM my_sql_source");
}
}
这个示例程序将从MySQL中的user
表中读取数据,并将其插入到Hive中的my_hive_sink
表中。请根据实际情况修改MySQL和Hive连接的配置信息。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。