开发者社区> 问答> 正文

flink sql 不同job消费同一个kafka表(指定了groupId)时输出相同数据?

Hi,all 使用flink版本1.10.0,在hive catalog下建了映射kafka的表: CREATE TABLE x.log.yanfa_log ( dt TIMESTAMP(3), conn_id STRING, sequence STRING, trace_id STRING, span_info STRING, service_id STRING, msg_id STRING, servicename STRING, ret_code STRING, duration STRING, req_body MAP<String,String>, res_body MAP<STRING,STRING>, extra_info MAP<STRING,STRING>, WATERMARK FOR dt AS dt - INTERVAL '60' SECOND ) WITH ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'x-log-yanfa_log', 'connector.properties.bootstrap.servers' = '******:9092', 'connector.properties.zookeeper.connect' = '******:2181', 'connector.properties.group.id' = 'testGroup', 'connector.startup-mode' = 'group-offsets', 'update-mode' = 'append', 'format.type' = 'json', 'format.fail-on-missing-field' = 'true' ); 消费表x.log.yanfa_log程序如下: Catalog myCatalog = new HiveCatalog("x", "default", "D:\conf", "1.1.0"); tEnv.registerCatalog("x", myCatalog); Table rs = tEnv.sqlQuery("select * from x.log.yanfa_log"); tEnv.toAppendStream(rs, Row.class).print();

然后针对同一个程序启动了2个job,结果都输出了相同的结果。我的疑问是kafka topic的同一个partition不是只能被group下至多一个consumer消费吗?为什么2个job会输出相同结果呢?

来自志愿者整理的flink邮件归档来自志愿者整理的FLINK邮件归档

展开
收起
小阿怪 2021-12-04 19:26:04 1738 0
1 条回答
写回答
取消 提交回答
  • Flink的Kafka Connector的实现是用的Kafka lower api,也就是会自己去获取当前的partition信息,自己来分配那些subtask读取那个partition。 所以如果有两个任务,他们互相之间是没有关系的,也不会相互感知到。(只有一点,就是如果你配置了相同的group id,他们提交offset可能会互相覆盖。) 你说的那个模式是Kafka high-level api。

    来自志愿者整理的flink邮件归档来自志愿者整理的FLINK邮件归档

    2021-12-04 22:34:59
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载