Flink数据问题之连接mysql无数据输出如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:Flink CDC 遇到关于不发生packet造成的卡顿问题该如何解决

主要是为了实现解析自定义的schema,sink端好输出到下游。

想请教一个问题:

https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector#set-a-differnet-server-id-for-each-job

看了上面这个链接关于为每个作业设置一个differnet server id的问题。我看sql可以指定不同的server id,所以有下面这三个疑惑:

1、 如果是不同的stream 任务 的它的server id是不是同一个?

2、不同的stream 任务 同步同一个数据库的不同表是不是没有问题

3、不同的stream 任务 同步同一个数据库的同一张表是不是有问题

*来自志愿者整理的flink邮件归档



参考答案:

See the docs: https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector#setting-up-mysql-session-timeouts

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370018?spm=a2c6h.13066369.question.83.33bf585fwBnaSf



问题二:flink sql 连接mysql 无数据输出

   我在Idea里用flink-jdbc-connector连接mysql, 建完表后执行env.executeSql("select * from my_table").print()方法,只打印了表头,没有数据是什么原因? flink版本1.11.2

*来自志愿者整理的flink邮件归档



参考答案:

是不是没有加这一行代码,tableEnv.execute("test");

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370017?spm=a2c6h.13066369.question.84.33bf585fAxGR1X



问题三:flink 1.11.2 rowtime和proctime流 Interval Join 问题错误问

我需要将事件时间的流同处理时间的流做Interval Join时提示错误,我是用的是flink 1.11.2版本,我的示例程序如下,请问为什么提示我是个常规join,而不是区间join呢?我该如何解决?     我的 l_table.l_rt = r_table.r_pt 可以运行成功,而l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND运行错误!package join;

> >import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >import org.apache.flink.table.api.EnvironmentSettings; >import org.apache.flink.table.api.Table; >import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > >public class Test1 { > >    public static void main(String[] args) { >        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); >        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); > >        String lTable = "CREATE TABLE l_table (  " + >                " l_a INT,  " + >                " l_b string,  " + >                " l_rt AS localtimestamp,  " + >                " WATERMARK FOR l_rt AS l_rt  " + >                ") WITH (  " + >                " 'connector' = 'datagen',  " + >                " 'rows-per-second'='5',  " + >                " 'fields.l_a.min'='1',  " + >                " 'fields.l_a.max'='5',  " + >                " 'fields.l_b.length'='5'  " + >                ")"; >        bsTableEnv.executeSql(lTable); > >        String rTable = "CREATE TABLE r_table (  " + >                " r_a INT,  " + >                " r_b string,  " + >                " r_pt AS proctime()  " + >                ") WITH (  " + >                " 'connector' = 'datagen',  " + >                " 'rows-per-second'='5',  " + >                " 'fields.r_a.min'='1',  " + >                " 'fields.r_a.max'='5',  " + >                " 'fields.r_b.length'='5'  " + >                ")"; >        bsTableEnv.executeSql(rTable); > >        String printTable = "CREATE TABLE print (" + >                "  l_a INT,  " + >                "  l_b string,  " + >                "  l_rt timestamp(3),  " + >                "  r_a INT,  " + >                "  r_b string,  " + >                "  r_pt timestamp(3)  " + >                ") WITH (  " + >                " 'connector' = 'print' " + >                ") "; > >        bsTableEnv.executeSql(printTable); > >        // 运行成功 >//        Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt = r_table.r_pt"); > >        // 运行错误,提示Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before. >        Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND"); > >        bsTableEnv.executeSql("insert into print select * from " + joinTable); > >    } > >}

*来自志愿者整理的flink邮件归档



参考答案:

 因为你的 2 个流的时间属性不一样,所以就认为不是 interval join。

  而在 match 到到了regular join 后,又因为 join 条件中有时间属性,故报了这个错。

  Interval join 需要 2 个流的时间属性一样,所以你需要对这 2 条流使用相同的时间属性。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370016?spm=a2c6h.13066369.question.85.33bf585fSSKbxa



问题四:slot问题

          一个slot同时只能运行一个线程吗?或者1个slot可以同时并行运行多个线程?

*来自志愿者整理的flink邮件归档



参考答案:

有啊,一个slot本身就可以运行多个线程的。但是不可以运行1个算子结点的多个任务,也不可以运行多个作业中的算子结点的多个任务。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370015?spm=a2c6h.13066369.question.84.33bf585f3XoJwI



问题五:flink sql时间戳字段类型转换问题

数据源来自Jark项目 https://github.com/wuchong/flink-sql-submit

中的kafka消息,里面user_behavior消息例如

{"user_id": "470572", "item_id":"3760258", "category_id": "1299190",

"behavior": "pv", "ts": "2017-11-26T01:00:01Z"}

可以看到ts值是 '2017-11-26T01:00:00Z',现在要为它定义一张flink sql源表,如下

