上代码:
1 public class TridentWordCount { 2 public static class Split extends BaseFunction { 3 @Override 4 public void execute(TridentTuple tuple, TridentCollector collector) { 5 String sentence = tuple.getString(0); 6 for (String word : sentence.split(" ")) { 7 collector.emit(new Values(word)); 8 } 9 } 10 } 11 12 public static StormTopology buildTopology(LocalDRPC drpc) { 13 //这个是一个batch Spout 一次发3个.. 14 FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), 15 new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), 16 new Values("how many apples can you eat"), new Values("to be or not to be the person")); 17 spout.setCycle(true);//Spout是否循环发送 18 19 TridentTopology topology = new TridentTopology(); 20 TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16)//类似于setSpout 21 .each(new Fields("sentence"),new Split(), new Fields("word"))//setbolt 22 .groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(),new Count(), new Fields("count")).parallelismHint(16); 23 24 topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields( 25 "word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"), 26 new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum")); 27 return topology.build(); 28 } 29 30 public static void main(String[] args) throws Exception { 31 Config conf = new Config(); 32 conf.setMaxSpoutPending(20); 33 if (args.length == 0) { 34 LocalDRPC drpc = new LocalDRPC(); 35 LocalCluster cluster = new LocalCluster(); 36 cluster.submitTopology("wordCounter", conf, buildTopology(drpc)); 37 for (int i = 0; i < 100; i++) { 38 System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped")); 39 Thread.sleep(1000); 40 } 41 } 42 else { 43 conf.setNumWorkers(3); 44 StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null)); 45 } 46 } 47 }
本文转自SummerChill博客园博客,原文链接:http://www.cnblogs.com/DreamDrive/p/6675985.html,如需转载请自行联系原作者