0x00 教程内容
- 新建Storm项目
- 代码实现
- 效果校验
版本说明:
a. windows本地使用的JDK版本为1.8
0x01 新建Storm项目
1. 项目结构
a. 建Maven项目,建package:
2. 添加Maven依赖
a. 我的Storm版本为:1.2.2
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.2.2</version> </dependency>
0x02 代码实现
1. SentenceSpout
package com.shaonaiyi; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import java.util.Map; import java.util.concurrent.TimeUnit; /** * @Auther: 邵奈一 * @Date: 2019/05/20 下午 6:24 * @Description: 语句Spout */ /** * 语句数据源spout * 将每一行语句转化为key-value的Tuple,输出给下一个语句分割bolt * 比如,语句为:i like shaonaiyi * 则输出为:Tuple("sentence" -> "i like shaonaiyi") */ public class SentenceSpout extends BaseRichSpout { private SpoutOutputCollector collector; // 定义随机生成的模拟数据 private String[] sentences = { "i like shaonaiyi", "shao naiyi shao naiyi", "i like shaonaiyi", "hello shaonaiyi ", "i am a superman", "i can do it" }; // 随机发送的语句的index private int index = 0; public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { // Tuple输出器 this.collector = spoutOutputCollector; } public void nextTuple() { // 转化当前语句为Tuple中的value,并发送数据到下一个bolt中 this.collector.emit(new Values(sentences[index])); index++; if (index >= sentences.length) { index = 0; } // 模拟生成语句的速度 try { TimeUnit.MILLISECONDS.sleep(30); } catch (InterruptedException e) { e.printStackTrace(); } } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { // 申明输出的Tuple的key outputFieldsDeclarer.declare(new Fields("sentence")); } }
2. SplitSentenceBolt
package com.shaonaiyi; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Map; /** * @Auther: 邵奈一 * @Date: 2019/05/20 下午 6:37 * @Description: 语句切割bolt */ /** * 语句分割bolt * 如,接收的Tuple是:Tuple("sentence" -> "i like shaonaiyi") * 则,输出的Tuple为: * Tuple("word" -> "i") * Tuple("word" -> "like") * Tuple("word" -> "shaonaiyi") */ public class SplitSentenceBolt extends BaseRichBolt { private OutputCollector collector; public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; } public void execute(Tuple tuple) { // 实时接收SentenceSpout中输出的Tuple流 String sentence = tuple.getStringByField("sentence"); // 根据key获取Tuple中的语句 String[] words = sentence.split(" "); // 按照空格进行切割 for (String word: words) { this.collector.emit(new Values(word)); // 将切割后的每一个单词作为Tuple的value输出到下一个bolt中 } } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word")); // 输出Tuple的key } }
3. WordCountBolt
package com.shaonaiyi; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.HashMap; import java.util.Map; /** * @Auther: 邵奈一 * @Date: 2019/05/20 下午 6:40 * @Description: 词频统计bolt */ /** * 单词计数bolt * 比如,接收到的Tuple为:Tuple("word" -> "shaonaiyi") * 则,输出的Tuple为:Tuple("word" -> "shaonaiyi", "count" -> 97) */ public class WordCountBolt extends BaseRichBolt { private OutputCollector collector; private HashMap<String, Long> counts = null; // 统计每个单词的计数 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; this.counts = new HashMap<String, Long>(); } public void execute(Tuple tuple) { String word = tuple.getStringByField("word"); // 根据key获取Tuple中的单词 // 统计每一个单词总共出现的次数 Long count = counts.getOrDefault(word, 0L); count++; this.counts.put(word, count); // 将每一个单词以及这个单词出现的次数作为Tuple中的value输出到下一个bolt中 this.collector.emit(new Values(word, count)); } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { // 输出Tuple的key,有两个key,是因为每次输出的value也有两个 outputFieldsDeclarer.declare(new Fields("word", "count")); } }
4. ReportBolt
package com.shaonaiyi; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; import java.util.Map; /** * @Auther: 邵奈一 * @Date: 2019/05/20 下午 6:42 * @Description: 报告bolt */ public class ReportBolt extends BaseRichBolt { @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { } @Override public void execute(Tuple tuple) { // 实时接收WordCountBolt中输出的Tuple流 String word = tuple.getStringByField("word"); // 根据word拿到对应的单词 Long count = tuple.getLongByField("count"); // 拿到对应单词出现的次数 System.out.println(word + " -> " + count); // 输出统计结果到到控制台 } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { // 数据末端的bolt,不需要再对接,所以不再声明 } }
5. WordCountTopology
package com.shaonaiyi; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; /** * @Auther: 邵奈一 * @Date: 2019/05/20 下午 6:44 * @Description: 程序Topology */ public class WordCountTopology { private static final String SENTENCE_SPOUT_ID = "sentence-spout"; private static final String SPLIT_BOLT_ID = "split-bolt"; private static final String COUNT_BOLT_ID = "count-bolt"; private static final String REPORT_BOLT_ID = "report-bolt"; private static final String TOPOLOGY_NAME = "word-count-topology"; public static void main(String[] args) throws InterruptedException { // 1、实例化所有的spout和bolt SentenceSpout sentenceSpout = new SentenceSpout(); SplitSentenceBolt splitSentenceBolt = new SplitSentenceBolt(); WordCountBolt wordCountBolt = new WordCountBolt(); ReportBolt reportBolt = new ReportBolt(); // 2、实例化Topology构造器 TopologyBuilder builder = new TopologyBuilder(); // 2.1、设置spout,并且为这个spout设置一个唯一的id builder.setSpout(SENTENCE_SPOUT_ID, sentenceSpout); // 2.2、设置第一个bolt,并给它设置一个唯一的id,然后指定这个bolt要消费哪一个组件的消息 // shuffleGrouping(SENTENCE_SPOUT_ID)方法是告诉Storm, // 要将SentenceSpout输出的Tuple随机均匀的的分发给SplitSentenceBolt builder.setBolt(SPLIT_BOLT_ID, splitSentenceBolt).shuffleGrouping(SENTENCE_SPOUT_ID); // 2.3、将WordCountBolt和SplitSentenceBolt连接起来,WordCountBolt将消费SplitSentenceBolt输出的Tuple数据 // 但是我们需要将相同的单词发送到同一个bolt中,所以我们需要用到fieldsGrouping // fieldsGrouping()就是保证所有的"word"字段值相同的Tuple都会被路由到同一个WordCountBolt实例中 builder.setBolt(COUNT_BOLT_ID, wordCountBolt).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word")); // 2.4、定义数据流的最后一步是将WordCountBolt的数据输出到ReportBolt中 // 我们希望WordCountBolt输出的所有的Tuple路由到唯一的ReportBolt实例任务中,所以我们使用globalGrouping builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID); // 3、提交Topology Config config = new Config(); // 用来配置Topology运行时行为,对Topology所有组件全局生效的配置参数集合 LocalCluster cluster = new LocalCluster(); // 使用LocalCluster类在本地模拟一个完成的Storm集群,从而可以来进行方便的本地开发和调试 cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology()); // 提交Topology // 4、20秒后停止本地集群 Thread.sleep(20000); cluster.killTopology(TOPOLOGY_NAME); cluster.shutdown(); } }
0x03 效果校验与代码讲解
1. 本地执行
a. 数据会源源不断地统计出来
2. 代码讲解
a. WordCountTopology类
builder.setBolt(SPLIT_BOLT_ID, splitSentenceBolt).shuffleGrouping(SENTENCE_SPOUT_ID);
设置splitSentenceBolt接受的数据来源于SentenceSpout,shuffleGrouping表示是均匀地接收,因为如果是集群的话,可能有几个Spout、几个Bolt。
builder.setBolt(COUNT_BOLT_ID, wordCountBolt).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
我们也可能有几个Bolt,但是,单词相同的,要分配到同一个Bolt里面去,然后才能累加,通过fieldsGrouping来实现。
builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);
我们希望wordCountBolt所有的Bolt的数据都发送到reportBolt中,可以通过globalGrouping来实现。
Thread.sleep(20000);
设置了20秒后程序会停止。
0xFF 总结
- 后期教程会部署Storm集群,已经打包项目到集群执行。
- 请自行尝试去设置一下日志级别!