可以参考这个代码 实现下
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkSQLSinkExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().inStreamingMode().build();
//开启checkpoint, check interval为1000毫秒
env.enableCheckpointing(1000);
env.setParallelism(1);
StreamTableEnvironment tenv = StreamTableEnvironment.create(env, environmentSettings);
/* 使用datagen生成测试数据
'rows-per-second' = '10' 每秒发送10条数据
'number-of-rows' = '100' 一共发送100条数据,不设置的话会无限量发送数据
*/
tenv.executeSql("CREATE TABLE order_info_source (\n" +
" order_date DATE,\n" +
" order_id INT,\n" +
" buy_num INT,\n" +
" user_id INT,\n" +
" create_time TIMESTAMP(3),\n" +
" update_time TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second' = '10',\n" +
" 'fields.order_id.min' = '30001',\n" +
" 'fields.order_id.max' = '30500',\n" +
" 'fields.user_id.min' = '10001',\n" +
" 'fields.user_id.max' = '20001',\n" +
" 'fields.buy_num.min' = '10',\n" +
" 'fields.buy_num.max' = '20',\n" +
" 'number-of-rows' = '100'" +
")");
/* 用于查看datagen生成的数据
tenv.executeSql("CREATE TABLE print_table (\n" +
" order_date DATE,\n" +
" order_id INT,\n" +
" buy_num INT,\n" +
" user_id INT,\n" +
" create_time TIMESTAMP(3),\n" +
" update_time TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'print'\n" +
")");
tenv.executeSql("insert into order_info_sink select * from order_info_source");
*/
//注册Doris Sink表
tenv.executeSql("CREATE TABLE order_info_sink ( \n" +
"order_date DATE, \n" +
"order_id INT, \n" +
"buy_num INT,\n" +
"user_id INT,\n" +
"create_time TIMESTAMP(3),\n" +
"update_time TIMESTAMP(3)\n" +
") \n" +
"WITH (\n" +
"'connector' = 'doris', \n" +
"'fenodes' = '192.168.56.104:8030', \n" +
"'table.identifier' = 'test.order_info_example', \n" +
"'username' = 'test', \n" +
"'password' = 'password123', \n" +
"'sink.label-prefix' = 'sink_doris_label_8'\n" +
")"
);
tenv.executeSql("insert into order_info_sink select * from order_info_source");
}
}
https://github.com/baiyuelanshan/apache-doris-example
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。