基于Spark对某移动APP流量访问日志分析(Java版)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 笔记

需求分析


我们来根据移动设备唯一标识deviceID来计算来自客户端用户访问日志请求和响应的上行流量、下行流量的记录。


上行流量:指的是手机app向服务器发送的请求数据的流量

下行流量:指的是服务器端给手机app返回的数据(比如说图片、文字、json)的流量

1.计算每个设备(deviceID)总上行流量之和与下行流量之和(取时间戳取最小的deviceID)

eg: 
时间戳 设备号 上行流量    下行流量
1   001   79976   11496
2   001   95291   89092
3   002   57029   93467   -> LogInfo(1, 001, 79976+95291+20428, 11496+89092+57706)
4   001   20428   57706
5   003   5291    9092

2.根据上行流量和下行流量进行排序

优先根据上行流量进行排序,如果上行流量相等,那么根据下行流量排序。如果上行流量和下行流量都相当,那么就根据最早时间戳类排序,即需要二次排序)

3.获取流量最大的前10个设备


数据原型

时间戳(timeStamp)  设备号(deviceID)           上行流量  下行流量
1454307391161 77e3c9e1811d4fb291d0d9bbd456bb4b  79976 11496
1454315971161 f92ecf8e076d44b89f2d070fb1df7197  95291 89092
1454304331161 3de7d6514f1d4ac790c630fa63d8d0be  57029 50228
1454303131161 dd382d2a20464a74bbb7414e429ae452  20428 93467
1454319991161 bb2956150d6741df875fbcca76ae9e7c  51994 57706
...

设计思路


1.自定义数据类型LogInfo(timeStamp,upTraffic,downTraffic)

2.将rdd映射成key-value方式<diviceId,LogInfo>

3.根据diviceId进行聚合,timeStamp取最小值,upTraffic为上行流量总和,downTraffic为下行流量总和

4.自定义一个键值对的比较类来实现比较,要实现Ordered接口和Serializable接口,在key中实现自己对多个列的排序算法。

5.将<diviceId, LogInfo(timeStamp,upTraffic,downTraffic)>映射成<LogSort(timeStamp,upTraffic,downTraffic),diviceId>

6.使用sortByKey算子按照自定义的key进行排序

7.使用take算子取出前n名

8.将排序过的value值打印输出

数据模型及演化过程

时间戳 设备号 上行流量    下行流量  <diviceId, LogInfo(timeStamp,upTraffic,downTraffic)>  <diviceId, LogInfo(timeStamp,upTraffic,downTraffic)>  <LogSort(timeStamp,upTraffic,downTraffic),diviceId>
1   001   10        20         <001,LogInfo(1,10,20)>
2   001   20        15         <001,LogInfo(2,20,15)>                    <001,LogInfo(1,70,55)>                <LogSort(1,70,55),001>
3   002   25        10    map() -> <002,LogInfo(3,25,10)>           reduceByKey() -> <002,LogInfo(3,25,10)>           map() -> <LogSort(3,25,10),002>           sortByKey(false) -> take(n) 
4   001   30        20         <001,LogInfo(4,30,20)>                    <003,LogInfo(5,10,20)>                <LogSort(5,10,20),003>
5   003   10        20         <003,LogInfo(5,10,20)>

1.png



实施过程


首先将SparkConf分装在一个类中

package com.kfk.spark.common;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/28
 * @time : 6:18 下午
 */
public class CommSparkContext {
    public static JavaSparkContext getsc(){
        SparkConf sparkConf = new SparkConf().setAppName("CommSparkContext").setMaster("local");
        return new JavaSparkContext(sparkConf);
    }
}

自定义数据类型LogInfo

package com.kfk.spark.traffic_analysis_project;
import java.io.Serializable;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/30
 * @time : 6:40 下午
 */
public class LogInfo implements Serializable {
    private long timeStamp;
    private long upTraffic;
    private long downTraffic;
    public long getTimeStamp() {
        return timeStamp;
    }
    public void setTimeStame(long timeStame) {
        this.timeStamp = timeStame;
    }
    public long getUpTraffic() {
        return upTraffic;
    }
    public void setUpTraffic(long upTraffic) {
        this.upTraffic = upTraffic;
    }
    public long getDownTraffic() {
        return downTraffic;
    }
    public void setDownTraffic(long downTraffic) {
        this.downTraffic = downTraffic;
    }
    public LogInfo(){
    }
    public LogInfo(long timeStame, long upTraffic, long downTraffic) {
        this.timeStamp = timeStame;
        this.upTraffic = upTraffic;
        this.downTraffic = downTraffic;
    }
}

