Flink查询问题之每秒入库到mysql数量很少如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink sql 读取mysql

版本:flink 1.10 mysql 5.7.24

需求场景是: 使用flink SQL,读取kafka数据,和mysql的维表数据,之后进行join操作,如何配置mysql connector,才能使维表数据修改后,flink能读取到更新后的数据进行join操作?

现在本地测试时,维表的DDL是:

但是去mysql修改了数据后,join操作还是旧数据.

望大神们指点方向,提前谢谢了.*来自志愿者整理的flink邮件归档



参考答案:

这个报错一般是sql格式错误,比如中英文逗号等,你可以检查下你的SQL语句*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371510?spm=a2c6h.12873639.article-detail.22.29d04378ApxdqJ



问题二:Apache Flink常见问题汇总【精品问答】

hi

我这面在使用sql api解析kafka

json数据,在创建表的时候发现json数据解析的时候有下面两项,这两项如果开启那么解析失败的数据是会被丢掉吗,有没有方式可以把解析失败的数据打到外部存储

json.ignore-parse-errors

son.fail-on-missing-field*来自志愿者整理的flink邮件归档



参考答案:

我理解应该做不到,因为这两个format参数在format里就做的。

json.ignore-parse-errors 是在 format解析时跳过解析失败的数据继续解析下一行,json.fail-on-missing-field 是标记如果字段少时是否失败还是继续(缺少的字段用null补上)

这两个不能同时为ture,语义上就是互斥的。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371508?spm=a2c6h.12873639.article-detail.23.29d04378ApxdqJ



问题三:https://developer.aliyun.com/ask/371504

请问实现了 MetricReporter, CharacterFilter,Scheduled, Reporter 的自定义

customerReporter 如何能在 代码env里面注册并实现metric上报,要求不在flink conf.xml 文件里面配置

该customerReporter的信息?

需求:在自定义的source 和sink等算子里面计算处理成功,失败的数据条数并通过自定义reporter上报,并且该reporter需要是通用型的即

*适用于多个flink

任务*从而避开重复造轮子。*来自志愿者整理的flink邮件归档



参考答案:

尝试理解一下你的需求,你自己实现了一个 report,也希望在 source 和 sink 中计算一些 metric,希望把 source 和sink 的这些 metric 通过自定义的 report 上报到你指定的地方。然后不希望在 env 里面配置 report 的信息,是这样吗?能否解释下为什么不希望在 flink-conf 中进行配置,而是希望在 env 中进行配置吗*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371506?spm=a2c6h.12873639.article-detail.24.29d04378ApxdqJ



问题四:flink1.11查询结果每秒入库到mysql数量很少

各位大佬好,请教一个问题,在使用flink1.11消费kafka数据,查询结果写入到mysql库表时,发现读取kafka的速度很快(300条/秒),但是查询结果每秒写入mysql的条数只有6条左右,请问这是怎么回事,以及优化的点在哪里?下面是我的代码。

from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode

from pyflink.table import StreamTableEnvironment, EnvironmentSettings

source_Kafka = """

CREATE TABLE kafka_source (

id VARCHAR,

alarm_id VARCHAR,

trck_id VARCHAR

) WITH (

'connector' = 'kafka',

'topic' = 'test',  

'properties.bootstrap.servers' = '*',

'properties.group.id' = 'flink_grouper',

'scan.startup.mode' = 'earliest-offset',    

'format' = 'json',

'json.fail-on-missing-field' = 'false',

'json.ignore-parse-errors' = 'true'

)

"""

source_W_detail_ddl = """

CREATE TABLE source_W_detail (

id VARCHAR,    

alarm_id VARCHAR,    

trck_id VARCHAR    

) WITH (

'connector' = 'jdbc',

'url' = 'jdbc:mysql://198.2.2.71:3306/bda?useSSL=false',

'driver' = 'com.mysql.cj.jdbc.Driver',

'table-name' = 'detail',

'username' = 'root',

'password' = 'root',

'sink.buffer-flush.max-rows' = '1000',

'sink.buffer-flush.interval' = '2s'

"""

env = StreamExecutionEnvironment.get_execution_environment()

env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)

env.set_parallelism(1)

env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()

t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)

t_env.execute_sql(source_Kafka)

t_env.execute_sql(source_W_detail_ddl)

table_result1=t_env.execute_sql('''insert into source_W_detail select id,alarm_id,trck_id from kafka_source''')

table_result1.get_job_client().get_job_execution_result().result()*来自志愿者整理的flink邮件归档



参考答案:

你可以尝试改写url,加上rewritebatchedstatements=true,如下: jdbc:mysql://198.2.2.71:3306/bda?useSSL=false&rewritebatchedstatements=true

