Storm容错机制Acker

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: 笔记

(1)基于tuple树的Acker详解


Storm中有个特殊的Executor叫acker,他们负责跟踪spout发出的每一个Tuple的Tuple树。当acker发现一个Tuple树 已经处理完成了,它会告诉框架回调Spout的ack(),否则回调Spout的fail()。


Acker的跟踪算法是Storm的主要突破之一,对任意大的一个Tuple树,它只需要恒定的20字节就可以进行跟踪。


我们期望的是,如果某个Tuple被Bolt执行失败了,则Spout端可以重新发送该Tuple。但很遗憾的是,框架不会自动 重新发送,需要我们自己手工编码实现。


(2)如何开启或者关闭Acker


(1)如何开启Acker

Spout类中emit(java.util.List<java.lang.Object> tuple, java.lang.Object messageId)


这个messageId需要是唯一标识(可以用UUID),如此就可以追踪该tuple。 Bolt类里进行collector.ack(tuple),当成功执行tuple后,会自动回调Spout类的ack()函数。


要实现ack机制,必须在spout发射tuple的时候指定messageId。并且需要在spout中对tuple进行缓存,对于ack的 tuple则从缓存队列中删除,对于fail的tuple可以选择重发。


(2)如何关闭Acker

Spout类中emit(java.util.List<java.lang.Object> tuple),无messageId则自动 不追踪。


或通过如下关闭Ack:

Config conf = new Config() ; 
conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0);

(3)Acker机制的编程模型


3.png


(4)Storm可靠性实现的设计思想


(1)需要实现可靠性的场景

什么情况下失败处理的Tuple期望能重发处理?

数据完整性要求高,比如银行系统,对电商来讲的话如订单统计。

该类数据通常规范很严格,不会出现异常格式的数据,一旦Tuple处理失败,通常是集群、网络等原因。


海量日志统计的场景呢?不需要开启Acker。


(2)可靠性设计 — Acker失败重发编码实现思路

Spout里需要做什么?


在Spout emit时添加一个MsgID,那么ack和fail方法将会被调用当Tuple被正确地处理了或发生了错误。

collector.emit(new Values("field1", "field2") ,msgID);

缓存emit过的tuple,调用ack()时从缓存里删掉成功执行的tuple;调用fail()时缓存失败处理的tuple及失败次数 ,如 果不大于3次则重发,否则从缓存里删除。

Bolt里需要做什么?


Bolt里显性回调:this.collector.ack(input);或 this.collector.fail(input);

Tuple数据缓存方案:


如何缓存Tuple?Spout里进行缓存,本课程将数据缓存到内存。 实际项目中,一定要将数据缓存到外部存储系统,比如比如redis或hbase

如何控制重发次数为3次?建一个Map1,缓存发送过的Tuple,成功执行的Tuple从里删除;Map2 存发送失败的Tuple及失败次数,当超过3次 时从Map2里删除。

(5)Storm可靠性及Acker失败重发编码实现

order.log数据文件

0 60.0  2017-03-21 17:33:40 6 334
1 60.0  2017-03-21 17:33:40 6 856
2 20.10 2017-03-21 17:33:40 1 2132
3 60.0  2017-03-21 17:33:40 4 4456
4 80.1  2017-03-21 17:33:40 7 678
5 10.10 2017-03-21 17:33:40 5 2132
6 80.1  2017-03-21 17:33:40 1 678
7 60.0  2017-03-21 17:33:40 6 334
8 80.1  2017-03-21 17:33:40 4 678
9 10.10 2017-03-21 17:33:40 5 798
0 60.0  2017-03-21 17:33:40 6 334
1 60.0  2017-03-21 17:33:40 6 856
2 20.10 2017-03-21 17:33:40 1 2132
3 60.0  2017-03-21 17:33:40 4 4456
4 80.1  2017-03-21 17:33:40 7 678
5 10.10 2017-03-21 17:33:40 5 2132
6 80.1  2017-03-21 17:33:40 1 678
7 60.0  2017-03-21 17:33:40 6 334
8 80.1  2017-03-21 17:33:40 4 678
9 10.10 2017-03-21 17:33:40 5 798
7 60.0  2017-03-21 17:33:40 6 334
8 80.1  2017-03-21 17:33:40 4 678
9 10.10 2017-03-21 17:33:40 5 798
8 80.1  2017-03-21 17:33:40 4 678
9 10.10 2017-03-21 17:33:40 5 798
8 80.1  2017-03-21 17:33:40 4 678
9 10.10 2017-03-21 17:33:40 5 798
8 80.1  2017-03-21 17:33:40 4 678
9 10.10 2017-03-21 17:33:40 5 798
9 10.10 2017-03-21 17:33:40 5 798
9 10.10 2017-03-21 17:33:40 5 798
9 10.10 2017-03-21 17:33:40 5 798

