Flink数据源问题之无法写入数据如何解决

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Flink数据源是指Apache Flink用于读取外部系统数据的接口或组件;本合集将探讨Flink数据源的类型、配置方法和最佳实践,以及在使用数据源时可能遇到的错误和解决方案。

问题一:linksql 如何保证我的维度表是新的维度表呢?就是比如我的数据源是 hive, 每天处理一次,flinkstreaming join 的表 就是 T-1 的维度表,运行到第T+1 天怎么确保,我join 的维度表是 T 天的维度表


请教个技术问题,flinksql 如何保证我的维度表是新的维度表呢?就是比如我的数据源是 hive, 每天处理一次,flinkstreaming join 的表 就是 T-1 的维度表,运行到第T+1 天怎么确保,我join 的维度表是 T 天的维度表 #Flink


参考回答:

维度表放到hbase、mysql这些存储中,实时etl,相当于只存维度最新数据。需要历史数据的场景,每日同步一份到hive保持;不需要不做


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/296129?spm=a2c6h.14164896.0.0.3088d9dfv2rd2u


问题二:flink使用hive作为维表,kafka作为数据源,join时候报错怎么办?


select .... FROM jdqTableSources AS a JOIN tmmmp FOR SYSTEM_TIME AS OF a.proctime AS b

Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Temporal Table Join requires primary key in versioned table, but no primary key can be found. The physical plan is: FlinkLogicalJoin(condition=[AND(=(0,0, 4), __INITIAL_TEMPORAL_JOIN_CONDITION(3, __TEMPORAL_JOIN_LEFT_KEY(3, __TEMPORAL_JOIN_LEFT_KEY(3, __TEMPORAL_JOIN_LEFT_KEY(0), __TEMPORAL_JOIN_RIGHT_KEY(4)))],joinType=[inner])FlinkLogicalCalc(select=[opt,src,cur,PROCTIME()ASproctime])FlinkLogicalTableSourceScan(table=[[defaultcatalog,defaultdatabase,jdqTableSources]],fields=[mid,db,sch,tab,opt,ts,ddl,err,src,cur,cus])FlinkLogicalSnapshot(period=[4)))], joinType=[inner]) FlinkLogicalCalc(select=[opt, src, cur, PROCTIME() AS proctime]) FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, jdqTableSources]], fields=[mid, db, sch, tab, opt, ts, ddl, err, src, cur, cus]) FlinkLogicalSnapshot(period=[cor0.proctime]) FlinkLogicalCalc(select=[item_sku_id, premium, cate_lev, type, borc]) FlinkLogicalTableSourceScan(table=[[myhive, dev, dev_brokenscreen_insurance_sku_info]], fields=[item_sku_id, item_sku_name, premium, cate_lev, type, borc, plan_code, subjection_b, product_name, lev_low_price, lev_upp_price, jd_price, shelves_tm, item_first_cate_name, item_second_cate_name, item_third_cate_name, sure_cate_lev, flag])*来自志愿者整理的flink邮件归档


参考回答:

你看异常信息,提示时态表join的时候需要主键,但是你没有定义。而且你join的时候不需要on吗?


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/361214?spm=a2c6h.14164896.0.0.275fd9dfCWXRB5


问题三:flink-1.11 hive-1.2.1 ddl 无法写入数据


确认数据源有数据,全部代码如下,但是hive就是没有数据

package com.hive;

import org.apache.flink.runtime.state.filesystem.FsStateBackend;

import org.apache.flink.streaming.api.CheckpointingMode;

import org.apache.flink.streaming.api.TimeCharacteristic;

import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.SqlDialect;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import org.apache.flink.table.catalog.hive.HiveCatalog;

import java.time.Duration;

public class HiveTest {

private static final String path = "hdfs_path";

public static void main(String []args) {

System.setProperty("HADOOP_USER_NAME", "work");

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

// 同一时间只允许进行一个检查点

env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

env.setStateBackend(new FsStateBackend(path));

EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()

.useBlinkPlanner()

.inStreamingMode()

.build();

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,tableEnvSettings);

tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);

tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20));

String name = "myhive";

String defaultDatabase = "situation";

String hiveConfDir = "/load/data/hive/hive-conf"; // a local path

String version = "1.2.1";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);

tableEnv.registerCatalog("myhive", hive);

// set the HiveCatalog as the current catalog of the session

tableEnv.useCatalog("myhive");

tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS situation");

tableEnv.executeSql("DROP TABLE IF EXISTS situation.source_table");

tableEnv.executeSql("CREATE TABLE situation.source_table (\n" +

"\thost STRING,\n" +

"\turl STRING,\n" +

"\tpublic_date STRING\n" +

") WITH (\n" +

"\t'connector.type' = 'kafka',\n" +

"\t'connector.version' = 'universal',\n" +

"\t'connector.startup-mode' = 'latest-offset',\n" +

"\t'connector.topic' = 'sendMessage',\n" +

"\t'connector.properties.group.id' = 'domain_testGroup',\n" +

"\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" +

"\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" +

"\t'update-mode' = 'append',\n" +

"\t'format.type' = 'json',\n" +

"\t'format.derive-schema' = 'true'\n" +

")");

tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

tableEnv.executeSql("DROP TABLE IF EXISTS situation.fs_table");

String hiveSql = "\n" +

" CREATE TABLE situation.fs_table (\n" +

" \n" +

" host STRING,\n" +

" url STRING,\n" +

" public_date STRING\n" +

" \n" +

" ) PARTITIONED BY (\n" +

" ts_date STRING,\n" +

" ts_hour STRING,\n" +

" ts_minute STRING\n" +

" ) STORED AS PARQUET\n" +

" TBLPROPERTIES (\n" +

" 'sink.partition-commit.trigger' = 'process time',\n" +

" 'sink.partition-commit.delay' = '1 min',\n" +

" 'sink.partition-commit.policy.kind' = 'metastore,success-file',\n" +

" 'partition.time-extractor.timestamp-pattern' = 'tsdatets_date ts_hour:$ts_minute:00'\n" +

" )\n" +

" ";

tableEnv.executeSql(hiveSql);

tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

tableEnv.executeSql("INSERT INTO situation.fs_table SELECT host, url,public_date," +

" DATE_FORMAT(public_date,'yyyy-MM-dd') ,DATE_FORMAT(public_date,'HH') ,DATE_FORMAT(public_date,'mm') FROM situation.source_table");

}

}*来自志愿者整理的flink邮件归档


参考回答:

