Flink数据问题之连接mysql无数据输出如何解决

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:Flink CDC 遇到关于不发生packet造成的卡顿问题该如何解决

主要是为了实现解析自定义的schema,sink端好输出到下游。

想请教一个问题:

https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector#set-a-differnet-server-id-for-each-job

看了上面这个链接关于为每个作业设置一个differnet server id的问题。我看sql可以指定不同的server id,所以有下面这三个疑惑:

1、 如果是不同的stream 任务 的它的server id是不是同一个?

2、不同的stream 任务 同步同一个数据库的不同表是不是没有问题

3、不同的stream 任务 同步同一个数据库的同一张表是不是有问题

*来自志愿者整理的flink邮件归档



参考答案:

See the docs: https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector#setting-up-mysql-session-timeouts

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370018?spm=a2c6h.13066369.question.83.33bf585fwBnaSf



问题二:flink sql 连接mysql 无数据输出

   我在Idea里用flink-jdbc-connector连接mysql, 建完表后执行env.executeSql("select * from my_table").print()方法,只打印了表头,没有数据是什么原因? flink版本1.11.2

*来自志愿者整理的flink邮件归档



参考答案:

是不是没有加这一行代码,tableEnv.execute("test");

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370017?spm=a2c6h.13066369.question.84.33bf585fAxGR1X



问题三:flink 1.11.2 rowtime和proctime流 Interval Join 问题错误问

我需要将事件时间的流同处理时间的流做Interval Join时提示错误,我是用的是flink 1.11.2版本,我的示例程序如下,请问为什么提示我是个常规join,而不是区间join呢?我该如何解决?     我的 l_table.l_rt = r_table.r_pt 可以运行成功,而l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND运行错误!package join;

> >import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >import org.apache.flink.table.api.EnvironmentSettings; >import org.apache.flink.table.api.Table; >import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > >public class Test1 { > >    public static void main(String[] args) { >        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); >        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); > >        String lTable = "CREATE TABLE l_table (  " + >                " l_a INT,  " + >                " l_b string,  " + >                " l_rt AS localtimestamp,  " + >                " WATERMARK FOR l_rt AS l_rt  " + >                ") WITH (  " + >                " 'connector' = 'datagen',  " + >                " 'rows-per-second'='5',  " + >                " 'fields.l_a.min'='1',  " + >                " 'fields.l_a.max'='5',  " + >                " 'fields.l_b.length'='5'  " + >                ")"; >        bsTableEnv.executeSql(lTable); > >        String rTable = "CREATE TABLE r_table (  " + >                " r_a INT,  " + >                " r_b string,  " + >                " r_pt AS proctime()  " + >                ") WITH (  " + >                " 'connector' = 'datagen',  " + >                " 'rows-per-second'='5',  " + >                " 'fields.r_a.min'='1',  " + >                " 'fields.r_a.max'='5',  " + >                " 'fields.r_b.length'='5'  " + >                ")"; >        bsTableEnv.executeSql(rTable); > >        String printTable = "CREATE TABLE print (" + >                "  l_a INT,  " + >                "  l_b string,  " + >                "  l_rt timestamp(3),  " + >                "  r_a INT,  " + >                "  r_b string,  " + >                "  r_pt timestamp(3)  " + >                ") WITH (  " + >                " 'connector' = 'print' " + >                ") "; > >        bsTableEnv.executeSql(printTable); > >        // 运行成功 >//        Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt = r_table.r_pt"); > >        // 运行错误,提示Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before. >        Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND"); > >        bsTableEnv.executeSql("insert into print select * from " + joinTable); > >    } > >}

*来自志愿者整理的flink邮件归档



参考答案:

 因为你的 2 个流的时间属性不一样,所以就认为不是 interval join。

  而在 match 到到了regular join 后,又因为 join 条件中有时间属性,故报了这个错。

  Interval join 需要 2 个流的时间属性一样,所以你需要对这 2 条流使用相同的时间属性。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370016?spm=a2c6h.13066369.question.85.33bf585fSSKbxa



问题四:slot问题

          一个slot同时只能运行一个线程吗?或者1个slot可以同时并行运行多个线程?

*来自志愿者整理的flink邮件归档



参考答案:

有啊,一个slot本身就可以运行多个线程的。但是不可以运行1个算子结点的多个任务,也不可以运行多个作业中的算子结点的多个任务。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370015?spm=a2c6h.13066369.question.84.33bf585f3XoJwI



问题五:flink sql时间戳字段类型转换问题

数据源来自Jark项目 https://github.com/wuchong/flink-sql-submit

中的kafka消息,里面user_behavior消息例如

{"user_id": "470572", "item_id":"3760258", "category_id": "1299190",

"behavior": "pv", "ts": "2017-11-26T01:00:01Z"}

可以看到ts值是 '2017-11-26T01:00:00Z',现在要为它定义一张flink sql源表,如下

