Storm第一个入门例子之Wordcount(windows本地)

简介: Storm第一个入门例子之Wordcount(windows本地)

0x00 教程内容


  1. 新建Storm项目
  2. 代码实现
  3. 效果校验

版本说明:

a. windows本地使用的JDK版本为1.8


0x01 新建Storm项目


1. 项目结构

a. 建Maven项目,建package:

image.png


2. 添加Maven依赖

a. 我的Storm版本为:1.2.2

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.2.2</version>
</dependency>


0x02 代码实现


1. SentenceSpout
package com.shaonaiyi;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
 * @Auther: 邵奈一
 * @Date: 2019/05/20 下午 6:24
 * @Description: 语句Spout
 */
/**
 *  语句数据源spout
 *  将每一行语句转化为key-value的Tuple,输出给下一个语句分割bolt
 *  比如,语句为:i like shaonaiyi
 *  则输出为:Tuple("sentence" -> "i like shaonaiyi")
 */
public class SentenceSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    // 定义随机生成的模拟数据
    private String[] sentences = {
            "i like shaonaiyi",
            "shao naiyi shao naiyi",
            "i like shaonaiyi",
            "hello shaonaiyi ",
            "i am a superman",
            "i can do it"
    };
    // 随机发送的语句的index
    private int index = 0;
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        // Tuple输出器
        this.collector = spoutOutputCollector;
    }
    public void nextTuple() {
        // 转化当前语句为Tuple中的value,并发送数据到下一个bolt中
        this.collector.emit(new Values(sentences[index]));
        index++;
        if (index >= sentences.length) {
            index = 0;
        }
        // 模拟生成语句的速度
        try {
            TimeUnit.MILLISECONDS.sleep(30);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        // 申明输出的Tuple的key
        outputFieldsDeclarer.declare(new Fields("sentence"));
    }
}


2. SplitSentenceBolt
package com.shaonaiyi;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
/**
 * @Auther: 邵奈一
 * @Date: 2019/05/20 下午 6:37
 * @Description: 语句切割bolt
 */
/**
 * 语句分割bolt
 * 如,接收的Tuple是:Tuple("sentence" -> "i like shaonaiyi")
 * 则,输出的Tuple为:
 *      Tuple("word" -> "i")
 *      Tuple("word" -> "like")
 *      Tuple("word" -> "shaonaiyi")
 */
public class SplitSentenceBolt extends BaseRichBolt {
    private OutputCollector collector;
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }
    public void execute(Tuple tuple) { // 实时接收SentenceSpout中输出的Tuple流
        String sentence = tuple.getStringByField("sentence"); // 根据key获取Tuple中的语句
        String[] words = sentence.split(" "); // 按照空格进行切割
        for (String word: words) {
            this.collector.emit(new Values(word)); // 将切割后的每一个单词作为Tuple的value输出到下一个bolt中
        }
    }
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word")); // 输出Tuple的key
    }
}


3. WordCountBolt
package com.shaonaiyi;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;
/**
 * @Auther: 邵奈一
 * @Date: 2019/05/20 下午 6:40
 * @Description: 词频统计bolt
 */
/**
 * 单词计数bolt
 * 比如,接收到的Tuple为:Tuple("word" -> "shaonaiyi")
 * 则,输出的Tuple为:Tuple("word" -> "shaonaiyi", "count" -> 97)
 */
public class WordCountBolt extends BaseRichBolt {
    private OutputCollector collector;
    private HashMap<String, Long> counts = null; // 统计每个单词的计数
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        this.counts = new HashMap<String, Long>();
    }
    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word"); // 根据key获取Tuple中的单词
        // 统计每一个单词总共出现的次数
        Long count = counts.getOrDefault(word, 0L);
        count++;
        this.counts.put(word, count);
        // 将每一个单词以及这个单词出现的次数作为Tuple中的value输出到下一个bolt中
        this.collector.emit(new Values(word, count));
    }
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        // 输出Tuple的key,有两个key,是因为每次输出的value也有两个
        outputFieldsDeclarer.declare(new Fields("word", "count"));
    }
}


4. ReportBolt
package com.shaonaiyi;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import java.util.Map;
/**
 * @Auther: 邵奈一
 * @Date: 2019/05/20 下午 6:42
 * @Description: 报告bolt
 */
