Apache Storm技术实战

简介: “源码走读系列”从代码层面分析了storm的具体实现,接下来通过具体的实例来说明storm的使用。因为目前storm已经正式迁移到Apache。WordCountTopology 使用storm来统计文件中的每个单词的出现次数。通过BasicDRPCTopology的实例来分析DRPCTopolo

 WordCountTopology

“源码走读系列”从代码层面分析了storm的具体实现,接下来通过具体的实例来说明storm的使用。因为目前storm已经正式迁移到Apache,文章系列也由twitter storm转为apache storm.

WordCountTopology 使用storm来统计文件中的每个单词的出现次数。

通过该例子来说明tuple发送时的几个要素

  1. source component   发送源
  2. destination component 接收者
  3. stream 消息通道
  4. tuple    消息本身

本文涉及到的开发环境搭建可以参考前面的两篇博文。

  1. arch linux简明安装指南
  2. 在archlinux上搭建storm cluster

awk实现

其实对文件中的单词进行统计是Linux下一个很常见的任务,用awk就可以轻松的解决(如果文件不是太大的话),下面是进行word counting的awk脚本,将其保存为名为wordcount.awk文件。

wordcount.awk

复制代码
{
  for (i = 1; i<=NF; i++)
    freq[$i]++
}
END{
  for (word in freq)
    printf "%s\t%d\n",word,freq[word]
}
复制代码

运行该脚本,对文件中的单词进行统计

gawk -f wordcount.awk filename

原始版本

从github上复制内容

git clone https://github.com/nathanmarz/storm-starter.git

编译运行

lein deps
lein compile
java -cp $(lein classpath) WordCountTopology

main函数

main函数的主要内容

    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("spout", new RandomSentenceSpout(), 5);
    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

注意:grouping操作的时候,如果没有显示指定stream id,则使用的是default stream. 如shuffleGrouping("spout")表示从名为spout的component中接收从default stream发送过来的tuple.

改进版本

在原始版本中,spout不停的向split bolt随机发送句子,Count bolt统计每个单词出现的次数。

那么能不能让Spout在读取完文件之后,通知下游的bolt显示最柊的统计结果呢?

要想达到上述的改进目标,采用如上图所示的结构即可。改变的地方如下,

  1. 在Spout中添加一个SUCCESS_STREAM
  2. 添加只有一个运行实例的statistics bolt
  3. 当spout读取完文件内容之后,通过SUCCESS_STREAM告诉statistics bolt,文件已经处理完毕,可以打印当前的统计结果

RandomSentenceSpout.java

declareOutputFields

添加SUCCESS_STREAM

@Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
    declarer.declareStream("SUCCESS_STREAM",new Fields("word"));
  }

nextTuple

使用SUCCESS_STREAM通知下游,文件处理完毕

复制代码
@Override
  public void nextTuple() {
    Utils.sleep(100);
    String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
        "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
    if ( count == sentences.length ) 
    {
      System.out.println(count+" try to emit tuple by success_stream");
      _collector.emit("SUCCESS_STREAM",new Values(sentences[0]));
      count++;
    }else if ( count < sentences.length ){
      _collector.emit(new Values(sentences[count]));
      count++;
    }
  }
复制代码

WordCountTopology.java

添加静态类WordCount2

复制代码
public static class WordCount2 extends BaseBasicBolt {
    Map<String, Integer> counts = new HashMap<String, Integer>();

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
      if ( tuple.getSourceStreamId() == "SUCCESS_STREAM" ) {
    System.out.println("prepare to print the statistics");
    for (String key : counts.keySet()) {
      System.out.println(key+"\t"+counts.get(key));
    }
    System.out.println("finish printing");
      }else {

    String word = tuple.getString(0);
    Integer count = counts.get(word);
    if (count == null)
      count = 0;
    count++;
    counts.put(word, count);
      }
    }
复制代码

 

main函数

将spout的并行数由5改为1

 builder.setSpout("spout", new RandomSentenceSpout(), 1);

在原有的Topology中添加WordCount2 Bolt

 builder.setBolt("count2", new WordCount2(), 1).globalGrouping("count").globalGrouping("spout","SUCCESS_STREAM");

WordCount2 Bolt会接收从Count Bolt通过default stream发送的tuple,同时接收Spout通过SUCCESS_STREAM发送的tuple,也就是说wordcount2会接收从两个stream来的数据。

编译

编译修改后的源文件

cd $STROM_STARTER
lein compile storm.starter

可能会出现以下异常信息,该异常可以忽略。

Exception in thread "main" java.io.FileNotFoundException: Could not locate storm/starter/WordCountTopology__init.class or storm/starter/WordCountTopology.clj on classpath:

运行

在local模式下运行修改后的WordCountTopology

java -cp $(lein classpath) storm.starter.WordCountTopology

