我有一组由Kafka流式传输的json消息,每个消息都描述一个网站用户。使用pyspark,我需要计算每个国家/地区每个流媒体窗口的用户数,并返回具有最大和最小用户数的国家/地区。
以下是流式json消息的示例:
{"id":1,"first_name":"Barthel","last_name":"Kittel","email":"bkittel0@printfriendly.com","gender":"Male","ip_address":"130.187.82.195","date":"06/05/2018","country":"France"}
这是我的代码:
from pyspark.sql.types import StructField, StructType, StringType
from pyspark.sql import Row
from pyspark import SparkContext
from pyspark.sql import SQLContext
fields = ['id', 'first_name', 'last_name', 'email', 'gender', 'ip_address', 'date', 'country']
schema = StructType([
StructField(field, StringType(), True) for field in fields
])
def parse(s, fields):
try:
d = json.loads(s[0])
return [tuple(d.get(field) for field in fields)]
except:
return []
array_of_users = parsed.SQLContext.createDataFrame(parsed.flatMap(lambda s: parse(s, fields)), schema)
rdd = sc.parallelize(array_of_users)
group by country and then substitute the list of messages for each country by its length, resulting into a rdd of (country, length) tuples
country_count = rdd.groupBy(lambda user: user['country']).mapValues(len)
identify the min and max using as comparison key the second element of the (country, length) tuple
country_min = country_count.min(key = lambda grp: grp[1])
country_max = country_count.max(key = lambda grp: grp[1])
当我运行它时,我收到消息
AttributeError Traceback (most recent call last)
in ()
16 return []
17
---> 18 array_of_users = parsed.SQLContext.createDataFrame(parsed.flatMap(lambda s: parse(s, fields)), schema)
19
20 rdd = sc.parallelize(array_of_users)
AttributeError: 'TransformedDStream' object has no attribute 'SQLContext'
我怎样才能解决这个问题?
如果我理解正确,您需要按国家/地区对邮件列表进行分组,然后计算每个组中的邮件数,然后选择具有最小和最大邮件数的组。
在我的脑海中,代码将是这样的:
rdd = sc.parallelize(array_of_users)
country_count = rdd.groupBy(lambda user: user['country']).mapValues(len)
country_min = country_count.min(key = lambda grp: grp[1])
country_max = country_count.max(key = lambda grp: grp[1])
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。