Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

简介:

 StormTopologyMoreTask.java

 

 

 

 

复制代码
package zhouls.bigdata.stormDemo;

import java.util.Map;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;


public class StormTopologyMoreTask {
    
    public static class MySpout extends BaseRichSpout{
        private Map conf;
        private TopologyContext context;
        private SpoutOutputCollector collector;
        public void open(Map conf, TopologyContext context,
                SpoutOutputCollector collector) {
            this.conf = conf;
            this.collector = collector;
            this.context = context;
        }

        int num = 0; 
        public void nextTuple() {
            num++;
            System.out.println("spout:"+num);
            this.collector.emit(new Values(num));
            Utils.sleep(1000);
        }

    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("num"));
        }
        
    }
    
    
    
    public static class MyBolt extends BaseRichBolt{
        
        private Map stormConf;
        private TopologyContext context;
        private OutputCollector collector;
        public void prepare(Map stormConf, TopologyContext context,
                OutputCollector collector) {
            this.stormConf = stormConf;
            this.context = context;
            this.collector = collector;
        }
        
        
        public void execute(Tuple input) {
            Integer num = input.getIntegerByField("num");
            System.out.println("线程id:"+Thread.currentThread().getId()+",接收的值为:"+num);
        }

        
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            
        }
        
    }
    
    
    
    public static void main(String[] args) {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        String spout_id = MySpout.class.getSimpleName();
        String bolt_id = MyBolt.class.getSimpleName();
        
        topologyBuilder.setSpout(spout_id, new MySpout());
        topologyBuilder.setBolt(bolt_id, new MyBolt()).setNumTasks(3).shuffleGrouping(spout_id);
        
        
        Config config = new Config();
        String topology_name = StormTopologyMoreTask.class.getSimpleName();
        if(args.length==0){
            //在本地运行
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology(topology_name, config, topologyBuilder.createTopology());
        }else{
            //在集群运行
            try {
                StormSubmitter.submitTopology(topology_name, config, topologyBuilder.createTopology());
            } catch (AlreadyAliveException e) {
                e.printStackTrace();
            } catch (InvalidTopologyException e) {
                e.printStackTrace();
            } catch (AuthorizationException e) {
                e.printStackTrace();
            }
        }
        
    }

}
复制代码

 

 

 

 

 

 

 打jar包

 

 

 

 

 

 

 

 

 

 

 

复制代码
[hadoop@master jar]$ pwd
/home/hadoop/app/apache-storm-1.0.2/jar
[hadoop@master jar]$ ll
total 24
-rw-r--r-- 1 hadoop hadoop 4869 Jul 27 22:17 StormTopology.jar
-rw-r--r-- 1 hadoop hadoop 5091 Jul 27 23:00 StormTopologyMoreExecutor.jar
-rw-r--r-- 1 hadoop hadoop 4992 Jul 27 22:39 StormTopologyMoreWorker.jar
[hadoop@master jar]$ rz

[hadoop@master jar]$ ll
total 32
-rw-r--r-- 1 hadoop hadoop 4869 Jul 27 22:17 StormTopology.jar
-rw-r--r-- 1 hadoop hadoop 5091 Jul 27 23:00 StormTopologyMoreExecutor.jar
-rw-r--r-- 1 hadoop hadoop 5105 Jul 27 23:20 StormTopologyMoreTask.jar
-rw-r--r-- 1 hadoop hadoop 4992 Jul 27 22:39 StormTopologyMoreWorker.jar
[hadoop@master jar]$ 
复制代码

 

 

 

 

 

提交作业之前

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

  

 

为什么,会是如上的数字呢?大家要学,就要深入去学和理解。

 

   因为,我之前运行的StormTopologyMoreExecutor没有停掉

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

  

为什么,会是如上的数字呢?大家要学,就要深入去学和理解。

 

 

 

 

 

 

 

 

 

 

 

 

 

 



本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/7247940.html,如需转载请自行联系原作者


