我有一个代码实际上创建了一个功能,它分割CSV文件并返回两个字段。
然后有map函数,我知道它是如何工作的,但是我对代码的下一部分感到困惑(在totalsByAge变量上发生了操作),mapValues和reduceByKey正在应用。请告诉我reduceByKey和mapValues如何在这里工作?
def parseLine(line):
fields = line.split(',')
age = int(fields[2])
numFriends = int(fields[3])
return (age,numFriends)
line = sparkCont.textFile("D:\ResearchInMotion\ml-100k\fakefriends.csv")
rdd = line.map(parseLine)
totalsByAge = rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
averagesByAge = totalsByAge.mapValues(lambda x: x[0] / x[1])
results = averagesByAge.collect()
for result in results:
print(result)
我需要在totalsByAge变量处理中需要帮助。最后能详细说明在averagesByAge上完成的操作 。如果有任何遗漏,请告诉我。
在rdd = line.map(parseLine)你的行中你有一对格式为(age, numFriends)like 的值(a_1, n_1), (a_2, n_2), ..., (a_m, n_m)。在rdd.mapValues(lambda x: (x, 1))你会得到(a_1, (n_1, 1)), (a_2, (n_2, 1)), ..., (a_m, (n_m, 1))。
在reduceByKey,首先按键分组,它意味着所有相同的age组在一个组中,你将有一些喜欢(a_i, iterator over pairs of (n_j, 1) which all n_j has the same age),然后应用减少的功能。减少部分意味着numFriends每个年龄1彼此相加,并且s彼此相加,1s的总和表示列表中的数字项。
因此,之后reduceByKey,我们将拥有(a_i, (sum of all numFriends in the list, number of items in the list))。换句话说,外部对的第一个值是age,第二个值是内部对,其第一个值是all的numFriends和,第二个值是项目数。因此,totalsByAge.mapValues(lambda x: x[0] / x[1])给出numFriends了每个人的平均值age。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。