Strom的trident小例子

简介:

上代码:

复制代码
 1 public class TridentFunc {
 2     
 3     /**
 4      * 类似于普通的bolt
 5      */
 6     public static class MyFunction extends BaseFunction{
 7         @Override
 8         public void execute(TridentTuple tuple, TridentCollector collector) {
 9             Integer value = tuple.getIntegerByField("sentence");
10             System.out.println(value);
11         }
12     }
13     
14     public static void main(String[] args) {
15         @SuppressWarnings("unchecked")
16         FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 1, new Values(1),new Values(2));
17         spout.setCycle(true);//让spout循环发送数据
18         
19         TridentTopology tridentTopology = new TridentTopology();
20         tridentTopology.newStream("spoutid",spout)
21             .each(new Fields("sentence"), new MyFunction(), new Fields(""));
22         
23         LocalCluster localCluster = new LocalCluster();
24         String simpleName = TridentFunc.class.getSimpleName();
25         localCluster.submitTopology(simpleName, new Config(), tridentTopology.build());
26         //运行结果就是  一直循环打印 1 2 1 2  
27     }
28 }
复制代码

 多数据源

复制代码
 1 public class TridentMeger {
 2     
 3     /**
 4      * 类似于普通的bolt
 5      */
 6     public static class MyFunction extends BaseFunction{
 7         @Override
 8         public void execute(TridentTuple tuple, TridentCollector collector) {
 9             Integer value = tuple.getIntegerByField("sentence");
10             System.out.println(value);
11         }
12     }
13     
14     public static void main(String[] args) {
15         FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 1, new Values(1),new Values(2));
16         //spout.setCycle(true);//让spout循环发送数据
17         
18         TridentTopology tridentTopology = new TridentTopology();
19         //指定多个数据源,流连接
20         Stream newStream1 = tridentTopology.newStream("spoutid1",spout);
21         Stream newStream2 = tridentTopology.newStream("spoutid2",spout);
22         
23         //tridentTopology.newStream("spoutid",spout) 之前是这种  但是只能有 一个数据源  
24         tridentTopology.merge(newStream1,newStream2)//使用这种就可以有多个数据源.
25             .each(new Fields("sentence"), new MyFunction(), new Fields(""));
26         
27         LocalCluster localCluster = new LocalCluster();
28         String simpleName = TridentMeger.class.getSimpleName();
29         localCluster.submitTopology(simpleName, new Config(), tridentTopology.build());
30     }
31 }
复制代码

 增加过滤器

复制代码
 1 public class TridentFilter {
 2     
 3     /**
 4      * 类似于普通的bolt
 5      */
 6     public static class MyFunction extends BaseFunction{
 7         @Override
 8         public void execute(TridentTuple tuple, TridentCollector collector) {
 9             Integer value = tuple.getIntegerByField("sentence");
10             System.out.println(value);
11         }
12     }
13     
14     public static class MyFilter extends BaseFilter{//专门封装了一个Filter功能.
15         //对数据进行过滤  如果过滤出的数据不要了就false 保留就ture
16         @Override
17         public boolean isKeep(TridentTuple tuple) {
18             Integer value = tuple.getIntegerByField("sentence");
19             return value%2==0?true:false; //只要偶数不要奇数
20         }
21     }
22     
23     public static void main(String[] args) {
24         FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 1, new Values(1),new Values(2));
25         spout.setCycle(true);//让spout循环发送数据
26         
27         TridentTopology tridentTopology = new TridentTopology();
28         tridentTopology.newStream("spoutid",spout)     //这个地方只能指定一个数据源,如果想指定多个数据源Spout 看TridentMeger.java
29             .each(new Fields("sentence"), new MyFilter())
30             .each(new Fields("sentence"), new MyFunction(), new Fields(""));
31         
32         LocalCluster localCluster = new LocalCluster();
33         String simpleName = TridentFilter.class.getSimpleName();
34         localCluster.submitTopology(simpleName, new Config(), tridentTopology.build());
35     }
36 }
复制代码

 


本文转自SummerChill博客园博客,原文链接:http://www.cnblogs.com/DreamDrive/p/6675991.html,如需转载请自行联系原作者

相关文章
|
分布式计算 API 流计算
22MyCat - Spark/Storm 对join扩展(简略)
22MyCat - Spark/Storm 对join扩展(简略)
55 0
|
消息中间件 分布式计算 监控
五十二、 Storm 组件安装部署(MINI版)
五十二、 Storm 组件安装部署(MINI版)
五十二、 Storm 组件安装部署(MINI版)
|
分布式计算 API 数据安全/隐私保护
Spark3.0分布,Structured Streaming UI登场
Spark3.0分布,Structured Streaming UI登场
540 0
Spark3.0分布,Structured Streaming UI登场
|
存储 自然语言处理 分布式计算
|
消息中间件 存储 并行计算
|
存储 消息中间件 缓存
storm笔记:Trident状态
在storm笔记:Trident应用中说了下Trident的使用,这里说下Trident几种状态的变化及其对应API的使用。
100 0
storm笔记:Trident状态
|
存储 SQL 运维
storm笔记:Trident应用
Trident是基于Storm的实时计算模型的高级抽象。它可以实现高吞吐(每秒数百万条消息)的有状态流处理和低延迟分布式查询。
201 0
storm笔记:Trident应用
|
Java 分布式数据库 Hbase
HBase使用例子(中文翻译)
通过编码(java)的形式对HBase进行一系列的管理涉及到对表的管理、数据的操作等。 1、 对表的创建、删除、显示以及修改等,可以用HBaseAdmin,一旦创建了表,那么可以通过HTable的实例来访问表,每次可以往表里增加数据。 2、 插入数据 创建一个Put对象,在这个Put对象里可以指定要给哪个列增加数据,以及当前的时间戳等值,然后通过调用HTable.put(Put)来提交操作,子猴在这里提请注意的是:在创建Put对象的时候,你必须指定一个行(Row)值,在构造Put对象的时候作为参数传入。 3、 获取数据 要获取数据,使用Get对象,Get对象同Put对象一样有好
174 0
|
存储 Hbase 分布式数据库
带你读《Apache Kylin权威指南》之三:Cube优化
从最早使用大数据技术来做批量处理,到现在越来越多的人要求大数据平台也能够如传统数据仓库技术一样支持交互式分析,随着数据量的不断膨胀、数据平民化的不断推进,低延迟、高并发地在Hadoop之上提供标准SQL查询能力成为必须攻破的技术难题。而Apache Kylin的诞生正是基于这个背景,并成功地完成了很多人认为不可能实现的突破。