Storm starter - SingleJoinExample

简介:

Storm常见模式——流聚合

Topology

1.定义两个spout, 分别是genderSpout, ageSpout 
  Fields, ("id", "gender"), ("id", "age"), 最终join的结果应该是("id", "gender", "age")

2. 在设置SingleJoinBolt需要将outFields作为参数, 即告诉bolt, join完的结果应该包含哪些fields 
   并且对于两个spout都是以Fields("id")进行fieldsGrouping, 保证相同id都会发到同一个task

public class SingleJoinExample {
    public static void main(String[] args) {
        FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender"));
        FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));
        
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("gender", genderSpout);
        builder.setSpout("age", ageSpout);
        builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age")))
                .fieldsGrouping("gender", new Fields("id"))
                .fieldsGrouping("age", new Fields("id"));
}

SingleJoinBolt

由于不能保证bolt可以同时收到某个id的所有tuple, 所以必须把收到的tuple都先在memory里面cache, 至到收到某id的所有的tuples, 再做join. 
做完join后, 这些tuple就可以从cache里面删除, 但是如果某id的某些tuple丢失, 就会导致该id的其他tuples被一直cache. 
解决这个问题, 对cache数据设置timeout, 过期后就删除, 并发送这些tuples的fail通知.

可见这个场景, 使用TimeCacheMap正合适,

TimeCacheMap<List<Object>, , Map,>

List<Object>, 被join的field, 对于上面的例子就是"id”, 之所以是List, 应该是为了支持多fields join 
Map<GlobalStreamId, Tuple>,记录tuple和stream的关系

对于这个例子, 从TimeCacheMap的bucket里面取出下面两个k,v, 然后进行join 
{id, {agestream, (id, age)}} 
{id, {genderstream, (id, gender)}}

 

1. prepare 
一般的prepare的逻辑都很简单, 而这里确很复杂... 
a, 设置Timeout和ExpireCallback 
timeout 设的是, Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 默认是30s, 这个可以根据场景自己调整 
应该设法保证不同spout中tuple的发送顺序, 以保证相同id的tuple以较短时间间隔被收到, 比如这个例子应该按id排序然后emit 
否则如果出现, ("id", "gender")被第一个emit, 而 ("id", "age")被最后一个emit, 会导致不断的timeout 
设置ExpireCallback, 对于所有timeout的tuples, 发送fail通知

    private class ExpireCallback implements TimeCacheMap.ExpiredCallback<List<Object>, Map<GlobalStreamId, Tuple>> {
        @Override
        public void expire(List<Object> id, Map<GlobalStreamId, Tuple> tuples) {
            for(Tuple tuple: tuples.values()) {
                _collector.fail(tuple);
            }
        }        
    }

b. 找出_idFields(哪些field是相同的, 可以用作join) 和_fieldLocations (outfield和spout stream的关系, 比如gender属于genderstream) 
通过context.getThisSources()取出spout sources列表, 并通过getComponentOutputFields取到fields列表 
_idFields, 逻辑很简单, 每次都拿新的fields和idFields做retainAll(取出set共同部分), 最终会得到所有spout fields的相同部分 
_fieldLocations, 拿_outFields和spout fields进行匹配, 找到后记录下关系

其实, 我觉得这部分准备工作, 在调用的时候用参数指明就可以了, 犯不着那么麻烦的来做 
比如参数变为("id", {"gender", genderstream}, {"age", agestream})

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        _fieldLocations = new HashMap<String, GlobalStreamId>();
        _collector = collector;
        int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
        _pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback());
        _numSources = context.getThisSources().size();
        Set<String> idFields = null;
        for(GlobalStreamId source: context.getThisSources().keySet()) {
            Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId());
            Set<String> setFields = new HashSet<String>(fields.toList());
            if(idFields==null) idFields = setFields;
            else idFields.retainAll(setFields);
            
            for(String outfield: _outFields) {
                for(String sourcefield: fields) {
                    if(outfield.equals(sourcefield)) {
                        _fieldLocations.put(outfield, source);
                    }
                }
            }
        }
        _idFields = new Fields(new ArrayList<String>(idFields));
        
        if(_fieldLocations.size()!=_outFields.size()) {
            throw new RuntimeException("Cannot find all outfields among sources");
        }
    }

2, execute

a, 从tuple中取出_idFields和streamid  
   如果在_pending(TimeCacheMap)中没有此_idFields, 为这个_idFields创新新的hashmap并put到bucket 
b, 取出该_idFields所对应的所有Map<GlobalStreamId, Tuple> parts, 并检测当前收到的是否是无效tuple(从同一个stream emit的具有相同id的tuple) 
   将新的tuple, put到该_idFields所对应的map. parts.put(streamId, tuple); 
