Flume+Kafka+Storm实战:一、Kakfa与Storm整合(下)

简介: Flume+Kafka+Storm实战:一、Kakfa与Storm整合(下)

d. KafkaWordCountBolt

package com.shaonaiyi.kafka;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;
/**
 * @author: shaonaiyi
 * @createTime: 2019/07/14 13:42
 * @description: 单词计数bolt
 */
public class KafkaWordCountBolt extends BaseRichBolt {
    private OutputCollector collector;
    private HashMap<String, Long> counts = null; // 用于统计每隔单词的计数
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        this.counts = new HashMap<String, Long>();
    }
    @Override
    public void execute(Tuple tuple) { // 实时接收SplitSentenceBolt中输出的Tuple流
        String word = tuple.getStringByField("word"); // 根据key获取Tuple中的单词
        // 统计每一个单词总共出现的次数
        Long count = counts.getOrDefault(word, 0L);
        count++;
        this.counts.put(word, count);
        // 将每一个单词以及这个单词出现的次数作为Tuple中的value输出到下一个bolt中
        this.collector.emit(new Values(word, count.toString()));
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        // 输出Tuple的key,有两个key,是因为每次输出的value也有两个
        outputFieldsDeclarer.declare(new Fields("key", "message"));
    }
}


e. WordCountKafkaTopology

package com.shaonaiyi.kafka;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import java.util.Arrays;
import java.util.Properties;
/**
 * @author: shaonaiyi
 * @createTime: 2019/07/15 22:54
 * @description: Kafka之WordCountTopology
 */
public class WordCountKafkaTopology {
    private static final String SENTENCE_SPOUT_ID = "sentence-spout";
    private static final String SPLIT_BOLT_ID = "split-bolt";
    private static final String COUNT_BOLT_ID = "count-bolt";
    private static final String KAFKA_BOLT_ID = "kafka-bolt";
    private static final String TOPOLOGY_NAME = "word-count-topology";
    public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        int workers = Integer.parseInt(args[0]);
        // 从Kafka中消费数据
        KafkaSpout kafkaSpout = new KafkaSpoutBuilder()
                .brokers(Arrays.asList("master:9092"))
                .topic("word-count-input")
                .build();
        KafkaSplitSentenceBolt splitSentenceBolt = new KafkaSplitSentenceBolt();
        KafkaWordCountBolt wordCountBolt = new KafkaWordCountBolt();
        Properties props = new Properties();
        props.put("bootstrap.servers", "master:9092");
        // 此配置是表明当一次produce请求被认为完成时的确认值。
        // 特别是,多少个其他brokers必须已经提交了数据到他们的log并且向他们的leader确认了这些信息。典型的值包括:
        // 0: 表示producer从来不等待来自broker的确认信息(和0.7一样的行为)。
        // 这个选择提供了最小的时延但同时风险最大(因为当server宕机时,数据将会丢失)。
        // 1:表示获得leader replica已经接收了数据的确认信息。这个选择时延较小同时确保了server确认接收成功。
        // -1:producer会获得所有同步replicas都收到数据的确认
        props.put("acks", "1");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaBolt kafkaBolt = new KafkaBolt()
                .withProducerProperties(props)
                .withTopicSelector(new DefaultTopicSelector("word-count-output"))
                .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(SENTENCE_SPOUT_ID, kafkaSpout);
        builder.setBolt(SPLIT_BOLT_ID, splitSentenceBolt).shuffleGrouping(SENTENCE_SPOUT_ID);
        builder.setBolt(COUNT_BOLT_ID, wordCountBolt).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
        builder.setBolt(KAFKA_BOLT_ID, kafkaBolt).shuffleGrouping(COUNT_BOLT_ID);
        // 3、提交Topology
        Config config = new Config(); // 用来配置Topology运行时行为,对Topology所有组件全局生效的配置参数集合
        config.setNumWorkers(workers);
        StormSubmitter.submitTopology(TOPOLOGY_NAME, config, builder.createTopology()); // 提交Topology
    }
}


0x03 校验结果


1. 打包Storm代码

a. 打包