public class ReportBolt extends BaseRichBolt {
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    }
    @Override
    public void execute(Tuple tuple) { // 实时接收WordCountBolt中输出的Tuple流
        String word = tuple.getStringByField("word"); // 根据word拿到对应的单词
        Long count = tuple.getLongByField("count"); // 拿到对应单词出现的次数
        System.out.println(word + " -> " + count); // 输出统计结果到到控制台
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        // 数据末端的bolt,不需要再对接,所以不再声明
    }
}


5. WordCountTopology
package com.shaonaiyi;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
/**
 * @Auther: 邵奈一
 * @Date: 2019/05/20 下午 6:44
 * @Description: 程序Topology
 */
public class WordCountTopology {
    private static final String SENTENCE_SPOUT_ID = "sentence-spout";
    private static final String SPLIT_BOLT_ID = "split-bolt";
    private static final String COUNT_BOLT_ID = "count-bolt";
    private static final String REPORT_BOLT_ID = "report-bolt";
    private static final String TOPOLOGY_NAME = "word-count-topology";
    public static void main(String[] args) throws InterruptedException {
        // 1、实例化所有的spout和bolt
        SentenceSpout sentenceSpout = new SentenceSpout();
        SplitSentenceBolt splitSentenceBolt = new SplitSentenceBolt();
        WordCountBolt wordCountBolt = new WordCountBolt();
        ReportBolt reportBolt = new ReportBolt();
        // 2、实例化Topology构造器
        TopologyBuilder builder = new TopologyBuilder();
        // 2.1、设置spout,并且为这个spout设置一个唯一的id
        builder.setSpout(SENTENCE_SPOUT_ID, sentenceSpout);
        // 2.2、设置第一个bolt,并给它设置一个唯一的id,然后指定这个bolt要消费哪一个组件的消息
        // shuffleGrouping(SENTENCE_SPOUT_ID)方法是告诉Storm,
        // 要将SentenceSpout输出的Tuple随机均匀的的分发给SplitSentenceBolt
        builder.setBolt(SPLIT_BOLT_ID, splitSentenceBolt).shuffleGrouping(SENTENCE_SPOUT_ID);
        // 2.3、将WordCountBolt和SplitSentenceBolt连接起来,WordCountBolt将消费SplitSentenceBolt输出的Tuple数据
        // 但是我们需要将相同的单词发送到同一个bolt中,所以我们需要用到fieldsGrouping
        // fieldsGrouping()就是保证所有的"word"字段值相同的Tuple都会被路由到同一个WordCountBolt实例中
        builder.setBolt(COUNT_BOLT_ID, wordCountBolt).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
        // 2.4、定义数据流的最后一步是将WordCountBolt的数据输出到ReportBolt中
        // 我们希望WordCountBolt输出的所有的Tuple路由到唯一的ReportBolt实例任务中,所以我们使用globalGrouping
        builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);
        // 3、提交Topology
        Config config = new Config(); // 用来配置Topology运行时行为,对Topology所有组件全局生效的配置参数集合
        LocalCluster cluster = new LocalCluster(); // 使用LocalCluster类在本地模拟一个完成的Storm集群,从而可以来进行方便的本地开发和调试
        cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology()); // 提交Topology
        // 4、20秒后停止本地集群
        Thread.sleep(20000);
        cluster.killTopology(TOPOLOGY_NAME);
        cluster.shutdown();
    }
}


0x03 效果校验与代码讲解


1. 本地执行

a. 数据会源源不断地统计出来

image.png


2. 代码讲解

a. WordCountTopology类

builder.setBolt(SPLIT_BOLT_ID, splitSentenceBolt).shuffleGrouping(SENTENCE_SPOUT_ID);

设置splitSentenceBolt接受的数据来源于SentenceSpout,shuffleGrouping表示是均匀地接收,因为如果是集群的话,可能有几个Spout、几个Bolt。


builder.setBolt(COUNT_BOLT_ID, wordCountBolt).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));

我们也可能有几个Bolt,但是,单词相同的,要分配到同一个Bolt里面去,然后才能累加,通过fieldsGrouping来实现。


builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);

我们希望wordCountBolt所有的Bolt的数据都发送到reportBolt中,可以通过globalGrouping来实现。


Thread.sleep(20000);

设置了20秒后程序会停止。


0xFF 总结


  1. 后期教程会部署Storm集群,已经打包项目到集群执行。
  2. 请自行尝试去设置一下日志级别!
