开发者社区> 问答> 正文

如何为consumer设置Kafka抵消?

"假设我的主题中已有10个数据,现在我开始编写消费者 Flink,消费者将使用第11个数据。

因此,我有3个问题:

如何分别获取当前主题的分区数和每个分区的偏移量?
如何手动为消费者设置每个分区的起始位置?
如果Flink消费者崩溃,几分钟后就会恢复。消费者将如何知道重新启动的位置?
示例代码(我试过FlinkKafkaConsumer08,FlinkKafkaConsumer10但都是例外。):

public class kafkaConsumer {
public static void main(String[] args) throws Exception {

// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);

Properties properties = new Properties();
properties.setProperty(""bootstrap.servers"", ""192.168.95.2:9092"");
properties.setProperty(""group.id"", ""test"");
properties.setProperty(""auto.offset.reset"", ""earliest"");

FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<>(
        ""game_event"", new SimpleStringSchema(), properties);
DataStream<String> stream = env.addSource(myConsumer);

stream.map(new MapFunction<String, String>() {
    private static final long serialVersionUID = -6867736771747690202L;

    @Override
    public String map(String value) throws Exception {
        return ""Stream Value: "" + value;
    }
}).print();

env.execute();
}

}
和pom.xml:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>1.6.1</version>
</dependency>"

展开
收起
flink小助手 2018-11-28 16:13:23 1967 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    "为了从特定偏移开始消耗来自分区的消息,你可以参考Flink文档 l:
    你还可以指定comsumer应从每个分区开始的确切偏移量:

    Map specificStartOffsets = new HashMap<>();
    specificStartOffsets.put(new KafkaTopicPartition(""myTopic"", 0), 23L);
    specificStartOffsets.put(new KafkaTopicPartition(""myTopic"", 1), 31L);
    specificStartOffsets.put(new KafkaTopicPartition(""myTopic"", 2), 43L);

    myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
    上面的示例将使用者配置为从主题myTopic的分区0,1和2的指定偏移量开始。偏移值应该是消费者应为每个分区读取的下一条记录。请注意,如果使用者需要读取在提供的偏移量映射中没有指定偏移量的分区,则它将回退到该特定分区的默认组偏移行为(即setStartFromGroupOffsets())。

    请注意,当作业从故障中自动恢复或使用保存点手动恢复时,这些起始位置配置方法不会影响起始位置。在恢复时,每个Kafka分区的起始位置由存储在保存点或检查点中的偏移量确定(有关检查点的信息,请参阅下一节以启用消费者的容错)。

    如果其中一个comsumer崩溃,一旦它恢复,Kafka将引用consumer_offsets主题,以便继续处理从崩溃之前留下的点的消息。consumer_offsets是一个主题,用于存储有关每个三元组(主题,分区,组)的已提交偏移量的元数据信息。它也会定期压缩,以便只有最新的偏移量可用。你还可以参考Flink的Kafka连接器度量标准:
    Flink的Kafka连接器通过Flink的度量系统提供一些指标来分析连接器的行为。生产者通过Flink的度量系统为所有支持的版本导出Kafka的内部指标。消费者从Kafka 0.9版开始导出所有指标。Kafka文档在其文档中列出了所有导出的度量标准。

    除了这些指标之外,所有消费者都会公开每个主题分区的当前偏移和承诺偏移。电流偏移是指分区中的当前偏移。这指的是我们成功检索和发出的最后一个元素的偏移量。已提交的偏移量是最后提交的偏移量。

    Flink的Kafkacomsumer将抵消权交还给Zookeeper(Kafka 0.8)或Kafka经纪人(Kafka 0.9+)。如果禁用了检查点,则会定期提交偏移量。通过检查点,一旦流拓扑中的所有运算符确认已创建其状态的检查点,就会发生提交。这为用户提供了至少一次语义,用于提交给Zookeeper或代理的偏移量。对于Flink的偏移检查点,系统提供一次保证。

    提交给ZK或经纪人的抵消也可用于跟踪Kafka comnsumer的阅读进度。提交的偏移量与每个分区中最近的偏移量之间的差异称为consumer滞后。如果Flink拓扑消耗主题的数据比添加新数据的速度慢,则滞后将增加,消费者将落后。对于大型生产部署,我们建议监控该指标以避免增加延迟。"

    2019-07-17 23:16:50
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载