在Storm的Toplogy中设置多数据源Spout

简介:

上代码:主要看main方法中的设置.   如下代码是一般情况下的设置方法...Trident中设置多数据源看对应的博客总结

复制代码
  1 /**
  2  * 指定多个数据源
  3  * 数字累加求和
  4  * 先添加storm依赖
  5  */
  6 public class LocalTopologyMeger {
  7     /**
  8      * spout需要继承baserichspout,实现未实现的方法
  9      * @author Administrator
 10      *
 11      */
 12     public static class MySpout extends BaseRichSpout{
 13         private Map conf;
 14         private TopologyContext context;
 15         private SpoutOutputCollector collector;
 16         
 17         /**
 18          * 初始化方法,只会执行一次
 19          * 在这里面可以写一个初始化的代码
 20          * Map conf:其实里面保存的是topology的一些配置信息
 21          * TopologyContext context:topology的上下文,类似于servletcontext
 22          * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple)
 23          */
 24         @Override
 25         public void open(Map conf, TopologyContext context,
 26                 SpoutOutputCollector collector) {
 27             this.conf = conf;
 28             this.context = context;
 29             this.collector = collector;
 30         }
 31 
 32         int num = 1;
 33         /**
 34          * 这个方法是spout中最重要的方法,
 35          * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内
 36          * 每调用一次,会向外发射一条数据
 37          */
 38         @Override
 39         public void nextTuple() {
 40             System.out.println("spout发射:"+num);
 41             //把数据封装到values中,称为一个tuple,发射出去
 42             this.collector.emit(new Values(num++));
 43             Utils.sleep(1000);
 44         }
 45         
 46         /**
 47          * 声明输出字段
 48          */
 49         @Override
 50         public void declareOutputFields(OutputFieldsDeclarer declarer) {
 51             //给values中的数据起个名字,方便后面的bolt从这个values中取数据
 52             //fields中定义的参数和values中传递的数值是一一对应的
 53             declarer.declare(new Fields("num"));
 54         }
 55         
 56     }
 57     
 58     
 59     /**
 60      * 自定义bolt需要实现baserichbolt
 61      * @author Administrator
 62      *
 63      */
 64     public static class MyBolt extends BaseRichBolt{
 65         private Map stormConf; 
 66         private TopologyContext context;
 67         private OutputCollector collector;
 68         
 69         /**
 70          * 和spout中的open方法意义一样
 71          */
 72         @Override
 73         public void prepare(Map stormConf, TopologyContext context,
 74                 OutputCollector collector) {
 75             this.stormConf = stormConf;
 76             this.context = context;
 77             this.collector = collector;
 78         }
 79 
 80         int sum = 0;
 81         /**
 82          * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
 83          */
 84         @Override
 85         public void execute(Tuple input) {
 86             //input.getInteger(0);//也可以根据角标获取tuple中的数据
 87             Integer value = input.getIntegerByField("num");
 88             sum+=value;
 89             System.out.println("和:"+sum);
 90         }
 91         
 92         /**
 93          * 声明输出字段
 94          */
 95         @Override
 96         public void declareOutputFields(OutputFieldsDeclarer declarer) {
 97             //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
 98             //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
 99         }
100         
101     }
102     /**
103      * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的
104      * @param args
105      */
106     public static void main(String[] args) {
107         //组装topology
108         TopologyBuilder topologyBuilder = new TopologyBuilder();
109         topologyBuilder.setSpout("spout1", new MySpout());
110         topologyBuilder.setSpout("spout2", new MySpout());
111         //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple
112         topologyBuilder.setBolt("bolt1", new MyBolt()).shuffleGrouping("spout1").shuffleGrouping("spout2");
113         
114         //创建本地storm集群
115         LocalCluster localCluster = new LocalCluster();
116         localCluster.submitTopology("sumTopology", new Config(), topologyBuilder.createTopology());
117     }
118 }
复制代码

 


本文转自SummerChill博客园博客,原文链接:http://www.cnblogs.com/DreamDrive/p/6675974.html,如需转载请自行联系原作者

相关文章
Java Exception异常信息怎么打印、记录,几种方式自己选
Java Exception异常信息怎么打印、记录,几种方式自己选
1015 0
Java Exception异常信息怎么打印、记录,几种方式自己选
|
中间件 数据库
MQ如何保障发送消息可靠
简易做法包括开启生产者重试及确认机制。更可靠但复杂的方案则涉及将消息存入数据库,通过状态码管理发送状态,结合定时任务检查并重发未成功发送的消息,同时利用确认回调确保消息发送成功。
326 4
|
消息中间件 负载均衡 监控
【Kafka】Kafka 创建Topic后如何将分区放置到不同的 Broker 中?
【4月更文挑战第13天】【Kafka】Kafka 创建Topic后如何将分区放置到不同的 Broker 中?
|
存储 NoSQL 关系型数据库
【赵渝强老师】MongoDB的存储结构
MongoDB 是一个可移植的 NoSQL 数据库,支持跨平台运行。其逻辑存储结构包括数据库、集合和文档,而物理存储结构则由命名空间文件、数据文件和日志文件组成。视频讲解和示意图进一步解释了这些概念。
397 5
|
并行计算 Ubuntu PyTorch
Ubuntu下CUDA、Conda、Pytorch联合教程
本文是一份Ubuntu系统下安装和配置CUDA、Conda和Pytorch的教程,涵盖了查看显卡驱动、下载安装CUDA、添加环境变量、卸载CUDA、Anaconda的下载安装、环境管理以及Pytorch的安装和验证等步骤。
3174 1
Ubuntu下CUDA、Conda、Pytorch联合教程
|
机器学习/深度学习 监控 数据可视化
关于运动员伤病预测数据集的探索(下)
关于运动员伤病预测数据集的探索(下)
234 2
|
Web App开发 移动开发 小程序
看我如何让手机秒变扫码枪
为解决无扫码枪问题,作者受到微信小程序“超级扫码枪”启发,决定自制手机扫码到电脑的应用。项目需求是手机扫描条形码或二维码后实时传送到电脑。实现步骤包括:电脑端用Java Swing和Robot模拟键盘输入,手机端H5调用摄像头扫码(借助html5-qrcode库),并通过WebSocket服务将结果发送至电脑。项目源码及演示视频链接提供。
2831 5
|
JavaScript 前端开发 数据格式
Jquery前端分页插件pagination使用
Jquery前端分页插件pagination使用
392 1
|
存储 安全 Java
一文解读类的加载过程(类的生命周期)(上)
一文解读类的加载过程(类的生命周期)
|
消息中间件 负载均衡 Kafka
Kafka - 3.x 分区分配策略及再平衡不完全指北
Kafka - 3.x 分区分配策略及再平衡不完全指北
702 0