(1)基于tuple树的Acker详解
Storm中有个特殊的Executor叫acker,他们负责跟踪spout发出的每一个Tuple的Tuple树。当acker发现一个Tuple树 已经处理完成了,它会告诉框架回调Spout的ack(),否则回调Spout的fail()。
Acker的跟踪算法是Storm的主要突破之一,对任意大的一个Tuple树,它只需要恒定的20字节就可以进行跟踪。
我们期望的是,如果某个Tuple被Bolt执行失败了,则Spout端可以重新发送该Tuple。但很遗憾的是,框架不会自动 重新发送,需要我们自己手工编码实现。
(2)如何开启或者关闭Acker
(1)如何开启Acker
Spout类中emit(java.util.List<java.lang.Object> tuple, java.lang.Object messageId)
这个messageId需要是唯一标识(可以用UUID),如此就可以追踪该tuple。 Bolt类里进行collector.ack(tuple),当成功执行tuple后,会自动回调Spout类的ack()函数。
要实现ack机制,必须在spout发射tuple的时候指定messageId。并且需要在spout中对tuple进行缓存,对于ack的 tuple则从缓存队列中删除,对于fail的tuple可以选择重发。
(2)如何关闭Acker
Spout类中emit(java.util.List<java.lang.Object> tuple),无messageId则自动 不追踪。
或通过如下关闭Ack:
Config conf = new Config() ; conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0);
(3)Acker机制的编程模型
(4)Storm可靠性实现的设计思想
(1)需要实现可靠性的场景
什么情况下失败处理的Tuple期望能重发处理?
数据完整性要求高,比如银行系统,对电商来讲的话如订单统计。
该类数据通常规范很严格,不会出现异常格式的数据,一旦Tuple处理失败,通常是集群、网络等原因。
海量日志统计的场景呢?不需要开启Acker。
(2)可靠性设计 — Acker失败重发编码实现思路
Spout里需要做什么?
在Spout emit时添加一个MsgID,那么ack和fail方法将会被调用当Tuple被正确地处理了或发生了错误。
collector.emit(new Values("field1", "field2") ,msgID);
缓存emit过的tuple,调用ack()时从缓存里删掉成功执行的tuple;调用fail()时缓存失败处理的tuple及失败次数 ,如 果不大于3次则重发,否则从缓存里删除。
Bolt里需要做什么?
Bolt里显性回调:this.collector.ack(input);或 this.collector.fail(input);
Tuple数据缓存方案:
如何缓存Tuple?Spout里进行缓存,本课程将数据缓存到内存。 实际项目中,一定要将数据缓存到外部存储系统,比如比如redis或hbase
如何控制重发次数为3次?建一个Map1,缓存发送过的Tuple,成功执行的Tuple从里删除;Map2 存发送失败的Tuple及失败次数,当超过3次 时从Map2里删除。
(5)Storm可靠性及Acker失败重发编码实现
order.log数据文件
0 60.0 2017-03-21 17:33:40 6 334 1 60.0 2017-03-21 17:33:40 6 856 2 20.10 2017-03-21 17:33:40 1 2132 3 60.0 2017-03-21 17:33:40 4 4456 4 80.1 2017-03-21 17:33:40 7 678 5 10.10 2017-03-21 17:33:40 5 2132 6 80.1 2017-03-21 17:33:40 1 678 7 60.0 2017-03-21 17:33:40 6 334 8 80.1 2017-03-21 17:33:40 4 678 9 10.10 2017-03-21 17:33:40 5 798 0 60.0 2017-03-21 17:33:40 6 334 1 60.0 2017-03-21 17:33:40 6 856 2 20.10 2017-03-21 17:33:40 1 2132 3 60.0 2017-03-21 17:33:40 4 4456 4 80.1 2017-03-21 17:33:40 7 678 5 10.10 2017-03-21 17:33:40 5 2132 6 80.1 2017-03-21 17:33:40 1 678 7 60.0 2017-03-21 17:33:40 6 334 8 80.1 2017-03-21 17:33:40 4 678 9 10.10 2017-03-21 17:33:40 5 798 7 60.0 2017-03-21 17:33:40 6 334 8 80.1 2017-03-21 17:33:40 4 678 9 10.10 2017-03-21 17:33:40 5 798 8 80.1 2017-03-21 17:33:40 4 678 9 10.10 2017-03-21 17:33:40 5 798 8 80.1 2017-03-21 17:33:40 4 678 9 10.10 2017-03-21 17:33:40 5 798 8 80.1 2017-03-21 17:33:40 4 678 9 10.10 2017-03-21 17:33:40 5 798 9 10.10 2017-03-21 17:33:40 5 798 9 10.10 2017-03-21 17:33:40 5 798 9 10.10 2017-03-21 17:33:40 5 798
MainTopology:
package com.kfk.ackFail; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/29 * @time : 10:28 下午 */ public class MainTopology { public static void main(String[] args) { // 创建Topology TopologyBuilder builder = new TopologyBuilder(); // set Spout builder.setSpout("spout", new AckSpout(), 1); // set Bolt builder.setBolt("bolt1", new AckBolt1(), 1).shuffleGrouping("spout"); builder.setBolt("bolt2", new AckBolt2(), 1).shuffleGrouping("bolt1"); Config conf = new Config(); conf.setDebug(false); try { if (args != null && args.length > 0) { StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("mytopology", conf, builder.createTopology()); } } catch (Exception e){ e.printStackTrace(); } } }
AckSpout:
package com.kfk.ackFail; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.InputStreamReader; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/29 * @time : 10:36 下午 */ public class AckSpout implements IRichSpout { FileInputStream fileInputStream = null; InputStreamReader inputStreamReader = null; BufferedReader bufferedReader = null; /** * 线程安全的Map,存储emit过的tuple */ private ConcurrentHashMap<Object, Values> pending; /** * 存储失败的tuple和其失败次数 */ private ConcurrentHashMap<Object, Integer> failpending; SpoutOutputCollector spoutOutputCollector = null; String str = null; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { try { this.spoutOutputCollector = collector; this.fileInputStream = new FileInputStream("order.log"); this.inputStreamReader = new InputStreamReader(fileInputStream,"UTF-8"); this.bufferedReader = new BufferedReader(inputStreamReader); this.pending = new ConcurrentHashMap<Object, Values>(); this.failpending = new ConcurrentHashMap<Object, Integer>(); } catch (Exception e){ e.printStackTrace(); } } @Override public void close() { // TODO Auto-generated method stub try { bufferedReader.close(); inputStreamReader.close(); fileInputStream.close(); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } @Override public void activate() { // TODO Auto-generated method stub } @Override public void deactivate() { // TODO Auto-generated method stub } @Override public void nextTuple() { try { while ((str = this.bufferedReader.readLine()) != null){ // 过滤动作 UUID msgId = UUID.randomUUID(); String[] arr = str.split("\t"); String date = arr[2].substring(0,10); String orderAmt = arr[1]; Values val = new Values(date,orderAmt); this.pending.put(msgId,val); spoutOutputCollector.emit(val,msgId); System.out.println("pending.size()="+pending.size()); } } catch (Exception e){ // TODO: handle exception e.printStackTrace(); } } @Override public void ack(Object msgId) { // TODO Auto-generated method stub System.out.println("pending size 共有:"+pending.size()); System.out.println("spout ack:"+msgId.toString()+"---"+msgId.getClass()); this.pending.remove(msgId); System.out.println("pending size 剩余:"+pending.size()); } @Override public void fail(Object msgId) { // TODO Auto-generated method stub System.out.println("spout fail:"+msgId.toString()); // 获取该Tuple失败的次数 Integer failcount = failpending.get(msgId); if (failcount == null){ failcount = 0; } failcount++; if (failcount >= 3){ // 重试次数已满,不再进行重新emit failpending.remove(msgId); } else { // 记录该tuple失败次数 failpending.put(msgId, failcount); // 重发 this.spoutOutputCollector.emit(this.pending.get(msgId), msgId); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("date","orderAmt")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
AckBolt1:
package com.kfk.ackFail; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Map; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/29 * @time : 11:00 下午 */ public class AckBolt1 implements IRichBolt { OutputCollector outputCollector = null; TopologyContext topologyContext = null; int num = 0; String url = null; String session_id = null; String date = null; String province_id = null; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { // TODO Auto-generated method this.outputCollector = collector; this.topologyContext = context; } @Override public void execute(Tuple input) { try { date = input.getStringByField("date") ; Double orderAmt = Double.parseDouble(input.getStringByField("orderAmt")); // 注意参数,第一个参数是Tuple本身 outputCollector.emit(input,new Values(date,String.valueOf(orderAmt))); outputCollector.ack(input); } catch (Exception e){ outputCollector.fail(input); e.printStackTrace(); } } @Override public void cleanup() { // TODO Auto-generated method stub } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("date","orderAmt")) ; } @Override public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } }
AckBolt2:
package com.kfk.ackFail; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import java.util.Map; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/29 * @time : 11:28 下午 */ public class AckBolt2 implements IRichBolt { OutputCollector outputCollector = null; TopologyContext topologyContext = null; int num = 0; String url = null; String session_id = null; String date = null; String province_id = null; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { // TODO Auto-generated method this.outputCollector = collector; this.topologyContext = context; } @Override public void execute(Tuple input) { try { date = input.getStringByField("date") ; Double orderAmt = Double.parseDouble(input.getStringByField("orderAmt")); outputCollector.ack(input); } catch (Exception e){ outputCollector.fail(input); e.printStackTrace(); } } @Override public void cleanup() { // TODO Auto-generated method stub } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("")) ; } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
运行结果: