Twitter Storm - DRPC

简介:

DRPC ,Distributed Remote Procedure Call 
RPC本身是个成熟和古老的概念, Storm里面引入DRPC主要是利用storm的实时计算能力来并行化CPU intensive的计算

DRPC, 只是storm应用的一个场景, 并且storm提供相应的编程框架, 以方便程序员

提供DRPC server的实现, 并提供对DRPC场景经行封装的Topology  
对于storm, Topology内部其实是没有任何区别的, 主要就是实现和封装Topology和DRPC Server之间交互的spout和bolt  
具体交互过程如下图,

image

 

LinearDRPCTopologyBuilder

Storm封装了这样一个Topology实现LinearDRPCTopologyBuilder的topology builder

会使用DRPCSpout从DRPC服务器接收函数调用流, 每个函数调用被DRPC服务器标记了一个唯一的id. 
这个topology然后计算结果, 在topology的最后一个叫做ReturnResults的bolt会连接到DRPC服务器, 并且把这个调用的结果发送给DRPC服务器(通过那个唯一的id标识). 
DRPC服务器用那个唯一id来跟等待的客户端匹配上,唤醒这个客户端并且把结果发送给它.

public static class ExclaimBolt implements IBasicBolt {
    public void prepare(Map conf, TopologyContext context) {
    }
 
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String input = tuple.getString(1);
        collector.emit(new Values(tuple.getValue(0), input + "!"));
    }
 
    public void cleanup() {
    }
 
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "result"));
    }
 
}
 
public static void main(String[] args) throws Exception {
    LinearDRPCTopologyBuilder builder
        = new LinearDRPCTopologyBuilder("exclamation");
    builder.addBolt(new ExclaimBolt(), 3);
    // ...
}

使用LinearDRPCTopologyBuilder

首先, 使用builder创建topology的时候, topology name要等于function name(PRC call) 
其次, 这里不需要指定spout, 因为已经做了封装, 会自动从DRPC server读取数据 
再者, 所有bolt的输出的第一个field必须是request-id, 最后一般bolt的输出一定是(request-id, result) 
最后, builder会自动将结果发给DRPC server

 

A more complex example, ReachTopology

前面的例子, 无法说明为什么要使用storm来实现RPC, 那个操作直接在RPC server上就可以完成 
当只有对并行计算和数据量有一定要求时, 才能体现出价值... 
ReachTopology, 每个人发送的URL都会被所有follower收到, 所以要计算某URL的reach, 只需要如下几步, 
找出所有发送该URL的tweeter, 取出他们的follower, 去重复, 计数

Let's look at a more complex example which really needs the parallelism a Storm cluster provides for computing the DRPC function. 
The example we'll look at is computing the reach of a URL on Twitter.

The reach of a URL is the number of unique people exposed to a URL on Twitter. To compute reach, you need to:

  1. Get all the people who tweeted the URL
  2. Get all the followers of all those people
  3. Unique the set of followers
  4. Count the unique set of followers
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
builder.addBolt(new GetTweeters(), 3);
builder.addBolt(new GetFollowers(), 12)
        .shuffleGrouping();
builder.addBolt(new PartialUniquer(), 6)
        .fieldsGrouping(new Fields("id", "follower"));
builder.addBolt(new CountAggregator(), 2)
        .fieldsGrouping(new Fields("id"));

首先对于LinearDRPCTopologyBuilder中, 没有各个spout和bolt的id, 可能因为是线性的, 所以没有必要显式指明关系, 按加入顺序就可以 
GetTweeters(), 输入(rpc-id, url), 输出(rpc-id, tweeter) 
GetFollowers(), 输入(rpc-id, tweeter), 输出(rpc-id, follower)

对于PartialUniquer仔细分析一下, 
首先是基于id和follower的grouping, fieldsGrouping(new Fields("id", "follower")), 因为可能由多个RPC同时执行, 所以只需要保证同一个RPC的followers被发到同一个task即可. 
接着继承BaseBatchBolt, 可以在finishBatch中emit结果 
使用HashSet来存放follower, 保证不重复

    public static class PartialUniquer extends BaseBatchBolt {
        BatchOutputCollector _collector;
        Object _id;
        Set<String> _followers = new HashSet<String>();
        
        @Override
        public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
            _collector = collector;
            _id = id;
        }

        @Override
        public void execute(Tuple tuple) {
            _followers.add(tuple.getString(1));
        }
        
        @Override
        public void finishBatch() {
            _collector.emit(new Values(_id, _followers.size()));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id", "partial-count"));
        }
    }

 

Non-linear DRPC topologies

当前LinearDRPCTopologyBuilder只能handle线性的bolt workflow 

LinearDRPCTopologyBuilder only handles "linear" DRPC topologies, where the computation is expressed as a sequence of steps (like reach). It's not hard to imagine functions that would require a more complicated topology with branching and merging of the bolts. For now, to do this you'll need to drop down into using CoordinatedBolt directly. Be sure to talk about your use case for non-linear DRPC topologies on the mailing list to inform the construction of more general abstractions for DRPC topologies.


本文章摘自博客园,原文发布日期:2013-05-08

目录
相关文章
|
JavaScript 网络协议 Dubbo
Facebook分布式框架—Thrift介绍。
Thrift介绍 Thrift是一个分布式RPC框架,用来进行可扩展且跨语言的服务的开发。它结合了功能强大的软件堆栈和代码生成引擎,以构建在 C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml这些编程语言间无缝结合的、高效的服务。
411 0
Facebook分布式框架—Thrift介绍。
|
存储 SQL 运维
storm笔记:Trident应用
Trident是基于Storm的实时计算模型的高级抽象。它可以实现高吞吐(每秒数百万条消息)的有状态流处理和低延迟分布式查询。
195 0
storm笔记:Trident应用
|
搜索推荐 开发工具
完全基于开源软件构建的 Twitter
Twitter 宣布其整个系统基于开源软件系统构建,同时该公司也贡献很多开源的软件,并为这些开源软件专门制作一个汇集页面 http://twitter.github.io/
154 0
完全基于开源软件构建的 Twitter
|
大数据 测试技术 Apache
Apache Cassandra 在 Facebook 的应用
在 Instagram (Instagram是Facebook公司旗下一款免费提供在线图片及视频分享的社交应用软件,于2010年10月发布。)上,我们拥有世界上最大的 Apache Cassandra 数据库部署。
3889 0
|
消息中间件 Java 开发工具
|
Java 流计算
storm从入门到放弃(一),storm介绍
背景:目前就职于国内最大的IT咨询公司,恰巧又是毕业季,所在部门招了20多个应届毕业生,本人要跟部门新人进行为期一个月的大数据入职培训,特此将整理的文档分享出来。 原文和作者一起讨论:http://www.cnblogs.com/intsmaze/p/7274361.html 微信:intsmaze 避免微信回复重复咨询问题,技术咨询请博客留言。
1510 0