MainTopology:

package com.kfk.ackFail;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/29
 * @time : 10:28 下午
 */
public class MainTopology {
    public static void main(String[] args) {
        // 创建Topology
        TopologyBuilder builder = new TopologyBuilder();
        // set Spout
        builder.setSpout("spout", new AckSpout(), 1);
        // set Bolt
        builder.setBolt("bolt1", new AckBolt1(), 1).shuffleGrouping("spout");
        builder.setBolt("bolt2", new AckBolt2(), 1).shuffleGrouping("bolt1");
        Config conf = new Config();
        conf.setDebug(false);
        try {
            if (args != null && args.length > 0) {
                StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
            }
            else {
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("mytopology", conf, builder.createTopology());
            }
        } catch (Exception e){
            e.printStackTrace();
        }
    }
}

AckSpout:

package com.kfk.ackFail;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/29
 * @time : 10:36 下午
 */
public class AckSpout implements IRichSpout {
    FileInputStream fileInputStream = null;
    InputStreamReader inputStreamReader = null;
    BufferedReader bufferedReader = null;
    /**
     * 线程安全的Map,存储emit过的tuple
     */
    private ConcurrentHashMap<Object, Values> pending;
    /**
     * 存储失败的tuple和其失败次数
     */
    private ConcurrentHashMap<Object, Integer> failpending;
    SpoutOutputCollector spoutOutputCollector = null;
    String str = null;
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        try {
            this.spoutOutputCollector = collector;
            this.fileInputStream = new FileInputStream("order.log");
            this.inputStreamReader = new InputStreamReader(fileInputStream,"UTF-8");
            this.bufferedReader = new BufferedReader(inputStreamReader);
            this.pending = new ConcurrentHashMap<Object, Values>();
            this.failpending = new ConcurrentHashMap<Object, Integer>();
        } catch (Exception e){
            e.printStackTrace();
        }
    }
    @Override
    public void close() {
        // TODO Auto-generated method stub
        try {
            bufferedReader.close();
            inputStreamReader.close();
            fileInputStream.close();
        } catch (Exception e) {
            // TODO: handle exception
            e.printStackTrace();
        }
    }
    @Override
    public void activate() {
        // TODO Auto-generated method stub
    }
    @Override
    public void deactivate() {
        // TODO Auto-generated method stub
    }
    @Override
    public void nextTuple() {
        try {
            while ((str = this.bufferedReader.readLine()) != null){
                // 过滤动作
                UUID msgId = UUID.randomUUID();
                String[] arr = str.split("\t");
                String date = arr[2].substring(0,10);
                String orderAmt = arr[1];
                Values val = new Values(date,orderAmt);
                this.pending.put(msgId,val);
                spoutOutputCollector.emit(val,msgId);
                System.out.println("pending.size()="+pending.size());
            }
        } catch (Exception e){
            // TODO: handle exception
            e.printStackTrace();
        }
    }
    @Override
    public void ack(Object msgId) {
        // TODO Auto-generated method stub
        System.out.println("pending size 共有:"+pending.size());
        System.out.println("spout ack:"+msgId.toString()+"---"+msgId.getClass());
        this.pending.remove(msgId);
        System.out.println("pending size 剩余:"+pending.size());
    }
    @Override
    public void fail(Object msgId) {
        // TODO Auto-generated method stub
        System.out.println("spout fail:"+msgId.toString());
        // 获取该Tuple失败的次数
        Integer failcount = failpending.get(msgId);
        if (failcount == null){
            failcount = 0;
        }
        failcount++;
        if (failcount >= 3){
            // 重试次数已满,不再进行重新emit
            failpending.remove(msgId);
        } else {
            // 记录该tuple失败次数
            failpending.put(msgId, failcount);
            // 重发
            this.spoutOutputCollector.emit(this.pending.get(msgId), msgId);
        }
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub
        declarer.declare(new Fields("date","orderAmt"));
    }
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

