// TODO 下面代码仅供参考,具体测试根据自己时间环境来
// 以下只是一些简单的案例,后面会逐步深入复杂sql和原理层面
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @author 857hub
*/
public class ClickhouseSinkApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(
env,
EnvironmentSettings.newInstance().
// useBlinkPlanner().
build()
);
tEnv.getConfig().getConfiguration().setString(PipelineOptions.NAME, "sql test");
// sources
String source = "CREATE TABLE source (\n" +
" `id` int,\n" +
" `name` varchar.\n" +
" `ts` timestamp(3),\n" +
// 指定watermark 允许延迟5s
"WATERMARK FOR ts AS ts - INTERVAL '5' SECOND"+
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'test1',\n" +
" 'properties.bootstrap.servers' = '172.16.100.109:9092',\n" +
" 'properties.group.id' = 'xzw',\n" +
" 'scan.startup.mode' = 'latest-offset',\n" +
" 'format' = 'json'\n" +
")";
String source2 = "CREATE TABLE source2 (\n" +
" `id` int,\n" +
" `name` varchar,\n" +
" `ts` timestamp(3)\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'test2',\n" +
" 'properties.bootstrap.servers' = '172.16.100.109:9092',\n" +
" 'properties.group.id' = 'xzw',\n" +
" 'scan.startup.mode' = 'latest-offset',\n" +
" 'format' = 'json'\n" +
")";
// clickhouse sink 由我自己定义,后面会对sql自定义source和sink进行讲解
String sink = "CREATE TABLE sink (\n" +
" `id` INT,\n" +
" `name` VARCHAR\n" +
") WITH (\n" +
// 需要自定义接信息参数 -- option
" 'connector' = 'xzw_ck',\n" +
" 'url' = 'jdbc:clickhouse://localhost:8123/default',\n" +
" 'table-name' = 'test',\n" +
" 'username' = 'default',\n" +
" 'password' = '123456'\n" +
" )";
// 执行 source sink sql
tEnv.executeSql(source);
tEnv.executeSql(source2);
tEnv.executeSql(sink);
/*
由于是简单使用,没有在场景应用,简单介绍一下区别,可以根据们不同的区别在自己项目中使用
left json : 无论是否join上都返回左表的数据
inner join : 只有join上才会返回匹配后的结果
full outer join : 两边的数据都会返回,无论是否join上,没有的则为null
interval join : 基于时间范围内的join,在指定的时间范围内返回join上的数据
*/
String joinSql = "select * from source1 s1" +
"left join source2 s2" +
// 内连接
// "inner join source2" || "join source2"
// 全连接
// "full outer join source2"
// 时间范围join
// "s1.ts >= s2.ts AND s1.ts < s2.ts + INTERVAL '10' MINUTE" +
" on s1.id =s2.id "
;
Table joinTable = tEnv.sqlQuery(joinSql);
// 分组排序,取topN, 如果要是去重 rnum=1即可实现去重操作
String insertSql = "insert into sink select id,name from(" +
"select *," +
"row_number() over(partition by id order by ts) as rnum " +
"from "+joinTable+" where rnum < 5 " +
")";
// add insert sql
TableResult tableResult = executeSql(tEnv, "insert into sink select * from source", "*",insertSql);
// 随意使用
// Optional<JobClient> jobClient = tableResult.getJobClient();
}
// 添加多个sql具体执行
private static TableResult executeSql(StreamTableEnvironment tEnv, final String... sqls) {
StatementSet statementSet = tEnv.createStatementSet();
for (String sql : sqls) {
if ("*".equals(sql) || sql.length()>=27) {
continue;
}
statementSet.addInsertSql(sql);
}
return statementSet.execute();
}
}