Flink查询问题之每秒入库到mysql数量很少如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink sql 读取mysql

版本:flink 1.10 mysql 5.7.24

需求场景是: 使用flink SQL,读取kafka数据,和mysql的维表数据,之后进行join操作,如何配置mysql connector,才能使维表数据修改后,flink能读取到更新后的数据进行join操作?

现在本地测试时,维表的DDL是:

但是去mysql修改了数据后,join操作还是旧数据.

望大神们指点方向,提前谢谢了.*来自志愿者整理的flink邮件归档



参考答案:

这个报错一般是sql格式错误,比如中英文逗号等,你可以检查下你的SQL语句*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371510?spm=a2c6h.12873639.article-detail.22.29d04378ApxdqJ



问题二:Apache Flink常见问题汇总【精品问答】

hi

我这面在使用sql api解析kafka

json数据,在创建表的时候发现json数据解析的时候有下面两项,这两项如果开启那么解析失败的数据是会被丢掉吗,有没有方式可以把解析失败的数据打到外部存储

json.ignore-parse-errors

son.fail-on-missing-field*来自志愿者整理的flink邮件归档



参考答案:

我理解应该做不到,因为这两个format参数在format里就做的。

json.ignore-parse-errors 是在 format解析时跳过解析失败的数据继续解析下一行,json.fail-on-missing-field 是标记如果字段少时是否失败还是继续(缺少的字段用null补上)

这两个不能同时为ture,语义上就是互斥的。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371508?spm=a2c6h.12873639.article-detail.23.29d04378ApxdqJ



问题三:https://developer.aliyun.com/ask/371504

请问实现了 MetricReporter, CharacterFilter,Scheduled, Reporter 的自定义

customerReporter 如何能在 代码env里面注册并实现metric上报,要求不在flink conf.xml 文件里面配置

该customerReporter的信息?

需求:在自定义的source 和sink等算子里面计算处理成功,失败的数据条数并通过自定义reporter上报,并且该reporter需要是通用型的即

*适用于多个flink

任务*从而避开重复造轮子。*来自志愿者整理的flink邮件归档



参考答案:

尝试理解一下你的需求,你自己实现了一个 report,也希望在 source 和 sink 中计算一些 metric,希望把 source 和sink 的这些 metric 通过自定义的 report 上报到你指定的地方。然后不希望在 env 里面配置 report 的信息,是这样吗?能否解释下为什么不希望在 flink-conf 中进行配置,而是希望在 env 中进行配置吗*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371506?spm=a2c6h.12873639.article-detail.24.29d04378ApxdqJ



问题四:flink1.11查询结果每秒入库到mysql数量很少

各位大佬好,请教一个问题,在使用flink1.11消费kafka数据,查询结果写入到mysql库表时,发现读取kafka的速度很快(300条/秒),但是查询结果每秒写入mysql的条数只有6条左右,请问这是怎么回事,以及优化的点在哪里?下面是我的代码。

from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode

from pyflink.table import StreamTableEnvironment, EnvironmentSettings

source_Kafka = """

CREATE TABLE kafka_source (

id VARCHAR,

alarm_id VARCHAR,

trck_id VARCHAR

) WITH (

'connector' = 'kafka',

'topic' = 'test',  

'properties.bootstrap.servers' = '*',

'properties.group.id' = 'flink_grouper',

'scan.startup.mode' = 'earliest-offset',    

'format' = 'json',

'json.fail-on-missing-field' = 'false',

'json.ignore-parse-errors' = 'true'

)

"""

source_W_detail_ddl = """

CREATE TABLE source_W_detail (

id VARCHAR,    

alarm_id VARCHAR,    

trck_id VARCHAR    

) WITH (

'connector' = 'jdbc',

'url' = 'jdbc:mysql://198.2.2.71:3306/bda?useSSL=false',

'driver' = 'com.mysql.cj.jdbc.Driver',

'table-name' = 'detail',

'username' = 'root',

'password' = 'root',

'sink.buffer-flush.max-rows' = '1000',

'sink.buffer-flush.interval' = '2s'

"""

env = StreamExecutionEnvironment.get_execution_environment()

env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)

env.set_parallelism(1)

env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()

t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)

t_env.execute_sql(source_Kafka)

t_env.execute_sql(source_W_detail_ddl)

table_result1=t_env.execute_sql('''insert into source_W_detail select id,alarm_id,trck_id from kafka_source''')

table_result1.get_job_client().get_job_execution_result().result()*来自志愿者整理的flink邮件归档



