Flink查询问题之每秒入库到mysql数量很少如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink sql 读取mysql

版本:flink 1.10 mysql 5.7.24

需求场景是: 使用flink SQL,读取kafka数据,和mysql的维表数据,之后进行join操作,如何配置mysql connector,才能使维表数据修改后,flink能读取到更新后的数据进行join操作?

现在本地测试时,维表的DDL是:

但是去mysql修改了数据后,join操作还是旧数据.

望大神们指点方向,提前谢谢了.*来自志愿者整理的flink邮件归档



参考答案:

这个报错一般是sql格式错误,比如中英文逗号等,你可以检查下你的SQL语句*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371510?spm=a2c6h.12873639.article-detail.22.29d04378ApxdqJ



问题二:Apache Flink常见问题汇总【精品问答】

hi

我这面在使用sql api解析kafka

json数据,在创建表的时候发现json数据解析的时候有下面两项,这两项如果开启那么解析失败的数据是会被丢掉吗,有没有方式可以把解析失败的数据打到外部存储

json.ignore-parse-errors

son.fail-on-missing-field*来自志愿者整理的flink邮件归档



参考答案:

我理解应该做不到,因为这两个format参数在format里就做的。

json.ignore-parse-errors 是在 format解析时跳过解析失败的数据继续解析下一行,json.fail-on-missing-field 是标记如果字段少时是否失败还是继续(缺少的字段用null补上)

这两个不能同时为ture,语义上就是互斥的。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371508?spm=a2c6h.12873639.article-detail.23.29d04378ApxdqJ



问题三:https://developer.aliyun.com/ask/371504

请问实现了 MetricReporter, CharacterFilter,Scheduled, Reporter 的自定义

customerReporter 如何能在 代码env里面注册并实现metric上报,要求不在flink conf.xml 文件里面配置

该customerReporter的信息?

需求:在自定义的source 和sink等算子里面计算处理成功,失败的数据条数并通过自定义reporter上报,并且该reporter需要是通用型的即

*适用于多个flink

任务*从而避开重复造轮子。*来自志愿者整理的flink邮件归档



参考答案:

尝试理解一下你的需求,你自己实现了一个 report,也希望在 source 和 sink 中计算一些 metric,希望把 source 和sink 的这些 metric 通过自定义的 report 上报到你指定的地方。然后不希望在 env 里面配置 report 的信息,是这样吗?能否解释下为什么不希望在 flink-conf 中进行配置,而是希望在 env 中进行配置吗*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371506?spm=a2c6h.12873639.article-detail.24.29d04378ApxdqJ



问题四:flink1.11查询结果每秒入库到mysql数量很少

各位大佬好,请教一个问题,在使用flink1.11消费kafka数据,查询结果写入到mysql库表时,发现读取kafka的速度很快(300条/秒),但是查询结果每秒写入mysql的条数只有6条左右,请问这是怎么回事,以及优化的点在哪里?下面是我的代码。

from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode

from pyflink.table import StreamTableEnvironment, EnvironmentSettings

source_Kafka = """

CREATE TABLE kafka_source (

id VARCHAR,

alarm_id VARCHAR,

trck_id VARCHAR

) WITH (

'connector' = 'kafka',

'topic' = 'test',  

'properties.bootstrap.servers' = '*',

'properties.group.id' = 'flink_grouper',

'scan.startup.mode' = 'earliest-offset',    

'format' = 'json',

'json.fail-on-missing-field' = 'false',

'json.ignore-parse-errors' = 'true'

)

"""

source_W_detail_ddl = """

CREATE TABLE source_W_detail (

id VARCHAR,    

alarm_id VARCHAR,    

trck_id VARCHAR    

) WITH (

'connector' = 'jdbc',

'url' = 'jdbc:mysql://198.2.2.71:3306/bda?useSSL=false',

'driver' = 'com.mysql.cj.jdbc.Driver',

'table-name' = 'detail',

'username' = 'root',

'password' = 'root',

'sink.buffer-flush.max-rows' = '1000',

'sink.buffer-flush.interval' = '2s'

"""

env = StreamExecutionEnvironment.get_execution_environment()

env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)

env.set_parallelism(1)

env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()

t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)

t_env.execute_sql(source_Kafka)

t_env.execute_sql(source_W_detail_ddl)

table_result1=t_env.execute_sql('''insert into source_W_detail select id,alarm_id,trck_id from kafka_source''')

table_result1.get_job_client().get_job_execution_result().result()*来自志愿者整理的flink邮件归档



参考答案:

你可以尝试改写url,加上rewritebatchedstatements=true,如下: jdbc:mysql://198.2.2.71:3306/bda?useSSL=false&rewritebatchedstatements=true

MySQL Jdbc驱动在默认情况下会无视executeBatch()语句,把期望批量执行的一组sql语句拆散,一条一条地发给MySQL数据库,直接造成较低的性能。把rewriteBatchedStatements参数置为true, 驱动才会帮你批量执行SQL。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371504?spm=a2c6h.12873639.article-detail.26.29d04378ApxdqJ

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
10天前
|
关系型数据库 MySQL 数据库
轻松入门MySQL:精准查询,巧用WHERE与HAVING,数据库查询如虎添翼(7)
轻松入门MySQL:精准查询,巧用WHERE与HAVING,数据库查询如虎添翼(7)
|
10天前
|
缓存 关系型数据库 MySQL
MySQL查询优化:提速查询效率的13大秘籍(合理使用索引合并、优化配置参数、使用分区优化性能、避免不必要的排序和group by操作)(下)
MySQL查询优化:提速查询效率的13大秘籍(合理使用索引合并、优化配置参数、使用分区优化性能、避免不必要的排序和group by操作)(下)
|
10天前
|
缓存 关系型数据库 MySQL
MySQL 查询优化:提速查询效率的13大秘籍(索引设计、查询优化、缓存策略、子查询优化以及定期表分析和优化)(中)
MySQL 查询优化:提速查询效率的13大秘籍(索引设计、查询优化、缓存策略、子查询优化以及定期表分析和优化)(中)
|
5天前
|
SQL 关系型数据库 MySQL
mysql多表查询、函数查询
mysql多表查询、函数查询
|
5天前
|
SQL 关系型数据库 MySQL
mysql基本查询、运算符、排序和分页
mysql基本查询、运算符、排序和分页
|
10天前
|
存储 关系型数据库 MySQL
MySQL 查询优化:提速查询效率的13大秘籍(避免使用SELECT *、分页查询的优化、合理使用连接、子查询的优化)(上)
MySQL 查询优化:提速查询效率的13大秘籍(避免使用SELECT *、分页查询的优化、合理使用连接、子查询的优化)(上)
|
11天前
|
关系型数据库 MySQL 数据库
【MySQL】7. 基本查询(create / retrieve)
【MySQL】7. 基本查询(create / retrieve)
37 0
|
2月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
476 5
|
26天前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1038 1
官宣|Apache Flink 1.19 发布公告
|
29天前
|
SQL Apache 流计算
Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
【2月更文挑战第25天】Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
131 3

相关产品

  • 实时计算 Flink版