Strom的trident单词计数代码

简介:

 

复制代码
  1 /**
  2  * 单词计数
  3  */
  4 public class LocalTridentCount {
  5     
  6     public static class MyBatchSpout implements IBatchSpout {
  7 
  8         Fields fields;
  9         HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();
 10         
 11         public MyBatchSpout(Fields fields) {
 12             this.fields = fields;
 13         }
 14         @Override
 15         public void open(Map conf, TopologyContext context) {
 16         }
 17 
 18         @Override
 19         public void emitBatch(long batchId, TridentCollector collector) {
 20             List<List<Object>> batch = this.batches.get(batchId);
 21             if(batch == null){
 22                 batch = new ArrayList<List<Object>>();
 23                 Collection<File> listFiles = FileUtils.listFiles(new File("d:\\stormtest"), new String[]{"txt"}, true);
 24                 for (File file : listFiles) {
 25                     List<String> readLines;
 26                     try {
 27                         readLines = FileUtils.readLines(file);
 28                         for (String line : readLines) {
 29                             batch.add(new Values(line));
 30                         }
 31                         FileUtils.moveFile(file, new File(file.getAbsolutePath()+System.currentTimeMillis()));
 32                     } catch (IOException e) {
 33                         e.printStackTrace();
 34                     }
 35                     
 36                 }
 37                 if(batch.size()>0){
 38                     this.batches.put(batchId, batch);
 39                 }
 40             }
 41             for(List<Object> list : batch){
 42                 collector.emit(list);
 43             }
 44         }
 45 
 46         @Override
 47         public void ack(long batchId) {
 48             this.batches.remove(batchId);
 49         }
 50 
 51         @Override
 52         public void close() {
 53         }
 54 
 55         @Override
 56         public Map getComponentConfiguration() {
 57             Config conf = new Config();
 58             conf.setMaxTaskParallelism(1);
 59             return conf;
 60         }
 61 
 62         @Override
 63         public Fields getOutputFields() {
 64             return fields;
 65         }
 66         
 67     }
 68     
 69     /**
 70      * 对一行行的数据进行切割成一个个单词
 71      */
 72     public static class MySplit extends BaseFunction{
 73 
 74         @Override
 75         public void execute(TridentTuple tuple, TridentCollector collector) {
 76             String line = tuple.getStringByField("lines");
 77             String[] words = line.split("\t");
 78             for (String word : words) {
 79                 collector.emit(new Values(word));
 80             }
 81         }
 82         
 83     }
 84     
 85     public static class MyWordAgge extends BaseAggregator<Map<String, Integer>>{
 86 
 87         @Override
 88         public Map<String, Integer> init(Object batchId,
 89                 TridentCollector collector) {
 90             return new HashMap<String, Integer>();
 91         }
 92 
 93         @Override
 94         public void aggregate(Map<String, Integer> val, TridentTuple tuple,
 95                 TridentCollector collector) {
 96             String key = tuple.getString(0);
 97             /*Integer integer = val.get(key);
 98             if(integer==null){
 99                 integer=0;
100             }
101             integer++;
102             val.put(key, integer);*/
103             val.put(key, MapUtils.getInteger(val, key, 0)+1);
104         }
105 
106         @Override
107         public void complete(Map<String, Integer> val,
108                 TridentCollector collector) {
109             collector.emit(new Values(val));
110         }
111         
112     }
113     
114     /**
115      * 汇总局部的map,并且打印结果
116      *
117      */
118     public static class MyCountPrint extends BaseFunction{
119 
120         HashMap<String, Integer> hashMap = new HashMap<String, Integer>();
121         @Override
122         public void execute(TridentTuple tuple, TridentCollector collector) {
123             Map<String, Integer> map = (Map<String, Integer>)tuple.get(0);
124             for (Entry<String, Integer> entry : map.entrySet()) {
125                 String key = entry.getKey();
126                 Integer value = entry.getValue();
127                 Integer integer = hashMap.get(key);
128                 if(integer==null){
129                     integer=0;
130                 }
131                 hashMap.put(key, integer+value);
132             }
133             
134             Utils.sleep(1000);
135             System.out.println("==================================");
136             for (Entry<String, Integer> entry : hashMap.entrySet()) {
137                 System.out.println(entry);
138             }
139         }
140         
141     }
142     
143     
144     public static void main(String[] args) {
145         //大体流程:首先设置一个数据源MyBatchSpout,会监控指定目录下文件的变化,当发现有新文件的时候把文件中的数据取出来,
146         //然后封装到一个batch中发射出来.就会对tuple中的数据进行处理,把每个tuple中的数据都取出来,然后切割..切割成一个个的单词.
147         //单词发射出来之后,会对单词进行分组,会对一批假设有10个tuple,会对这10个tuple分完词之后的单词进行分组, 相同的单词分一块  
148         //分完之后聚合 把相同的单词使用同一个聚合器聚合  然后出结果  每个单词出现多少次...
149         //进行汇总  先每一批数据局部汇总  最后全局汇总....
150         //这个代码也不是很简单...挺多....就是使用批处理的方式.
151         
152         TridentTopology tridentTopology = new TridentTopology();
153         
154         tridentTopology.newStream("spoutid", new MyBatchSpout(new Fields("lines")))
155             .each(new Fields("lines"), new MySplit(), new Fields("word"))
156             .groupBy(new Fields("word"))//用到了分组 对一批tuple中的单词进行分组..
157             .aggregate(new Fields("word"), new MyWordAgge(), new Fields("wwwww"))//用到了聚合
158             .each(new Fields("wwwww"), new MyCountPrint(), new Fields(""));
159         
160         LocalCluster localCluster = new LocalCluster();
161         String simpleName = TridentMeger.class.getSimpleName();
162         localCluster.submitTopology(simpleName, new Config(), tridentTopology.build());
163     }
164 }
复制代码

