c. 新建一个项目入口类SessionCutETL
:
package com.shaonaiyi.session import org.apache.spark.{SparkConf, SparkContext} /** * @Auther: shaonaiyi@163.com * @Date: 2019/9/12 10:09 * @Description: 会话切割的程序主入口 */ object SessionCutETL { def main(args: Array[String]): Unit = { var conf = new SparkConf() conf.setAppName("SessionCutETL") conf.setMaster("local") var sc = new SparkContext(conf) val rawRDD: RDD[String] = sc.textFile("data/rawdata/visit_log.txt") rawRDD.collect().foreach(println) sc.stop() } }
执行后,可以看到已经加载到了原始日志的数据
PS:因为没有配置Hadoop,所以前面会报此错误,可以忽略:
4. 解析日志源数据
a. 继续在SessionCutETL
中添加解析日志源数据代码
val parsedLogRDD: RDD[Option[TrackerLog]] = rawRDD.map( line => RawLogParserUtil.parse(line)) parsedLogRDD.collect().foreach(println)
现在去执行的话会报错:
因为Driver与executor通讯的时候需要进行序列号,而Avro插件给我们生成的类中并没有给我们进行序列化,所以还需要给相应的类进行序列号,序列号很简单,只需要在类的后面实现序列化接口 Serializable 即可。
b. 实现TrackerLog
类的序列化:
c. 重新执行就可以看到结果了,到此已经实现了我们的日志的解析:
d. 使用flatmap API调整输出格式
查看上面的代码:
val parsedLogRDD: RDD[Option[TrackerLog]] = rawRDD.map( line => RawLogParserUtil.parse(line))
我们返回的类型是RDD[Option[TrackerLog]],对于返回结果,我们的格式是None、Some(),这种类型,不是我们想要的结果,我们的结果应该更加简洁。所以可以修改此行代码为:
val parsedLogRDD: RDD[TrackerLog] = rawRDD.flatMap( line => RawLogParserUtil.parse(line))
运算结果如下:
此处需要清楚map与flatMap的区别,简而言之:map是一对一的关系,一行就是一个具体的类,所以多少行就有多少个类,然后每个类都切,切了后,这些类还存在;而flatMap则会对所有的行打平成一个类,然后进行切,最后是只有一个类。
5. 日志清洗操作
a. 非法类型的日志说明
仔细关系我们前面的日志,可以看到我们最后有两条数据的类型是hhhh和3333ss,显然不是合法的日志,这里我们需要将其过滤掉,在实际的日志里面脏数据可能各种各样,但处理流程都类似。
hhhh|2017-09-04 12:45:03|cookie1|127.0.0.3|https://tieba.baidu.com/index.html 3333ss|2017-09-04 12:45:03|cookie1|127.0.0.3|https://tieba.baidu.com/index.html
b. 定义合法类型(与main方法同级)
private val logTypeSet = Set("pageview", "click")
c. 修改解析代码,加上过滤条件
val parsedLogRDD: RDD[TrackerLog] = rawRDD.flatMap(RawLogParserUtil.parse(_)) .filter(trackerLog => logTypeSet.contains(trackerLog.getLogType.toString))
d. 重新执行,会发现后面的那两条脏数据已经没有显示出来了
0x03 思路回顾
1、首先,我们需要清楚,我们的大项目其实是网站用户行为分析,而会话切割只是项目里面的小环节,至于网站用户行为分析可以达到什么结果。我们这里边可以简单列举几个,比如说,我们可以通过分析一个人在浏览网站时点击的内容、打开的内容、停留的时间等等,初步判断此人是否对此内容特别感兴趣,得出这个结论之后,我们就可以对此人做定制化的推荐等等,当然这只是一个小小的例子。刚刚我们是从项目开发流程出发,而实际上,我们应该是已经知道我们可以获取到什么信息什么数据,然后,由我们的信息和数据,挖掘我们想要的功能。比如说,你现在有一个产品,需要找到你的商城网站里面、或者其他网站里面某些人对你同类型的产品特别感兴趣,你只需要筛选出这些人,那么我们就可以给这些人推荐你的产品,这些匹配度度高的人就是你的高级客户,你推销产品肯定也会更加顺利。所以呢,我们就要努力朝着这个方向去实现我们的目标,我们目标就是要筛选出这些与你产品匹配度极高的人。
2、所以在此过程中,我们可以根据各种途径,拿到你需要的数据,当然,这些数据也可能是之前已经存在了的,也可能是你确定了目标之后才确定你是否想要的,缺什么数据就想方法去获取,然后整理一下存放到该存放的地方,像这些网页行为,数据量肯定是很大的,点击、打开、滑动等等都是一条数据,每个人都可以产生很多很多条,对于这些量大的数据,我们就可以存储到HDFS上,关于量少的,就可以存到其他地方,不固定,灵活进行存储选型。
3、这里教程没有提供收集数据的教程,如果有浏览过本博客的读者应该会清楚,我们的数据是怎么得到的,比如说Flume就可以实现,还有很多种方式。我们这里已经省略了那些过程,分析完成之后我们就可以进行编码去实现了,因为存储的数据都只是文本文件,而我们要做的,肯定是要先进行数据清洗,把不合法的数据过滤掉,那么根据什么条件进行过滤呢? 在学习传统数据库的时候,如MySQL,可以用where来确定过滤的条件,比如检索出年龄大于18岁的学生,那么年龄就是一个很关键的因素,而我们的文本文件中,其实是还没有真正去规定好字段的,只是在第一行确定了Schema信息而已,如果不进行字段的切分,那么每一行都要自定义一次,这样显然很麻烦。那么我们就可以统一将文本文件切分成一段一段的,什么字段对应什么意思、对应什么字段。每个字段都属于一行竖行,很显然,我们可以通过构建一个对象来对应上,一个类相当于一行数据,然后一行里面的每个字段,相当于一个对象里面的属性。类->行,属性->字段,所以我们需要在前面定义一个类。
4、我们的Spark作业在进行数据传输的时候,要先进行序列化,所以还要给我们的类进行序列化,Java里面的序列化类是Serializable,搞定之后就可以去获取我们的数据源了,然后清洗、过滤等,过程比较繁多,后期教程会继续完成。
0xFF 总结
其实Spark里面有内置的Kryo序列化接口,性能更好,而且更适合我们当前的应用场景,这个优化点留到项目后期再进行升级。
我们在测试的时候,看到很多启动时的提示信息,很影响我们查看,我们可以通过设置将其屏蔽掉,以方便我们开发,也一样在后期教程再写上。
俗话说得好:“台上一分钟,台下十年功。” 可是写教程不一样,操作半个钟,思路半天,写教程一天。这就是台下半个钟,台上十年功啊!感谢各位读者支持,感谢老汤,关注,评论,加油。
网站用户行为分析项目系列:
网站用户行为分析项目之会话切割(一)
网站用户行为分析项目之会话切割(二)
网站用户行为分析项目之会话切割(三)