Flink查询问题之每秒入库到mysql数量很少如何解决

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink sql 读取mysql

版本:flink 1.10 mysql 5.7.24

需求场景是: 使用flink SQL,读取kafka数据,和mysql的维表数据,之后进行join操作,如何配置mysql connector,才能使维表数据修改后,flink能读取到更新后的数据进行join操作?

现在本地测试时,维表的DDL是:

但是去mysql修改了数据后,join操作还是旧数据.

望大神们指点方向,提前谢谢了.*来自志愿者整理的flink邮件归档



参考答案:

这个报错一般是sql格式错误,比如中英文逗号等,你可以检查下你的SQL语句*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371510?spm=a2c6h.12873639.article-detail.22.29d04378ApxdqJ



问题二:Apache Flink常见问题汇总【精品问答】

hi

我这面在使用sql api解析kafka

json数据,在创建表的时候发现json数据解析的时候有下面两项,这两项如果开启那么解析失败的数据是会被丢掉吗,有没有方式可以把解析失败的数据打到外部存储

json.ignore-parse-errors

son.fail-on-missing-field*来自志愿者整理的flink邮件归档



参考答案:

我理解应该做不到,因为这两个format参数在format里就做的。

json.ignore-parse-errors 是在 format解析时跳过解析失败的数据继续解析下一行,json.fail-on-missing-field 是标记如果字段少时是否失败还是继续(缺少的字段用null补上)

这两个不能同时为ture,语义上就是互斥的。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371508?spm=a2c6h.12873639.article-detail.23.29d04378ApxdqJ



问题三:https://developer.aliyun.com/ask/371504

请问实现了 MetricReporter, CharacterFilter,Scheduled, Reporter 的自定义

customerReporter 如何能在 代码env里面注册并实现metric上报,要求不在flink conf.xml 文件里面配置

该customerReporter的信息?

需求:在自定义的source 和sink等算子里面计算处理成功,失败的数据条数并通过自定义reporter上报,并且该reporter需要是通用型的即

*适用于多个flink

任务*从而避开重复造轮子。*来自志愿者整理的flink邮件归档



参考答案:

尝试理解一下你的需求,你自己实现了一个 report,也希望在 source 和 sink 中计算一些 metric,希望把 source 和sink 的这些 metric 通过自定义的 report 上报到你指定的地方。然后不希望在 env 里面配置 report 的信息,是这样吗?能否解释下为什么不希望在 flink-conf 中进行配置,而是希望在 env 中进行配置吗*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371506?spm=a2c6h.12873639.article-detail.24.29d04378ApxdqJ



问题四:flink1.11查询结果每秒入库到mysql数量很少

各位大佬好,请教一个问题,在使用flink1.11消费kafka数据,查询结果写入到mysql库表时,发现读取kafka的速度很快(300条/秒),但是查询结果每秒写入mysql的条数只有6条左右,请问这是怎么回事,以及优化的点在哪里?下面是我的代码。

from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode

from pyflink.table import StreamTableEnvironment, EnvironmentSettings

source_Kafka = """

CREATE TABLE kafka_source (

id VARCHAR,

alarm_id VARCHAR,

trck_id VARCHAR

) WITH (

'connector' = 'kafka',

'topic' = 'test',  

'properties.bootstrap.servers' = '*',

'properties.group.id' = 'flink_grouper',

'scan.startup.mode' = 'earliest-offset',    

'format' = 'json',

'json.fail-on-missing-field' = 'false',

'json.ignore-parse-errors' = 'true'

)

"""

source_W_detail_ddl = """

CREATE TABLE source_W_detail (

id VARCHAR,    

alarm_id VARCHAR,    

trck_id VARCHAR    

) WITH (

'connector' = 'jdbc',

'url' = 'jdbc:mysql://198.2.2.71:3306/bda?useSSL=false',

'driver' = 'com.mysql.cj.jdbc.Driver',

'table-name' = 'detail',

'username' = 'root',

'password' = 'root',

'sink.buffer-flush.max-rows' = '1000',

'sink.buffer-flush.interval' = '2s'

"""

env = StreamExecutionEnvironment.get_execution_environment()

env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)

env.set_parallelism(1)

env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()

t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)

t_env.execute_sql(source_Kafka)

t_env.execute_sql(source_W_detail_ddl)

table_result1=t_env.execute_sql('''insert into source_W_detail select id,alarm_id,trck_id from kafka_source''')

