开发者社区> 问答> 正文

flink 1.11运算结果存mysql出错

各位大佬好,请教一个问题flink从Kafka读数,写入mysql,程序没有报错,但是没有写入mysql任何数据。代码如下,是在linux下,直接python *.py执行的。完整代码如下

from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode

from pyflink.table import StreamTableEnvironment, EnvironmentSettings

source="""

CREATE TABLE kafka_source_tab (

 id VARCHAR,   

 alarm_id VARCHAR,   

 trck_id VARCHAR

) WITH (

 'connector' = 'kafka',

 'topic' = 'alarm_test_g',   

 'scan.startup.mode' = 'earliest-offset',

 'properties.bootstrap.servers' = '10.2.2.73:2181',

 'properties.bootstrap.servers' = '10.2.2.73:9092',

 'format' = 'json' 

)

"""

sink="""

CREATE TABLE g_source_tab (

 id VARCHAR,   

 alarm_id VARCHAR,     

 trck_id VARCHAR

) WITH (

 'connector' = 'jdbc',

 'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false', 

 'table-name' = 'g',   

 'username' = 'root',

 'password' = '123456t',

 'sink.buffer-flush.interval' = '1s'

)

"""

env = StreamExecutionEnvironment.get_execution_environment()

env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

env.set_parallelism(1)

env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()

t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)

t_env.execute_sql(source)

t_env.execute_sql(sink)

source = t_env.from_path("kafka_source_tab")\

        .select("id,alarm_id,trck_id")

source.execute_insert("g_source_tab")

*来自志愿者整理的flink邮件归档

展开
收起
小阿矿 2021-12-06 15:45:55 852 0
1 条回答
写回答
取消 提交回答
  • 简单看了下代码应该没啥问题,alarm_test_g 这个kafka topic里有数据吗?可以检查下是否有脏数据,直接用./bin/kafka-console-consumer.sh 检查下?我有点怀疑这点

    *来自志愿者整理的flink邮件归档

    2021-12-06 18:27:31
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载

相关镜像