Flume+Kafka+Storm实战:一、Kakfa与Storm整合(上)

简介: Flume+Kafka+Storm实战:一、Kakfa与Storm整合(上)

0x00 文章内容


  1. Kafka准备
  2. Storm准备
  3. 校验结果


0x01 Kafka准备


1. 启动Kafka

a. 后台启动Kafka(三台都要启动)

nohup ~/bigdata/kafka_2.11-1.0.0/bin/kafka-server-start.sh ~/bigdata/kafka_2.11-1.0.0/config/server.properties >~/bigdata/kafka_2.11-1.0.0/logs/server.log 2>&1 &


2. 创建Topic

a. 创建Topic:word-count-input


~/bigdata/kafka_2.11-1.0.0/bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic word-count-input


b. 创建Topic:word-count-output


~/bigdata/kafka_2.11-1.0.0/bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic word-count-output


3. 启动消费者与消费者

a. 启动一个producer,向word-count-input发送消息

进入到$KAFKA_HOME路径:

cd ~/bigdata/kafka_2.11-1.0.0

启动:

bin/kafka-console-producer.sh --broker-list master:9092 --topic word-count-input


image.png


b. 启动一个consumer,消费word-count-output的消息

bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic word-count-output --property print.key=true


image.png


0x02 Storm准备


1. 构建Maven项目

a. 引入Storm依赖

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.2.2</version>
    <scope>provided</scope>
</dependency>


b. 引入Kafka依赖

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-kafka-client</artifactId>
    <version>1.2.2</version>
</dependency>


c. 引入额外打包插件

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.1</version>
    <configuration>
        <source>1.8</source>
        <target>1.8</target>
        <testExcludes>
            <testExclude>/src/test/**</testExclude>
        </testExcludes>
        <encoding>utf-8</encoding>
    </configuration>
</plugin>
<plugin>
    <artifactId>maven-assembly-plugin</artifactId>
    <configuration>
        <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
    </configuration>
    <executions>
        <execution>
            <id>make-assembly</id> <!-- this is used for inheritance merges -->
            <phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 -->
            <goals>
                <goal>single</goal>
            </goals>
        </execution>
    </executions>
</plugin>


d. 完整的pom.xml文件


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.shaonaiyi</groupId>
    <artifactId>stormlearning</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core -->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>1.2.2</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka-client</artifactId>
            <version>1.2.2</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <testExcludes>
                        <testExclude>/src/test/**</testExclude>
                    </testExcludes>
                    <encoding>utf-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id> <!-- this is used for inheritance merges -->
                        <phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 -->
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>


2. 编写代码

a. 项目代码结构

image.png


b. KafkaSpoutBuilder

package com.shaonaiyi.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import java.util.List;
/**
 * @author: shaonaiyi
 * @createTime: 2019/07/14 13:32
 * @description: KafkaSpout构建器
 */
public class KafkaSpoutBuilder {
    private List<String> brokers;
    private String topic;
    public KafkaSpoutBuilder brokers(List<String> v) {
        brokers = v;
        return this;
    }
    public KafkaSpoutBuilder topic(String v) {
        topic = v;
        return this;
    }
    public KafkaSpout build() {
        /** 配置kafka
         * 1. 需要设置consumer group(注意一个partition中的消息只能被同一group中的一个consumer消费)
         * 2. 起始消费策略:根据业务需要配置
         */
        String allBrokers = String.join(",", brokers);
        KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig
                .builder(allBrokers, topic)
                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "word-count-storm")
                //消费最新的数据
                .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
                .build();
        return new KafkaSpout(conf);
    }
}


c. KafkaSplitSentenceBolt

package com.shaonaiyi.kafka;
/**
 * @author: shaonaiyi
 * @createTime: 2019/07/14 13:38
 * @description: 语句分割bolt
 */
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
/**
 * 如,接收的Tuple是:Tuple("sentence" -> "I love teacher shao")
 * 则,输出的Tuple为:
 *      Tuple("word" -> "I")
 *      Tuple("word" -> "love")
 *      Tuple("word" -> "teacher")
 *      Tuple("word" -> "shao")
 */
public class KafkaSplitSentenceBolt extends BaseRichBolt {
    private OutputCollector collector;
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }
    @Override
    public void execute(Tuple tuple) { // 实时接收SentenceSpout中输出的Tuple流
        String sentence = tuple.getStringByField("value"); // 根据key获取Tuple中的语句,"value"是Kafka中固定了的
        String[] words = sentence.split(" "); // 将语句按照空格进行切割
        for (String word: words) {
            this.collector.emit(new Values(word)); // 将切割之后的每一个单词作为Tuple的value输出到下一个bolt中
        }
        this.collector.ack(tuple); // 表示成功处理kafka-spout输出的消息,需要应答,要不然,kafka-spout会不断的重复发送消息
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word")); // 输出Tuple的key
    }
}
相关文章
|
7月前
|
数据采集 消息中间件 监控
Flume数据采集系统设计与配置实战:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入探讨Apache Flume的数据采集系统设计,涵盖Flume Agent、Source、Channel、Sink的核心概念及其配置实战。通过实例展示了文件日志收集、网络数据接收、命令行实时数据捕获等场景。此外,还讨论了Flume与同类工具的对比、实际项目挑战及解决方案,以及未来发展趋势。提供配置示例帮助理解Flume在数据集成、日志收集中的应用,为面试准备提供扎实的理论与实践支持。
312 1
|
7月前
|
消息中间件 存储 监控
Flume+Kafka整合案例实现
Flume+Kafka整合案例实现
150 1
|
7月前
|
消息中间件 缓存 Java
✈️【Kafka技术专题】「开发实战篇」深入实战探索Kafka的生产者的开发实现及实战指南
✈️【Kafka技术专题】「开发实战篇」深入实战探索Kafka的生产者的开发实现及实战指南
83 0
|
7月前
|
存储 SQL Shell
bigdata-13-Flume实战
bigdata-13-Flume实战
50 0
|
1月前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
83 5
|
2月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
49 3
|
4月前
|
消息中间件 Java Kafka
"Kafka快速上手:从环境搭建到Java Producer与Consumer实战,轻松掌握分布式流处理平台"
【8月更文挑战第10天】Apache Kafka作为分布式流处理平台的领头羊,凭借其高吞吐量、可扩展性和容错性,在大数据处理、实时日志收集及消息队列领域表现卓越。初学者需掌握Kafka基本概念与操作。Kafka的核心组件包括Producer(生产者)、Broker(服务器)和Consumer(消费者)。Producer发送消息到Topic,Broker负责存储与转发,Consumer则读取这些消息。首先确保已安装Java和Kafka,并启动服务。接着可通过命令行创建Topic,并使用提供的Java API实现Producer发送消息和Consumer读取消息的功能。
87 8
|
6月前
|
消息中间件 存储 Kafka
go语言并发实战——日志收集系统(二) Kafka简介
go语言并发实战——日志收集系统(二) Kafka简介
132 1
|
6月前
|
消息中间件 数据挖掘 Kafka
使用 Flume 将 CSV 数据导入 Kafka:实现实时数据流
使用 Flume 将 CSV 数据导入 Kafka:实现实时数据流
153 2
下一篇
DataWorks