CREATE TABLE user_log (

user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP(3), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'json', -- 'json.timestamp-format.standard' = 'ISO-8601', // 不加这一行默认是'SQL' 'scan.startup.mode' = 'earliest-offset' );

程序运行会抛错

Caused by: java.time.format.DateTimeParseException: Text '2017-11-26T01:00:00Z' could not be parsed at index 10

我查了一下flink json官方文档 https://ci.apache.org/projects/flink/flink-docs-stable/zh/dev/table/connectors/formats/json.html#json-timestamp-format-standard

目前只支持两种格式:SQL 和 ISO-8601

其中SQL支持的格式是 'yyyy-MM-dd HH:mm:ss',

而ISO-8601支持的格式是 'yyyy-MM-ddTHH:mm:ss.s{precision}'

确实不支持上面的 'yyyy-MM-ddTHH:mm:ssZ' (注意末尾的Z)

请问:

  1. 像上述时间格式字段在Flink SQL中应该解析成什么类型?
  2. 如果不能直接支持的话是不是得先用VARCHAR类型接收,再利用 UNIX_TIMESTAMP(ts_string,

pattern_string) 函数转成 支持的时间格式?可问题是 pattern_string

里面如果包含单引号要如何转义?UNIX_TIMESTAMP('2017-11-26T01:00:00Z',

'yyyy-MM-dd'T'HH:mm:ss'Z'')?

  1. TIMESTAMP WITH TIME ZONE和TIMESTAMP WITH LOCAL TIME

ZONE这两种类型在什么情况下会用到?有例子吗?

谢谢!

*来自志愿者整理的flink邮件归档



参考答案:

你可以用这篇文章中的 docker: https://flink.apache.org/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html https://raw.githubusercontent.com/wuchong/flink-sql-demo/v1.11-EN/docker-compose.yml

这个容器里面的 ts 数据格式是 SQL 格式的。

  1. 像上述时间格式字段在Flink SQL中应该解析成什么类型? TIMESTAMP WITH LOCAL TIME ZONE, 1.12 的 json formart 才支持。
  2. 是的
  3. Flink 目前还不支持 TIMESTAMP WITH TIME ZONE。 'yyyy-MM-dd HH:mm:ss' 这种,对应的是 TIMESTAMP,代表无时区 timestamp long 值,或者 'yyyy-MM-dd HH:mm:ssZ' 这种是TIMESTAMP WITH LOCAL TIME ZONE ,代表session 时区的 timestamp

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370014?spm=a2c6h.13066369.question.87.33bf585fd5RWZf

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
7天前
|
SQL 关系型数据库 MySQL
DQL语言之连接查询(mysql)
DQL语言之连接查询(mysql)
|
10天前
|
关系型数据库 MySQL 数据安全/隐私保护
MySQL 安装及连接
MySQL 安装及连接
30 0
|
12天前
|
缓存 NoSQL 关系型数据库
13- Redis和Mysql如何保证数据⼀致?
该内容讨论了保证Redis和MySQL数据一致性的几种策略。首先提到的两种方法存在不一致风险:先更新MySQL再更新Redis,或先删Redis再更新MySQL。第三种方案是通过MQ异步同步以达到最终一致性,适用于一致性要求较高的场景。项目中根据不同业务需求选择不同方案,如对一致性要求不高的情况不做处理,时效性数据设置过期时间,高一致性需求则使用MQ确保同步,最严格的情况可能涉及分布式事务(如Seata的TCC模式)。
36 6
|
13天前
|
关系型数据库 MySQL
如何解决cmd命令窗口无法运行mysql命令的问题
如何解决cmd命令窗口无法运行mysql命令的问题
8 0
|
19天前
|
SQL 关系型数据库 MySQL
轻松入门MySQL:保障数据完整性,MySQL事务在进销存管理系统中的应用(12)
轻松入门MySQL:保障数据完整性,MySQL事务在进销存管理系统中的应用(12)
|
19天前
|
存储 关系型数据库 MySQL
MySQL 查询优化:提速查询效率的13大秘籍(避免使用SELECT *、分页查询的优化、合理使用连接、子查询的优化)(上)
MySQL 查询优化:提速查询效率的13大秘籍(避免使用SELECT *、分页查询的优化、合理使用连接、子查询的优化)(上)
|
21天前
|
关系型数据库 MySQL 数据安全/隐私保护
MySQL连接ERROR 2059 (HY000): Authentication plugin ‘caching_sha2_password‘ cannot be loaded
MySQL连接ERROR 2059 (HY000): Authentication plugin ‘caching_sha2_password‘ cannot be loaded
25 0
|
25天前
|
关系型数据库 MySQL
MySQL查询当天昨天明天本月上月今年等数据
MySQL查询当天昨天明天本月上月今年等数据
19 2
|
25天前
|
关系型数据库 MySQL 开发工具
MySQL分组后,组内排序,然后取每组的第一条数据
MySQL分组后,组内排序,然后取每组的第一条数据
15 1
|
26天前
|
canal SQL 关系型数据库
MySQL数据直接实时同步到ES
MySQL数据直接实时同步到ES
32 0

相关产品

  • 实时计算 Flink版