如果一切正常,日志如下所示,线程的名字可能会有所不同。

复制代码
moon    1
score    1
cow    1
doctor    1
over    1
nature    1
snow    1
four    1
keeps    1
with    1
a    1
white    1
dwarfs    1
at    1
the    4
and    2
i    1
two    1
away    1
seven    2
apple    1
am    1
an    1
jumped    1
day    1
years    1
ago    1
复制代码

 结果验证

可以将WordCountTopology的运行结果和awk脚本的运行结果相比对,结果应该是一致的。

小技巧

  1. awk脚本的执行结果存为一个文件result1.log, WordCountTopology的输出中单词统计部分存为result2.log
  2. 用vim打开result1.log,进行sorting,保存结果;用vim打开result2.log,进行sorting,保存。
  3. 然后用vimdiff来进行比较 vimdiff result1.log result2.log

BasicDRPCTopology

本文通过BasicDRPCTopology的实例来分析DRPCTopology在提交的时候, Topology中究竟含有哪些内容?

BasicDRPCTopology

main函数

DRPC 分布式远程调用(这个说法有意思,远程调用本来就是分布的,何须再加个D, <头文字D>看多了, :)

 

复制代码
public static void main(String[] args) throws Exception {
    LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
    builder.addBolt(new ExclaimBolt(), 3);

    Config conf = new Config();

    if (args == null || args.length == 0) {
      LocalDRPC drpc = new LocalDRPC();
      LocalCluster cluster = new LocalCluster();

      cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));

      for (String word : new String[]{ "hello", "goodbye" }) {
        System.out.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word));
      }

      cluster.shutdown();
      drpc.shutdown();
    }
    else {
      conf.setNumWorkers(3);
      StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());
    }
  }
复制代码

 

问题: 上面的代码中只是添加了一个bolt,并没有设定Spout. 我们知道一个topology中最起码得有一个Spout,那么这里的Spout又隐身于何处呢?

关键的地方就在builder.createLocalTopology, 调用关系如下

  • LinearDRPCTopologyBuilder::createLocalTopology
    •   LinearDRPCTopologyBuilder::createTopology()
      •   LinearDRPCTopologyBuilder::createTopology(new DRPCSpout(_function))

       

原来DRPCTopology中使用的Spout是DRPCSpout.

LinearDRPCTopology::createTopology

既然代码已经读到此处,何不再进一步看看createTopology的实现.

简要说明一下该段代码的处理逻辑:

  1. 设置DRPCSpout
  2. 以bolt为入参,创建CoordinatedBolt
  3. 添加JoinResult Bolt
  4. 添加ReturnResult Bolt: ReturnResultBolt连接到DRPCServer,并返回结果
复制代码
    private StormTopology createTopology(DRPCSpout spout) {
        final String SPOUT_ID = "spout";
        final String PREPARE_ID = "prepare-request";
        
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(SPOUT_ID, spout);
        builder.setBolt(PREPARE_ID, new PrepareRequest())
                .noneGrouping(SPOUT_ID);
        int i=0;
        for(; i<_components.size();i++) {
            Component component = _components.get(i);
            
            Map<String, SourceArgs> source = new HashMap<String, SourceArgs>();
            if (i==1) {
                source.put(boltId(i-1), SourceArgs.single());
            } else if (i>=2) {
                source.put(boltId(i-1), SourceArgs.all());
            }
            IdStreamSpec idSpec = null;
            if(i==_components.size()-1 && component.bolt instanceof FinishedCallback) {
                idSpec = IdStreamSpec.makeDetectSpec(PREPARE_ID, PrepareRequest.ID_STREAM);
            }
            BoltDeclarer declarer = builder.setBolt(
                    boltId(i),
                    new CoordinatedBolt(component.bolt, source, idSpec),
                    component.parallelism);
            
            for(Map conf: component.componentConfs) {
                declarer.addConfigurations(conf);
            }
            
            if(idSpec!=null) {
                declarer.fieldsGrouping(idSpec.getGlobalStreamId().get_componentId(), PrepareRequest.ID_STREAM, new Fields("request"));
            }
            if(i==0 && component.declarations.isEmpty()) {
                declarer.noneGrouping(PREPARE_ID, PrepareRequest.ARGS_STREAM);
            } else {
                String prevId;
                if(i==0) {
                    prevId = PREPARE_ID;
                } else {
                    prevId = boltId(i-1);
                }
                for(InputDeclaration declaration: component.declarations) {
                    declaration.declare(prevId, declarer);
                }
            }
            if(i>0) {
                declarer.directGrouping(boltId(i-1), Constants.COORDINATED_STREAM_ID); 
            }
        }
        
        IRichBolt lastBolt = _components.get(_components.size()-1).bolt;
        OutputFieldsGetter getter = new OutputFieldsGetter();
        lastBolt.declareOutputFields(getter);
        Map<String, StreamInfo> streams = getter.getFieldsDeclaration();
        if(streams.size()!=1) {
            throw new RuntimeException("Must declare exactly one stream from last bolt in LinearDRPCTopology");
        }
        String outputStream = streams.keySet().iterator().next();
        List<String> fields = streams.get(outputStream).get_output_fields();
        if(fields.size()!=2) {
            throw new RuntimeException("Output stream of last component in LinearDRPCTopology must contain exactly two fields. The first should be the request id, and the second should be the result.");
        }

        builder.setBolt(boltId(i), new JoinResult(PREPARE_ID))
                .fieldsGrouping(boltId(i-1), outputStream, new Fields(fields.get(0)))
                .fieldsGrouping(PREPARE_ID, PrepareRequest.RETURN_STREAM, new Fields("request"));
        i++;
        builder.setBolt(boltId(i), new ReturnResults())
                .noneGrouping(boltId(i-1));
        return builder.createTopology();
    }
