问题一:flink sql 读取mysql
版本:flink 1.10 mysql 5.7.24
需求场景是: 使用flink SQL,读取kafka数据,和mysql的维表数据,之后进行join操作,如何配置mysql connector,才能使维表数据修改后,flink能读取到更新后的数据进行join操作?
现在本地测试时,维表的DDL是:
但是去mysql修改了数据后,join操作还是旧数据.
望大神们指点方向,提前谢谢了.*来自志愿者整理的flink邮件归档
参考答案:
这个报错一般是sql格式错误,比如中英文逗号等,你可以检查下你的SQL语句*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371510?spm=a2c6h.12873639.article-detail.22.29d04378ApxdqJ
问题二:Apache Flink常见问题汇总【精品问答】
hi
我这面在使用sql api解析kafka
json数据,在创建表的时候发现json数据解析的时候有下面两项,这两项如果开启那么解析失败的数据是会被丢掉吗,有没有方式可以把解析失败的数据打到外部存储
json.ignore-parse-errors
son.fail-on-missing-field*来自志愿者整理的flink邮件归档
参考答案:
我理解应该做不到,因为这两个format参数在format里就做的。
json.ignore-parse-errors 是在 format解析时跳过解析失败的数据继续解析下一行,json.fail-on-missing-field 是标记如果字段少时是否失败还是继续(缺少的字段用null补上)
这两个不能同时为ture,语义上就是互斥的。*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371508?spm=a2c6h.12873639.article-detail.23.29d04378ApxdqJ
问题三:https://developer.aliyun.com/ask/371504
请问实现了 MetricReporter, CharacterFilter,Scheduled, Reporter 的自定义
customerReporter 如何能在 代码env里面注册并实现metric上报,要求不在flink conf.xml 文件里面配置
该customerReporter的信息?
需求:在自定义的source 和sink等算子里面计算处理成功,失败的数据条数并通过自定义reporter上报,并且该reporter需要是通用型的即
*适用于多个flink
任务*从而避开重复造轮子。*来自志愿者整理的flink邮件归档
参考答案:
尝试理解一下你的需求,你自己实现了一个 report,也希望在 source 和 sink 中计算一些 metric,希望把 source 和sink 的这些 metric 通过自定义的 report 上报到你指定的地方。然后不希望在 env 里面配置 report 的信息,是这样吗?能否解释下为什么不希望在 flink-conf 中进行配置,而是希望在 env 中进行配置吗*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371506?spm=a2c6h.12873639.article-detail.24.29d04378ApxdqJ
问题四:flink1.11查询结果每秒入库到mysql数量很少
各位大佬好,请教一个问题,在使用flink1.11消费kafka数据,查询结果写入到mysql库表时,发现读取kafka的速度很快(300条/秒),但是查询结果每秒写入mysql的条数只有6条左右,请问这是怎么回事,以及优化的点在哪里?下面是我的代码。
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
source_Kafka = """
CREATE TABLE kafka_source (
id VARCHAR,
alarm_id VARCHAR,
trck_id VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'test',
'properties.bootstrap.servers' = '*',
'properties.group.id' = 'flink_grouper',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
)
"""
source_W_detail_ddl = """
CREATE TABLE source_W_detail (
id VARCHAR,
alarm_id VARCHAR,
trck_id VARCHAR
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://198.2.2.71:3306/bda?useSSL=false',
'driver' = 'com.mysql.cj.jdbc.Driver',
'table-name' = 'detail',
'username' = 'root',
'password' = 'root',
'sink.buffer-flush.max-rows' = '1000',
'sink.buffer-flush.interval' = '2s'
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)
env.set_parallelism(1)
env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
t_env.execute_sql(source_Kafka)
t_env.execute_sql(source_W_detail_ddl)
table_result1=t_env.execute_sql('''insert into source_W_detail select id,alarm_id,trck_id from kafka_source''')
table_result1.get_job_client().get_job_execution_result().result()*来自志愿者整理的flink邮件归档
参考答案:
你可以尝试改写url,加上rewritebatchedstatements=true,如下: jdbc:mysql://198.2.2.71:3306/bda?useSSL=false&rewritebatchedstatements=true
MySQL Jdbc驱动在默认情况下会无视executeBatch()语句,把期望批量执行的一组sql语句拆散,一条一条地发给MySQL数据库,直接造成较低的性能。把rewriteBatchedStatements参数置为true, 驱动才会帮你批量执行SQL。*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371504?spm=a2c6h.12873639.article-detail.26.29d04378ApxdqJ