Storm编程入门API系列之Storm的Topology多个Workers数目控制实现

简介:

  继续编写

StormTopologyMoreWorker.java

 

 

 

复制代码
package zhouls.bigdata.stormDemo;

import java.util.Map;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;


public class StormTopologyMoreWorker {
    
    public static class MySpout extends BaseRichSpout{
        private Map conf;
        private TopologyContext context;
        private SpoutOutputCollector collector;
        public void open(Map conf, TopologyContext context,
                SpoutOutputCollector collector) {
            this.conf = conf;
            this.collector = collector;
            this.context = context;
        }

        int num = 0; 
        public void nextTuple() {
            num++;
            System.out.println("spout:"+num);
            this.collector.emit(new Values(num));
            Utils.sleep(1000);
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("num"));
        }
        
    }
    
    
    
    public static class MyBolt extends BaseRichBolt{
        
        private Map stormConf;
        private TopologyContext context;
        private OutputCollector collector;
        public void prepare(Map stormConf, TopologyContext context,
                OutputCollector collector) {
            this.stormConf = stormConf;
            this.context = context;
            this.collector = collector;
        }
        
        int sum = 0;
        public void execute(Tuple input) {
            Integer num = input.getIntegerByField("num");
            sum += num;
            System.out.println("sum="+sum);
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            
        }
        
    }
    
    
    
    public static void main(String[] args) {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        String spout_id = MySpout.class.getSimpleName();
        String bolt_id = MyBolt.class.getSimpleName();
        
        topologyBuilder.setSpout(spout_id, new MySpout());
        topologyBuilder.setBolt(bolt_id, new MyBolt()).shuffleGrouping(spout_id);
        
        
        Config config = new Config();
        config.setNumWorkers(2);
        String topology_name = StormTopologyMoreWorker.class.getSimpleName();
        if(args.length==0){
            //在本地运行
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology(topology_name, config, topologyBuilder.createTopology());
        }else{
            //在集群运行
            try {
                StormSubmitter.submitTopology(topology_name, config, topologyBuilder.createTopology());
            } catch (AlreadyAliveException e) {
                e.printStackTrace();
            } catch (InvalidTopologyException e) {
                e.printStackTrace();
            } catch (AuthorizationException e) {
                e.printStackTrace();
            }
        }
        
    }

}
复制代码

 

 

 

  打jar包

 

 

 

 

 

 

 

复制代码
[hadoop@master jar]$ pwd
/home/hadoop/app/apache-storm-1.0.2/jar
[hadoop@master jar]$ ll
total 8
-rw-r--r-- 1 hadoop hadoop 4869 Jul 27 22:17 StormTopology.jar
[hadoop@master jar]$ rz

[hadoop@master jar]$ ll
total 16
-rw-r--r-- 1 hadoop hadoop 4869 Jul 27 22:17 StormTopology.jar
-rw-r--r-- 1 hadoop hadoop 4992 Jul 27 22:39 StormTopologyMoreWorker.jar
复制代码

 

 

 

 

 提交作业之前

 

 

 

 

  

