kafkaspot在ack机制下如何保证内存不溢

简介: 新浪微博:intsmaze刘洋洋哥。   storm框架中的kafkaspout类实现的是BaseRichSpout,它里面已经重写了fail和ack方法,所以我们的bolt必须实现ack机制,就可以保证消息的重新发送;如果不实现ack机制,那么kafkaspout就无法得到消息的处理响应,就会在超时以后再次发送消息,导致消息的重复发送。
新浪微博:intsmaze刘洋洋哥。
 
storm框架中的kafkaspout类实现的是BaseRichSpout,它里面已经重写了fail和ack方法,所以我们的bolt必须实现ack机制,就可以保证消息的重新发送;如果不实现ack机制,那么kafkaspout就无法得到消息的处理响应,就会在超时以后再次发送消息,导致消息的重复发送。
 
但是回想一下我们自己写一个spout类实现BaseRichSpout并让他具备消息重发,那么我们是会在我们的spout类里面定义一个map集合,并以msgId作为key。
public class MySpout extends BaseRichSpout {
    private static final long serialVersionUID = 5028304756439810609L;
    // key:messageId,Data
    private HashMap<String, String> waitAck = new HashMap<String, String>();
    private SpoutOutputCollector collector;
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
    }
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }
    public void nextTuple() {
        String sentence = "the cow jumped over the moon";
        String messageId = UUID.randomUUID().toString().replaceAll("-", "");
        waitAck.put(messageId, sentence);
        //指定messageId,开启ackfail机制
        collector.emit(new Values(sentence), messageId);
    }
    @Override
    public void ack(Object msgId) {
        System.out.println("消息处理成功:" + msgId);
        System.out.println("删除缓存中的数据...");
        waitAck.remove(msgId);
    }
    @Override
    public void fail(Object msgId) {
        System.out.println("消息处理失败:" + msgId);
        System.out.println("重新发送失败的信息...");
        //重发如果不开启ackfail机制,那么spout的map对象中的该数据不会被删除的,而且下游
        collector.emit(new Values(waitAck.get(msgId)),msgId);
    }
}
 
那么kafkaspout会不会也是这样还保存这已发送未收到bolt响应的消息呢?如果这样,如果消息处理不断失败,不断重发,消息不断积累在kafkaspout节点上,kafkaspout端会不就会出现内存溢出?
 
其实并没有,回想kafka的原理,Kafka会为每一个consumergroup保留一些metadata信息–当前消费的消息的position,也即offset。这个offset由consumer控制。正常情况下consumer会在消费完一条消息后线性增加这个offset。当然,consumer也可将offset设成一个较小的值,重新消费一些消息。也就是说, kafkaspot在消费kafka的数据是,通过offset读取到消息并发送给bolt后,kafkaspot只是保存者当前的offset值。
当失败或成功根据msgId查询offset值,然后再去kafka消费该数据来确保消息的重新发送。
 
那么虽然offset数据小,但是当offset的数据量上去了还是会内存溢出的?
其实并没有,kafkaspout发现缓存的数据超过限制了,会把某端的数据清理掉的。
 
 
kafkaspot中发送数据的代码
collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
可以看到msgID里面包装了offset参数。
它不缓存已经发送出去的数据信息。
 
当他接收到来至bolt的响应后,会从接收到的msgId中得到offset。以下是从源码中折取的关键代码:
public void ack(Object msgId) {
     KafkaMessageId id = (KafkaMessageId) msgId;
     PartitionManager m = _coordinator.getManager(id.partition);
     if (m != null) {
          m.ack(id.offset);
     }
 }
 m.ack(id.offset);
 public void ack(Long offset) {
     _pending.remove(offset);//处理成功移除offset
     numberAcked++;
 }

