《Storm企业级应用:实战、运维和调优》——2.4 创建Topology并向集群提交任务

简介:

本节书摘来自华章计算机《Storm企业级应用:实战、运维和调优》一书中的第2章,第2.4节,作者:马延辉 陈书美 雷葆华著, 更多章节内容可以访问云栖社区“华章计算机”公众号查看。

2.4 创建Topology并向集群提交任务

Topology是Storm的核心概念之一,是将Spout与Bolt融合在一起的纽带,在Storm集群中运行,完成实时计算的任务。在Storm集群中,Topology的定义是一个Thrift结构,并且Nimbus就是一个Thrift服务,可以提交由任何语言创建的Topology。下面使用Java语言讲解Topology的使用。首先了解如何创建Topology。
2.4.1 创建Topology
在创建一个Topology之前,设计一个Topology来统计词频。在创建Topology之前,要准备Spout(数据源)和Bolt来组成Topology。这里简单介绍创建的Spout和Bolt,第3章会详细介绍这两个概念。
下面梳理Topology的大致结构。
1.?Spout
创建一个WordSpout数据源,负责发送语句。WordSpout的代码如下:

public class WordSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private static f?inal String[] msgs = new String[] {
             "I have a dream",
             "my dream is to be a data analyst",
             "you kan do what you are dreaming",
             "don't give up your dreams",
             "it's just so so ",
             "We need change the traditional ideas and practice boldly",
             "Storm enterprise real time calculation of actual combat",
             "you kan be what you want be",
             };
    private static f?inal Random random = new Random();   
    public void open(Map conf, TopologyContext context,
             SpoutOutputCollector collector) {
        this.collector = collector;
    }

    public void nextTuple() {
        String sentence= msgs[random.nextInt(8)];
        collector.emit(new Values(sentence));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
    }
  }

2.?Bolt
两个Bolt,一个负责将语句切分,即SplitSentenceBolt,另一个是对切分的单词进行词频累加的Bolt,即WordCountBolt。下面是这两个Bolt的具体代码。

public class SplitSentenceBolt implements IBasicBolt{ 
      public void prepare(Map conf, TopologyContext context) { 
       } 
      public void execute(Tuple tuple, BasicOutputCollector collector) { 
            String sentence = tuple.getString(0); 
             for(String word: sentence.split(" ")) { 
                     collector.emit(new Values(word)); 
               } 
         } 
      public void cleanup() { 
      } 
      public void declareOutputFields(OutputFieldsDeclarer declarer) { 
            declarer.declare(new Fields("word")); 
          } 
 } 
public class WordCountBolt implements IBasicBolt { 
      private Map<String, Integer> _counts = new HashMap<String, Integer>(); 
      public void prepare(Map conf, TopologyContext context) { 
      } 
      public void execute(Tuple tuple, BasicOutputCollector collector) { 
            String word = tuple.getString(0); 
            int count; 
            if(_counts.containsKey(word)) { 
                count = _counts.get(word); 
            } else { 
                count = 0; 
} 
            count++; 
            _counts.put(word, count); 
            collector.emit(new Values(word, count)); 
      } 
      public void cleanup() { 
      } 
      public void declareOutputFields(OutputFieldsDeclarer declarer) { 
            declarer.declare(new Fields("word", "count")); 
      }
}

3.?Topology
要创建的Topology的Spout从句子队列中随机生成一个句子,Spout用setSpout方法插入一个独特的ID到Topology。必须给予Topology中的每个节点一个ID,ID是由其他Bolt用于订阅该节点的输出流,其中WordSpout在Topology中的ID为1。
setBolt用于在Topology中插入Bolt,在Topology中定义的第一个Bolt是切分句子的SplitSentenceBolt,该Bolt将句子流转成单词流,第二个Bolt统计单词。Topology的代码如下:

TopologyBuilder builder = new TopologyBuilder(); 
bulider.setSpout(1,new WordSpout(),2);
builder.setBolt(2, new SplitSentenceBolt(), 10).shuffleGrouping(1); 
builder.setBolt(3, new WordCountBolt(), 20).f?ieldsGrouping(2, new Fields("word"));