复制代码
[hadoop@master apache-storm-1.0.2]$ pwd
/home/hadoop/app/apache-storm-1.0.2
[hadoop@master apache-storm-1.0.2]$ ll
total 208
drwxrwxr-x  2 hadoop hadoop  4096 May 21 17:18 bin
-rw-r--r--  1 hadoop hadoop 82317 Jul 27  2016 CHANGELOG.md
drwxrwxr-x  2 hadoop hadoop  4096 Jul 27 20:12 conf
drwxrwxr-x  3 hadoop hadoop  4096 Jul 27  2016 examples
drwxrwxr-x 17 hadoop hadoop  4096 May 21 17:18 external
drwxrwxr-x  2 hadoop hadoop  4096 Jul 27  2016 extlib
drwxrwxr-x  2 hadoop hadoop  4096 Jul 27  2016 extlib-daemon
drwxrwxr-x  2 hadoop hadoop  4096 Jul 27 22:39 jar
drwxrwxr-x  2 hadoop hadoop  4096 May 21 17:18 lib
-rw-r--r--  1 hadoop hadoop 32101 Jul 27  2016 LICENSE
drwxrwxr-x  2 hadoop hadoop  4096 May 21 17:18 log4j2
drwxrwxr-x  2 hadoop hadoop  4096 May 21 19:05 logs
-rw-r--r--  1 hadoop hadoop   981 Jul 27  2016 NOTICE
drwxrwxr-x  6 hadoop hadoop  4096 May 21 17:18 public
-rw-r--r--  1 hadoop hadoop 15287 Jul 27  2016 README.markdown
-rw-r--r--  1 hadoop hadoop     6 Jul 27  2016 RELEASE
-rw-r--r--  1 hadoop hadoop 23774 Jul 27  2016 SECURITY.md
[hadoop@master apache-storm-1.0.2]$ bin/storm jar jar/StormTopologyMoreWorker.jar zhouls.bigdata.stormDemo.StormTopologyMoreWorker aaa
Running: /home/hadoop/app/jdk/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/home/hadoop/app/apache-storm-1.0.2 -Dstorm.log.dir=/home/hadoop/app/apache-storm-1.0.2/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /home/hadoop/app/apache-storm-1.0.2/lib/log4j-api-2.1.jar:/home/hadoop/app/apache-storm-1.0.2/lib/kryo-3.0.3.jar:/home/hadoop/app/apache-storm-1.0.2/lib/storm-rename-hack-1.0.2.jar:/home/hadoop/app/apache-storm-1.0.2/lib/log4j-core-2.1.jar:/home/hadoop/app/apache-storm-1.0.2/lib/slf4j-api-1.7.7.jar:/home/hadoop/app/apache-storm-1.0.2/lib/minlog-1.3.0.jar:/home/hadoop/app/apache-storm-1.0.2/lib/objenesis-2.1.jar:/home/hadoop/app/apache-storm-1.0.2/lib/clojure-1.7.0.jar:/home/hadoop/app/apache-storm-1.0.2/lib/servlet-api-2.5.jar:/home/hadoop/app/apache-storm-1.0.2/lib/log4j-slf4j-impl-2.1.jar:/home/hadoop/app/apache-storm-1.0.2/lib/log4j-over-slf4j-1.6.6.jar:/home/hadoop/app/apache-storm-1.0.2/lib/storm-core-1.0.2.jar:/home/hadoop/app/apache-storm-1.0.2/lib/disruptor-3.3.2.jar:/home/hadoop/app/apache-storm-1.0.2/lib/asm-5.0.3.jar:/home/hadoop/app/apache-storm-1.0.2/lib/reflectasm-1.10.1.jar:jar/StormTopologyMoreWorker.jar:/home/hadoop/app/apache-storm-1.0.2/conf:/home/hadoop/app/apache-storm-1.0.2/bin -Dstorm.jar=jar/StormTopologyMoreWorker.jar zhouls.bigdata.stormDemo.StormTopologyMoreWorker aaa
6687 [main] INFO  o.a.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -5319119555480935017:-9144362215090188990
7204 [main] INFO  o.a.s.s.a.AuthUtils - Got AutoCreds []
7785 [main] INFO  o.a.s.StormSubmitter - Uploading topology jar jar/StormTopologyMoreWorker.jar to assigned location: /home/hadoop/data/storm/nimbus/inbox/stormjar-ea1d6383-ca31-4033-b784-e06
856000894.jar
7875 [main] INFO  o.a.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /home/hadoop/data/storm/nimbus/inbox/stormjar-ea1d6383-ca31-4033-b784-e06856000894.jar
7876 [main] INFO  o.a.s.StormSubmitter - Submitting topology StormTopologyMoreWorker in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-5319119555480935017:-9144362215090188990","topology.workers":2}
8962 [main] INFO  o.a.s.StormSubmitter - Finished submitting topology: StormTopologyMoreWorker
[hadoop@master apache-storm-1.0.2]$ 
复制代码

 

 

 

  提交之后

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

  

  为什么,会是如上的数字呢?大家要学,就要深入去学和理解。

   

  因为,我之前运行的StormTopology没有停掉

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

  现在呢,我将之前运行的StormTopology给停掉,然后,再来看。非常重要

 

 

 

 

 

 

 

 

 

   即提示,我们,30秒之后,kill掉。如果大家等不及,可以设置时间短些

 

 

 

 

 

  为什么,会是如上的数字呢?大家要学,就要深入去学和理解。