public void fail(Object msgId) { KafkaMessageId id = (KafkaMessageId) msgId; PartitionManager m = _coordinator.getManager(id.partition); if (m != null) { m.fail(id.offset); } } m.fail(id.offset); public void fail(Long offset) {     failed.add(offset);//处理失败添加offset numberFailed++; } SortedSet<Long> _pending = new TreeSet<Long>(); SortedSet<Long> failed = new TreeSet<Long>();

 

关于kafkaspot的源码解析大家可以看这边博客:http://www.cnblogs.com/cruze/p/4241181.html

源码解析中涉及了很多kafka的概念,所以仅仅理解kafka的概念想完全理解kafkaspot源码是很难的,如果不理解kafka概念,那么就只需要在理解storm的ack机制上明白kafkaspot做了上面的两件事就可以了。

作者: intsmaze(刘洋)
老铁,你的--->推荐,--->关注,--->评论--->是我继续写作的动力。
微信公众号号:Apache技术研究院
由于博主能力有限,文中可能存在描述不正确,欢迎指正、补充!
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
相关文章
|
3月前
|
存储 监控 算法
Java中的内存管理:理解Garbage Collection机制
本文将深入探讨Java编程语言中的内存管理,着重介绍垃圾回收(Garbage Collection, GC)机制。通过阐述GC的工作原理、常见算法及其在Java中的应用,帮助读者提高程序的性能和稳定性。我们将从基本原理出发,逐步深入到调优实践,为开发者提供一套系统的理解和优化Java应用中内存管理的方法。
|
4月前
|
监控 算法 Java
Java中的内存管理:理解Garbage Collection机制
本文将深入探讨Java编程语言中的内存管理,特别是垃圾回收(Garbage Collection, GC)机制。我们将从基础概念开始,逐步解析垃圾回收的工作原理、不同类型的垃圾回收器以及它们在实际项目中的应用。通过实际案例,读者将能更好地理解Java应用的性能调优技巧及最佳实践。
114 0
|
2月前
|
弹性计算 Kubernetes Perl
k8s 设置pod 的cpu 和内存
在 Kubernetes (k8s) 中,设置 Pod 的 CPU 和内存资源限制和请求是非常重要的,因为这有助于确保集群资源的合理分配和有效利用。你可以通过定义 Pod 的 `resources` 字段来设置这些限制。 以下是一个示例 YAML 文件,展示了如何为一个 Pod 设置 CPU 和内存资源请求(requests)和限制(limits): ```yaml apiVersion: v1 kind: Pod metadata: name: example-pod spec: containers: - name: example-container image:
398 1
|
2月前
|
存储 算法 Java
Go语言的内存管理机制
【10月更文挑战第25天】Go语言的内存管理机制
44 2
|
2月前
|
存储 运维 Java
💻Java零基础:深入了解Java内存机制
【10月更文挑战第18天】本文收录于「滚雪球学Java」专栏,专业攻坚指数级提升,希望能够助你一臂之力,帮你早日登顶实现财富自由🚀;同时,欢迎大家关注&&收藏&&订阅!持续更新中,up!up!up!!
41 1
|
3月前
|
存储 安全 NoSQL
driftingblues9 - 溢出ASLR(内存地址随机化机制)
driftingblues9 - 溢出ASLR(内存地址随机化机制)
48 1
|
4月前
|
消息中间件
共享内存和信号量的配合机制
【9月更文挑战第16天】本文介绍了进程间通过共享内存通信的机制及其同步保护方法。共享内存可让多个进程像访问本地内存一样进行数据交换,但需解决并发读写问题,通常借助信号量实现同步。文章详细描述了共享内存的创建、映射、解除映射等操作,并展示了如何利用信号量保护共享数据,确保其正确访问。此外,还提供了具体代码示例与步骤说明。
|
3月前
|
消息中间件 Java Kafka
Kafka ACK机制详解!
本文深入剖析了Kafka的ACK机制,涵盖其原理、源码分析及应用场景,并探讨了acks=0、acks=1和acks=all三种级别的优缺点。文中还介绍了ISR(同步副本)的工作原理及其维护机制,帮助读者理解如何在性能与可靠性之间找到最佳平衡。适合希望深入了解Kafka消息传递机制的开发者阅读。
279 0
|
3月前
|
程序员 编译器 数据处理
【C语言】深度解析:动态内存管理的机制与实践
【C语言】深度解析:动态内存管理的机制与实践
|
5月前
|
JavaScript 前端开发 算法
js 内存回收机制
【8月更文挑战第23天】js 内存回收机制
51 3