table_result1.get_job_client().get_job_execution_result().result()*来自志愿者整理的flink邮件归档



参考答案:

你可以尝试改写url,加上rewritebatchedstatements=true,如下: jdbc:mysql://198.2.2.71:3306/bda?useSSL=false&rewritebatchedstatements=true

MySQL Jdbc驱动在默认情况下会无视executeBatch()语句,把期望批量执行的一组sql语句拆散,一条一条地发给MySQL数据库,直接造成较低的性能。把rewriteBatchedStatements参数置为true, 驱动才会帮你批量执行SQL。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371504?spm=a2c6h.12873639.article-detail.26.29d04378ApxdqJ

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
2月前
|
SQL 人工智能 关系型数据库
如何实现MySQL百万级数据的查询?
本文探讨了在MySQL中对百万级数据进行排序分页查询的优化策略。面对五百万条数据,传统的浅分页和深分页查询效率较低,尤其深分页因偏移量大导致性能显著下降。通过为排序字段添加索引、使用联合索引、手动回表等方法,有效提升了查询速度。最终建议根据业务需求选择合适方案:浅分页可加单列索引,深分页推荐联合索引或子查询优化,同时结合前端传递最后一条数据ID的方式实现高效翻页。
131 0
|
25天前
|
存储 关系型数据库 MySQL
使用命令行cmd查询MySQL表结构信息技巧分享。
掌握了这些命令和技巧,您就能快速并有效地从命令行中查询MySQL表的结构信息,进而支持数据库维护、架构审查和优化等工作。
156 9
|
6月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
628 0
|
3月前
|
关系型数据库 MySQL 数据库
MySQL报错:未知系统变量'tx_isolation'及隔离级别查询
记住,选择合适的隔离级别,就像是在风平浪静的湖面上找到适合的划船速度——既要快到能赶上午饭(性能),又不至于翻船(数据一致性问题)。
187 3
|
3月前
|
SQL 关系型数据库 MySQL
MySQL 进行 select 查询时 where 条件中 in 的value数过多却导致无记录返回
MySQL 进行 select 查询时 where 条件中 in 的value数过多返回不符合预期怎么办?会不会遇到bug了?
200 0
|
4月前
|
缓存 JSON 关系型数据库
MySQL 查询优化分析 - 常用分析方法
本文介绍了MySQL查询优化分析的常用方法EXPLAIN、Optimizer Trace、Profiling和常用监控指标。
|
7月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
578 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
|
7月前
|
算法 关系型数据库 MySQL
join查询可以⽆限叠加吗?MySQL对join查询有什么限制吗?
大家好,我是 V 哥。本文详细探讨了 MySQL 中 `JOIN` 查询的限制及其优化方法。首先,`JOIN` 查询不能无限叠加,存在资源(CPU、内存、磁盘 I/O)、性能和语法等方面的限制。过多的 `JOIN` 操作会导致数据库性能急剧下降。其次,介绍了三种常见的 `JOIN` 查询算法:嵌套循环连接(NLJ)、索引嵌套连接(INL)和基于块的嵌套循环连接(BNL),并分析了它们的触发条件和性能特点。最后,分享了优化 `JOIN` 查询的方法,包括 SQL 语句优化、索引优化、数据库配置调整等。关注 V 哥,了解更多技术干货,点赞👍支持,一起进步!
149 3
|
3月前
|
人工智能 运维 关系型数据库
数据库运维:mysql 数据库迁移方法-mysqldump
本文介绍了MySQL数据库迁移的方法与技巧,重点探讨了数据量大小对迁移方式的影响。对于10GB以下的小型数据库,推荐使用mysqldump进行逻辑导出和source导入;10GB以上可考虑mydumper与myloader工具;100GB以上则建议物理迁移。文中还提供了统计数据库及表空间大小的SQL语句,并讲解了如何使用mysqldump导出存储过程、函数和数据结构。通过结合实际应用场景选择合适的工具与方法,可实现高效的数据迁移。
661 1
|
4月前
|
负载均衡 算法 关系型数据库
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
本文聚焦 MySQL 集群架构中的负载均衡算法,阐述其重要性。详细介绍轮询、加权轮询、最少连接、加权最少连接、随机、源地址哈希等常用算法,分析各自优缺点及适用场景。并提供 Java 语言代码实现示例,助力直观理解。文章结构清晰,语言通俗易懂,对理解和应用负载均衡算法具有实用价值和参考价值。
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多