看connector的properties还是1.10的格式,你换成1.11试试[1].[1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#connector-options https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#connector-options


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/371528?spm=a2c6h.14164896.0.0.7193d9dfgNu6az


问题四:flink sql聚合后collect收集数据问题


源表三个字段 name, color, ts 按时间窗口聚合后想根据name group by取colors数组

create table source_table ( name STRING, color STRING, ts TIMESTAMP, WATERMARK ts for ts )

create table sink_table ( name STRING, colors ARRAY )

  1. 请问这个select语句要怎么写? select name, collect(color) as colors from source_table group by tumble(ts, interval '5' seconds) 这里collect(color)返回的是multiset类型,怎样转成Array类型呢?
  2. 如果array元素很多,我只想取其中N个,该怎么写flink sql?

3, 若取出现次数最多的前N个,又该怎么写flink sql? select name, collect(color) as colors from ( select name, color from ( select , ROW_NUMBER() OVER (PARTITION BY name ORDER BY color_cnt desc) AS row_num from ( select name, color, count() as color_cnt group by name, color, tumble(ts, interval '5' seconds) ) ) where row_num < 5 ); 是这样写么?*来自志愿者整理的flink邮件归档


参考回答:

1 & 2. multiset 不能转成 array。可以考虑使用 listagg + split_index + limit

语句达成需要的效果。当然更方便的还是写一个 UDF。

  1. window top-n 可以使用 1.13 新引入的 window tvf: https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/window-topn/


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/362036?spm=a2c6h.14164896.0.0.7193d9dfgNu6az


问题五:Flink SQL 如何在流式数据上使用LAG和LEAD函数


如何在流式数据源上使用分析函数LAG和EAD函数:

kafka输入数据如:

{"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0}

{"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0}

{"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0}

{"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0}

{"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0}

{"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0}

sql如下:

INSERT INTO topic_sink
SELECT
  t,
  id,
  speed,
  LAG(speed, 1) OVER w AS speed_1,
  LAG(speed, 2) OVER w AS speed_2
FROM topic_source
WINDOW w AS (
      PARTITION BY id
      ORDER BY t
)

我期望得到的结果数据是

{"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":null, "speed_2":null}

{"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":1.0, "speed_2":null}

{"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":2.0, "speed_2":1.0}

{"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":3.0, "speed_2":2.0}

{"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":4.0, "speed_2":3.0}

{"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":5.0, "speed_2":4.0}

实际得到的结果数据是:

{"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":1.0, "speed_2":1.0}

{"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":2.0, "speed_2":2.0}

{"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":3.0, "speed_2":3.0}

{"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":4.0, "speed_2":4.0}

{"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":5.0, "speed_2":5.0}

{"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":6.0, "speed_2":6.0}

想问一下flink sql里的LAG函数能完成我期望的计算吗?如果可以sql该如何写?


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/306832?spm=a2c6h.14164896.0.0.68dad9df0PUuUo

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
4月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
791 43
|
4月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
286 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
8月前
|
存储 消息中间件 Kafka
基于 Flink 的中国电信星海时空数据多引擎实时改造
本文整理自中国电信集团大数据架构师李新虎老师在Flink Forward Asia 2024的分享,围绕星海时空智能系统展开,涵盖四个核心部分:时空数据现状、实时场景多引擎化、典型应用及未来展望。系统日处理8000亿条数据,具备亚米级定位能力,通过Flink多引擎架构解决数据膨胀与响应时效等问题,优化资源利用并提升计算效率。应用场景包括运动状态识别、个体行为分析和群智感知,未来将推进湖仓一体改造与三维时空服务体系建设,助力数字化转型与智慧城市建设。
815 3
基于 Flink 的中国电信星海时空数据多引擎实时改造
|
4月前
|
SQL 关系型数据库 Apache
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
本文将深入解析 Flink-Doris-Connector 三大典型场景中的设计与实现,并结合 Flink CDC 详细介绍了整库同步的解决方案,助力构建更加高效、稳定的实时数据处理体系。
1789 0
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
|
5月前
|
存储 消息中间件 搜索推荐
京东零售基于Flink的推荐系统智能数据体系
摘要:本文整理自京东零售技术专家张颖老师,在 Flink Forward Asia 2024 生产实践(二)专场中的分享,介绍了基于Flink构建的推荐系统数据,以及Flink智能体系带来的智能服务功能。内容分为以下六个部分: 推荐系统架构 索引 样本 特征 可解释 指标 Tips:关注「公众号」回复 FFA 2024 查看会后资料~
348 1
京东零售基于Flink的推荐系统智能数据体系
|
9月前
|
Oracle 关系型数据库 Java
【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山
本文介绍通过Flink CDC实现Oracle数据实时同步至崖山数据库(YashanDB)的方法,支持全量与增量同步,并涵盖新增、修改和删除的DML操作。内容包括环境准备(如JDK、Flink版本等)、Oracle日志归档启用、用户权限配置、增量日志记录设置、元数据迁移、Flink安装与配置、生成Flink SQL文件、Streampark部署,以及创建和启动实时同步任务的具体步骤。适合需要跨数据库实时同步方案的技术人员参考。
【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山
|
9月前
|
消息中间件 关系型数据库 Kafka
阿里云基于 Flink CDC 的现代数据栈云上实践
阿里云基于 Flink CDC 的现代数据栈云上实践
165 1
|
4月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
482 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。

相关产品

  • 实时计算 Flink版