指定路径下文件中的内容:

程序运行结果:


本文转自SummerChill博客园博客,原文链接:http://www.cnblogs.com/DreamDrive/p/6676021.html,如需转载请自行联系原作者

相关文章
|
分布式计算 API 流计算
22MyCat - Spark/Storm 对join扩展(简略)
22MyCat - Spark/Storm 对join扩展(简略)
55 0
|
分布式计算 Hadoop 流计算
Storm集群部署与单词计数程序
Storm集群部署与单词计数程序
92 0
|
消息中间件 存储 分布式计算
5万字Spark全集之末尾Structured Streaming续集!!!!!(二)
5万字Spark全集之末尾Structured Streaming续集!!!!!
202 0
5万字Spark全集之末尾Structured Streaming续集!!!!!(二)
|
SQL 消息中间件 分布式计算
5万字Spark全集之末尾Structured Streaming续集!!!!!(一)
5万字Spark全集之末尾Structured Streaming续集!!!!!
175 0
5万字Spark全集之末尾Structured Streaming续集!!!!!(一)
|
分布式计算 API 数据安全/隐私保护
Spark3.0分布,Structured Streaming UI登场
Spark3.0分布,Structured Streaming UI登场
540 0
Spark3.0分布,Structured Streaming UI登场
|
存储 自然语言处理 分布式计算
|
存储 缓存 分布式计算
Spark—GraphX编程指南
GraphX 是新的图形和图像并行计算的Spark API。从整理上看,GraphX 通过引入 弹性分布式属性图(Resilient Distributed Property Graph)继承了Spark RDD:一个将有效信息放在顶点和边的有向多重图。为了支持图形计算,GraphX 公开了一组基本的运算(例如,subgraph,joinVertices和mapReduceTriplets),以及在一个优化后的 PregelAPI的变形。此外,GraphX 包括越来越多的图算法和 builder 构造器,以简化图形分析任务。
574 0
Spark—GraphX编程指南
|
存储 SQL 运维
storm笔记:Trident应用
Trident是基于Storm的实时计算模型的高级抽象。它可以实现高吞吐(每秒数百万条消息)的有状态流处理和低延迟分布式查询。
201 0
storm笔记:Trident应用
|
Java 分布式数据库 Hbase
HBase使用例子(中文翻译)
通过编码(java)的形式对HBase进行一系列的管理涉及到对表的管理、数据的操作等。 1、 对表的创建、删除、显示以及修改等,可以用HBaseAdmin,一旦创建了表,那么可以通过HTable的实例来访问表,每次可以往表里增加数据。 2、 插入数据 创建一个Put对象,在这个Put对象里可以指定要给哪个列增加数据,以及当前的时间戳等值,然后通过调用HTable.put(Put)来提交操作,子猴在这里提请注意的是:在创建Put对象的时候,你必须指定一个行(Row)值,在构造Put对象的时候作为参数传入。 3、 获取数据 要获取数据,使用Get对象,Get对象同Put对象一样有好
174 0