日志分析实战之清洗日志小实例6:获取uri点击量排序并得到最高的url

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 日志分析实战之清洗日志小实例6:获取uri点击量排序并得到最高的url

下面我们开始统计链接的点击量,并做排序。

我们统计记录的时候,为了防止空记录等异常的情况,我们创建一条空记录

val nullObject = AccessLogRecord("", "", "", "", "GET /foo HTTP/1.1", "", "", "", "")

下面我们开始找点击量最高的链接。


首先获取我们想要的uri


val uriCounts = log.map(p.parseRecord(_).getOrElse(nullObject).request)
                  .map(_.split(" ")(1))
                  .filter(_ != "/foo")

上面的代码做一个简单解释:

p.parseRecord(_)解析记录

p.parseRecord(_).getOrElse(nullObject)如何没有取到值,则使用nullObject,也就是我们上面定义的对象

p.parseRecord(_).getOrElse(nullObject).request也就是我们取到uri

.map(_.split(" ")(1))是取到我们过滤的url,过滤掉不想要的版本等信息

 .filter(_ != "/foo")则是再次过滤掉/foo[也就是空记录]

这样就获取了uri,然后我们输出

uriCounts.collect.foreach(print)

c6adf927781e2d36359a90c9ddbef6f3.jpg

下面我们统计点击量



val uriCounts = log.map(p.parseRecord(_).getOrElse(nullObject).request)
                  .map(_.split(" ")(1))
                  .map(uri => (uri, 1))
                  .reduceByKey((a, b) => a + b)


rdd转换为数组




val uriToCount = uriCounts.collect


数组转换为序列并排序



import scala.collection.immutable.ListMap
val uriHitCount = ListMap(uriToCount.toSeq.sortWith(_._2 > _._2):_*)

12db63d9f311b7e591c2d0982b203945.jpg

#############################

这里留下一个问题,如果上面元素不是2,而是为sortWith(_._1 > _._1)是对什么排序

import scala.collection.immutable.ListMap
val uriHitCount = ListMap(uriToCount.toSeq.sortWith(_._1 > _._1):_*)

#############################

3d215861311bc5e15689c4a0f89d9bf5.jpg

输出

uriHitCount.take(10).foreach(println)

90a982b316ddbdf4e37a4b73d552881c.jpg

上面便是排序的结果

点击最高的uri
如果想得出点击最高的uri

uriHitCount.take(1).foreach(println)

563df5644efa45cd8d8a3ec05c4785c2.jpg

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

知识补充:

Scala代码看上去很少,但是内容却是很丰富的。上面用到的相关知识,这里补充,供大家能看懂上面代码


getOrElse:

println(a.get("k1").getOrElse("default")) //根据key读取元素,不存在就替换成默认值


在Spark中写法是:persons.getOrElse("Spark",1000) //如果persons这个Map中包含有Spark,取出它的值,如果没有,值就是1000。


reduce、reduceByKey


reduce(binary_function)

reduce将RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与RDD中下一个元素(第三个元素)组成两个元素,再被传给输入函数,直到最后只有一个值为止。


val c = sc.parallelize(1 to 10)
c.reduce((x, y) => x + y)//结果55


具体过程,RDD有1 2 3 4 5 6 7 8 9 10个元素,

1+2=3

3+3=6

6+4=10

10+5=15

15+6=21

21+7=28

28+8=36

36+9=45

45+10=55


reduceByKey(binary_function)

reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行binary_function的reduce操作,因此,Key相同


的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。

val a = sc.parallelize(List((1,2),(1,3),(3,4),(3,6)))

a.reduceByKey((x,y) => x + y).collect

//结果 Array((1,5), (3,10))



Seq


Sequence都有一个预定义的顺序。

scala> Seq(1, 1, 2)

res3: Seq[Int] = List(1, 1, 2)

(注意返回的结果是一个List。Seq是一个trait;List是它的一个实现类。Seq对象是一个工厂对象,正如你所看到


的,它会创建一个List。)


集合之间可以相互进行转换。

def toArray : Array[A]

def toArray [B >: A] (implicit arg0: ClassManifest[B]) : Array[B]

def toBuffer [B >: A] : Buffer[B]

def toIndexedSeq [B >: A] : IndexedSeq[B]

def toIterable : Iterable[A]

def toIterator : Iterator[A]

def toList : List[A]

def toMap [T, U] (implicit ev: <:<[A, (T, U)]) : Map[T, U]

def toSeq : Seq[A]

def toSet [B >: A] : Set[B]

def toStream : Stream[A]

def toString () : String

def toTraversable : Traversable[A]


我们可以把一个Map转换成一个数组,然后得到一个键值对数组。


scala> Map(1 -> 2).toArray

res41: Array[(Int, Int)] = Array((1,2))


sortWith

排序操作(sorted, sortWith, sortBy)根据不同的条件对序列元素进行排序。更多排序内容推荐参考

Scala的map实现key和value排序及各种排序比较等知识讨论

http://www.aboutyun.com/forum.php?mod=viewthread&tid=22942

相关实践学习
日志服务之数据清洗与入湖
本教程介绍如何使用日志服务接入NGINX模拟数据,通过数据加工对数据进行清洗并归档至OSS中进行存储。
目录
相关文章
|
12天前
|
存储 算法 Go
go语言并发实战——日志收集系统(七) etcd的介绍与简单使用
go语言并发实战——日志收集系统(七) etcd的介绍与简单使用
|
12天前
|
监控 Go
go语言并发实战——日志收集系统(八) go语言操作etcd以及利用watch实现对键值的监控
go语言并发实战——日志收集系统(八) go语言操作etcd以及利用watch实现对键值的监控
go语言并发实战——日志收集系统(八) go语言操作etcd以及利用watch实现对键值的监控
|
12天前
|
监控 Go
go语言并发实战——日志收集系统(十一)基于etcd来监视配置文件的变化
go语言并发实战——日志收集系统(十一)基于etcd来监视配置文件的变化
|
12天前
|
监控 Go
go语言并发实战——日志收集系统(十) 重构tailfile模块实现同时监控多个日志文件
go语言并发实战——日志收集系统(十) 重构tailfile模块实现同时监控多个日志文件
|
12天前
|
存储 JSON 监控
go语言并发实战——日志收集系统(九) 基于etcd的代码重构思考与初步实现
go语言并发实战——日志收集系统(九) 基于etcd的代码重构思考与初步实现
|
5天前
|
存储 关系型数据库 MySQL
|
23天前
|
SQL 数据采集 DataWorks
DataWorks产品使用合集之pyodps的线程限制是什么意思
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
23天前
|
DataWorks 数据可视化 安全
DataWorks产品使用合集之SLS日志中新增了存在iotId这个字段,同步的时候怎么手动增加
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
5天前
|
SQL 运维 关系型数据库
|
5天前
|
存储 关系型数据库 MySQL