【大数据技术】Spark+Flume+Kafka实现商品实时交易数据统计分析实战(附源码)

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 【大数据技术】Spark+Flume+Kafka实现商品实时交易数据统计分析实战(附源码)

需要源码请点赞关注收藏后评论区留言私信~~~

Flume、Kafka区别和侧重点

1)Kafka 是一个非常通用的系统,你可以有许多生产者和消费者共享多个主题Topics。相比之下,Flume是一个专用工具被设计为旨在往HDFS,HBase等发送数据。它对HDFS有特殊的优化,并且集成了Hadoop的安全特性。如果数据被多个系统消费的话,使用kafka;如果数据有多个生产者场景,或者有写入Hbase、HDFS操作,使用Flume。

2)Flume可以使用拦截器实时处理数据。而Kafka需要外部的流处理系统才能做到。

3)Kafka和Flume都是可靠的系统,通过适当的配置能保证零数据丢失。然而,Flume不支持副本事件。如果Flume代理的一个节点奔溃了,即使使用了可靠的文件管道方式,你也将丢失这些事件直到你恢复这些磁盘。如果你需要一个高可靠行的管道,那么使用Kafka是个更好的选择。

Spark Streaming与Flume、Kafka整合与开发

此开发示例的功能是商品实时交易数据统计分析,通过Flume实时收集交易订单,将数据分发Kafka,Kafka将数据传输到Spark Streaming,Spark Streaming统计商品的销售量。实现主要有以下几个步骤:

1)通过LOG日志模拟产生实时交易数据

2)Flume收集模拟产生实时交易数据

3)Flume将数据发送给Kafka消息队列

4)Spark Streaming接收Kafka消息队列的消息,每5秒进行数据统计

具体实现如下:

1)新建MAVEN项目,名称为RealtimeAnalysis,新建过程请见第9章。在POM.XML文件中加入依赖包

2)在工程的resource目录下新建log4j.properties文件,其中注意的是log4j.appender.flume.Hostname的配置,要配置成你安装flume的服务器

3)在工程的test目录下新建java类LoggerGenerator,此类用于不断模拟产生订单交易数据,在此类中每6秒调用一次PaymentInfo交易的实体类的random方法是模拟产生订单交易数据方法,数据以JSON格式返回。其中PaymentInfo是交易的实体类,用三个成员变量,分别是订单编号、商品编号、商品价格,LoggerGenerator为模拟日志生成类

4)在安装Flume服务器的conf目录下新建文件log4j_flume.properties,其中注意的是sinks.kafka_sink.brokerList配置的是连接Kafka集群的地址和端口号

5)启动flume,命令如下:

./kafka-server-start.sh /hadoop/kafka_2.11-2.4.1/config/server.properties &

6)新建topic,名称为 logtoflume,命令如下:

kafka-topics.sh
  --zookeeper 172.16.106.69:2181,172.16.106.70:2181,172.16.106.71:2181
 --topic logtoflume --replication-factor 1 --partitions 1  --create

7)新建scala类KafkaConsumerMsg,接收kafka下的topic队列,名称为logtoflume的数据,并做统计

8)启动LoggerGenerator不断模拟产生订单交易数据,运行效果如下:

9)启动KafkaConsumerMsg接收kafka下的topic队列的数据,并做统计,运行效果如下:

部分代码如下

import com.alibaba.fastjson.JSONObject;
import java.util.Random;
import java.util.UUID;
public class PaymentInfo {
    private static final long serialVersionUID = 1L;
    private String orderId;//订单编号
    private String productId;//商品编号
    private long productPrice;//商品价格
    public PaymentInfo() {
    }
    public static long getSerialVersionUID() {
        return serialVersionUID;
    }
    public String getOrderId() {
        return orderId;
    }
    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }
    public String getProductId() {
        return productId;
    }
    public void setProductId(String productId) {
        this.productId = productId;
    }
    public long getProductPrice() {
        return productPrice;
    }
    public void setProductPrice(long productPrice) {
        this.productPrice = productPrice;
    }
    @Override
    public String toString() {
        return "PaymentInfo{" +
                "orderId='" + orderId + '\'' +
                ", productId='" + productId + '\'' +
                ", productPrice=" + productPrice +
                '}';
    }
    //模拟订单数据
    public String random() {
        Random r = new Random();
        this.orderId = UUID.randomUUID().toString().replaceAll( "-", "" );
        this.productPrice = r.nextInt( 1000 );
        this.productId = r.nextInt( 10 ) + "";
        JSONObject obj = new JSONObject();
        String jsonString = obj.toJSONString( this );
        return jsonString;
    }
}

创作不易 觉得有帮助请点赞关注收藏~~~

相关文章
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
130 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
110 2
|
1月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
75 1
|
1月前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
1月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
69 1
|
7月前
|
存储 分布式计算 监控
【Flume】Flume 监听日志文件案例分析
【4月更文挑战第4天】【Flume】Flume 监听日志文件案例分析
|
7月前
|
存储 运维 监控
【Flume】flume 日志管理中的应用
【4月更文挑战第4天】【Flume】flume 日志管理中的应用
|
消息中间件 数据采集 SQL
1、电商数仓(用户行为采集平台)数据仓库概念、用户行为日志、业务数据、模拟数据、用户行为数据采集模块、日志采集Flume(一)
1、电商数仓(用户行为采集平台)数据仓库概念、用户行为日志、业务数据、模拟数据、用户行为数据采集模块、日志采集Flume(一)
|
4月前
|
存储 数据采集 数据处理
【Flume拓扑揭秘】掌握Flume的四大常用结构,构建强大的日志收集系统!
【8月更文挑战第24天】Apache Flume是一个强大的工具,专为大规模日志数据的收集、聚合及传输设计。其核心架构包括源(Source)、通道(Channel)与接收器(Sink)。Flume支持多样化的拓扑结构以适应不同需求,包括单层、扇入(Fan-in)、扇出(Fan-out)及复杂多层拓扑。单层拓扑简单直观,适用于单一数据流场景;扇入结构集中处理多源头数据;扇出结构则实现数据多目的地分发;复杂多层拓扑提供高度灵活性,适合多层次数据处理。通过灵活配置,Flume能够高效构建各种规模的数据收集系统。
96 0
|
2月前
|
SQL 分布式计算 Hadoop
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
53 2
下一篇
DataWorks