【大数据技术】Spark+Flume+Kafka实现商品实时交易数据统计分析实战(附源码)

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 【大数据技术】Spark+Flume+Kafka实现商品实时交易数据统计分析实战(附源码)

需要源码请点赞关注收藏后评论区留言私信~~~

Flume、Kafka区别和侧重点

1)Kafka 是一个非常通用的系统,你可以有许多生产者和消费者共享多个主题Topics。相比之下,Flume是一个专用工具被设计为旨在往HDFS,HBase等发送数据。它对HDFS有特殊的优化,并且集成了Hadoop的安全特性。如果数据被多个系统消费的话,使用kafka;如果数据有多个生产者场景,或者有写入Hbase、HDFS操作,使用Flume。

2)Flume可以使用拦截器实时处理数据。而Kafka需要外部的流处理系统才能做到。

3)Kafka和Flume都是可靠的系统,通过适当的配置能保证零数据丢失。然而,Flume不支持副本事件。如果Flume代理的一个节点奔溃了,即使使用了可靠的文件管道方式,你也将丢失这些事件直到你恢复这些磁盘。如果你需要一个高可靠行的管道,那么使用Kafka是个更好的选择。

Spark Streaming与Flume、Kafka整合与开发

此开发示例的功能是商品实时交易数据统计分析,通过Flume实时收集交易订单,将数据分发Kafka,Kafka将数据传输到Spark Streaming,Spark Streaming统计商品的销售量。实现主要有以下几个步骤:

1)通过LOG日志模拟产生实时交易数据

2)Flume收集模拟产生实时交易数据

3)Flume将数据发送给Kafka消息队列

4)Spark Streaming接收Kafka消息队列的消息,每5秒进行数据统计

具体实现如下:

1)新建MAVEN项目,名称为RealtimeAnalysis,新建过程请见第9章。在POM.XML文件中加入依赖包

2)在工程的resource目录下新建log4j.properties文件,其中注意的是log4j.appender.flume.Hostname的配置,要配置成你安装flume的服务器

3)在工程的test目录下新建java类LoggerGenerator,此类用于不断模拟产生订单交易数据,在此类中每6秒调用一次PaymentInfo交易的实体类的random方法是模拟产生订单交易数据方法,数据以JSON格式返回。其中PaymentInfo是交易的实体类,用三个成员变量,分别是订单编号、商品编号、商品价格,LoggerGenerator为模拟日志生成类

4)在安装Flume服务器的conf目录下新建文件log4j_flume.properties,其中注意的是sinks.kafka_sink.brokerList配置的是连接Kafka集群的地址和端口号

5)启动flume,命令如下:

./kafka-server-start.sh /hadoop/kafka_2.11-2.4.1/config/server.properties &

6)新建topic,名称为 logtoflume,命令如下:

kafka-topics.sh
  --zookeeper 172.16.106.69:2181,172.16.106.70:2181,172.16.106.71:2181
 --topic logtoflume --replication-factor 1 --partitions 1  --create

7)新建scala类KafkaConsumerMsg,接收kafka下的topic队列,名称为logtoflume的数据,并做统计

8)启动LoggerGenerator不断模拟产生订单交易数据,运行效果如下:

9)启动KafkaConsumerMsg接收kafka下的topic队列的数据,并做统计,运行效果如下:

部分代码如下

import com.alibaba.fastjson.JSONObject;
import java.util.Random;
import java.util.UUID;
public class PaymentInfo {
    private static final long serialVersionUID = 1L;
    private String orderId;//订单编号
    private String productId;//商品编号
    private long productPrice;//商品价格
    public PaymentInfo() {
    }
    public static long getSerialVersionUID() {
        return serialVersionUID;
    }
    public String getOrderId() {
        return orderId;
    }
    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }
    public String getProductId() {
        return productId;
    }
    public void setProductId(String productId) {
        this.productId = productId;
    }
    public long getProductPrice() {
        return productPrice;
    }
    public void setProductPrice(long productPrice) {
        this.productPrice = productPrice;
    }
    @Override
    public String toString() {
        return "PaymentInfo{" +
                "orderId='" + orderId + '\'' +
                ", productId='" + productId + '\'' +
                ", productPrice=" + productPrice +
                '}';
    }
    //模拟订单数据
    public String random() {
        Random r = new Random();
        this.orderId = UUID.randomUUID().toString().replaceAll( "-", "" );
        this.productPrice = r.nextInt( 1000 );
        this.productId = r.nextInt( 10 ) + "";
        JSONObject obj = new JSONObject();
        String jsonString = obj.toJSONString( this );
        return jsonString;
    }
}

创作不易 觉得有帮助请点赞关注收藏~~~

相关文章
|
1月前
|
消息中间件 存储 Cloud Native
云消息队列 Kafka 版 V3 系列荣获信通院“云原生技术创新标杆案例”
2024 年 12 月 24 日,由中国信息通信研究院(以下简称“中国信通院”)主办的“2025 中国信通院深度观察报告会:算力互联网分论坛”,在北京隆重召开。本次论坛以“算力互联网 新质生产力”为主题,全面展示中国信通院在算力互联网产业领域的研究、实践与业界共识,与产业先行者共同探索算力互联网产业未来发展的方向。会议公布了“2024 年度云原生与应用现代化标杆案例”评选结果,“云消息队列 Kafka 版 V3 系列”荣获“云原生技术创新标杆案例”。
|
4月前
|
分布式计算 Java Hadoop
Hadoop-18 Flume HelloWorld 第一个Flume尝试!编写conf实现Source+Channel+Sink 控制台查看收集到的数据 流式收集
Hadoop-18 Flume HelloWorld 第一个Flume尝试!编写conf实现Source+Channel+Sink 控制台查看收集到的数据 流式收集
56 1
|
4月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
75 0
|
4月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
124 0
|
4月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
83 0
|
6月前
|
分布式计算 搜索推荐 物联网
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
|
5月前
|
消息中间件 安全 Kafka
Kafka支持SSL/TLS协议技术深度解析
SSL(Secure Socket Layer,安全套接层)及其继任者TLS(Transport Layer Security,传输层安全)是为网络通信提供安全及数据完整性的一种安全协议。这些协议在传输层对网络连接进行加密,确保数据在传输过程中不被窃取或篡改。
411 0
|
6月前
|
消息中间件 存储 算法
时间轮在Kafka的实践:技术深度剖析
【8月更文挑战第13天】在分布式消息系统Kafka中,时间轮(Timing Wheel)作为一种高效的时间调度机制,被广泛应用于处理各种延时操作,如延时生产、延时拉取和延时删除等。本文将深入探讨时间轮在Kafka中的实践应用,解析其技术原理、优势及具体实现方式。
199 2
|
6月前
|
消息中间件 存储 NoSQL
深度解密Kafka:从内部存储结构到关键技术的全景透视
深度解密Kafka:从内部存储结构到关键技术的全景透视
|
8月前
|
消息中间件 Java Kafka
SpringBoot实用开发篇第六章(整合第三方技术,ActiveMQ,RabbitMQ,RocketMQ,Kafka)
SpringBoot实用开发篇第六章(整合第三方技术,ActiveMQ,RabbitMQ,RocketMQ,Kafka)