Flink数据问题之连接mysql无数据输出如何解决

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:Flink CDC 遇到关于不发生packet造成的卡顿问题该如何解决

主要是为了实现解析自定义的schema,sink端好输出到下游。

想请教一个问题:

https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector#set-a-differnet-server-id-for-each-job

看了上面这个链接关于为每个作业设置一个differnet server id的问题。我看sql可以指定不同的server id,所以有下面这三个疑惑:

1、 如果是不同的stream 任务 的它的server id是不是同一个?

2、不同的stream 任务 同步同一个数据库的不同表是不是没有问题

3、不同的stream 任务 同步同一个数据库的同一张表是不是有问题

*来自志愿者整理的flink邮件归档



参考答案:

See the docs: https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector#setting-up-mysql-session-timeouts

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370018?spm=a2c6h.13066369.question.83.33bf585fwBnaSf



问题二:flink sql 连接mysql 无数据输出

   我在Idea里用flink-jdbc-connector连接mysql, 建完表后执行env.executeSql("select * from my_table").print()方法,只打印了表头,没有数据是什么原因? flink版本1.11.2

*来自志愿者整理的flink邮件归档



参考答案:

是不是没有加这一行代码,tableEnv.execute("test");

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370017?spm=a2c6h.13066369.question.84.33bf585fAxGR1X



问题三:flink 1.11.2 rowtime和proctime流 Interval Join 问题错误问

我需要将事件时间的流同处理时间的流做Interval Join时提示错误,我是用的是flink 1.11.2版本,我的示例程序如下,请问为什么提示我是个常规join,而不是区间join呢?我该如何解决?     我的 l_table.l_rt = r_table.r_pt 可以运行成功,而l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND运行错误!package join;

> >import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >import org.apache.flink.table.api.EnvironmentSettings; >import org.apache.flink.table.api.Table; >import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > >public class Test1 { > >    public static void main(String[] args) { >        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); >        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); > >        String lTable = "CREATE TABLE l_table (  " + >                " l_a INT,  " + >                " l_b string,  " + >                " l_rt AS localtimestamp,  " + >                " WATERMARK FOR l_rt AS l_rt  " + >                ") WITH (  " + >                " 'connector' = 'datagen',  " + >                " 'rows-per-second'='5',  " + >                " 'fields.l_a.min'='1',  " + >                " 'fields.l_a.max'='5',  " + >                " 'fields.l_b.length'='5'  " + >                ")"; >        bsTableEnv.executeSql(lTable); > >        String rTable = "CREATE TABLE r_table (  " + >                " r_a INT,  " + >                " r_b string,  " + >                " r_pt AS proctime()  " + >                ") WITH (  " + >                " 'connector' = 'datagen',  " + >                " 'rows-per-second'='5',  " + >                " 'fields.r_a.min'='1',  " + >                " 'fields.r_a.max'='5',  " + >                " 'fields.r_b.length'='5'  " + >                ")"; >        bsTableEnv.executeSql(rTable); > >        String printTable = "CREATE TABLE print (" + >                "  l_a INT,  " + >                "  l_b string,  " + >                "  l_rt timestamp(3),  " + >                "  r_a INT,  " + >                "  r_b string,  " + >                "  r_pt timestamp(3)  " + >                ") WITH (  " + >                " 'connector' = 'print' " + >                ") "; > >        bsTableEnv.executeSql(printTable); > >        // 运行成功 >//        Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt = r_table.r_pt"); > >        // 运行错误,提示Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before. >        Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND"); > >        bsTableEnv.executeSql("insert into print select * from " + joinTable); > >    } > >}

*来自志愿者整理的flink邮件归档



参考答案:

 因为你的 2 个流的时间属性不一样,所以就认为不是 interval join。

  而在 match 到到了regular join 后,又因为 join 条件中有时间属性,故报了这个错。

  Interval join 需要 2 个流的时间属性一样,所以你需要对这 2 条流使用相同的时间属性。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370016?spm=a2c6h.13066369.question.85.33bf585fSSKbxa



问题四:slot问题

          一个slot同时只能运行一个线程吗?或者1个slot可以同时并行运行多个线程?