这样就创建了简单的Topology结构,下面介绍如何使用Topology。
2.4.2 向集群提交任务
向Storm集群提交Topology任务,类似提交MapReduce作业到Hadoop集群中,只需要运行JAR包中的Topology即可。而使用kill命令可以杀掉任务,类似杀掉MapReduce作业。下面详细介绍这两部分内容。
1.?启动Topology
在Storm的安装主目录下,执行下面的命令提交任务:

bin/storm jar testTopolgoy.jar org.me.MyTopology arg1 arg2 arg3

其中,jar命令专门负责提交任务,testTopolgoy.jar是包含Topology实现代码的JAR包,org.me.MyTopology的main方法是Topology的入口,arg1、arg2和arg3为org.me.MyTopology执行时需要传入的参数。
2.?停止Topology
在Storm主目录下,执行kill命令停止之前已经提交的Topology:

bin/Storm kill {toponame}

其中,{toponame}为Topology提交到Storm集群时指定的Topology任务名称,该名称可以在代码中指定,也可以作为参数传入Topology中。

相关文章
|
9月前
|
运维 Cloud Native Go
Ansible自动化:简化你的运维任务
Ansible自动化:简化你的运维任务
75 0
|
16天前
|
运维 监控 API
自动化运维实践指南:Python脚本优化服务器管理任务
本文探讨了Python在自动化运维中的应用,介绍了使用Python脚本优化服务器管理的四个关键步骤:1) 安装必备库如paramiko、psutil和requests;2) 使用paramiko进行远程命令执行;3) 利用psutil监控系统资源;4) 结合requests自动化软件部署。这些示例展示了Python如何提升运维效率和系统稳定性。
32 8
|
2天前
|
运维 分布式计算 DataWorks
DataWorks产品使用合集之如何访问周期任务运维
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
6 0
|
2月前
|
运维 DataWorks 关系型数据库
DataWorks产品使用合集之DataWorks中如何提升运维中心实时同步任务的运行速度
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
66 1
|
2月前
|
运维 监控 Linux
linux脚本自动化运维任务
Linux自动化运维通过脚本提升效率,涵盖服务管理(启停服务、异常恢复)、系统监控(资源警报)、日志管理(清理分析)、备份恢复、补丁更新、自动化部署(如Ansible)、网络管理、定时任务(cron)和故障排查。结合shell、Python及工具,形成高效运维体系。
54 3
|
2月前
|
消息中间件 运维 应用服务中间件
容器化运维:构建高可用RabbitMQ集群的Docker Compose指南
容器化运维:构建高可用RabbitMQ集群的Docker Compose指南
426 0
|
2月前
|
运维 DataWorks 调度
DataWorks运维中心提供了下线节点、优雅下线和下线任务三种下线方式
DataWorks运维中心提供了下线节点、优雅下线和下线任务三种下线方式
126 2
|
10月前
|
运维 DataWorks
DataWorks运维中心中的实时任务
DataWorks运维中心中的实时任务
84 1
|
9月前
|
运维 数据挖掘 BI
【Dataphin运维】解放双手,支持补数据任务定时调度和手动运行,轻松实现回刷历史数据
Datatphin V3.11版本全新上线补数据任务功能,支持将单次补数据保存为补数据任务,保存补数据节点范围及运行规则;支持补数据任务定时调度,自动定期回刷历史数据;支持手动运行补数据任务。满足企业复杂多样的回刷历史数据的需求,减少人工操作成本。
174 0
|
11月前
|
域名解析 运维 负载均衡
【运维知识进阶篇】Tomcat集群实战之部署zrlog博客(Tomcat服务安装+静态资源挂载NFS+Nginx负载均衡+HTTPS证书+Redis会话保持)
【运维知识进阶篇】Tomcat集群实战之部署zrlog博客(Tomcat服务安装+静态资源挂载NFS+Nginx负载均衡+HTTPS证书+Redis会话保持)
278 1

热门文章

最新文章