下面我们开始统计链接的点击量,并做排序。
我们统计记录的时候,为了防止空记录等异常的情况,我们创建一条空记录
val nullObject = AccessLogRecord("", "", "", "", "GET /foo HTTP/1.1", "", "", "", "")
下面我们开始找点击量最高的链接。
首先获取我们想要的uri
val uriCounts = log.map(p.parseRecord(_).getOrElse(nullObject).request) .map(_.split(" ")(1)) .filter(_ != "/foo")
上面的代码做一个简单解释:
p.parseRecord(_)解析记录
p.parseRecord(_).getOrElse(nullObject)如何没有取到值,则使用nullObject,也就是我们上面定义的对象
p.parseRecord(_).getOrElse(nullObject).request也就是我们取到uri
.map(_.split(" ")(1))是取到我们过滤的url,过滤掉不想要的版本等信息
.filter(_ != "/foo")则是再次过滤掉/foo[也就是空记录]
这样就获取了uri,然后我们输出
uriCounts.collect.foreach(print)
下面我们统计点击量
val uriCounts = log.map(p.parseRecord(_).getOrElse(nullObject).request) .map(_.split(" ")(1)) .map(uri => (uri, 1)) .reduceByKey((a, b) => a + b)
rdd转换为数组
val uriToCount = uriCounts.collect
数组转换为序列并排序
import scala.collection.immutable.ListMap val uriHitCount = ListMap(uriToCount.toSeq.sortWith(_._2 > _._2):_*)
#############################
这里留下一个问题,如果上面元素不是2,而是为sortWith(_._1 > _._1)是对什么排序
import scala.collection.immutable.ListMap val uriHitCount = ListMap(uriToCount.toSeq.sortWith(_._1 > _._1):_*)
#############################
输出
uriHitCount.take(10).foreach(println)
上面便是排序的结果
点击最高的uri
如果想得出点击最高的uri
uriHitCount.take(1).foreach(println)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
知识补充:
Scala代码看上去很少,但是内容却是很丰富的。上面用到的相关知识,这里补充,供大家能看懂上面代码
getOrElse:
println(a.get("k1").getOrElse("default")) //根据key读取元素,不存在就替换成默认值
在Spark中写法是:persons.getOrElse("Spark",1000) //如果persons这个Map中包含有Spark,取出它的值,如果没有,值就是1000。
reduce、reduceByKey
reduce(binary_function)
reduce将RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与RDD中下一个元素(第三个元素)组成两个元素,再被传给输入函数,直到最后只有一个值为止。
val c = sc.parallelize(1 to 10) c.reduce((x, y) => x + y)//结果55
具体过程,RDD有1 2 3 4 5 6 7 8 9 10个元素,
1+2=3
3+3=6
6+4=10
10+5=15
15+6=21
21+7=28
28+8=36
36+9=45
45+10=55
reduceByKey(binary_function)
reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行binary_function的reduce操作,因此,Key相同
的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。
val a = sc.parallelize(List((1,2),(1,3),(3,4),(3,6)))
a.reduceByKey((x,y) => x + y).collect
//结果 Array((1,5), (3,10))
Seq
Sequence都有一个预定义的顺序。
scala> Seq(1, 1, 2)
res3: Seq[Int] = List(1, 1, 2)
(注意返回的结果是一个List。Seq是一个trait;List是它的一个实现类。Seq对象是一个工厂对象,正如你所看到
的,它会创建一个List。)
集合之间可以相互进行转换。
def toArray : Array[A]
def toArray [B >: A] (implicit arg0: ClassManifest[B]) : Array[B]
def toBuffer [B >: A] : Buffer[B]
def toIndexedSeq [B >: A] : IndexedSeq[B]
def toIterable : Iterable[A]
def toIterator : Iterator[A]
def toList : List[A]
def toMap [T, U] (implicit ev: <:<[A, (T, U)]) : Map[T, U]
def toSeq : Seq[A]
def toSet [B >: A] : Set[B]
def toStream : Stream[A]
def toString () : String
def toTraversable : Traversable[A]
我们可以把一个Map转换成一个数组,然后得到一个键值对数组。
scala> Map(1 -> 2).toArray
res41: Array[(Int, Int)] = Array((1,2))
sortWith
排序操作(sorted, sortWith, sortBy)根据不同的条件对序列元素进行排序。更多排序内容推荐参考
Scala的map实现key和value排序及各种排序比较等知识讨论