开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

写Flink-API代码不是可以指定消费KAFKA的数据是从最新的位置、最早的位置、当前OFFSET

写Flink-API代码不是可以指定消费KAFKA的数据是从最新的位置、最早的位置、当前OFFSET三种方式进行数据读取吗?Flink-SQL 如何指定?

展开
收起
游客3oewgrzrf6o5c 2022-07-27 10:19:03 561 0
1 条回答
写回答
取消 提交回答
  • 北京阿里云ACE会长

    是的,使用 Flink API 可以指定从最新的位置、最早的位置或者指定的 offset 开始消费 Kafka 数据。

    对于从最新位置开始消费 Kafka 数据,可以使用以下代码:

    java
    Copy
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "my-group");

    FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>("my-topic", new SimpleStringSchema(), properties);
    consumer.setStartFromLatest();

    DataStream stream = env.addSource(consumer);
    这里使用 setStartFromLatest() 方法将消费者的起始位置设置为最新的位置。类似地,您也可以使用 setStartFromEarliest() 方法将起始位置设置为最早的位置,或者使用 setStartFromSpecificOffsets() 方法手动指定起始 offset。

    对于 Flink SQL,您可以在创建表时使用 properties 子句指定 Kafka 消费者的相关属性,例如:

    sql
    Copy
    CREATE TABLE kafka_table (
    id INT,
    name STRING
    ) WITH (
    'connector' = 'kafka',
    'topic' = 'my-topic',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'my-group',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json'
    );
    这里使用 'scan.startup.mode' = 'latest-offset' 将消费者的起始位置设置为最新的位置。类似地,您也可以使用 'scan.startup.mode' = 'earliest-offset' 将起始位置设置为最早的位置,或者使用 'scan.startup.mode' = 'specific-offsets' 手动指定起始 offset。

    2023-07-17 13:25:09
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Spring Boot2.0实战Redis分布式缓存 立即下载
    CUDA MATH API 立即下载
    API PLAYBOOK 立即下载