AckBolt1:

package com.kfk.ackFail;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/29
 * @time : 11:00 下午
 */
public class AckBolt1 implements IRichBolt {
    OutputCollector outputCollector = null;
    TopologyContext topologyContext = null;
    int num = 0;
    String url = null;
    String session_id = null;
    String date = null;
    String province_id = null;
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        // TODO Auto-generated method
        this.outputCollector = collector;
        this.topologyContext = context;
    }
    @Override
    public void execute(Tuple input) {
        try {
            date = input.getStringByField("date") ;
            Double orderAmt = Double.parseDouble(input.getStringByField("orderAmt"));
            // 注意参数,第一个参数是Tuple本身
            outputCollector.emit(input,new Values(date,String.valueOf(orderAmt)));
            outputCollector.ack(input);
        } catch (Exception e){
            outputCollector.fail(input);
            e.printStackTrace();
        }
    }
    @Override
    public void cleanup() {
        // TODO Auto-generated method stub
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub
        declarer.declare(new Fields("date","orderAmt")) ;
    }
    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }
}

AckBolt2:

package com.kfk.ackFail;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import java.util.Map;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/29
 * @time : 11:28 下午
 */
public class AckBolt2 implements IRichBolt {
    OutputCollector outputCollector = null;
    TopologyContext topologyContext = null;
    int num = 0;
    String url = null;
    String session_id = null;
    String date = null;
    String province_id = null;
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        // TODO Auto-generated method
        this.outputCollector = collector;
        this.topologyContext = context;
    }
    @Override
    public void execute(Tuple input) {
        try {
            date = input.getStringByField("date") ;
            Double orderAmt = Double.parseDouble(input.getStringByField("orderAmt"));
            outputCollector.ack(input);
        } catch (Exception e){
            outputCollector.fail(input);
            e.printStackTrace();
        }
    }
    @Override
    public void cleanup() {
        // TODO Auto-generated method stub
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub
        declarer.declare(new Fields("")) ;
    }
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

运行结果:

4.png




相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
10月前
|
Java 流计算
119 Storm的并发机制
119 Storm的并发机制
35 0
|
4月前
|
消息中间件 存储 算法
深入了解Kafka的数据持久化机制
深入了解Kafka的数据持久化机制
281 0
|
3月前
|
存储 缓存 算法
Flink(十二)【容错机制】(2)
Flink(十二)【容错机制】
|
3月前
|
消息中间件 存储 NoSQL
Flink(十二)【容错机制】(4)
Flink(十二)【容错机制】
|
3月前
|
存储 消息中间件 算法
|
2月前
|
消息中间件 存储 负载均衡
Kafka高可用性指南:提高数据一致性和集群容错能力!
**Kafka高可用性概览** - 创建Topic时设置`--replication-factor 3`确保数据冗余和高可用。 - 分配角色:Leader处理读写,Follower同步数据,简化管理和客户端逻辑。 - ISR(In-Sync Replicas)保持与Leader同步的副本列表,确保数据一致性和可靠性。 - 设置`acks=all`保证消息被所有副本确认,防止数据丢失,增强一致性。 - 通过这些机制,Kafka实现了分布式环境中的数据可靠性、一致性及服务的高可用性。
244 0
|
3月前
|
存储 消息中间件 缓存
Flink(十二)【容错机制】(3)
Flink(十二)【容错机制】
|
3月前
|
消息中间件 Kafka API
深入解析Kafka消息传递的可靠性保证机制
深入解析Kafka消息传递的可靠性保证机制
58 0
|
4月前
|
消息中间件 存储 分布式计算
分布式实时消息队列Kafka(五)副本机制
分布式实时消息队列Kafka(五)副本机制
131 0
分布式实时消息队列Kafka(五)副本机制
|
10月前
|
Apache 流计算
Flink CDC的Brokers设计是为了提高容错性和可扩展性
Flink CDC的Brokers设计是为了提高容错性和可扩展性
33 2