c, 判断如果parts的size等于spout sources的数目, 对于这个例子为2, 意思是当从genderstream和agestream过来的tuple都已经收到时 
       从_pending(TimeCacheMap)删除该_idFields的cache数据, 因为已经可以join, 不需要继续等待了 
       并根据_outFields以及_fieldLocations, 去各个stream的tuple中取出值 
       最终emit结果, (((id, age), (id, gender)),            (age, gender)) 
                       ArrayList<Tuple>(parts.values()),   joinResult 
       Ack所有的tuple

    @Override
    public void execute(Tuple tuple) {
        List<Object> id = tuple.select(_idFields);
        GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId());
        if(!_pending.containsKey(id)) {
            _pending.put(id, new HashMap<GlobalStreamId, Tuple>());            
        }
        Map<GlobalStreamId, Tuple> parts = _pending.get(id);
        if(parts.containsKey(streamId)) throw new RuntimeException("Received same side of single join twice");
        parts.put(streamId, tuple);
        if(parts.size()==_numSources) {
            _pending.remove(id);
            List<Object> joinResult = new ArrayList<Object>();
            for(String outField: _outFields) {
                GlobalStreamId loc = _fieldLocations.get(outField);
                joinResult.add(parts.get(loc).getValueByField(outField));
            }
            _collector.emit(new ArrayList<Tuple>(parts.values()), joinResult);
            
            for(Tuple part: parts.values()) {
                _collector.ack(part);
            }
        }
    }

 

TimeCacheMap

Storm常见模式——TimeCacheMap

解决什么问题? 
常常需要在memory里面cache key-value, 比如实现快速查找表 
但是memeory是有限的, 所以希望只保留最新的cache的, 过期的key-value可以被删除. 所以TimeCacheMap就是用来解决这个问题的, 在一定time内cache map(kv set)

1. 构造参数

TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback)

首先需要expirationSecs, 表示多久过期 
然后, numBuckets, 表示时间粒度, 比如expirationSecs = 60s, 而numBuckets=10, 那么一个bucket就代表6s的时间窗, 并且6s会发生一次过期数据删除 
最后, ExpiredCallback<K, V> callback, 当发生超时的时候, 需要对超时的K,V做些操作的话, 可以定义这个callback, 比如发送fail通知

2. 数据成员

核心结构, 使用linkedlist来实现bucket list, 用HashMap<K, V>来实现每个bucket

private LinkedList<HashMap<K, V>> _buckets; 

辅助成员, lock对象和定期的cleaner thread

private final Object _lock = new Object();
private Thread _cleaner;

3. 构造函数

其实核心就是启动_cleaner Daemon线程 
_cleaner的逻辑其实很简单, 
定期的把最后一个bucket删除, 在bucket list开头加上新的bucket, 并且如果有定义callback, 对所有timeout的kv调用callback 
同时这里考虑线程安全, 会对操作过程加锁synchronized(_lock)

唯一需要讨论的是, sleepTime 
即如果保证数据在定义的expirationSecs时间后, 被删除 
定义, sleepTime = expirationMillis / (numBuckets-1) 
a, 如果cleaner刚刚完成删除last, 添加first bucket, 这时put的K,V的过期时间为, 
   expirationSecs / (numBuckets-1) * numBuckets = expirationSecs * (1 + 1 / (numBuckets-1)) 
   需要等待完整的numBuckets个sleepTime, 所以时间会略大于expirationSecs

b, 如果反之, 刚完成put k,v操作后, cleaner开始clean操作, 那么k,v的过期时间为, 
   expirationSecs / (numBuckets-1) * numBuckets - expirationSecs / (numBuckets-1) = expirationSecs 
   这种case会比a少等一个sleepTime, 时间恰恰是expirationSecs

所以这个方法保证, 数据会在[b,a]的时间区间内被删除

    public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback) {
        if(numBuckets<2) {
            throw new IllegalArgumentException("numBuckets must be >= 2");
        }
        _buckets = new LinkedList<HashMap<K, V>>();
        for(int i=0; i<numBuckets; i++) {
            _buckets.add(new HashMap<K, V>());
        }

        _callback = callback;
        final long expirationMillis = expirationSecs * 1000L;
        final long sleepTime = expirationMillis / (numBuckets-1);
        _cleaner = new Thread(new Runnable() {
            public void run() {
                try {
                    while(true) {
                        Map<K, V> dead = null;
                        Time.sleep(sleepTime);
                        synchronized(_lock) {
                            dead = _buckets.removeLast();
                            _buckets.addFirst(new HashMap<K, V>());
                        }
                        if(_callback!=null) {
                            for(Entry<K, V> entry: dead.entrySet()) {
                                _callback.expire(entry.getKey(), entry.getValue());
                            }
                        }
                    }
                } catch (InterruptedException ex) {

                }
            }
        });
        _cleaner.setDaemon(true);
        _cleaner.start();
    }