MySQL Jdbc驱动在默认情况下会无视executeBatch()语句,把期望批量执行的一组sql语句拆散,一条一条地发给MySQL数据库,直接造成较低的性能。把rewriteBatchedStatements参数置为true, 驱动才会帮你批量执行SQL。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371504?spm=a2c6h.12873639.article-detail.26.29d04378ApxdqJ

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
缓存 关系型数据库 MySQL
MySQL索引策略与查询性能调优实战
在实际应用中,需要根据具体的业务需求和查询模式,综合运用索引策略和查询性能调优方法,不断地测试和优化,以提高MySQL数据库的查询性能。
258 66
|
1月前
|
SQL 关系型数据库 MySQL
【MySQL基础篇】多表查询(隐式/显式内连接、左/右外连接、自连接查询、联合查询、标量/列/行/表子查询)
本文详细介绍了MySQL中的多表查询,包括多表关系、隐式/显式内连接、左/右外连接、自连接查询、联合查询、标量/列/行/表子查询及其实现方式,一文全面读懂多表联查!
【MySQL基础篇】多表查询(隐式/显式内连接、左/右外连接、自连接查询、联合查询、标量/列/行/表子查询)
|
1月前
|
监控 关系型数据库 MySQL
Flink CDC MySQL同步MySQL错误记录
在使用Flink CDC同步MySQL数据时,常见的错误包括连接错误、权限错误、表结构变化、数据类型不匹配、主键冲突和
119 16
|
5天前
|
缓存 关系型数据库 MySQL
【深入了解MySQL】优化查询性能与数据库设计的深度总结
本文详细介绍了MySQL查询优化和数据库设计技巧,涵盖基础优化、高级技巧及性能监控。
70 0
|
1月前
|
存储 Oracle 关系型数据库
索引在手,查询无忧:MySQL索引简介
MySQL 是一款广泛使用的关系型数据库管理系统,在2024年5月的DB-Engines排名中得分1084,仅次于Oracle。本文介绍MySQL索引的工作原理和类型,包括B+Tree、Hash、Full-text索引,以及主键、唯一、普通索引等,帮助开发者优化查询性能。索引类似于图书馆的分类系统,能快速定位数据行,极大提高检索效率。
63 8
|
1月前
|
SQL 关系型数据库 MySQL
MySQL 窗口函数详解:分析性查询的强大工具
MySQL 窗口函数从 8.0 版本开始支持,提供了一种灵活的方式处理 SQL 查询中的数据。无需分组即可对行集进行分析,常用于计算排名、累计和、移动平均值等。基本语法包括 `function_name([arguments]) OVER ([PARTITION BY columns] [ORDER BY columns] [frame_clause])`,常见函数有 `ROW_NUMBER()`, `RANK()`, `DENSE_RANK()`, `SUM()`, `AVG()` 等。窗口框架定义了计算聚合值时应包含的行。适用于复杂数据操作和分析报告。
87 11
|
1月前
|
存储 关系型数据库 MySQL
mysql怎么查询longblob类型数据的大小
通过本文的介绍,希望您能深入理解如何查询MySQL中 `LONG BLOB`类型数据的大小,并结合优化技术提升查询性能,以满足实际业务需求。
142 6
|
1天前
|
关系型数据库 MySQL 数据库连接
数据库连接工具连接mysql提示:“Host ‘172.23.0.1‘ is not allowed to connect to this MySQL server“
docker-compose部署mysql8服务后,连接时提示不允许连接问题解决
|
1月前
|
存储 Oracle 关系型数据库
数据库传奇:MySQL创世之父的两千金My、Maria
《数据库传奇:MySQL创世之父的两千金My、Maria》介绍了MySQL的发展历程及其分支MariaDB。MySQL由Michael Widenius等人于1994年创建,现归Oracle所有,广泛应用于阿里巴巴、腾讯等企业。2009年,Widenius因担心Oracle收购影响MySQL的开源性,创建了MariaDB,提供额外功能和改进。维基百科、Google等已逐步替换为MariaDB,以确保更好的性能和社区支持。掌握MariaDB作为备用方案,对未来发展至关重要。
63 3
|
1月前
|
安全 关系型数据库 MySQL
MySQL崩溃保险箱:探秘Redo/Undo日志确保数据库安全无忧!
《MySQL崩溃保险箱:探秘Redo/Undo日志确保数据库安全无忧!》介绍了MySQL中的三种关键日志:二进制日志(Binary Log)、重做日志(Redo Log)和撤销日志(Undo Log)。这些日志确保了数据库的ACID特性,即原子性、一致性、隔离性和持久性。Redo Log记录数据页的物理修改,保证事务持久性;Undo Log记录事务的逆操作,支持回滚和多版本并发控制(MVCC)。文章还详细对比了InnoDB和MyISAM存储引擎在事务支持、锁定机制、并发性等方面的差异,强调了InnoDB在高并发和事务处理中的优势。通过这些机制,MySQL能够在事务执行、崩溃和恢复过程中保持
81 3

相关产品

  • 实时计算 Flink版