flink 手动维护kafka offset

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: flink 手动维护kafka offset

flink 自定义管理kafka offset

我们都知道flink 程序自行设置指定offset配合checkpint 的方式,来获取便宜量。并且可以通过state和设置事件语义,达到exactly-once.但是仍然有一些场景需要自己手工管理offset.

用过spark的同学都知道,spark在读取kafka数据后,DStream(准确是InputDStream[ConsumerRecord[String, String]])中会有这几个信息:topic、partition、offset、key、value、timestamp等等信息,在维护的时候只需要对DStream进行一次foreach操作就可以了,根据场景选择保存offset的位置,再次重启的时候,读取redis中的offset就可以了。

同理,可以通过类似的方式实现。


步骤:

1:重写FlinkKafkaConsumer:创建NewKafkaDStream

2:使用外部数据库存储、更新offset.如redis

3:读取offset

完整demo

kafkaDstream类,

import org.apache.commons.lang.StringUtils
import org.slf4j.{Logger, LoggerFactory}
import redis.clients.jedis.Jedis
import java.nio.charset.StandardCharsets
import java.util._
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaConsumerBase}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaConsumerBase, KafkaDeserializationSchema}
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord
import scala.collection.JavaConversions._
/**
 * Created with IntelliJ IDEA.
 *
 * @Author: mdz
 * @Param: $params$
 * @Date: 2021/10/21/15:50
 * @Description:
 * @version:
 * */
object KafkaDStream {
  private val logger = LoggerFactory.getLogger(classOf[KafkaDStream])
  /**
   * 组建kafka信息
   * @param topic
   * @param groupid
   * @return
   */
  def createKafkaSource(topic:java.util.List[String], groupid:String): FlinkKafkaConsumer[KafkaDStream] ={
    // kafka消费者配置
    //KeyedDeserializationSchema太旧了,用KafkaDeserializationSchema
    val dataStream = new FlinkKafkaConsumer[KafkaDStream](topic:java.util.List[String], new KafkaDeserializationSchema[KafkaDStream]() {
      override def getProducedType: TypeInformation[KafkaDStream] = TypeInformation.of(new TypeHint[KafkaDStream]() {})
      override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): KafkaDStream = {
        var key: String = null
        var value: String = null
        if (record.key != null) {
          key = new String(record.key())
        }
        if (record.value != null) {
          value = new String(record.value())
        }
        val kafkasource = new KafkaDStream(record.topic(), record.partition(), record.offset(), record.timestamp(),value)
        kafkasource
      }
      override def isEndOfStream(s: KafkaDStream) = false
    }, getKafkaProperties(groupid))
    //是否自动提交offset
    dataStream.setCommitOffsetsOnCheckpoints(true)
    dataStream
  }
  /**
   * kafka配置
   * @param groupId
   * @return
   */
  private def getKafkaProperties(groupId:String): Properties = {
    val kafkaProps: Properties = new Properties()
    kafkaProps.setProperty("bootstrap.servers", "kafka.brokersxxxxxxx")
    kafkaProps.setProperty("group.id", groupId)
    kafkaProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    kafkaProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    kafkaProps
  }
  /**
   * 从redis中获取kafka的offset
   * @param topic
   * @param groupId
   * @return
   */
  def getSpecificOffsets(topic:java.util.ArrayList[String]): java.util.Map[KafkaTopicPartition, java.lang.Long]  ={
    import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
    val specificStartOffsets: java.util.Map[KafkaTopicPartition, java.lang.Long] = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
    for(topic <- topic){
      val jedis = new Jedis("redis_host", 6379)
      val key = s"my_flink_$topic"
      val partitions = jedis.hgetAll(key).toList
      for(partition <- partitions){
        if(!StringUtils.isEmpty(topic) && !StringUtils.isEmpty(partition._1) && !StringUtils.isEmpty(partition._2)){
          logger.warn("topic:"+topic.trim, partition._1.trim.toInt, partition._2.trim.toLong)
          specificStartOffsets.put(new KafkaTopicPartition(topic.trim, partition._1.trim.toInt), partition._2.trim.toLong)
        }
      }
      jedis.close()
    }
    specificStartOffsets
  }
  def setOffset(topic:String, partition:Int, offset:Long): Unit ={
    val jedis = new Jedis("redis_port1", 3306)
    val gtKey = s"my_flink_$topic"
    jedis.hset(gtKey, partition.toString, offset.toString)
    jedis.close()
  }
}
case class KafkaDStream(topic:String, partition:Int, offset:Long, keyMessage:String, message:String){
}

测试主类ManuKafkaOffsetTest:

import KafkaDStream.{createKafkaSource, getSpecificOffsets}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
/**
 * Created with IntelliJ IDEA.
 *
 * @Author: mdz
 * @Param: $params$
 * @Date: 2021/10/21/16:07
 * @Description:
 * @version:
 * */
object ManuKafkaOffsetTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.set...
    env.setParallelism(1)
    val myTopic="test_002"
    val groupId="111111"
    val topics = new java.util.ArrayList[String]
    topics.add(myTopic)
    val consumer = createKafkaSource(topics, groupId)
    consumer.setStartFromSpecificOffsets(getSpecificOffsets(topics))
    val dataStream = env.addSource(consumer)
    dataStream.print()
    env.execute()
  }
}


相关文章
|
3月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
231 0
|
3月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
54 0
|
3月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
110 0
|
3月前
|
消息中间件 存储 分布式计算
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
94 4
|
3月前
|
消息中间件 NoSQL Kafka
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
78 4
|
3月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
212 0
|
3月前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
62 0
|
3月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
114 1
|
3月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
64 1
|
5月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
367 9