flink 自定义管理kafka offset
我们都知道flink 程序自行设置指定offset配合checkpint 的方式,来获取便宜量。并且可以通过state和设置事件语义,达到exactly-once.但是仍然有一些场景需要自己手工管理offset.
用过spark的同学都知道,spark在读取kafka数据后,DStream(准确是InputDStream[ConsumerRecord[String, String]])中会有这几个信息:topic、partition、offset、key、value、timestamp等等信息,在维护的时候只需要对DStream进行一次foreach操作就可以了,根据场景选择保存offset的位置,再次重启的时候,读取redis中的offset就可以了。
同理,可以通过类似的方式实现。
步骤:
1:重写FlinkKafkaConsumer
:创建NewKafkaDStream
2:使用外部数据库存储、更新offset.如redis
3:读取offset
完整demo
kafkaDstream类,
import org.apache.commons.lang.StringUtils import org.slf4j.{Logger, LoggerFactory} import redis.clients.jedis.Jedis import java.nio.charset.StandardCharsets import java.util._ import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation} import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaConsumerBase} import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaConsumerBase, KafkaDeserializationSchema} import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema import org.apache.kafka.clients.consumer.ConsumerRecord import scala.collection.JavaConversions._ /** * Created with IntelliJ IDEA. * * @Author: mdz * @Param: $params$ * @Date: 2021/10/21/15:50 * @Description: * @version: * */ object KafkaDStream { private val logger = LoggerFactory.getLogger(classOf[KafkaDStream]) /** * 组建kafka信息 * @param topic * @param groupid * @return */ def createKafkaSource(topic:java.util.List[String], groupid:String): FlinkKafkaConsumer[KafkaDStream] ={ // kafka消费者配置 //KeyedDeserializationSchema太旧了,用KafkaDeserializationSchema val dataStream = new FlinkKafkaConsumer[KafkaDStream](topic:java.util.List[String], new KafkaDeserializationSchema[KafkaDStream]() { override def getProducedType: TypeInformation[KafkaDStream] = TypeInformation.of(new TypeHint[KafkaDStream]() {}) override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): KafkaDStream = { var key: String = null var value: String = null if (record.key != null) { key = new String(record.key()) } if (record.value != null) { value = new String(record.value()) } val kafkasource = new KafkaDStream(record.topic(), record.partition(), record.offset(), record.timestamp(),value) kafkasource } override def isEndOfStream(s: KafkaDStream) = false }, getKafkaProperties(groupid)) //是否自动提交offset dataStream.setCommitOffsetsOnCheckpoints(true) dataStream } /** * kafka配置 * @param groupId * @return */ private def getKafkaProperties(groupId:String): Properties = { val kafkaProps: Properties = new Properties() kafkaProps.setProperty("bootstrap.servers", "kafka.brokersxxxxxxx") kafkaProps.setProperty("group.id", groupId) kafkaProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") kafkaProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") kafkaProps } /** * 从redis中获取kafka的offset * @param topic * @param groupId * @return */ def getSpecificOffsets(topic:java.util.ArrayList[String]): java.util.Map[KafkaTopicPartition, java.lang.Long] ={ import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition val specificStartOffsets: java.util.Map[KafkaTopicPartition, java.lang.Long] = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]() for(topic <- topic){ val jedis = new Jedis("redis_host", 6379) val key = s"my_flink_$topic" val partitions = jedis.hgetAll(key).toList for(partition <- partitions){ if(!StringUtils.isEmpty(topic) && !StringUtils.isEmpty(partition._1) && !StringUtils.isEmpty(partition._2)){ logger.warn("topic:"+topic.trim, partition._1.trim.toInt, partition._2.trim.toLong) specificStartOffsets.put(new KafkaTopicPartition(topic.trim, partition._1.trim.toInt), partition._2.trim.toLong) } } jedis.close() } specificStartOffsets } def setOffset(topic:String, partition:Int, offset:Long): Unit ={ val jedis = new Jedis("redis_port1", 3306) val gtKey = s"my_flink_$topic" jedis.hset(gtKey, partition.toString, offset.toString) jedis.close() } } case class KafkaDStream(topic:String, partition:Int, offset:Long, keyMessage:String, message:String){ }
测试主类ManuKafkaOffsetTest:
import KafkaDStream.{createKafkaSource, getSpecificOffsets} import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.api.scala._ /** * Created with IntelliJ IDEA. * * @Author: mdz * @Param: $params$ * @Date: 2021/10/21/16:07 * @Description: * @version: * */ object ManuKafkaOffsetTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.set... env.setParallelism(1) val myTopic="test_002" val groupId="111111" val topics = new java.util.ArrayList[String] topics.add(myTopic) val consumer = createKafkaSource(topics, groupId) consumer.setStartFromSpecificOffsets(getSpecificOffsets(topics)) val dataStream = env.addSource(consumer) dataStream.print() env.execute() } }