开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

实时计算 Flink版咋能把es 的数据实时推入到 kafka? 进一条推一条

实时计算 Flink版咋能把es 的数据实时推入到 kafka? 进一条推一条

展开
收起
wenti 2023-02-27 19:27:58 638 0
1 条回答
写回答
取消 提交回答
  • 存在即是合理

    阿里云实时计算 Flink版可以通过Flink的 Elasticsearch connector 将 Elasticsearch 中的数据实时推入到 Kafka 中。具体的实现方法如下:

    导入 Elasticsearch 和 Kafka 的 Flink connector。

        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch7_2.12</artifactId>
        <version>1.13.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.12</artifactId>
        <version>1.13.2</version>
    </dependency>
    
    

    在 Flink 任务中创建 Elasticsearch source。

        .setHosts("http://localhost:9200")
        .setIndex("my_index")
        .setDocumentType("_doc")
        .setScrollTimeout(Duration.ofMinutes(1))
        .setBulkFlushMaxActions(10)
        .setBulkFlushInterval(Duration.ofSeconds(5))
        .setDeserializationSchema(new JsonRowDataDeserializationSchema(schema))
        .build();
    
    

    创建一个 Kafka sink。

        "my_topic",
        new SimpleStringSchema(),
        properties
    );
    
    

    将 Elasticsearch source 和 Kafka sink 连接起来。

    stream.addSink(kafkaSink);
    
    

    以上就是将 Elasticsearch 中的数据实时推入到 Kafka 中的完整代码示例。需要注意的是,在实际使用中,您需要根据自己的需求进行适当的调整和修改。同时,为了保证推送数据的正确性和一致性,您还需要在 Flink 任务中设置适当的 checkpoint 和保存点等机制。

    2023-03-02 16:49:13
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
    MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
    消息队列kafka介绍 立即下载