spark groupByKey 也是可以filter的

简介:
复制代码
>>> v=sc.parallelize(["one", "two", "two", "three", "three", "three"])
>>> v2=v.map(lambda x: (x,1))
>>> v2.collect()
[('one', 1), ('two', 1), ('two', 1), ('three', 1), ('three', 1), ('three', 1)]  
>>> v3=v2.groupByKey()
>>> v3.collect()
[('one', <pyspark.resultiterable.ResultIterable object at 0x7fd3c7850e90>), ('two', <pyspark.resultiterable.ResultIterable object at 0x7fd3c7850f10>), ('three', <pyspark.resultiterable.ResultIterable object at 0x7fd3c6dc83d0>)]
>>> v4=v3.filter(lambda x:len(x[1].data)>2)
>>> v4.collect()
[('three', <pyspark.resultiterable.ResultIterable object at 0x7fd3c6dc8510>)]
复制代码

过滤了出现次数大于2的结果。










本文转自张昺华-sky博客园博客,原文链接:http://www.cnblogs.com/bonelee/p/7764934.html,如需转载请自行联系原作者





相关文章
|
6月前
|
机器学习/深度学习 分布式计算 数据库连接
[Spark精进]必须掌握的4个RDD算子之filter算子
[Spark精进]必须掌握的4个RDD算子之filter算子
144 2
|
分布式计算 Spark
教材P164操作题。编写Spark Steaming程序,使用leftOuterJoin操作及filter方法过滤掉黑名单的数据
教材P164操作题。编写Spark Steaming程序,使用leftOuterJoin操作及filter方法过滤掉黑名单的数据
|
SQL 存储 分布式计算
spark outer join push down filter rule(spark 外连接中的下推规则)
spark outer join push down filter rule(spark 外连接中的下推规则)
265 0
spark outer join push down filter rule(spark 外连接中的下推规则)
EMR Spark Runtime Filter性能优化 | 7月5号云栖夜读
今天的首篇文章,讲述了:Join是一个非常耗费资源耗费时间的操作,特别是数据量很大的情况下。一般流程上会涉及底层表的扫描/shuffle/Join等过程, 如果我们能够尽可能的在靠近源头上减少参与计算的数据,一方面可以提高查询性能,另一方面也可以减少资源的消耗(网络/IO/CPU等),在同样的资源的情况下可以支撑更多的查询。
3913 0
|
存储 SQL 分布式计算
EMR Spark Runtime Filter性能优化
Join是一个非常耗费资源耗费时间的操作,特别是数据量很大的情况下。一般流程上会涉及底层表的扫描/shuffle/Join等过程, 如果我们能够尽可能的在靠近源头上减少参与计算的数据,一方面可以提高查询性能,另一方面也可以减少资源的消耗(网络/IO/CPU等),在同样的资源的情况下可以支撑更多的查询。
|
存储 SQL 分布式计算
EMR Spark Runtime Filter性能优化
Join是一个非常耗费资源耗费时间的操作,特别是数据量很大的情况下。一般流程上会涉及底层表的扫描/shuffle/Join等过程, 如果我们能够尽可能的在靠近源头上减少参与计算的数据,一方面可以提高查询性能,另一方面也可以减少资源的消耗(网络/IO/CPU等),在同样的资源的情况下可以支撑更多的查询。
5248 0
|
分布式计算
Spark PruneDependency 依赖关系 Filter
Spark PruneDependency 依赖关系 Filter Represents a dependency between the PartitionPruningRDD and its parent.
938 0
|
分布式计算 Spark
Spark基本的RDD算子之groupBy,groupByKey,mapValues
1. groupby def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterable[T])] def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Iterable[T])] def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Iterable[T])] groupBy算子接收一个函数,这个函数返回的值作为key,然后通过这个key来对里面的元素进行分组。
13242 0