请问一下。怎么从kafka获取一 批数据,然后批量写入es或数据库呢?

请问一下。怎么从kafka获取一 批数据,然后批量写入es或数据库呢。现在kafka消费都是一条一条的。怎么转化成批量呢。?

展开
收起
圆葱猪肉包 2023-03-27 13:40:46 388 发布于辽宁 分享
分享
版权
举报
3 条回答
写回答
取消 提交回答
  • 定义批插入sink或者使用flink 提供的sink函数,我记得flink好几个数据库sink函数都提供批量插入。此回答整理自钉群“【③群】Apache Flink China社区”

    2023-03-28 10:53:57 发布于辽宁 举报
    赞同 评论

    评论

    全部评论 (0)

    登录后可评论
  • Flink实时消费kafka数据,数据经过处理,富化、清洗等操作,写入ES

    2023-03-27 15:22:57 发布于陕西 举报
    赞同 评论

    评论

    全部评论 (0)

    登录后可评论
  • 坚持这件事孤独又漫长。
    • 可以使用Flink的DataStream API中提供的 window 和 batch API,来进行批量处理。以下是基于 Flink 的批处理实例代码:
    val env = ExecutionEnvironment.getExecutionEnvironment
    
    val props = new Properties()
    props.setProperty("bootstrap.servers", "localhost:9092")
    props.setProperty("group.id", "test")
    
    val consumer = new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), props)
    consumer.setStartFromEarliest()
    
    val stream: DataSet[String] = env
      .addSource(consumer)
    
    val batchSize = 1000L // 批处理大小
    stream
      .map(parseRecord) // 对每个数据进行解析
      .groupingKey(data => data.id) // 根据 ID 进行分组
      .reduceGroup { (values, out: Collector[Seq[Data]]) =>
        val batch = scala.collection.mutable.ArrayBuffer[Data]()
        values.foreach { data =>
          batch += data
          if (batch.size >= batchSize) {
            out.collect(batch)
            batch.clear()
          }
        }
        if (batch.nonEmpty) {
          out.collect(batch)
          batch.clear()
        }
      }
      .flatMap { data =>
        writeToEs(data) // 将处理后的数据写入 ES 中
      }
    
    env.execute("Batch processing from Kafka to ES example")
    

    在此示例中,我们首先使用FlinkKafkaConsumer从 Kafka 中消费数据,然后定义一个处理函数,在函数内我们对数据进行解析,并根据 ID 进行分组。然后我们将同一个数据 ID 的数据打包成一个批次,批次大小是向上取整为 batchSize 的最小值。最后我们将批次数据批量写入到 ES 中。

    2023-03-27 14:48:36 发布于北京 举报
    赞同 评论

    评论

    全部评论 (0)

    登录后可评论

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

收录在圈子:
实时计算 Flink 版(Alibaba Cloud Realtime Compute for Apache Flink,Powered by Ververica)是阿里云基于 Apache Flink 构建的企业级、高性能实时大数据处理系统,由 Apache Flink 创始团队官方出品,拥有全球统一商业化品牌,完全兼容开源 Flink API,提供丰富的企业级增值功能。
还有其他疑问?
咨询AI助理
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等