在Storm的Toplogy中设置多数据源Spout

简介:

上代码:主要看main方法中的设置.   如下代码是一般情况下的设置方法...Trident中设置多数据源看对应的博客总结

复制代码
  1 /**
  2  * 指定多个数据源
  3  * 数字累加求和
  4  * 先添加storm依赖
  5  */
  6 public class LocalTopologyMeger {
  7     /**
  8      * spout需要继承baserichspout,实现未实现的方法
  9      * @author Administrator
 10      *
 11      */
 12     public static class MySpout extends BaseRichSpout{
 13         private Map conf;
 14         private TopologyContext context;
 15         private SpoutOutputCollector collector;
 16         
 17         /**
 18          * 初始化方法,只会执行一次
 19          * 在这里面可以写一个初始化的代码
 20          * Map conf:其实里面保存的是topology的一些配置信息
 21          * TopologyContext context:topology的上下文,类似于servletcontext
 22          * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple)
 23          */
 24         @Override
 25         public void open(Map conf, TopologyContext context,
 26                 SpoutOutputCollector collector) {
 27             this.conf = conf;
 28             this.context = context;
 29             this.collector = collector;
 30         }
 31 
 32         int num = 1;
 33         /**
 34          * 这个方法是spout中最重要的方法,
 35          * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内
 36          * 每调用一次,会向外发射一条数据
 37          */
 38         @Override
 39         public void nextTuple() {
 40             System.out.println("spout发射:"+num);
 41             //把数据封装到values中,称为一个tuple,发射出去
 42             this.collector.emit(new Values(num++));
 43             Utils.sleep(1000);
 44         }
 45         
 46         /**
 47          * 声明输出字段
 48          */
 49         @Override
 50         public void declareOutputFields(OutputFieldsDeclarer declarer) {
 51             //给values中的数据起个名字,方便后面的bolt从这个values中取数据
 52             //fields中定义的参数和values中传递的数值是一一对应的
 53             declarer.declare(new Fields("num"));
 54         }
 55         
 56     }
 57     
 58     
 59     /**
 60      * 自定义bolt需要实现baserichbolt
 61      * @author Administrator
 62      *
 63      */
 64     public static class MyBolt extends BaseRichBolt{
 65         private Map stormConf; 
 66         private TopologyContext context;
 67         private OutputCollector collector;
 68         
 69         /**
 70          * 和spout中的open方法意义一样
 71          */
 72         @Override
 73         public void prepare(Map stormConf, TopologyContext context,
 74                 OutputCollector collector) {
 75             this.stormConf = stormConf;
 76             this.context = context;
 77             this.collector = collector;
 78         }
 79 
 80         int sum = 0;
 81         /**
 82          * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
 83          */
 84         @Override
 85         public void execute(Tuple input) {
 86             //input.getInteger(0);//也可以根据角标获取tuple中的数据
 87             Integer value = input.getIntegerByField("num");
 88             sum+=value;
 89             System.out.println("和:"+sum);
 90         }
 91         
 92         /**
 93          * 声明输出字段
 94          */
 95         @Override
 96         public void declareOutputFields(OutputFieldsDeclarer declarer) {
 97             //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
 98             //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
 99         }
100         
101     }
102     /**
103      * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的
104      * @param args
105      */
106     public static void main(String[] args) {
107         //组装topology
108         TopologyBuilder topologyBuilder = new TopologyBuilder();
109         topologyBuilder.setSpout("spout1", new MySpout());
110         topologyBuilder.setSpout("spout2", new MySpout());
111         //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple
112         topologyBuilder.setBolt("bolt1", new MyBolt()).shuffleGrouping("spout1").shuffleGrouping("spout2");
113         
114         //创建本地storm集群
115         LocalCluster localCluster = new LocalCluster();
116         localCluster.submitTopology("sumTopology", new Config(), topologyBuilder.createTopology());
117     }
118 }
复制代码

 


本文转自SummerChill博客园博客,原文链接:http://www.cnblogs.com/DreamDrive/p/6675974.html,如需转载请自行联系原作者

目录
打赏
0
0
0
0
64
分享
相关文章
|
9月前
|
Storm详细配置
Storm详细配置
117 0
Storm vs. Kafka Streams vs. Spark Streaming vs. Flink ,流式处理框架一网打尽!2
Storm vs. Kafka Streams vs. Spark Streaming vs. Flink ,流式处理框架一网打尽!2
489 0
Storm vs. Kafka Streams vs. Spark Streaming vs. Flink ,流式处理框架一网打尽!2
Storm vs. Kafka Streams vs. Spark Streaming vs. Flink ,流式处理框架一网打尽!1
Storm vs. Kafka Streams vs. Spark Streaming vs. Flink ,流式处理框架一网打尽!1
328 0
Storm vs. Kafka Streams vs. Spark Streaming vs. Flink ,流式处理框架一网打尽!1
Spark Streaming,Flink,Storm,Kafka Streams,Samza:如何选择流处理框架
根据最新的统计显示,仅在过去的两年中,当今世界上90%的数据都是在新产生的,每天创建2.5万亿字节的数据,并且随着新设备,传感器和技术的出现,数据增长速度可能会进一步加快。从技术上讲,这意味着我们的大数据处理将变得更加复杂且更具挑战性。而且,许多用例(例如,移动应用广告,欺诈检测,出租车预订,病人监护等)都需要在数据到达时进行实时数据处理,以便做出快速可行的决策。这就是为什么分布式流处理在大数据世界中变得非常流行的原因。
185 0
Spark Streaming,Flink,Storm,Kafka Streams,Samza:如何选择流处理框架
【Kafka】(十五)流式计算 Kafka Streams 架构深入2
【Kafka】(十五)流式计算 Kafka Streams 架构深入2
786 0
【Kafka】(十五)流式计算 Kafka Streams 架构深入1
【Kafka】(十五)流式计算 Kafka Streams 架构深入1
769 0
Airbnb 是如何通过 balanced Kafka reader 来扩展 Spark streaming 实时流处理能力的
得益于 balanced Kafka reader,从 Kafka 消费的 Spark 应用程序现在可以横向扩展,并具有任意并行度。平衡分区算法很简单,并且已被证明非常有效。由于这些改进,用于摄取日志记录事件的 Spark streaming 作业可以处理比以前多一个数量级的事件。
storm 读取不到对应的kafka数据
坑一:pom文件主要内容:注意里面 需要 使用 “exclusion”排除相关的依赖 UTF-8 1.
1847 0
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等