本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/7247860.html,如需转载请自行联系原作者


相关文章
|
2月前
|
Java API
掌握Java 8 Stream API的艺术:详解流式编程(三)
掌握Java 8 Stream API的艺术:详解流式编程
25 2
|
4天前
|
JavaScript API 开发者
GraphQL API开发入门:比RESTful更高效的数据查询方式
**GraphQL API开发入门摘要** GraphQL是一种更高效的数据查询方式,解决RESTful API的过度或不足获取数据问题。它允许客户端按需获取数据,减少网络传输,支持一次请求获取多资源。强类型和自描述特性方便了开发。文章通过一个简单的Node.js示例,展示如何使用`apollo-server-express`搭建GraphQL服务器,包括定义Schema、实现Resolver和创建服务器。通过测试,显示了GraphQL如何提供精确数据和优化查询效率。对于复杂数据需求,GraphQL是现代API设计的有效选择。
14 0
|
7天前
|
Linux API 数据安全/隐私保护
一文搞懂:【零基础】易盛9.0API入门二:登陆
一文搞懂:【零基础】易盛9.0API入门二:登陆
14 1
|
2月前
|
安全 Java API
Java 8中的Stream API:简介与实用指南深入理解Java并发编程:线程安全与锁优化
【5月更文挑战第29天】本文旨在介绍Java 8中引入的Stream API,这是一种用于处理集合的新方法。我们将探讨Stream API的基本概念,以及如何使用它来简化集合操作,提高代码的可读性和效率。 【5月更文挑战第29天】 在Java并发编程中,线程安全和性能优化是两个核心议题。本文将深入探讨如何通过不同的锁机制和同步策略来保证多线程环境下的数据一致性,同时避免常见的并发问题如死锁和竞态条件。文章还将介绍现代Java虚拟机(JVM)针对锁的优化技术,包括锁粗化、锁消除以及轻量级锁等概念,并指导开发者如何合理选择和使用这些技术以提升应用的性能。
|
21天前
|
分布式计算 自然语言处理 大数据
【大数据】MapReduce JAVA API编程实践及适用场景介绍
【大数据】MapReduce JAVA API编程实践及适用场景介绍
33 0
|
26天前
|
存储 Java API
JavaSE——常用API(3/3)-ArrayList入门、ArratList使用、ArrayList综合案例
JavaSE——常用API(3/3)-ArrayList入门、ArratList使用、ArrayList综合案例
23 0
|
2月前
|
Linux API
Linux系统编程之文件编程常用API回顾和文件编程一般步骤
Linux系统编程之文件编程常用API回顾和文件编程一般步骤
Linux系统编程之文件编程常用API回顾和文件编程一般步骤
|
2月前
|
存储 SQL Java
Java8 Stream API 详解:流式编程进行数据处理
Java8 Stream API 详解:流式编程进行数据处理
|
5天前
|
JSON 安全 API
如何高效编写API接口:以Python与Flask为例
构建RESTful API的简明教程:使用Python的Flask框架,从环境准备(安装Python,设置虚拟环境,安装Flask)到编写首个API(包括获取用户列表和单个用户信息的路由)。运行API服务器并测试在`http://127.0.0.1:5000/users`。进阶话题包括安全、数据库集成、API文档生成和性能优化。【6月更文挑战第27天】
27 7
|
2天前
|
Java API PHP
【亲测有效,官方提供】php版本企查查api接口请求示例代码,php请求企查查api接口,thinkphp请求企查查api接口
【亲测有效,官方提供】php版本企查查api接口请求示例代码,php请求企查查api接口,thinkphp请求企查查api接口
7 1