CREATE TABLE user_log (

user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP(3), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'json', -- 'json.timestamp-format.standard' = 'ISO-8601', // 不加这一行默认是'SQL' 'scan.startup.mode' = 'earliest-offset' );

程序运行会抛错

Caused by: java.time.format.DateTimeParseException: Text '2017-11-26T01:00:00Z' could not be parsed at index 10

我查了一下flink json官方文档 https://ci.apache.org/projects/flink/flink-docs-stable/zh/dev/table/connectors/formats/json.html#json-timestamp-format-standard

目前只支持两种格式:SQL 和 ISO-8601

其中SQL支持的格式是 'yyyy-MM-dd HH:mm:ss',

而ISO-8601支持的格式是 'yyyy-MM-ddTHH:mm:ss.s{precision}'

确实不支持上面的 'yyyy-MM-ddTHH:mm:ssZ' (注意末尾的Z)

请问:

  1. 像上述时间格式字段在Flink SQL中应该解析成什么类型?
  2. 如果不能直接支持的话是不是得先用VARCHAR类型接收,再利用 UNIX_TIMESTAMP(ts_string,

pattern_string) 函数转成 支持的时间格式?可问题是 pattern_string

里面如果包含单引号要如何转义?UNIX_TIMESTAMP('2017-11-26T01:00:00Z',

'yyyy-MM-dd'T'HH:mm:ss'Z'')?

  1. TIMESTAMP WITH TIME ZONE和TIMESTAMP WITH LOCAL TIME

ZONE这两种类型在什么情况下会用到?有例子吗?

谢谢!

*来自志愿者整理的flink邮件归档



参考答案:

你可以用这篇文章中的 docker: https://flink.apache.org/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html https://raw.githubusercontent.com/wuchong/flink-sql-demo/v1.11-EN/docker-compose.yml

这个容器里面的 ts 数据格式是 SQL 格式的。

  1. 像上述时间格式字段在Flink SQL中应该解析成什么类型? TIMESTAMP WITH LOCAL TIME ZONE, 1.12 的 json formart 才支持。
  2. 是的
  3. Flink 目前还不支持 TIMESTAMP WITH TIME ZONE。 'yyyy-MM-dd HH:mm:ss' 这种,对应的是 TIMESTAMP,代表无时区 timestamp long 值,或者 'yyyy-MM-dd HH:mm:ssZ' 这种是TIMESTAMP WITH LOCAL TIME ZONE ,代表session 时区的 timestamp

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370014?spm=a2c6h.13066369.question.87.33bf585fd5RWZf

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
1月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
488 43
|
1月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
166 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
2月前
|
SQL 人工智能 关系型数据库
如何实现MySQL百万级数据的查询?
本文探讨了在MySQL中对百万级数据进行排序分页查询的优化策略。面对五百万条数据,传统的浅分页和深分页查询效率较低,尤其深分页因偏移量大导致性能显著下降。通过为排序字段添加索引、使用联合索引、手动回表等方法,有效提升了查询速度。最终建议根据业务需求选择合适方案:浅分页可加单列索引,深分页推荐联合索引或子查询优化,同时结合前端传递最后一条数据ID的方式实现高效翻页。
141 0
|
1月前
|
存储 关系型数据库 MySQL
在CentOS 8.x上安装Percona Xtrabackup工具备份MySQL数据步骤。
以上就是在CentOS8.x上通过Perconaxtabbackup工具对Mysql进行高效率、高可靠性、无锁定影响地实现在线快速全量及增加式数据库资料保存与恢复流程。通过以上流程可以有效地将Mysql相关资料按需求完成定期或不定期地保存与灾难恢复需求。
151 10
|
1月前
|
SQL 关系型数据库 Apache
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
本文将深入解析 Flink-Doris-Connector 三大典型场景中的设计与实现,并结合 Flink CDC 详细介绍了整库同步的解决方案,助力构建更加高效、稳定的实时数据处理体系。
986 0
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
|
2月前
|
存储 消息中间件 搜索推荐
京东零售基于Flink的推荐系统智能数据体系
摘要:本文整理自京东零售技术专家张颖老师,在 Flink Forward Asia 2024 生产实践(二)专场中的分享,介绍了基于Flink构建的推荐系统数据,以及Flink智能体系带来的智能服务功能。内容分为以下六个部分: 推荐系统架构 索引 样本 特征 可解释 指标 Tips:关注「公众号」回复 FFA 2024 查看会后资料~
222 1
京东零售基于Flink的推荐系统智能数据体系
|
2月前
|
SQL 存储 缓存
MySQL 如何高效可靠处理持久化数据
本文详细解析了 MySQL 的 SQL 执行流程、crash-safe 机制及性能优化策略。内容涵盖连接器、分析器、优化器、执行器与存储引擎的工作原理,深入探讨 redolog 与 binlog 的两阶段提交机制,并分析日志策略、组提交、脏页刷盘等关键性能优化手段,帮助提升数据库稳定性与执行效率。
|
27天前
|
安全 关系型数据库 MySQL
MySQL安全最佳实践:保护你的数据库
本文深入探讨了MySQL数据库的安全防护体系,涵盖认证安全、访问控制、网络安全、数据加密、审计监控、备份恢复、操作系统安全、应急响应等多个方面。通过具体配置示例,为企业提供了一套全面的安全实践方案,帮助强化数据库安全,防止数据泄露和未授权访问,保障企业数据资产安全。
|
12天前
|
缓存 关系型数据库 BI
使用MYSQL Report分析数据库性能(下)
使用MYSQL Report分析数据库性能
51 3
|
18天前
|
关系型数据库 MySQL 数据库
自建数据库如何迁移至RDS MySQL实例
数据库迁移是一项复杂且耗时的工程,需考虑数据安全、完整性及业务中断影响。使用阿里云数据传输服务DTS,可快速、平滑完成迁移任务,将应用停机时间降至分钟级。您还可通过全量备份自建数据库并恢复至RDS MySQL实例,实现间接迁移上云。

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多