Storm官方提供的trident单词计数的例子

简介:

上代码:

复制代码
 1 public class TridentWordCount {
 2   public static class Split extends BaseFunction {
 3     @Override
 4     public void execute(TridentTuple tuple, TridentCollector collector) {
 5       String sentence = tuple.getString(0);
 6       for (String word : sentence.split(" ")) {
 7         collector.emit(new Values(word));
 8       }
 9     }
10   }
11 
12   public static StormTopology buildTopology(LocalDRPC drpc) {
13     //这个是一个batch Spout  一次发3个..
14     FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
15         new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
16         new Values("how many apples can you eat"), new Values("to be or not to be the person"));
17     spout.setCycle(true);//Spout是否循环发送
18 
19     TridentTopology topology = new TridentTopology();
20     TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16)//类似于setSpout
21             .each(new Fields("sentence"),new Split(), new Fields("word"))//setbolt
22             .groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(),new Count(), new Fields("count")).parallelismHint(16);
23     
24     topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields(
25         "word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"),
26         new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum"));
27     return topology.build();
28   }
29 
30   public static void main(String[] args) throws Exception {
31     Config conf = new Config();
32     conf.setMaxSpoutPending(20);
33     if (args.length == 0) {
34       LocalDRPC drpc = new LocalDRPC();
35       LocalCluster cluster = new LocalCluster();
36       cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
37       for (int i = 0; i < 100; i++) {
38         System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));
39         Thread.sleep(1000);
40       }
41     }
42     else {
43       conf.setNumWorkers(3);
44       StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));
45     }
46   }
47 }
复制代码

 


本文转自SummerChill博客园博客,原文链接:http://www.cnblogs.com/DreamDrive/p/6675985.html,如需转载请自行联系原作者

相关文章
|
7天前
|
消息中间件 资源调度 Java
用Java实现samza转换成flink
【10月更文挑战第20天】
|
分布式计算 Hadoop 流计算
Storm集群部署与单词计数程序
Storm集群部署与单词计数程序
84 0
|
分布式计算 资源调度 Java
Hadoop中的MapReduce概述、优缺点、核心思想、编程规范、进程、官方WordCount源码、提交到集群测试、常用数据序列化类型、WordCount案例实操
Hadoop中的MapReduce概述、优缺点、核心思想、编程规范、进程、官方WordCount源码、提交到集群测试、常用数据序列化类型、WordCount案例实操
Hadoop中的MapReduce概述、优缺点、核心思想、编程规范、进程、官方WordCount源码、提交到集群测试、常用数据序列化类型、WordCount案例实操
|
Java Maven 流计算
Storm第一个入门例子之Wordcount(windows本地)
Storm第一个入门例子之Wordcount(windows本地)
137 0
Storm第一个入门例子之Wordcount(windows本地)
|
资源调度 分布式计算 Java
MapReduce入门例子之WordCount单词计数
MapReduce入门例子之WordCount单词计数
170 0
MapReduce入门例子之WordCount单词计数
|
存储 消息中间件 缓存
storm笔记:Trident状态
在storm笔记:Trident应用中说了下Trident的使用,这里说下Trident几种状态的变化及其对应API的使用。
95 0
storm笔记:Trident状态
|
存储 SQL 运维
storm笔记:Trident应用
Trident是基于Storm的实时计算模型的高级抽象。它可以实现高吞吐(每秒数百万条消息)的有状态流处理和低延迟分布式查询。
194 0
storm笔记:Trident应用