【大数据】Linux下Storm(0.9版本以上)的环境配置和小Demo

本文涉及的产品
云原生网关 MSE Higress,422元/月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 在storm发布到0.9.x以后,配置storm将会变得简单很多,也就是只需要配置zookeeper和storm即可,而不再需要配置zeromq和jzmq,由于网上面的storm配置绝大部分都是0.9以前的storm版本,所以有很多工作是不需要进行的,下面就storm的0.9.5版本在linux环境下进行配置进行详细解析。  由于配置storm只需要两个步骤,大大简化了配置,也是storm团队做了很大的努力,让程序员们专注于程序,让storm配置进行异常简单,好了,废话说了不少,下面正式开始讲解。

一、引言:


  在storm发布到0.9.x以后,配置storm将会变得简单很多,也就是只需要配置zookeeper和storm即可,而不再需要配置zeromq和jzmq,由于网上面的storm配置绝大部分都是0.9以前的storm版本,所以有很多工作是不需要进行的,下面就storm的0.9.5版本在linux环境下进行配置进行详细解析。


  由于配置storm只需要两个步骤,大大简化了配置,也是storm团队做了很大的努力,让程序员们专注于程序,让storm配置进行异常简单,好了,废话说了不少,下面正式开始讲解。

 

二、配置zookeeper


  1.打开shell,可以根据自身的习惯设置下载文件的位置信息,使用如下命令进行下载(下载3.4.6版本,此版本位稳定版):


  wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz


  2.下载完成后,使用如下命令进行解压缩:


  tar -zxvf zookeeper-3.4.6.tar.gz


  会出现一个名为zookeeper-3.4.6的文件夹


  3.进入zookeeper-3.4.6的conf文件夹,复制zoo_sample.cfg,重命名为zoo.cfg

  4.修改zoo.cfg的内容,添加的内容如下:

 

  dataDir=/home/leesf/program/zookeeper/data    //(注释:放置数据信息)

  dataLogDir=/home/leesf/program/zookeeper/log    //(注释:放置日志信息)

    server.1=127.0.0.1:2888:3888    //(注释:使用本地模式,如果有多个机器,可以进行配置(server.1=xxx.xxx.xxxx:xxxx:xxxx    

                

                    //server.2=xxx.xxx.xxx:xxxx:xxxx ....))

  5.在shell命令行里进入zookeeper-3.4.6/bin目录,使用如下命令可以开启、测试、停止zookeeper服务


  ./zkServer.sh start    //(注释:开启服务)

  ./zkServer.sh status    //(注释:查看状态)

  ./zkServer.sh stop    //(停止服务)


  截图如下:


image.png


三、配置storm

  1.下载storm,使用如下命令下载storm文件

  wget http://mirrors.hust.edu.cn/apache/storm/apache-storm-0.9.5/apache-storm-0.9.5.tar.gz

  2.进行解压缩,使用如下命令

  tar -zxvf apache-storm-0.9.5.tar.gz

  解压缩后出现文件夹apache-storm-0.9.5

  3.修改apache-storm-0.9.5/conf目录中的storm.yaml文件

  添加的内容如下: 

  # storm.zookeeper.servers:

  # - "127.0.0.1"

  #

  # nimbus.host: "127.0.0.1"

  #

  # storm.zookeeper.port:2181

  #

  # storm.local.dir: "/home/leesf/program/storm/data"

  #

  # supervisor.slots.ports:

  # -6700

  # -6701

  # -6702

  # -6703

  4.进入到apache-storm-0.9.5/bin目录下,启动nimbus、supervisor、ui,使用如下命令进行启动:

  ./storm nimbus

  ./storm supervisor

  ./storm ui

  截图如下:


image.png

image.png

image.png


5.在浏览器中查看storm ui信息,打开浏览器输入127.0.0.1:8080即可查看


  截图如下:


image.png


至此,storm的配置就完成了。


下面使用storm的本地模式来运行一个小的Demo,方便各位园友查看storm的运行效果

 

四、Storm Demo示例


  storm demo的目录结构如下

    1.spout包,数据发射源

    2.bolt包,数据处理节点

    3.main包,程序执行入口

    4.words.txt,程序资源文件


  分为如下几个步骤:


  1.添加源代码:


    1.spout包中包含一个java文件,WordReader.java,具体代码如下: 

  

package com.leesf.Spout;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class WordReader extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private FileReader fileReader;
    private boolean completed = false;
    public void ack(Object msgId) {
        System.out.println("OK:"+msgId);
    }
    public void close() {}
    public void fail(Object msgId) {
        System.out.println("FAIL:"+msgId);
    }
    /**
     * The only thing that the methods will do It is emit each 
     * file line
     */
    public void nextTuple() {
        /**
         * The nextuple it is called forever, so if we have been readed the file
         * we will wait and then return
         */
        if(completed){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                //Do nothing
            }
            return;
        }
        String str;
        //Open the reader
        BufferedReader reader = new BufferedReader(fileReader);
        try{
            //Read all lines
            while((str = reader.readLine()) != null){
                /**
                 * By each line emmit a new value with the line as a their
                 */
                this.collector.emit(new Values(str),str);
            }
        }catch(Exception e){
            throw new RuntimeException("Error reading tuple",e);
        }finally{
            completed = true;
        }
    }
    /**
     * We will create the file and get the collector object
     */
    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        try {
            this.fileReader = new FileReader(conf.get("wordsFile").toString());
        } catch (FileNotFoundException e) {
            throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
        }
        this.collector = collector;
    }
    /**
     * Declare the output field "word"
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("line"));
    }
}


2.bolt包中包含两个java文件,WordCounter.java、WordNormalizer.java,具体代码如下:

    WordCounter.java代码如下:


package com.leesf.Bolt;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
public class WordCounter extends BaseBasicBolt {
    Integer id;
    String name;
    Map<String, Integer> counters;
    /**
     * At the end of the spout (when the cluster is shutdown
     * We will show the word counters
     */
    @Override
    public void cleanup() {
        System.out.println("-- Word Counter ["+name+"-"+id+"] --");
        for(Map.Entry<String, Integer> entry : counters.entrySet()){
            System.out.println(entry.getKey()+": "+entry.getValue());
        }
    }
    /**
     * On create 
     */
    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        this.counters = new HashMap<String, Integer>();
        this.name = context.getThisComponentId();
        this.id = context.getThisTaskId();
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {}
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String str = input.getString(0);
        /**
         * If the word dosn't exist in the map we will create
         * this, if not We will add 1 
         */
        if(!counters.containsKey(str)){
            counters.put(str, 1);
        }else{
            Integer c = counters.get(str) + 1;
            counters.put(str, c);
        }
    }
}


    WordNormalizer.java代码如下:


package com.leesf.Bolt;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class WordNormalizer extends BaseBasicBolt {
    public void cleanup() {}
    /**
     * The bolt will receive the line from the
     * words file and process it to Normalize this line
     * 
     * The normalize will be put the words in lower case
     * and split the line to get all words in this 
     */
    public void execute(Tuple input, BasicOutputCollector collector) {
        String sentence = input.getString(0);
        String[] words = sentence.split(" ");
        for(String word : words){
            word = word.trim();
            if(!word.isEmpty()){
                word = word.toLowerCase();
                collector.emit(new Values(word));
            }
        }
    }
    /**
     * The bolt will only emit the field "word" 
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}


  3.main包中包含一个java文件,Main.java,具体代码如下:


package com.leesf.Main;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import com.leesf.Bolt.*;
import com.leesf.Spout.*;
public class Main {
    public static void main(String[] args) throws InterruptedException {
        //Topology definition
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("word-reader",new WordReader());
        builder.setBolt("word-normalizer", new WordNormalizer())
            .shuffleGrouping("word-reader");
        builder.setBolt("word-counter", new WordCounter(),1)
            .fieldsGrouping("word-normalizer", new Fields("word"));
        //Configuration
        Config conf = new Config();
        conf.put("wordsFile", "/home/leesf/code/eclipse/StormDemo/res/words.txt");
        conf.setDebug(false);
        //Topology run
        conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
        Thread.sleep(10000);
        cluster.shutdown();
    }
}


    4.资源文件,words.txt,内容如下:


storm
test
are
great
is
an
storm
simple
application
but
very
powerfull
really
StOrm
is
great


words.txt可以放在任何地方,相应的程序中的路径也要进行修改,保证路径一致。


  2.添加依赖库


  将storm/lib目录下的所有文件添加到本项目中,截图如下:


image.png


3.运行程序


  运行程序,可以得到如下的结果:


image.png


至此,关于storm的所有配置就已经完成了,下面可以进行相应的storm的开发了。


总结:storm在发布了0.9b版本以后,其配置工作就变得很简单,不再需要配置zeromq和jzmq,现在网上面的配置信息绝大部分都是0.9版本以前的,所以配置显得很累赘,在此记录此次的配置过程,方便各位园友的同时也方便自己以后再去配置这样的信息。在配置的过程中有任何问题也欢迎交流,谢谢各位观看。


 

 


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
1天前
|
关系型数据库 MySQL Linux
一文教会你如何在Linux系统中使用Docker安装Mysql 5.7版本 【详细过程+图解】
这篇文章提供了在Linux系统中使用Docker安装Mysql 5.7版本的详细过程和图解,包括安装指定版本、创建实例、启动、使用Navicat连接测试、文件挂载与端口映射、进入容器、配置文件修改以及重新启动容器等步骤。
一文教会你如何在Linux系统中使用Docker安装Mysql 5.7版本 【详细过程+图解】
|
4天前
|
Linux
关于linux的qt发布(linuxdeployqt)中opengl版本过高的解决
关于linux的qt发布(linuxdeployqt)中opengl版本过高的解决
|
7天前
|
安全 Linux 网络安全
Linux——OpenSSH如何升级到最新版本
Linux——OpenSSH如何升级到最新版本
28 0
Linux——OpenSSH如何升级到最新版本
|
17天前
|
Linux
Linux系统如何查看版本信息,内核、发行版、cpu、所有版本
Linux系统如何查看版本信息,内核、发行版、cpu、所有版本
|
1月前
|
Linux
查看linux内核版本
在Linux中查看内核版本可使用`uname -r`、`cat /proc/version`、`lsb_release -a`、`cat /etc/*release`、`dmesg | grep Linux`、`hostnamectl`、`kernrelease`(部分系统)、`rpm -q kernel`(RPM系统)或`dpkg -l linux-image-*`(Debian系)。
28 2
|
1月前
|
人工智能 Unix Linux
【初识Linux】Linux环境配置、Linux的基本指令 一
【初识Linux】Linux环境配置、Linux的基本指令 一
|
1月前
|
Ubuntu Linux UED
|
25天前
|
Ubuntu 机器人 Linux
Ubuntu查看ros版本-linux查看ros版本
通过上述方法,您可以轻松检查和确认您的Ubuntu或其他Linux系统上安装的ROS版本,以确保您的机器人项目能够顺利进行。
72 0
|
1月前
|
机器学习/深度学习 分布式计算 大数据
MaxCompute产品使用合集之如何从Dataphin使用界面查看版本
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。