自定义key排序类LogSort

package com.kfk.spark.traffic_analysis_project;
import scala.Serializable;
import scala.math.Ordered;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/30
 * @time : 7:39 下午
 */
public class LogSort extends LogInfo implements Ordered<LogSort> , Serializable {
    private long timeStamp;
    private long upTraffic;
    private long downTraffic;
    @Override
    public long getTimeStamp() {
        return timeStamp;
    }
    public void setTimeStamp(long timeStamp) {
        this.timeStamp = timeStamp;
    }
    @Override
    public long getUpTraffic() {
        return upTraffic;
    }
    @Override
    public void setUpTraffic(long upTraffic) {
        this.upTraffic = upTraffic;
    }
    @Override
    public long getDownTraffic() {
        return downTraffic;
    }
    @Override
    public void setDownTraffic(long downTraffic) {
        this.downTraffic = downTraffic;
    }
    public LogSort(){
    }
    public LogSort(long timeStamp, long upTraffic, long downTraffic) {
        this.timeStamp = timeStamp;
        this.upTraffic = upTraffic;
        this.downTraffic = downTraffic;
    }
    public int compare(LogSort that) {
        int comp = Long.valueOf(this.getUpTraffic()).compareTo(that.getUpTraffic());
        if (comp == 0){
            comp = Long.valueOf(this.getDownTraffic()).compareTo(that.getDownTraffic());
        }
        if (comp == 0){
            comp = Long.valueOf(this.getTimeStamp()).compareTo(that.getTimeStamp());
        }
        return comp;
    }
    public boolean $less(LogSort that) {
        return false;
    }
    public boolean $greater(LogSort that) {
        return false;
    }
    public boolean $less$eq(LogSort that) {
        return false;
    }
    public boolean $greater$eq(LogSort that) {
        return false;
    }
    public int compareTo(LogSort that) {
        int comp = Long.valueOf(this.getUpTraffic()).compareTo(that.getUpTraffic());
        if (comp == 0){
            comp = Long.valueOf(this.getDownTraffic()).compareTo(that.getDownTraffic());
        }
        if (comp == 0){
            comp = Long.valueOf(this.getTimeStamp()).compareTo(that.getTimeStamp());
        }
        return comp;
    }
}

编写主类LogApp

package com.kfk.spark.traffic_analysis_project;
import com.kfk.spark.common.CommSparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/30
 * @time : 6:36 下午
 */
