开发者社区> 问答> 正文

flink1.11查询结果每秒入库到mysql数量很少

各位大佬好,请教一个问题,在使用flink1.11消费kafka数据,查询结果写入到mysql库表时,发现读取kafka的速度很快(300条/秒),但是查询结果每秒写入mysql的条数只有6条左右,请问这是怎么回事,以及优化的点在哪里?下面是我的代码。

from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode

from pyflink.table import StreamTableEnvironment, EnvironmentSettings

source_Kafka = """

CREATE TABLE kafka_source (

 id VARCHAR, 

 alarm_id VARCHAR,

 trck_id VARCHAR

) WITH (

 'connector' = 'kafka',

 'topic' = 'test',  

 'properties.bootstrap.servers' = '*',

 'properties.group.id' = 'flink_grouper',

 'scan.startup.mode' = 'earliest-offset',    

 'format' = 'json',

 'json.fail-on-missing-field' = 'false',

 'json.ignore-parse-errors' = 'true'

)

"""

source_W_detail_ddl = """

CREATE TABLE source_W_detail (

 id VARCHAR,    

 alarm_id VARCHAR,     

 trck_id VARCHAR    

) WITH (

 'connector' = 'jdbc',

 'url' = 'jdbc:mysql://198.2.2.71:3306/bda?useSSL=false',

 'driver' = 'com.mysql.cj.jdbc.Driver',

 'table-name' = 'detail',

 'username' = 'root',

 'password' = 'root',

 'sink.buffer-flush.max-rows' = '1000',

 'sink.buffer-flush.interval' = '2s'

"""

env = StreamExecutionEnvironment.get_execution_environment()

env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)

env.set_parallelism(1)

env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()

t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)

t_env.execute_sql(source_Kafka)

t_env.execute_sql(source_W_detail_ddl)

table_result1=t_env.execute_sql('''insert into source_W_detail select id,alarm_id,trck_id from kafka_source''')

table_result1.get_job_client().get_job_execution_result().result()*来自志愿者整理的flink邮件归档

展开
收起
说了是一只鲳鱼 2021-12-07 10:11:45 914 0
1 条回答
写回答
取消 提交回答
  • 你可以尝试改写url,加上rewritebatchedstatements=true,如下: jdbc:mysql://198.2.2.71:3306/bda?useSSL=false&rewritebatchedstatements=true

    MySQL Jdbc驱动在默认情况下会无视executeBatch()语句,把期望批量执行的一组sql语句拆散,一条一条地发给MySQL数据库,直接造成较低的性能。把rewriteBatchedStatements参数置为true, 驱动才会帮你批量执行SQL。*来自志愿者整理的flink邮件归档

    2021-12-07 12:14:03
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载

相关镜像