4. 其他操作

首先, 所有操作都会使用synchronized(_lock)保证线程互斥 
其次, 所有操作的复杂度都是O(numBuckets), 因为每个item都是hashmap, 都是O(1)操作

 

最重要的是Put, 只会将新的k,v, put到第一个(即最新的)bucket, 并且将之前旧bucket里面的相同key的cache数据删除

public void put(K key, V value) {
        synchronized(_lock) {
            Iterator<HashMap<K, V>> it = _buckets.iterator();
            HashMap<K, V> bucket = it.next();
            bucket.put(key, value);
            while(it.hasNext()) {
                bucket = it.next();
                bucket.remove(key);
            }
        }
    }

其他还支持如下操作,

public boolean containsKey(K key) 
public V get(K key) 
public Object remove(K key) 
public int size() //将所有bucket的HashMap的size累加

本文章摘自博客园,原文发布日期: 2013-05-24

目录
相关文章
|
12天前
|
存储 关系型数据库 分布式数据库
PostgreSQL 18 发布,快来 PolarDB 尝鲜!
PostgreSQL 18 发布,PolarDB for PostgreSQL 全面兼容。新版本支持异步I/O、UUIDv7、虚拟生成列、逻辑复制增强及OAuth认证,显著提升性能与安全。PolarDB-PG 18 支持存算分离架构,融合海量弹性存储与极致计算性能,搭配丰富插件生态,为企业提供高效、稳定、灵活的云数据库解决方案,助力企业数字化转型如虎添翼!
|
11天前
|
存储 人工智能 搜索推荐
终身学习型智能体
当前人工智能前沿研究的一个重要方向:构建能够自主学习、调用工具、积累经验的小型智能体(Agent)。 我们可以称这种系统为“终身学习型智能体”或“自适应认知代理”。它的设计理念就是: 不靠庞大的内置知识取胜,而是依靠高效的推理能力 + 动态获取知识的能力 + 经验积累机制。
379 133
|
11天前
|
存储 人工智能 Java
AI 超级智能体全栈项目阶段二:Prompt 优化技巧与学术分析 AI 应用开发实现上下文联系多轮对话
本文讲解 Prompt 基本概念与 10 个优化技巧,结合学术分析 AI 应用的需求分析、设计方案,介绍 Spring AI 中 ChatClient 及 Advisors 的使用。
474 131
AI 超级智能体全栈项目阶段二:Prompt 优化技巧与学术分析 AI 应用开发实现上下文联系多轮对话
|
5天前
|
存储 安全 前端开发
如何将加密和解密函数应用到实际项目中?
如何将加密和解密函数应用到实际项目中?
212 138
|
11天前
|
人工智能 Java API
AI 超级智能体全栈项目阶段一:AI大模型概述、选型、项目初始化以及基于阿里云灵积模型 Qwen-Plus实现模型接入四种方式(SDK/HTTP/SpringAI/langchain4j)
本文介绍AI大模型的核心概念、分类及开发者学习路径,重点讲解如何选择与接入大模型。项目基于Spring Boot,使用阿里云灵积模型(Qwen-Plus),对比SDK、HTTP、Spring AI和LangChain4j四种接入方式,助力开发者高效构建AI应用。
455 122
AI 超级智能体全栈项目阶段一:AI大模型概述、选型、项目初始化以及基于阿里云灵积模型 Qwen-Plus实现模型接入四种方式(SDK/HTTP/SpringAI/langchain4j)
|
5天前
|
存储 JSON 安全
加密和解密函数的具体实现代码
加密和解密函数的具体实现代码
230 136
|
22天前
|
机器学习/深度学习 人工智能 前端开发
通义DeepResearch全面开源!同步分享可落地的高阶Agent构建方法论
通义研究团队开源发布通义 DeepResearch —— 首个在性能上可与 OpenAI DeepResearch 相媲美、并在多项权威基准测试中取得领先表现的全开源 Web Agent。
1548 87
|
23天前
|
弹性计算 关系型数据库 微服务
基于 Docker 与 Kubernetes(K3s)的微服务:阿里云生产环境扩容实践
在微服务架构中,如何实现“稳定扩容”与“成本可控”是企业面临的核心挑战。本文结合 Python FastAPI 微服务实战,详解如何基于阿里云基础设施,利用 Docker 封装服务、K3s 实现容器编排,构建生产级微服务架构。内容涵盖容器构建、集群部署、自动扩缩容、可观测性等关键环节,适配阿里云资源特性与服务生态,助力企业打造低成本、高可靠、易扩展的微服务解决方案。
1368 8