开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

请问一下。怎么从kafka获取一 批数据,然后批量写入es或数据库呢?

请问一下。怎么从kafka获取一 批数据,然后批量写入es或数据库呢。现在kafka消费都是一条一条的。怎么转化成批量呢。?

展开
收起
圆葱猪肉包 2023-03-27 13:40:46 364 0
3 条回答
写回答
取消 提交回答
  • 定义批插入sink或者使用flink 提供的sink函数,我记得flink好几个数据库sink函数都提供批量插入。此回答整理自钉群“【③群】Apache Flink China社区”

    2023-03-28 10:53:57
    赞同 展开评论 打赏
  • Flink实时消费kafka数据,数据经过处理,富化、清洗等操作,写入ES

    2023-03-27 15:22:57
    赞同 展开评论 打赏
  • 坚持这件事孤独又漫长。
    • 可以使用Flink的DataStream API中提供的 window 和 batch API,来进行批量处理。以下是基于 Flink 的批处理实例代码:
    val env = ExecutionEnvironment.getExecutionEnvironment
    
    val props = new Properties()
    props.setProperty("bootstrap.servers", "localhost:9092")
    props.setProperty("group.id", "test")
    
    val consumer = new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), props)
    consumer.setStartFromEarliest()
    
    val stream: DataSet[String] = env
      .addSource(consumer)
    
    val batchSize = 1000L // 批处理大小
    stream
      .map(parseRecord) // 对每个数据进行解析
      .groupingKey(data => data.id) // 根据 ID 进行分组
      .reduceGroup { (values, out: Collector[Seq[Data]]) =>
        val batch = scala.collection.mutable.ArrayBuffer[Data]()
        values.foreach { data =>
          batch += data
          if (batch.size >= batchSize) {
            out.collect(batch)
            batch.clear()
          }
        }
        if (batch.nonEmpty) {
          out.collect(batch)
          batch.clear()
        }
      }
      .flatMap { data =>
        writeToEs(data) // 将处理后的数据写入 ES 中
      }
    
    env.execute("Batch processing from Kafka to ES example")
    

    在此示例中,我们首先使用FlinkKafkaConsumer从 Kafka 中消费数据,然后定义一个处理函数,在函数内我们对数据进行解析,并根据 ID 进行分组。然后我们将同一个数据 ID 的数据打包成一个批次,批次大小是向上取整为 batchSize 的最小值。最后我们将批次数据批量写入到 ES 中。

    2023-03-27 14:48:36
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载