在Flink DataStream上做流计算的时候⼀定会涉及时间窗⼝运算,这时候就需要⽤到时间属性,Flink Table API/SQL同样可以使⽤时间属性来⽀持时间窗⼝运算。
时间属性可以是每个表模式的⼀部分,它们是在从 CREATE TABLE DDL 或 DataStream 创建表时定义的。 ⼀旦定 义了时间属性,就可以将其作为字段引⽤并⽤于基于时间的操作。 只要时间属性没有被修改,并且只是从查询的⼀ 部分转发到另⼀部分,它就仍然是有效的时间属性。 时间属性的⾏为类似于常规时间戳,并且可⽤于计算。 在计 算中使⽤时,时间属性被具体化并充当标准时间戳。 但是,普通时间戳不能⽤来代替或转换为时间属性,只有时间 属性字段才能跟⽔印配合使⽤
事件时间
DataStream转Table时定义
方式一
packagecom.blink.sb.time; importcom.blink.sb.beans.ClickLogs; importorg.apache.flink.streaming.api.datastream.DataStream; importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment; importorg.apache.flink.table.api.DataTypes; importorg.apache.flink.table.api.Schema; importorg.apache.flink.table.api.Table; importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment; /*** DataStream转换为Table时指定EventTime属性*/publicclassEventTimeDefDataStream2Table1 { publicstaticvoidmain(String[] args) throwsException { //1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(); //2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env); env.setParallelism(1); //3、读取数据DataStream<ClickLogs>clickLogs=env.fromElements( "Mary,./home,2022-02-02 12:00:00", "Bob,./cart,2022-02-02 12:00:00", "Mary,./prod?id=1,2022-02-02 12:00:05", "Liz,./home,2022-02-02 12:01:00", "Bob,./prod?id=3,2022-02-02 12:01:30", "Mary,./prod?id=7,2022-02-02 12:01:45" ).map(event-> { String[] props=event.split(","); returnClickLogs .builder() .user(props[0]) .url(props[1]) .cTime(props[2]) .build(); }); //4、流转换为动态表finalSchemaschema=Schema.newBuilder() .column("user", DataTypes.STRING()) .column("url", DataTypes.STRING()) //3就是毫秒,0就是秒 .column("cTime", DataTypes.TIMESTAMP(0)) //以cTime字段减5秒作为水印// .watermark("cTime", $("cTime").minus(lit(5).seconds())) .watermark("cTime", "cTime - INTERVAL '5' SECOND") .build(); Tabletable=tEnv.fromDataStream(clickLogs,schema); table.printSchema(); } }
注意:这种⽅式⽔印和时间语意⼀起定义,不需要使⽤assignTimestampsAndWatermarks⽅法指定⽔印⽣成策 略。
方式二
packagecom.blink.sb.time; importcom.blink.sb.beans.ClickLogs; importorg.apache.flink.api.common.eventtime.SerializableTimestampAssigner; importorg.apache.flink.api.common.eventtime.WatermarkStrategy; importorg.apache.flink.streaming.api.datastream.DataStream; importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment; importorg.apache.flink.table.api.Table; importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment; importorg.apache.flink.types.Row; importjava.text.ParseException; importjava.text.SimpleDateFormat; importjava.time.Duration; importstaticorg.apache.flink.table.api.Expressions.$; /*** DataStream转换为Table时指定EventTime属性*/publicclassEventTimeDefDataStream2Table2 { publicstaticvoidmain(String[] args) throwsException { //1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(); //2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env); env.setParallelism(1); //3、读取数据并提取时间戳指定水印生成策略WatermarkStrategy<ClickLogs>watermarkStrategy=WatermarkStrategy .<ClickLogs>forBoundedOutOfOrderness(Duration.ofSeconds(2)) .withTimestampAssigner(newSerializableTimestampAssigner<ClickLogs>() { publiclongextractTimestamp(ClickLogselement, longrecordTimestamp) { SimpleDateFormatsdf=newSimpleDateFormat("yyyy-MM-dd HH:mm:ss"); try { returnsdf.parse(element.getCTime()).getTime(); } catch (ParseExceptione) { e.printStackTrace(); } return0; } }); DataStream<ClickLogs>clickLogs=env.fromElements( "Mary,./home,2022-02-02 12:00:00", "Bob,./cart,2022-02-02 12:00:00", "Mary,./prod?id=1,2022-02-02 12:00:05", "Liz,./home,2022-02-02 12:01:00", "Bob,./prod?id=3,2022-02-02 12:01:30", "Mary,./prod?id=7,2022-02-02 12:01:45" ).map(event-> { String[] props=event.split(","); returnClickLogs .builder() .user(props[0]) .url(props[1]) .cTime(props[2]) .build(); }).assignTimestampsAndWatermarks(watermarkStrategy); //4、流转换为动态表//当字段名不存在时,重新添加一个字段作为时间属性Tabletable=tEnv.fromDataStream(clickLogs,$("user"),$("url"),$("cTime"),$("eTime").rowtime()); //当字段名存在,且类型为Timestamp或者Long时,直接以现有字段作为时间属性(这里肯定报错)// Table table = tEnv.fromDataStream(clickLogs,$("user"),$("url"),$("cTime").rowtime());table.printSchema(); //5、转换为流并打印tEnv.toDataStream(table, Row.class).print(); //6、执行env.execute("EventTimeDefDataStream2Table2"); } }
在Flink SQL 建表语句中定义
packagecom.blink.sb.time; importorg.apache.flink.table.api.*; /*** DDL语句中指定EventTime属性*/publicclassEventTimeDefDDL { publicstaticvoidmain(String[] args) throwsException { //1、创建TableEnvironmentEnvironmentSettingssettings=EnvironmentSettings .newInstance() .build(); TableEnvironmenttEnv=TableEnvironment.create(settings); //2、创建source table(DDL语句)-会自动注册表的tEnv.executeSql("CREATE TABLE user_behavior_log ("+" user_id BIGINT,"+" item_id BIGINT,"+" category_id INT,"+" behavior VARCHAR,"+//必须是TIMESTAMP" ts TIMESTAMP(3),"+//声明 ts 为EventTime属性并使用5秒延迟水印策略" WATERMARK FOR ts AS ts - INTERVAL '5' SECOND"+") WITH ("+" 'connector' = 'filesystem',"+" 'path' = 'data/user_behavior/input/',"+" 'format' = 'csv'"+")");//最后不要有分号,注意空格Tableuser_behavior_log=tEnv.from("user_behavior_log"); user_behavior_log.printSchema(); user_behavior_log.execute().print(); } }
处理时间
DataStream转Table时定义
packagecom.blink.sb.time; importcom.blink.sb.beans.ClickLogs; importorg.apache.flink.streaming.api.datastream.DataStream; importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment; importorg.apache.flink.table.api.Table; importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment; importstaticorg.apache.flink.table.api.Expressions.$; /*** DataStream转换为Table时指定ProcessTime属性*/publicclassProcessTimeDefDataStream2Table { publicstaticvoidmain(String[] args) throwsException { //1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(); //2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env); env.setParallelism(1); //3、读取数据DataStream<ClickLogs>clickLogs=env.fromElements( "Mary,./home,2022-02-02 12:00:00", "Bob,./cart,2022-02-02 12:00:00", "Mary,./prod?id=1,2022-02-02 12:00:05", "Liz,./home,2022-02-02 12:01:00", "Bob,./prod?id=3,2022-02-02 12:01:30", "Mary,./prod?id=7,2022-02-02 12:01:45" ).map(event-> { String[] props=event.split(","); returnClickLogs .builder() .user(props[0]) .url(props[1]) .cTime(props[2]) .build(); }); //4、流转换为动态表Tabletable=tEnv.fromDataStream( clickLogs, $("user"), $("url"), $("cTime"), $("pt").proctime() ); table.printSchema(); table.execute().print(); } }
Fink SQL 建表语句中定义
我们可以在DDL语句中,声明⼀个附加字段作为处理时间属性:
packagecom.blink.sb.time; importorg.apache.flink.table.api.*; /*** DDL语句中指定ProcessTime属性*/publicclassProcessTimeDefDDL { publicstaticvoidmain(String[] args) throwsException { //1、创建TableEnvironmentEnvironmentSettingssettings=EnvironmentSettings .newInstance() .build(); TableEnvironmenttEnv=TableEnvironment.create(settings); //2、创建source table(DDL语句)-会自动注册表的tEnv.executeSql("CREATE TABLE user_behavior_log ("+" user_id BIGINT,"+" item_id BIGINT,"+" category_id INT,"+" behavior VARCHAR,"+" ts BIGINT,"+//声明一个附加字段作为处理时间属性" user_action_time AS PROCTIME()"+") WITH ("+" 'connector' = 'filesystem',"+" 'path' = 'data/user_behavior/input/',"+" 'format' = 'csv'"+")");//最后不要有分号,注意空格Tableuser_behavior_log=tEnv.sqlQuery("select * from user_behavior_log"); user_behavior_log.printSchema(); user_behavior_log.execute().print(); } }