参考答案:

你可以尝试改写url,加上rewritebatchedstatements=true,如下: jdbc:mysql://198.2.2.71:3306/bda?useSSL=false&rewritebatchedstatements=true

MySQL Jdbc驱动在默认情况下会无视executeBatch()语句,把期望批量执行的一组sql语句拆散,一条一条地发给MySQL数据库,直接造成较低的性能。把rewriteBatchedStatements参数置为true, 驱动才会帮你批量执行SQL。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371504?spm=a2c6h.12873639.article-detail.26.29d04378ApxdqJ

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
5天前
|
存储 关系型数据库 MySQL
MySQL索引失效及避免策略:优化查询性能的关键
MySQL索引失效及避免策略:优化查询性能的关键
23 3
|
8天前
|
关系型数据库 MySQL 数据库
MySQL 表的CRUD与复合查询
【9月更文挑战第26天】本文介绍了数据库操作中的 CRUD(创建、读取、更新、删除)基本操作及复合查询。创建操作使用 `INSERT INTO` 语句插入数据,支持单条和批量插入;读取操作使用 `SELECT` 语句查询数据,可进行基本查询、条件查询和排序查询;更新操作使用 `UPDATE` 语句修改数据;删除操作使用 `DELETE FROM` 语句删除数据。此外,还介绍了复合查询,包括连接查询(如内连接、左连接)和子查询,以及聚合函数与分组查询,并提供了示例代码。
|
11天前
|
关系型数据库 MySQL 数据库
Python MySQL查询返回字典类型数据的方法
通过使用 `mysql-connector-python`库并选择 `MySQLCursorDict`作为游标类型,您可以轻松地将MySQL查询结果以字典类型返回。这种方式提高了代码的可读性,使得数据操作更加直观和方便。上述步骤和示例代码展示了如何实现这一功能,希望对您的项目开发有所帮助。
37 4
|
11天前
|
关系型数据库 MySQL
mysql查询速度慢怎么解决?
mysql查询速度慢怎么解决?
32 2
|
10天前
|
关系型数据库 MySQL
mysql & clinkhouse之查询 行拼接
mysql & clinkhouse之查询 行拼接
mysql & clinkhouse之查询 行拼接
|
2天前
|
存储 关系型数据库 MySQL
MySQL中利用FIND_IN_SET进行包含查询的技巧
`FIND_IN_SET`提供了一种简便的方法来执行包含查询,尤其是当数据以逗号分隔的字符串形式存储时。虽然这个方法的性能可能不如使用专门的关系表,但在某些场景下,它提供了快速简便的解决方案。开发者应该根据具体的应用场景和性能要求,权衡其使用。
7 0
|
10天前
|
关系型数据库 MySQL Java
mysql & clinkhouse之查询添加比率列
mysql & clinkhouse之查询添加比率列
|
18天前
|
NoSQL 关系型数据库 MySQL
微服务架构下的数据库选择:MySQL、PostgreSQL 还是 NoSQL?
在微服务架构中,数据库的选择至关重要。不同类型的数据库适用于不同的需求和场景。在本文章中,我们将深入探讨传统的关系型数据库(如 MySQL 和 PostgreSQL)与现代 NoSQL 数据库的优劣势,并分析在微服务架构下的最佳实践。
|
20天前
|
存储 SQL 关系型数据库
使用MySQL Workbench进行数据库备份
【9月更文挑战第13天】以下是使用MySQL Workbench进行数据库备份的步骤:启动软件后,通过“Database”菜单中的“管理连接”选项配置并选择要备份的数据库。随后,选择“数据导出”,确认导出的数据库及格式(推荐SQL格式),设置存储路径,点击“开始导出”。完成后,可在指定路径找到备份文件,建议定期备份并存储于安全位置。
160 11
|
16天前
|
存储 SQL 关系型数据库
MySQL的安装&数据库的简单操作
本文介绍了数据库的基本概念及MySQL的安装配置。首先解释了数据库、数据库管理系统和SQL的概念,接着详细描述了MySQL的安装步骤及其全局配置文件my.ini的调整方法。文章还介绍了如何启动MySQL服务,包括配置环境变量和使用命令行的方法。最后,详细说明了数据库的各种操作,如创建、选择和删除数据库的SQL语句,并提供了实际操作示例。
58 13
MySQL的安装&数据库的简单操作

相关产品

  • 实时计算 Flink版
  • 下一篇
    无影云桌面