public class LogApp {
    /**
     * rdd映射成key-value方式<diviceId,LogInfo>
     * rdd map() -> <diviceId,LogInfo(timeStamp,upTraffic,downTraffic)>
     * @param rdd
     * @return
     */
    public static JavaPairRDD<String,LogInfo> mapToPairValues(JavaRDD<String> rdd){
        JavaPairRDD<String,LogInfo> mapToPairRdd =  rdd.mapToPair(new PairFunction<String, String, LogInfo>() {
            public Tuple2<String, LogInfo> call(String line) throws Exception {
                long timeStamp = Long.parseLong(line.split("\t")[0]);
                String diviceId = String.valueOf(line.split("\t")[1]);
                long upTraffic = Long.parseLong(line.split("\t")[2]);
                long downTraffic = Long.parseLong(line.split("\t")[3]);
                LogInfo logInfo = new LogInfo(timeStamp,upTraffic,downTraffic);
                return new Tuple2<String, LogInfo>(diviceId,logInfo);
            }
        });
        return mapToPairRdd;
    }
    /**
     * 根据diviceId进行聚合
     * mapToPairRdd reduceByKey() -> <diviceId,LogInfo(timeStamp,upTraffic,downTraffic)>
     * @param mapPairRdd
     * @return
     */
    public static JavaPairRDD<String,LogInfo> reduceByKeyValues(JavaPairRDD<String,LogInfo> mapPairRdd){
        JavaPairRDD<String,LogInfo> reduceByKeyRdd = mapPairRdd.reduceByKey(new Function2<LogInfo, LogInfo, LogInfo>() {
            public LogInfo call(LogInfo v1, LogInfo v2) throws Exception {
                long timeStamp = Math.min(v1.getTimeStamp(), v2.getTimeStamp());
                long upTraffic = v1.getUpTraffic() + v2.getUpTraffic();
                long downTraffic = v1.getDownTraffic() + v2.getDownTraffic();
                LogInfo logInfo = new LogInfo();
                logInfo.setTimeStame(timeStamp);
                logInfo.setUpTraffic(upTraffic);
                logInfo.setDownTraffic(downTraffic);
                return logInfo;
            }
        });
        return reduceByKeyRdd;
    }
    /**
     * reduceByKeyRdd map() -> <LogSort(timeStamp,upTraffic,downTraffic),diviceId>
     * @param aggregateByKeyRdd
     * @return
     */
    public static JavaPairRDD<LogSort,String> mapToPairSortValues(JavaPairRDD<String,LogInfo> aggregateByKeyRdd){
        JavaPairRDD<LogSort,String> mapToPairSortRdd = aggregateByKeyRdd.mapToPair(new PairFunction<Tuple2<String, LogInfo>, LogSort, String>() {
            public Tuple2<LogSort, String> call(Tuple2<String, LogInfo> stringLogInfoTuple2) throws Exception {
                String diviceId = stringLogInfoTuple2._1;
                long timeStamp = stringLogInfoTuple2._2.getTimeStamp();
                long upTraffic = stringLogInfoTuple2._2.getUpTraffic();
                long downTraffic = stringLogInfoTuple2._2.getDownTraffic();
                LogSort logSort = new LogSort(timeStamp,upTraffic,downTraffic);
                return new Tuple2<LogSort, String>(logSort,diviceId);
            }
        });
        return mapToPairSortRdd;
    }
    public static void main(String[] args) {
        JavaSparkContext sc = CommSparkContext.getsc();
        JavaRDD<String> rdd = sc.textFile("/Users/caizhengjie/IdeaProjects/spark_study01/src/main/java/com/kfk/spark/datas/access.log");
        // rdd map() -> <diviceId,LogInfo(timeStamp,upTraffic,downTraffic)>
        JavaPairRDD<String,LogInfo> mapToPairRdd = mapToPairValues(rdd);
        // mapToPairRdd reduceByKey() -> <diviceId,LogInfo(timeStamp,upTraffic,downTraffic)>
        JavaPairRDD<String,LogInfo> reduceByKeyRdd = reduceByKeyValues(mapToPairRdd);
        // reduceByKeyRdd map() -> <LogSort(timeStamp,upTraffic,downTraffic),diviceId>
        JavaPairRDD<LogSort, String> mapToPairSortRdd = mapToPairSortValues(reduceByKeyRdd);
        // sortByKey
        JavaPairRDD<LogSort,String> sortByKeyValues = mapToPairSortRdd.sortByKey(false);
        // TopN
        List<Tuple2<LogSort,String>> sortKeyList = sortByKeyValues.take(10);
        for (Tuple2<LogSort,String> logSortStringTuple2 : sortKeyList){
            System.out.println(logSortStringTuple2._2 + " : " + logSortStringTuple2._1.getUpTraffic() + " : " + logSortStringTuple2._1.getDownTraffic());
        }
    }
}

运行结果:

