Twitter Storm - DRPC

简介:

DRPC ,Distributed Remote Procedure Call 
RPC本身是个成熟和古老的概念, Storm里面引入DRPC主要是利用storm的实时计算能力来并行化CPU intensive的计算

DRPC, 只是storm应用的一个场景, 并且storm提供相应的编程框架, 以方便程序员

提供DRPC server的实现, 并提供对DRPC场景经行封装的Topology  
对于storm, Topology内部其实是没有任何区别的, 主要就是实现和封装Topology和DRPC Server之间交互的spout和bolt  
具体交互过程如下图,

image

 

LinearDRPCTopologyBuilder

Storm封装了这样一个Topology实现LinearDRPCTopologyBuilder的topology builder

会使用DRPCSpout从DRPC服务器接收函数调用流, 每个函数调用被DRPC服务器标记了一个唯一的id. 
这个topology然后计算结果, 在topology的最后一个叫做ReturnResults的bolt会连接到DRPC服务器, 并且把这个调用的结果发送给DRPC服务器(通过那个唯一的id标识). 
DRPC服务器用那个唯一id来跟等待的客户端匹配上,唤醒这个客户端并且把结果发送给它.

public static class ExclaimBolt implements IBasicBolt {
    public void prepare(Map conf, TopologyContext context) {
    }
 
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String input = tuple.getString(1);
        collector.emit(new Values(tuple.getValue(0), input + "!"));
    }
 
    public void cleanup() {
    }
 
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "result"));
    }
 
}
 
public static void main(String[] args) throws Exception {
    LinearDRPCTopologyBuilder builder
        = new LinearDRPCTopologyBuilder("exclamation");
    builder.addBolt(new ExclaimBolt(), 3);
    // ...
}

使用LinearDRPCTopologyBuilder

首先, 使用builder创建topology的时候, topology name要等于function name(PRC call) 
其次, 这里不需要指定spout, 因为已经做了封装, 会自动从DRPC server读取数据 
再者, 所有bolt的输出的第一个field必须是request-id, 最后一般bolt的输出一定是(request-id, result) 
最后, builder会自动将结果发给DRPC server

 

A more complex example, ReachTopology

前面的例子, 无法说明为什么要使用storm来实现RPC, 那个操作直接在RPC server上就可以完成 
当只有对并行计算和数据量有一定要求时, 才能体现出价值... 
ReachTopology, 每个人发送的URL都会被所有follower收到, 所以要计算某URL的reach, 只需要如下几步, 
找出所有发送该URL的tweeter, 取出他们的follower, 去重复, 计数

Let's look at a more complex example which really needs the parallelism a Storm cluster provides for computing the DRPC function. 
The example we'll look at is computing the reach of a URL on Twitter.

The reach of a URL is the number of unique people exposed to a URL on Twitter. To compute reach, you need to:

  1. Get all the people who tweeted the URL
  2. Get all the followers of all those people
  3. Unique the set of followers
  4. Count the unique set of followers
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
builder.addBolt(new GetTweeters(), 3);
builder.addBolt(new GetFollowers(), 12)
        .shuffleGrouping();
builder.addBolt(new PartialUniquer(), 6)
        .fieldsGrouping(new Fields("id", "follower"));
builder.addBolt(new CountAggregator(), 2)
        .fieldsGrouping(new Fields("id"));

首先对于LinearDRPCTopologyBuilder中, 没有各个spout和bolt的id, 可能因为是线性的, 所以没有必要显式指明关系, 按加入顺序就可以 
GetTweeters(), 输入(rpc-id, url), 输出(rpc-id, tweeter) 
GetFollowers(), 输入(rpc-id, tweeter), 输出(rpc-id, follower)

对于PartialUniquer仔细分析一下, 
首先是基于id和follower的grouping, fieldsGrouping(new Fields("id", "follower")), 因为可能由多个RPC同时执行, 所以只需要保证同一个RPC的followers被发到同一个task即可. 
接着继承BaseBatchBolt, 可以在finishBatch中emit结果 
使用HashSet来存放follower, 保证不重复

    public static class PartialUniquer extends BaseBatchBolt {
        BatchOutputCollector _collector;
        Object _id;
        Set<String> _followers = new HashSet<String>();
        
        @Override
        public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
            _collector = collector;
            _id = id;
        }

        @Override
        public void execute(Tuple tuple) {
            _followers.add(tuple.getString(1));
        }
        
        @Override
        public void finishBatch() {
            _collector.emit(new Values(_id, _followers.size()));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id", "partial-count"));
        }
    }

 

Non-linear DRPC topologies

当前LinearDRPCTopologyBuilder只能handle线性的bolt workflow 

LinearDRPCTopologyBuilder only handles "linear" DRPC topologies, where the computation is expressed as a sequence of steps (like reach). It's not hard to imagine functions that would require a more complicated topology with branching and merging of the bolts. For now, to do this you'll need to drop down into using CoordinatedBolt directly. Be sure to talk about your use case for non-linear DRPC topologies on the mailing list to inform the construction of more general abstractions for DRPC topologies.


本文章摘自博客园,原文发布日期:2013-05-08

目录
相关文章
|
开发框架 分布式计算 Java
storm 介绍|学习笔记
快速学习 storm 介绍
155 0
storm 介绍|学习笔记
|
存储 监控 安全
storm笔记:storm集群
Strom集群结构是有一个主节点(nimbus)和多个工作节点(supervisor)组成的主从结构,主节点通过配置静态指定(还有一种主从结构是在运行时动态选举,比如zookeeper)。通常这种主从结构存在出现单点故障的风险,Storm通过特殊处理规避这种风险,后面将解释Storm的半容错结构。
430 0
storm笔记:storm集群
|
存储 消息中间件 分布式计算
Storm 简介
场景 伴随着信息科技日新月异的发展,信息呈现出爆发式的膨胀,人们获取信息的途径也更加多样、更加便捷,同时对于信息的时效性要求也越来越高。举个搜索场景中的例子,当一个卖家发布了一条宝贝信息时,他希望的当然是这个宝贝马上就可以被卖家搜索出来、点击、购买啦,相反,如果这个宝贝要等到第二天或者更久才可以被搜出来,估计这个大哥就要骂娘了。再举一个推荐的例子,如果用户昨天在淘宝上买了一双袜子,今天想买一副泳镜去游泳,但是却发现系统在不遗余力地给他推荐袜子、鞋子,根本对他今天寻找泳镜的行为视而不见,估计这哥们心里就会想推荐你妹呀。其实稍微了解点背景知识的码农们都知道,这是因为后台系统做的是每天一次的全量处理
158 0
|
大数据 测试技术 Apache
Apache Cassandra 在 Facebook 的应用
在 Instagram (Instagram是Facebook公司旗下一款免费提供在线图片及视频分享的社交应用软件,于2010年10月发布。)上,我们拥有世界上最大的 Apache Cassandra 数据库部署。
3896 0
|
存储 分布式计算 Java
|
消息中间件 Java 开发工具
|
Java 流计算
storm从入门到放弃(一),storm介绍
背景:目前就职于国内最大的IT咨询公司,恰巧又是毕业季,所在部门招了20多个应届毕业生,本人要跟部门新人进行为期一个月的大数据入职培训,特此将整理的文档分享出来。 原文和作者一起讨论:http://www.cnblogs.com/intsmaze/p/7274361.html 微信:intsmaze 避免微信回复重复咨询问题,技术咨询请博客留言。
1518 0

热门文章

最新文章