0x00 文章内容
- Kafka准备
- Storm准备
- 校验结果
0x01 Kafka准备
1. 启动Kafka
a. 后台启动Kafka(三台都要启动)
nohup ~/bigdata/kafka_2.11-1.0.0/bin/kafka-server-start.sh ~/bigdata/kafka_2.11-1.0.0/config/server.properties >~/bigdata/kafka_2.11-1.0.0/logs/server.log 2>&1 &
2. 创建Topic
a. 创建Topic:word-count-input
~/bigdata/kafka_2.11-1.0.0/bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic word-count-input
b. 创建Topic:word-count-output
~/bigdata/kafka_2.11-1.0.0/bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic word-count-output
3. 启动消费者与消费者
a. 启动一个producer,向word-count-input
发送消息
进入到$KAFKA_HOME
路径:
cd ~/bigdata/kafka_2.11-1.0.0
启动:
bin/kafka-console-producer.sh --broker-list master:9092 --topic word-count-input
b. 启动一个consumer,消费word-count-output
的消息
bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic word-count-output --property print.key=true
0x02 Storm准备
1. 构建Maven项目
a. 引入Storm依赖
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.2.2</version> <scope>provided</scope> </dependency>
b. 引入Kafka依赖
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka-client</artifactId> <version>1.2.2</version> </dependency>
c. 引入额外打包插件
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> <testExcludes> <testExclude>/src/test/**</testExclude> </testExcludes> <encoding>utf-8</encoding> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <!-- this is used for inheritance merges --> <phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 --> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin>
d. 完整的pom.xml
文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.shaonaiyi</groupId> <artifactId>stormlearning</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.2.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka-client</artifactId> <version>1.2.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> <testExcludes> <testExclude>/src/test/**</testExclude> </testExcludes> <encoding>utf-8</encoding> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <!-- this is used for inheritance merges --> <phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 --> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
2. 编写代码
a. 项目代码结构
b. KafkaSpoutBuilder
package com.shaonaiyi.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.storm.kafka.spout.KafkaSpout; import org.apache.storm.kafka.spout.KafkaSpoutConfig; import java.util.List; /** * @author: shaonaiyi * @createTime: 2019/07/14 13:32 * @description: KafkaSpout构建器 */ public class KafkaSpoutBuilder { private List<String> brokers; private String topic; public KafkaSpoutBuilder brokers(List<String> v) { brokers = v; return this; } public KafkaSpoutBuilder topic(String v) { topic = v; return this; } public KafkaSpout build() { /** 配置kafka * 1. 需要设置consumer group(注意一个partition中的消息只能被同一group中的一个consumer消费) * 2. 起始消费策略:根据业务需要配置 */ String allBrokers = String.join(",", brokers); KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig .builder(allBrokers, topic) .setProp(ConsumerConfig.GROUP_ID_CONFIG, "word-count-storm") //消费最新的数据 .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) .build(); return new KafkaSpout(conf); } }
c. KafkaSplitSentenceBolt
package com.shaonaiyi.kafka; /** * @author: shaonaiyi * @createTime: 2019/07/14 13:38 * @description: 语句分割bolt */ import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Map; /** * 如,接收的Tuple是:Tuple("sentence" -> "I love teacher shao") * 则,输出的Tuple为: * Tuple("word" -> "I") * Tuple("word" -> "love") * Tuple("word" -> "teacher") * Tuple("word" -> "shao") */ public class KafkaSplitSentenceBolt extends BaseRichBolt { private OutputCollector collector; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; } @Override public void execute(Tuple tuple) { // 实时接收SentenceSpout中输出的Tuple流 String sentence = tuple.getStringByField("value"); // 根据key获取Tuple中的语句,"value"是Kafka中固定了的 String[] words = sentence.split(" "); // 将语句按照空格进行切割 for (String word: words) { this.collector.emit(new Values(word)); // 将切割之后的每一个单词作为Tuple的value输出到下一个bolt中 } this.collector.ack(tuple); // 表示成功处理kafka-spout输出的消息,需要应答,要不然,kafka-spout会不断的重复发送消息 } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word")); // 输出Tuple的key } }