image.png


b. 上传到集群


微信图片_20220618200956.png


2. 执行ZK与Storm

此步骤与教程:实时流处理框架之Storm的安装与部署

=>

0x03 启动并校验Storm 步骤一样

即:

a. 启动集群上的三台Zookeeper(查看进程是否存在,如果Kafka已经启动,应该还有Kafka的进程)


微信图片_20220618201014.png


b. 启动Storm

在master上启动Nimbus和Web UI

cd ~/bigdata/apache-storm-1.2.2

nohup bin/storm nimbus 2>&1 &

然后回车,切换终端2,执行:

nohup bin/storm ui 2>&1 &

然后回车

在slave1和slave2上启动Supervisor

cd ~/bigdata/apache-storm-1.2.2

nohup bin/storm supervisor 2>&1 &


3. 执行Storm作业

a. 执行Storm作业


~/bigdata/apache-storm-1.2.2/bin/storm jar /home/hadoop-sny/jar/stormlearning-1.0-SNAPSHOT-jar-with-dependencies.jar com.shaonaiyi.kafka.WordCountKafkaTopology 1


image.png


b. 查看Web UI界面(master:8080


image.png


4. 校验过程

a. 目前各节点的进程情况


微信图片_20220618201056.png


b. 发送消息到Kafka


image.png


c. 查看消费者信息


image.png


d. 查看Storm的Web UI界面


微信图片_20220618201125.png


0xFF 总结


  1. 在生产者端多发送几个语句,你会发现这种统计的结果,并不是我们真正想要的结果,思考应该怎样才能想我们前面学习WordCount那种表现形式,请看后面的教程。
  2. 内容比较多,请大家认真操作。
相关文章
|
6月前
|
数据采集 消息中间件 监控
Flume数据采集系统设计与配置实战:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入探讨Apache Flume的数据采集系统设计,涵盖Flume Agent、Source、Channel、Sink的核心概念及其配置实战。通过实例展示了文件日志收集、网络数据接收、命令行实时数据捕获等场景。此外,还讨论了Flume与同类工具的对比、实际项目挑战及解决方案,以及未来发展趋势。提供配置示例帮助理解Flume在数据集成、日志收集中的应用,为面试准备提供扎实的理论与实践支持。
265 1
|
6月前
|
消息中间件 存储 监控
Flume+Kafka整合案例实现
Flume+Kafka整合案例实现
120 1
|
6月前
|
消息中间件 缓存 Java
✈️【Kafka技术专题】「开发实战篇」深入实战探索Kafka的生产者的开发实现及实战指南
✈️【Kafka技术专题】「开发实战篇」深入实战探索Kafka的生产者的开发实现及实战指南
76 0
|
6月前
|
存储 SQL Shell
bigdata-13-Flume实战
bigdata-13-Flume实战
47 0
|
18天前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
64 5
|
1月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
40 3
|
3月前
|
消息中间件 Java Kafka
"Kafka快速上手:从环境搭建到Java Producer与Consumer实战,轻松掌握分布式流处理平台"
【8月更文挑战第10天】Apache Kafka作为分布式流处理平台的领头羊,凭借其高吞吐量、可扩展性和容错性,在大数据处理、实时日志收集及消息队列领域表现卓越。初学者需掌握Kafka基本概念与操作。Kafka的核心组件包括Producer(生产者)、Broker(服务器)和Consumer(消费者)。Producer发送消息到Topic,Broker负责存储与转发,Consumer则读取这些消息。首先确保已安装Java和Kafka,并启动服务。接着可通过命令行创建Topic,并使用提供的Java API实现Producer发送消息和Consumer读取消息的功能。
71 8
|
5月前
|
消息中间件 存储 Kafka
go语言并发实战——日志收集系统(二) Kafka简介
go语言并发实战——日志收集系统(二) Kafka简介
117 1
|
5月前
|
消息中间件 数据挖掘 Kafka
使用 Flume 将 CSV 数据导入 Kafka:实现实时数据流
使用 Flume 将 CSV 数据导入 Kafka:实现实时数据流
126 2