*来自志愿者整理的flink邮件归档



参考答案:

有啊,一个slot本身就可以运行多个线程的。但是不可以运行1个算子结点的多个任务,也不可以运行多个作业中的算子结点的多个任务。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370015?spm=a2c6h.13066369.question.84.33bf585f3XoJwI



问题五:flink sql时间戳字段类型转换问题

数据源来自Jark项目 https://github.com/wuchong/flink-sql-submit

中的kafka消息,里面user_behavior消息例如

{"user_id": "470572", "item_id":"3760258", "category_id": "1299190",

"behavior": "pv", "ts": "2017-11-26T01:00:01Z"}

可以看到ts值是 '2017-11-26T01:00:00Z',现在要为它定义一张flink sql源表,如下

CREATE TABLE user_log (

user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP(3), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'json', -- 'json.timestamp-format.standard' = 'ISO-8601', // 不加这一行默认是'SQL' 'scan.startup.mode' = 'earliest-offset' );

程序运行会抛错

Caused by: java.time.format.DateTimeParseException: Text '2017-11-26T01:00:00Z' could not be parsed at index 10

我查了一下flink json官方文档 https://ci.apache.org/projects/flink/flink-docs-stable/zh/dev/table/connectors/formats/json.html#json-timestamp-format-standard

目前只支持两种格式:SQL 和 ISO-8601

其中SQL支持的格式是 'yyyy-MM-dd HH:mm:ss',

而ISO-8601支持的格式是 'yyyy-MM-ddTHH:mm:ss.s{precision}'

确实不支持上面的 'yyyy-MM-ddTHH:mm:ssZ' (注意末尾的Z)

请问:

  1. 像上述时间格式字段在Flink SQL中应该解析成什么类型?
  2. 如果不能直接支持的话是不是得先用VARCHAR类型接收,再利用 UNIX_TIMESTAMP(ts_string,

pattern_string) 函数转成 支持的时间格式?可问题是 pattern_string

里面如果包含单引号要如何转义?UNIX_TIMESTAMP('2017-11-26T01:00:00Z',

'yyyy-MM-dd'T'HH:mm:ss'Z'')?

  1. TIMESTAMP WITH TIME ZONE和TIMESTAMP WITH LOCAL TIME

ZONE这两种类型在什么情况下会用到?有例子吗?

谢谢!

*来自志愿者整理的flink邮件归档



参考答案:

你可以用这篇文章中的 docker: https://flink.apache.org/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html https://raw.githubusercontent.com/wuchong/flink-sql-demo/v1.11-EN/docker-compose.yml

这个容器里面的 ts 数据格式是 SQL 格式的。

  1. 像上述时间格式字段在Flink SQL中应该解析成什么类型? TIMESTAMP WITH LOCAL TIME ZONE, 1.12 的 json formart 才支持。
  2. 是的
  3. Flink 目前还不支持 TIMESTAMP WITH TIME ZONE。 'yyyy-MM-dd HH:mm:ss' 这种,对应的是 TIMESTAMP,代表无时区 timestamp long 值,或者 'yyyy-MM-dd HH:mm:ssZ' 这种是TIMESTAMP WITH LOCAL TIME ZONE ,代表session 时区的 timestamp

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370014?spm=a2c6h.13066369.question.87.33bf585fd5RWZf

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
4月前
|
缓存 NoSQL 关系型数据库
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
|
2月前
|
SQL 人工智能 关系型数据库
如何实现MySQL百万级数据的查询?
本文探讨了在MySQL中对百万级数据进行排序分页查询的优化策略。面对五百万条数据,传统的浅分页和深分页查询效率较低,尤其深分页因偏移量大导致性能显著下降。通过为排序字段添加索引、使用联合索引、手动回表等方法,有效提升了查询速度。最终建议根据业务需求选择合适方案:浅分页可加单列索引,深分页推荐联合索引或子查询优化,同时结合前端传递最后一条数据ID的方式实现高效翻页。
131 0
|
26天前
|
存储 关系型数据库 MySQL
在CentOS 8.x上安装Percona Xtrabackup工具备份MySQL数据步骤。
以上就是在CentOS8.x上通过Perconaxtabbackup工具对Mysql进行高效率、高可靠性、无锁定影响地实现在线快速全量及增加式数据库资料保存与恢复流程。通过以上流程可以有效地将Mysql相关资料按需求完成定期或不定期地保存与灾难恢复需求。
122 10
|
6月前
|
关系型数据库 MySQL Java
【YashanDB知识库】原生mysql驱动配置连接崖山数据库
【YashanDB知识库】原生mysql驱动配置连接崖山数据库
【YashanDB知识库】原生mysql驱动配置连接崖山数据库
|
2月前
|
SQL 存储 缓存
MySQL 如何高效可靠处理持久化数据
本文详细解析了 MySQL 的 SQL 执行流程、crash-safe 机制及性能优化策略。内容涵盖连接器、分析器、优化器、执行器与存储引擎的工作原理,深入探讨 redolog 与 binlog 的两阶段提交机制,并分析日志策略、组提交、脏页刷盘等关键性能优化手段,帮助提升数据库稳定性与执行效率。
|
5月前
|
关系型数据库 MySQL Linux
在Linux环境下备份Docker中的MySQL数据并传输到其他服务器以实现数据级别的容灾
以上就是在Linux环境下备份Docker中的MySQL数据并传输到其他服务器以实现数据级别的容灾的步骤。这个过程就像是一场接力赛,数据从MySQL数据库中接力棒一样传递到备份文件,再从备份文件传递到其他服务器,最后再传递回MySQL数据库。这样,即使在灾难发生时,我们也可以快速恢复数据,保证业务的正常运行。
266 28
|
4月前
|
存储 SQL 缓存
mysql数据引擎有哪些
MySQL 提供了多种存储引擎,每种引擎都有其独特的特点和适用场景。以下是一些常见的 MySQL 存储引擎及其特点:
131 0
|
6月前
|
存储 SQL 关系型数据库
【YashanDB知识库】MySQL迁移至崖山char类型数据自动补空格问题
**简介**:在MySQL迁移到崖山环境时,若字段类型为char(2),而应用存储的数据仅为'0'或'1',查询时崖山会自动补空格。原因是mysql的sql_mode可能启用了PAD_CHAR_TO_FULL_LENGTH模式,导致保留CHAR类型尾随空格。解决方法是与应用确认数据需求,可将崖山环境中的char类型改为varchar类型以规避补空格问题,适用于所有版本。
|
6月前
|
SQL 关系型数据库 MySQL
【YashanDB知识库】字符集latin1的MySQL中文数据如何迁移到YashanDB
本文探讨了在使用YMP 23.2.1.3迁移MySQL Server字符集为latin1的中文数据至YashanDB时出现乱码的问题。问题根源在于MySQL latin1字符集存放的是实际utf8编码的数据,而YMP尚未支持此类场景。文章提供了两种解决方法:一是通过DBeaver直接迁移表数据;二是将MySQL表数据转换为Insert语句后手动插入YashanDB。同时指出,这两种方法适合单张表迁移,多表迁移可能存在兼容性问题,建议对问题表单独处理。
【YashanDB知识库】字符集latin1的MySQL中文数据如何迁移到YashanDB
|
6月前
|
缓存 NoSQL 关系型数据库
Redis和Mysql如何保证数据⼀致?
1. 先更新Mysql,再更新Redis,如果更新Redis失败,可能仍然不⼀致 2. 先删除Redis缓存数据,再更新Mysql,再次查询的时候在将数据添加到缓存中 这种⽅案能解决1 ⽅案的问题,但是在⾼并发下性能较低,⽽且仍然会出现数据不⼀致的问题,⽐如线程1删除了 Redis缓存数据,正在更新Mysql,此时另外⼀个查询再查询,那么就会把Mysql中⽼数据⼜查到 Redis中 1. 使用MQ异步同步, 保证数据的最终一致性 我们项目中会根据业务情况 , 使用不同的方案来解决Redis和Mysql的一致性问题 : 1. 对于一些一致性要求不高的场景 , 不做处理例如 : 用户行为数据 ,

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多