实时计算 Flink版咋能把es 的数据实时推入到 kafka? 进一条推一条

实时计算 Flink版咋能把es 的数据实时推入到 kafka? 进一条推一条

展开
收起
wenti 2023-02-27 19:27:58 674 发布于辽宁 分享
分享
版权
举报
1 条回答
写回答
取消 提交回答
  • 存在即是合理

    阿里云实时计算 Flink版可以通过Flink的 Elasticsearch connector 将 Elasticsearch 中的数据实时推入到 Kafka 中。具体的实现方法如下:

    导入 Elasticsearch 和 Kafka 的 Flink connector。

        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch7_2.12</artifactId>
        <version>1.13.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.12</artifactId>
        <version>1.13.2</version>
    </dependency>
    
    

    在 Flink 任务中创建 Elasticsearch source。

        .setHosts("http://localhost:9200")
        .setIndex("my_index")
        .setDocumentType("_doc")
        .setScrollTimeout(Duration.ofMinutes(1))
        .setBulkFlushMaxActions(10)
        .setBulkFlushInterval(Duration.ofSeconds(5))
        .setDeserializationSchema(new JsonRowDataDeserializationSchema(schema))
        .build();
    
    

    创建一个 Kafka sink。

        "my_topic",
        new SimpleStringSchema(),
        properties
    );
    
    

    将 Elasticsearch source 和 Kafka sink 连接起来。

    stream.addSink(kafkaSink);
    
    

    以上就是将 Elasticsearch 中的数据实时推入到 Kafka 中的完整代码示例。需要注意的是,在实际使用中,您需要根据自己的需求进行适当的调整和修改。同时,为了保证推送数据的正确性和一致性,您还需要在 Flink 任务中设置适当的 checkpoint 和保存点等机制。

    2023-03-02 16:49:13 举报
    赞同 评论

    评论

    全部评论 (0)

    登录后可评论

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

收录在圈子:
实时计算 Flink 版(Alibaba Cloud Realtime Compute for Apache Flink,Powered by Ververica)是阿里云基于 Apache Flink 构建的企业级、高性能实时大数据处理系统,由 Apache Flink 创始团队官方出品,拥有全球统一商业化品牌,完全兼容开源 Flink API,提供丰富的企业级增值功能。
还有其他疑问?
咨询AI助理