问题一:linksql 如何保证我的维度表是新的维度表呢?就是比如我的数据源是 hive, 每天处理一次,flinkstreaming join 的表 就是 T-1 的维度表,运行到第T+1 天怎么确保,我join 的维度表是 T 天的维度表
请教个技术问题,flinksql 如何保证我的维度表是新的维度表呢?就是比如我的数据源是 hive, 每天处理一次,flinkstreaming join 的表 就是 T-1 的维度表,运行到第T+1 天怎么确保,我join 的维度表是 T 天的维度表 #Flink
参考回答:
维度表放到hbase、mysql这些存储中,实时etl,相当于只存维度最新数据。需要历史数据的场景,每日同步一份到hive保持;不需要不做
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/296129?spm=a2c6h.14164896.0.0.3088d9dfv2rd2u
问题二:flink使用hive作为维表,kafka作为数据源,join时候报错怎么办?
select .... FROM jdqTableSources AS a JOIN tmmmp FOR SYSTEM_TIME AS OF a.proctime AS b
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Temporal Table Join requires primary key in versioned table, but no primary key can be found. The physical plan is: FlinkLogicalJoin(condition=[AND(=(0,0, 4), __INITIAL_TEMPORAL_JOIN_CONDITION(3, __TEMPORAL_JOIN_LEFT_KEY(3, __TEMPORAL_JOIN_LEFT_KEY(3, __TEMPORAL_JOIN_LEFT_KEY(0), __TEMPORAL_JOIN_RIGHT_KEY(4)))],joinType=[inner])FlinkLogicalCalc(select=[opt,src,cur,PROCTIME()ASproctime])FlinkLogicalTableSourceScan(table=[[defaultcatalog,defaultdatabase,jdqTableSources]],fields=[mid,db,sch,tab,opt,ts,ddl,err,src,cur,cus])FlinkLogicalSnapshot(period=[4)))], joinType=[inner]) FlinkLogicalCalc(select=[opt, src, cur, PROCTIME() AS proctime]) FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, jdqTableSources]], fields=[mid, db, sch, tab, opt, ts, ddl, err, src, cur, cus]) FlinkLogicalSnapshot(period=[cor0.proctime]) FlinkLogicalCalc(select=[item_sku_id, premium, cate_lev, type, borc]) FlinkLogicalTableSourceScan(table=[[myhive, dev, dev_brokenscreen_insurance_sku_info]], fields=[item_sku_id, item_sku_name, premium, cate_lev, type, borc, plan_code, subjection_b, product_name, lev_low_price, lev_upp_price, jd_price, shelves_tm, item_first_cate_name, item_second_cate_name, item_third_cate_name, sure_cate_lev, flag])*来自志愿者整理的flink邮件归档
参考回答:
你看异常信息,提示时态表join的时候需要主键,但是你没有定义。而且你join的时候不需要on吗?
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/361214?spm=a2c6h.14164896.0.0.275fd9dfCWXRB5
问题三:flink-1.11 hive-1.2.1 ddl 无法写入数据
确认数据源有数据,全部代码如下,但是hive就是没有数据
package com.hive;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import java.time.Duration;
public class HiveTest {
private static final String path = "hdfs_path";
public static void main(String []args) {
System.setProperty("HADOOP_USER_NAME", "work");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// 同一时间只允许进行一个检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setStateBackend(new FsStateBackend(path));
EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,tableEnvSettings);
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20));
String name = "myhive";
String defaultDatabase = "situation";
String hiveConfDir = "/load/data/hive/hive-conf"; // a local path
String version = "1.2.1";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tableEnv.registerCatalog("myhive", hive);
// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive");
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS situation");
tableEnv.executeSql("DROP TABLE IF EXISTS situation.source_table");
tableEnv.executeSql("CREATE TABLE situation.source_table (\n" +
"\thost STRING,\n" +
"\turl STRING,\n" +
"\tpublic_date STRING\n" +
") WITH (\n" +
"\t'connector.type' = 'kafka',\n" +
"\t'connector.version' = 'universal',\n" +
"\t'connector.startup-mode' = 'latest-offset',\n" +
"\t'connector.topic' = 'sendMessage',\n" +
"\t'connector.properties.group.id' = 'domain_testGroup',\n" +
"\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" +
"\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" +
"\t'update-mode' = 'append',\n" +
"\t'format.type' = 'json',\n" +
"\t'format.derive-schema' = 'true'\n" +
")");
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.executeSql("DROP TABLE IF EXISTS situation.fs_table");
String hiveSql = "\n" +
" CREATE TABLE situation.fs_table (\n" +
" \n" +
" host STRING,\n" +
" url STRING,\n" +
" public_date STRING\n" +
" \n" +
" ) PARTITIONED BY (\n" +
" ts_date STRING,\n" +
" ts_hour STRING,\n" +
" ts_minute STRING\n" +
" ) STORED AS PARQUET\n" +
" TBLPROPERTIES (\n" +
" 'sink.partition-commit.trigger' = 'process time',\n" +
" 'sink.partition-commit.delay' = '1 min',\n" +
" 'sink.partition-commit.policy.kind' = 'metastore,success-file',\n" +
" 'partition.time-extractor.timestamp-pattern' = 'tsdatets_date ts_hour:$ts_minute:00'\n" +
" )\n" +
" ";
tableEnv.executeSql(hiveSql);
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tableEnv.executeSql("INSERT INTO situation.fs_table SELECT host, url,public_date," +
" DATE_FORMAT(public_date,'yyyy-MM-dd') ,DATE_FORMAT(public_date,'HH') ,DATE_FORMAT(public_date,'mm') FROM situation.source_table");
}
}*来自志愿者整理的flink邮件归档
参考回答:
看connector的properties还是1.10的格式,你换成1.11试试[1].[1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#connector-options https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#connector-options
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/371528?spm=a2c6h.14164896.0.0.7193d9dfgNu6az
问题四:flink sql聚合后collect收集数据问题
源表三个字段 name, color, ts 按时间窗口聚合后想根据name group by取colors数组
create table source_table ( name STRING, color STRING, ts TIMESTAMP, WATERMARK ts for ts )
create table sink_table ( name STRING, colors ARRAY )
- 请问这个select语句要怎么写? select name, collect(color) as colors from source_table group by tumble(ts, interval '5' seconds) 这里collect(color)返回的是multiset类型,怎样转成Array类型呢?
- 如果array元素很多,我只想取其中N个,该怎么写flink sql?
3, 若取出现次数最多的前N个,又该怎么写flink sql? select name, collect(color) as colors from ( select name, color from ( select , ROW_NUMBER() OVER (PARTITION BY name ORDER BY color_cnt desc) AS row_num from ( select name, color, count() as color_cnt group by name, color, tumble(ts, interval '5' seconds) ) ) where row_num < 5 ); 是这样写么?*来自志愿者整理的flink邮件归档
参考回答:
1 & 2. multiset 不能转成 array。可以考虑使用 listagg + split_index + limit
语句达成需要的效果。当然更方便的还是写一个 UDF。
- window top-n 可以使用 1.13 新引入的 window tvf: https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/window-topn/
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/362036?spm=a2c6h.14164896.0.0.7193d9dfgNu6az
问题五:Flink SQL 如何在流式数据上使用LAG和LEAD函数
如何在流式数据源上使用分析函数LAG和EAD函数:
kafka输入数据如:
{"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0}
{"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0}
{"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0}
{"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0}
{"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0}
{"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0}
sql如下:
INSERT INTO topic_sink SELECT t, id, speed, LAG(speed, 1) OVER w AS speed_1, LAG(speed, 2) OVER w AS speed_2 FROM topic_source WINDOW w AS ( PARTITION BY id ORDER BY t )
我期望得到的结果数据是
{"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":null, "speed_2":null}
{"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":1.0, "speed_2":null}
{"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":2.0, "speed_2":1.0}
{"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":3.0, "speed_2":2.0}
{"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":4.0, "speed_2":3.0}
{"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":5.0, "speed_2":4.0}
实际得到的结果数据是:
{"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":1.0, "speed_2":1.0}
{"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":2.0, "speed_2":2.0}
{"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":3.0, "speed_2":3.0}
{"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":4.0, "speed_2":4.0}
{"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":5.0, "speed_2":5.0}
{"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":6.0, "speed_2":6.0}
想问一下flink sql里的LAG函数能完成我期望的计算吗?如果可以sql该如何写?
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/306832?spm=a2c6h.14164896.0.0.68dad9df0PUuUo