efde893d9c254e549f740d9613b3421c : 1036288 : 629025
84da30d2697042ca9a6835f6ccec6024 : 930018 : 737453
94055312e11c464d8bb16f21e4d607c6 : 827278 : 897382
c2a24d73d77d4984a1d88ea3330aa4c5 : 826817 : 943297
6e535645436f4926be1ee6e823dfd9d2 : 806761 : 613670
92f78b79738948bea0d27178bbcc5f3a : 761462 : 567899
1cca6591b6aa4033a190154db54a8087 : 750069 : 696854
f92ecf8e076d44b89f2d070fb1df7197 : 740234 : 779789
e6164ce7a908476a94502303328b26e8 : 722636 : 513737
537ec845bb4b405d9bf13975e4408b41 : 709045 : 642202
相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
2月前
|
存储 Java
【编程基础知识】 分析学生成绩:用Java二维数组存储与输出
本文介绍如何使用Java二维数组存储和处理多个学生的各科成绩,包括成绩的输入、存储及格式化输出,适合初学者实践Java基础知识。
94 1
|
2天前
|
缓存 算法 搜索推荐
Java中的算法优化与复杂度分析
在Java开发中,理解和优化算法的时间复杂度和空间复杂度是提升程序性能的关键。通过合理选择数据结构、避免重复计算、应用分治法等策略,可以显著提高算法效率。在实际开发中,应该根据具体需求和场景,选择合适的优化方法,从而编写出高效、可靠的代码。
15 6
|
26天前
|
监控 算法 Java
jvm-48-java 变更导致压测应用性能下降,如何分析定位原因?
【11月更文挑战第17天】当JVM相关变更导致压测应用性能下降时,可通过检查变更内容(如JVM参数、Java版本、代码变更)、收集性能监控数据(使用JVM监控工具、应用性能监控工具、系统资源监控)、分析垃圾回收情况(GC日志分析、内存泄漏检查)、分析线程和锁(线程状态分析、锁竞争分析)及分析代码执行路径(使用代码性能分析工具、代码审查)等步骤来定位和解决问题。
|
1月前
|
开发框架 监控 .NET
【Azure App Service】部署在App Service上的.NET应用内存消耗不能超过2GB的情况分析
x64 dotnet runtime is not installed on the app service by default. Since we had the app service running in x64, it was proxying the request to a 32 bit dotnet process which was throwing an OutOfMemoryException with requests >100MB. It worked on the IaaS servers because we had the x64 runtime install
|
1月前
|
存储 Java 关系型数据库
在Java开发中,数据库连接是应用与数据交互的关键环节。本文通过案例分析,深入探讨Java连接池的原理与最佳实践
在Java开发中,数据库连接是应用与数据交互的关键环节。本文通过案例分析,深入探讨Java连接池的原理与最佳实践,包括连接创建、分配、复用和释放等操作,并通过电商应用实例展示了如何选择合适的连接池库(如HikariCP)和配置参数,实现高效、稳定的数据库连接管理。
67 2
|
1月前
【Azure App Service】PowerShell脚本批量添加IP地址到Web App允许访问IP列表中
Web App取消公网访问后,只允许特定IP能访问Web App。需要写一下段PowerShell脚本,批量添加IP到Web App的允许访问IP列表里!
|
1月前
|
Java 关系型数据库 数据库
面向对象设计原则在Java中的实现与案例分析
【10月更文挑战第25天】本文通过Java语言的具体实现和案例分析,详细介绍了面向对象设计的五大核心原则:单一职责原则、开闭原则、里氏替换原则、接口隔离原则和依赖倒置原则。这些原则帮助开发者构建更加灵活、可维护和可扩展的系统,不仅适用于Java,也适用于其他面向对象编程语言。
38 2
|
2月前
|
人工智能 Oracle Java
解决 Java 打印日志吞异常堆栈的问题
前几天有同学找我查一个空指针问题,Java 打印日志时,异常堆栈信息被吞了,导致定位不到出问题的地方。
45 2
|
2月前
|
Java
让星星⭐月亮告诉你,Java synchronized(*.class) synchronized 方法 synchronized(this)分析
本文通过Java代码示例,介绍了`synchronized`关键字在类和实例方法上的使用。总结了三种情况:1) 类级别的锁,多个实例对象在同一时刻只能有一个获取锁;2) 实例方法级别的锁,多个实例对象可以同时执行;3) 同一实例对象的多个线程,同一时刻只能有一个线程执行同步方法。
22 1
|
2月前
|
小程序 Oracle Java
JVM知识体系学习一:JVM了解基础、java编译后class文件的类结构详解,class分析工具 javap 和 jclasslib 的使用
这篇文章是关于JVM基础知识的介绍,包括JVM的跨平台和跨语言特性、Class文件格式的详细解析,以及如何使用javap和jclasslib工具来分析Class文件。
59 0
JVM知识体系学习一:JVM了解基础、java编译后class文件的类结构详解,class分析工具 javap 和 jclasslib 的使用