相关文章
|
8月前
|
Shell Linux 开发工具
Git入门(windows系统)
Git入门(windows系统)
78 1
|
3月前
|
存储 NoSQL MongoDB
MongoDB入门级别教程全(Windows版,保姆级教程)
一份全面的MongoDB入门级教程,包括在Windows系统上安装MongoDB、使用MongoDB Shell和Compass GUI进行数据库操作,以及MongoDB的基本数据类型和查询技巧。
126 2
MongoDB入门级别教程全(Windows版,保姆级教程)
|
5月前
|
数据库 Windows
超详细步骤解析:从零开始,手把手教你使用 Visual Studio 打造你的第一个 Windows Forms 应用程序,菜鸟也能轻松上手的编程入门指南来了!
【8月更文挑战第31天】创建你的第一个Windows Forms (WinForms) 应用程序是一个激动人心的过程,尤其适合编程新手。本指南将带你逐步完成一个简单WinForms 应用的开发。首先,在Visual Studio 中创建一个“Windows Forms App (.NET)”项目,命名为“我的第一个WinForms 应用”。接着,在空白窗体中添加一个按钮和一个标签控件,并设置按钮文本为“点击我”。然后,为按钮添加点击事件处理程序`button1_Click`,实现点击按钮后更新标签文本为“你好,你刚刚点击了按钮!”。
380 0
|
5月前
|
开发者 iOS开发 C#
Uno Platform 入门超详细指南:从零开始教你打造兼容 Web、Windows、iOS 和 Android 的跨平台应用,轻松掌握 XAML 与 C# 开发技巧,快速上手示例代码助你迈出第一步
【8月更文挑战第31天】Uno Platform 是一个基于 Microsoft .NET 的开源框架,支持使用 C# 和 XAML 构建跨平台应用,适用于 Web(WebAssembly)、Windows、Linux、macOS、iOS 和 Android。它允许开发者共享几乎全部的业务逻辑和 UI 代码,同时保持原生性能。选择 Uno Platform 可以统一开发体验,减少代码重复,降低开发成本。安装时需先配置好 Visual Studio 或 Visual Studio for Mac,并通过 NuGet 或官网下载工具包。
471 0
|
5月前
|
Kubernetes Cloud Native 开发者
探索云原生技术:Kubernetes入门与实践探索Windows操作系统的隐藏功能
【8月更文挑战第31天】在数字化转型的浪潮中,云原生技术成为企业提升敏捷性、效率和可靠性的关键。本文将带你了解云原生的核心组件之一——Kubernetes(K8s),通过浅显易懂的语言和实际代码示例,引导你步入这一强大工具的世界。无论你是初学者还是有经验的开发者,本篇都将为你打开一扇通向高效资源管理与自动化部署的大门。
|
8月前
|
API C++ Windows
windows编程入门_链接错误的配置
windows编程入门_链接错误的配置
62 0
|
8月前
|
机器人 Linux 数据安全/隐私保护
Python办公自动化【Windows中定时任务、OS/linux 系统定时任务 、Python 钉钉发送消息、Python 钉钉发送图片】(九)-全面详解(学习总结---从入门到深化)
Python办公自动化【Windows中定时任务、OS/linux 系统定时任务 、Python 钉钉发送消息、Python 钉钉发送图片】(九)-全面详解(学习总结---从入门到深化)
274 0
|
8月前
|
机器人 Linux 数据安全/隐私保护
Python办公自动化【Windows中定时任务、OS/linux 系统定时任务 、Python 钉钉发送消息、Python 钉钉发送图片】(九)-全面详解(学习总结---从入门到深化)(下)
Python办公自动化【Windows中定时任务、OS/linux 系统定时任务 、Python 钉钉发送消息、Python 钉钉发送图片】(九)-全面详解(学习总结---从入门到深化)
135 0
|
8月前
|
Linux Python Windows
Python办公自动化【Windows中定时任务、OS/linux 系统定时任务 、Python 钉钉发送消息、Python 钉钉发送图片】(九)-全面详解(学习总结---从入门到深化)(上)
Python办公自动化【Windows中定时任务、OS/linux 系统定时任务 、Python 钉钉发送消息、Python 钉钉发送图片】(九)-全面详解(学习总结---从入门到深化)
90 0
|
8月前
|
SQL Shell 数据库
七天.NET 8操作SQLite入门到实战 - 第二天 在 Windows 上配置 SQLite环境
七天.NET 8操作SQLite入门到实战 - 第二天 在 Windows 上配置 SQLite环境