复制代码

 

Bolt

处理逻辑: 在接收到的每一个单词后面添加'!'.

复制代码
 public static class ExclaimBolt extends BaseBasicBolt {
    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
      String input = tuple.getString(1);
      collector.emit(new Values(tuple.getValue(0), input + "!"));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("id", "result"));
    }
  }
复制代码

运行

java -cp $(lein classpath) storm.starter.BasicDRPCTopology

 TridentWordCount

介绍TridentTopology的使用,重点分析newDRPCStream和stateQuery的实现机理。 

使用TridentTopology进行数据处理的时候,经常会使用State来保存一些状态,这些保存起来的State通过stateQuery来进行查询。问题恰恰在这里产生,即对state进行更新的Stream和尔后进行stateQuery的Stream并非同一个,那么它们之间是如何关联起来的呢。

在TridentTopology中,有一些Processor可能会同处于一个Bolt中,这些Processor形成一个processing chain, 那么Tuple又是如何在这些Processor之间进行传递的呢。

TridentWordCount

编译和运行

lein compile storm.starter.trident.TridentWordCount
java -cp $(lein classpath) storm.starter.trident.TridentWordCount 

main函数

复制代码
public static void main(String[] args) throws Exception {
    Config conf = new Config();
    conf.setMaxSpoutPending(20);
    if (args.length == 0) {
      LocalDRPC drpc = new LocalDRPC();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
      for (int i = 0; i < 100; i++) {
        System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));
        Thread.sleep(1000);
      }
    }
    else {
      conf.setNumWorkers(3);
      StormSubmitter.submitTopology(args[0], conf, buildTopology(null));
    }
  } 
复制代码

buildTopology

复制代码
 public static StormTopology buildTopology(LocalDRPC drpc) {
    FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
        new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
        new Values("how many apples can you eat"), new Values("to be or not to be the person"));
    spout.setCycle(true);

    TridentTopology topology = new TridentTopology();
    TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),
        new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(),
        new Count(), new Fields("count")).parallelismHint(16);

    topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields(
        "word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"),
        new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum"));
    return topology.build();
  } 
复制代码

示意图

 

在整个topology中,有两个不同的spout。

运行结果该如何理解

此图有好几个问题

  1. PartitionPersistProcessor和StateQueryProcessor同处于一个bolt,该bolt为SubtopologyBolt
  2. SubtopologyBolt有来自多个不同Stream的输入,根据不同的Streamid找到对应的InitialReceiver
  3. drpcspout在执行的时候,是一直不停的emit消息到SubtopologyBolt,还是发送完一次消息就停止发送

不同的tuple,其sourcestream不一样,根据SourceStream,找到对应的InitialReceiver

    Map<String, InitialReceiver> _roots = new HashMap(); 

 

状态更新

进行状态更新的Processor名为PartitionPersistProcessor

execute

记录哪些tuple需要进行状态更新

finishBatch

状态真正更新是发生在finishBatch阶段

persistentAggregate

PartitionPersistProcessor

  • SubtopologyBolt::execute
    • PartitionPersistProcessor::finishBatch
      •   _updater::updateState
        • Snapshottable::update

当状态更新的时候,状态查询是否会发生?

状态查询

进行状态查询的Processor名为StateQueryProcessor

execute

finishBatch

查询的时候,首先调用batchRetrieive来获得最新的状态更新结果,再对每个最新的结果使用_function来进行处理。

调用层次

  • SubtopologyBolt::finishBatch
    •   StateQueryProcessor::finishBatch
      • _function.batchRetrieve
      • _function.execute   将处理过的结果发送给下一跳进行处理

         

消息的传递

TridentTuple

如何决定bolt内部的哪个processor来处理接收到的消息,这个是根据不同的Stream来判断InitialReceiver完成。