相关文章
|
2月前
|
JSON 安全 API
电商API入门问答:开发者必知的10个基础问题
本文详解电商API的10个基础知识,涵盖定义、用途、认证、安全等内容,帮助开发者快速入门并提升开发效率。
96 0
|
2月前
|
缓存 监控 安全
电商API集成入门:从零开始搭建高效接口
在数字化电商时代,API集成成为企业提升效率、实现系统互联的关键。本文从零开始,逐步讲解如何搭建高效、可靠的电商API接口,适合初学者学习。内容涵盖API基础、认证安全、请求处理、性能优化等核心步骤,并提供Python代码示例与数学公式辅助理解。通过实践,读者可掌握构建优质电商API的技巧,提升用户体验与系统性能。
107 0
|
5月前
|
JSON 算法 API
一文掌握 1688 商品详情 API 接口:从入门到实战
1688是国内领先的综合电商批发平台,提供海量商品资源。其商品详情API助力开发者与企业获取商品的详细信息(如属性、价格、库存等),广泛应用于电商数据分析、比价系统及采购场景。API支持GET/POST请求,需传入通用参数(app_key、timestamp等)与业务参数(如product_id)。返回JSON格式数据,包含商品标题、价格、图片链接等详情,提升业务效率与决策精准度。
|
4月前
|
JSON API 开发工具
电商API接口入门指南
本文介绍了API的基础知识及其在电商领域的实际应用。首先,阐释了API的概念、运作机制及参数与返回值的作用,帮助读者理解如何通过API实现软件间的交互。接着,以获取电商商品列表为例,详细讲解了从选择平台、引入SDK到编写代码调用API的全流程。示例代码采用Python语言,利用requests库发送请求并解析JSON数据,为开发者提供了清晰的实践指导。
|
5月前
|
搜索推荐 API 开发者
京东商品列表 API 接口全解析:从入门到精通
京东商品列表API是京东开放平台为开发者提供的核心数据接口,支持批量获取商品基础信息、价格、库存状态等多维度数据。它具备数据丰富性、灵活筛选与分页查询、稳定高效等特点,可满足市场分析、选品优化、比价工具及推荐系统开发等需求,为电商业务创新提供坚实支撑。通过标准化通道,助力第三方高效、合法地利用京东海量商品数据。
|
4月前
|
数据挖掘 API 开发者
京东商品详情 API 接口全攻略:从入门到精通
京东商品详情API接口是京东开放平台为开发者提供的服务,用于获取商品详细信息。通过调用接口,开发者可获得商品属性、价格、库存、促销信息等数据,适用于电商应用、价格比较工具及数据分析平台等场景。支持GET/POST请求方式,参数包括API版本、密钥等。示例代码展示了如何使用Python的requests库调用该接口,并获取JSON格式的返回数据,包含商品基本信息、价格、库存和用户评价等内容。
199 16
|
6月前
|
机器学习/深度学习 设计模式 API
Python 高级编程与实战:构建 RESTful API
本文深入探讨了使用 Python 构建 RESTful API 的方法,涵盖 Flask、Django REST Framework 和 FastAPI 三个主流框架。通过实战项目示例,详细讲解了如何处理 GET、POST 请求,并返回相应数据。学习这些技术将帮助你掌握构建高效、可靠的 Web API。
|
6月前
|
机器学习/深度学习 开发框架 API
Python 高级编程与实战:深入理解 Web 开发与 API 设计
在前几篇文章中,我们探讨了 Python 的基础语法、面向对象编程、函数式编程、元编程、性能优化、调试技巧以及数据科学和机器学习。本文将深入探讨 Python 在 Web 开发和 API 设计中的应用,并通过实战项目帮助你掌握这些技术。
|
API 流计算
|
22天前
|
JSON API 数据格式
淘宝/天猫图片搜索API接口,json返回数据。
淘宝/天猫平台虽未开放直接的图片搜索API,但可通过阿里妈妈淘宝联盟或天猫开放平台接口实现类似功能。本文提供基于淘宝联盟的图片关联商品搜索Curl示例及JSON响应说明,适用于已获权限的开发者。如需更高精度搜索,可选用阿里云视觉智能API。