当SubtopologyBolt接收到最原始的tuple时,根据streamid找到InitialReceiver后,InitialReceiver在receive函数中作的第一件事情就是根据tuple来创建一个tridenttuple,tridenttuple会被处在同一个SubtopologyBolt中的processor一一处理,处理的结果是保存在tridenttuple和processorcontext中。

ProcessorContext

ProcessorContext记录两个重要的信息,即当前的batchId和batchState.

复制代码
public class ProcessorContext {
    public Object batchId;
    public Object[] state;
    
    public ProcessorContext(Object batchId, Object[] state) {
        this.batchId = batchId;
        this.state = state;
    }
}
复制代码

TridentCollector

tridentcollector在emit的时候将消息由各个TupleReceiver进行处理。目前仅有BridgeReceiver实现了该接口。

BridgeReceiver负责将消息发送给另外的Bolt进行处理。这里说的“另外的Bolt”是指Vanilla Topology中的Bolt.

目录
相关文章
|
2月前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
101 5
|
4月前
|
消息中间件 资源调度 API
Apache Flink 流批融合技术介绍
本文源自阿里云高级研发工程师周云峰在Apache Asia Community OverCode 2024的分享,内容涵盖从“流批一体”到“流批融合”的演进、技术解决方案及社区进展。流批一体已在API、算子和引擎层面实现统一,但用户仍需手动配置作业模式。流批融合旨在通过动态调整优化策略,自动适应不同场景需求。文章详细介绍了如何通过量化指标(如isProcessingBacklog和isInsertOnly)实现这一目标,并展示了针对不同场景的具体优化措施。此外,还概述了社区当前进展及未来规划,包括将优化方案推向Flink社区、动态调整算子流程结构等。
440 31
Apache Flink 流批融合技术介绍
|
3月前
|
存储 分布式计算 druid
大数据-149 Apache Druid 基本介绍 技术特点 应用场景
大数据-149 Apache Druid 基本介绍 技术特点 应用场景
78 1
大数据-149 Apache Druid 基本介绍 技术特点 应用场景
|
3月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
56 3
|
4月前
|
分布式计算 Java Apache
Apache Spark Streaming技术深度解析
【9月更文挑战第4天】Apache Spark Streaming是Apache Spark生态系统中用于处理实时数据流的一个重要组件。它将输入数据分成小批次(micro-batch),然后利用Spark的批处理引擎进行处理,从而结合了批处理和流处理的优点。这种处理方式使得Spark Streaming既能够保持高吞吐量,又能够处理实时数据流。
80 0
|
5月前
|
关系型数据库 Linux 网络安全
"Linux系统实战:从零开始部署Apache+PHP Web项目,轻松搭建您的在线应用"
【8月更文挑战第9天】Linux作为服务器操作系统,凭借其稳定性和安全性成为部署Web项目的优选平台。本文以Apache Web服务器和PHP项目为例,介绍部署流程。首先,通过包管理器安装Apache与PHP;接着创建项目目录,并上传项目文件至该目录;根据需要配置Apache虚拟主机;最后重启Apache服务并测试项目。确保防火墙允许HTTP流量,正确配置数据库连接,并定期更新系统以维持安全。随着项目复杂度提升,进一步学习高级配置将变得必要。
403 0
|
7月前
|
Java 数据库连接 Apache
深入理解Apache Commons Pool2池化技术
深入理解Apache Commons Pool2池化技术
|
7月前
|
存储 Apache 文件存储
在Apache环境下为Web网站增设访问控制:实战指南
在Apache服务器上保护网站资源涉及启用访问控制模块(`mod_authz_core`和`mod_auth_basic`),在`.htaccess`或`httpd.conf`中设定权限,如限制对特定目录的访问。创建`.htpasswd`文件存储用户名和密码,并使用`htpasswd`工具管理用户。完成配置后重启Apache服务,访问受限目录时需提供有效的用户名和密码。对于高安全性需求,可考虑更复杂的认证方法。【6月更文挑战第20天】
432 4
|
7月前
|
弹性计算 应用服务中间件 Linux
双剑合璧:在同一ECS服务器上共存Apache与Nginx的实战攻略
在ECS服务器上同时部署Apache和Nginx的实战:安装更新系统,Ubuntu用`sudo apt install apache2 nginx`,CentOS用`sudo yum install httpd nginx`。配置Nginx作为反向代理,处理静态内容及转发动态请求到Apache(监听8080端口)。调整Apache的`ports.conf`监听8080。重启服务测试,实现两者高效协同,提升Web服务性能。记得根据流量和需求优化配置。【6月更文挑战第21天】
610 1
|
6月前
|
分布式计算 Apache Spark

相关实验场景

更多

推荐镜像

更多