pyspark - 在json流数据中找到max和min usign createDataFrame-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

pyspark - 在json流数据中找到max和min usign createDataFrame

2019-01-02 15:24:56 2368 1

我有一组由Kafka流式传输的json消息,每个消息都描述一个网站用户。使用pyspark,我需要计算每个国家/地区每个流媒体窗口的用户数,并返回具有最大和最小用户数的国家/地区。

以下是流式json消息的示例:

{"id":1,"first_name":"Barthel","last_name":"Kittel","email":"bkittel0@printfriendly.com","gender":"Male","ip_address":"130.187.82.195","date":"06/05/2018","country":"France"}
这是我的代码:

from pyspark.sql.types import StructField, StructType, StringType
from pyspark.sql import Row
from pyspark import SparkContext
from pyspark.sql import SQLContext

fields = ['id', 'first_name', 'last_name', 'email', 'gender', 'ip_address', 'date', 'country']
schema = StructType([
StructField(field, StringType(), True) for field in fields
])

def parse(s, fields):

try:
    d = json.loads(s[0])
    return [tuple(d.get(field) for field in fields)]
except:
    return []

array_of_users = parsed.SQLContext.createDataFrame(parsed.flatMap(lambda s: parse(s, fields)), schema)

rdd = sc.parallelize(array_of_users)

group by country and then substitute the list of messages for each country by its length, resulting into a rdd of (country, length) tuples
country_count = rdd.groupBy(lambda user: user['country']).mapValues(len)

identify the min and max using as comparison key the second element of the (country, length) tuple
country_min = country_count.min(key = lambda grp: grp[1])
country_max = country_count.max(key = lambda grp: grp[1])
当我运行它时,我收到消息

AttributeError Traceback (most recent call last)
in ()

 16         return []
 17 

---> 18 array_of_users = parsed.SQLContext.createDataFrame(parsed.flatMap(lambda s: parse(s, fields)), schema)

 19 
 20 rdd = sc.parallelize(array_of_users)

AttributeError: 'TransformedDStream' object has no attribute 'SQLContext'
我怎样才能解决这个问题?

取消 提交回答
全部回答(1)
  • 社区小助手
    2019-07-17 23:24:26

    如果我理解正确,您需要按国家/地区对邮件列表进行分组,然后计算每个组中的邮件数,然后选择具有最小和最大邮件数的组。

    在我的脑海中,代码将是这样的:

    assuming the array_of_users is your array of messages

    rdd = sc.parallelize(array_of_users)

    group by country and then substitute the list of messages for each country by its length, resulting into a rdd of (country, length) tuples

    country_count = rdd.groupBy(lambda user: user['country']).mapValues(len)

    identify the min and max using as comparison key the second element of the (country, length) tuple

    country_min = country_count.min(key = lambda grp: grp[1])
    country_max = country_count.max(key = lambda grp: grp[1])

    0 0
相关问答

1

回答

请问 DataFrame的join里面mapjoin是怎么用的?

2022-07-15 15:03:11 84浏览量 回答数 1

1

回答

Spark中ark.hadoop.odps.cupid.eni.enable 参数是啥作用?

2021-12-12 16:15:56 316浏览量 回答数 1

1

回答

用flink 1.11.2 查询hive表自关联(self inner join) 结果不正确

2021-12-06 11:30:50 523浏览量 回答数 1

1

回答

flink使用hive作为维表,kafka作为数据源,join时候报错怎么办?

2021-12-02 11:06:58 583浏览量 回答数 1

1

回答

dataframe spark scala取每组的(MAX-MIN)

2018-12-21 13:12:29 3695浏览量 回答数 1

1

回答

Jupyter笔记本,pyspark,hadoop-aws问题

2018-12-12 18:20:20 2200浏览量 回答数 1

1

回答

如何将Spark Dataframe列的每个值作为字符串传递给python UDF?

2018-12-12 14:09:31 2754浏览量 回答数 1

1

回答

OSS MEDIA C SDK oss_media_hls_close

2018-06-16 20:05:53 548浏览量 回答数 1

1

回答

OSS MEDIA C SDK oss_media_hls_end_m3u8

2018-03-31 00:02:58 527浏览量 回答数 1

1

回答

OSS MEDIA C SDK 在什么情况下用oss_media_hls_stream_close

2018-02-17 18:47:38 674浏览量 回答数 1
+关注
社区小助手
社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。
12
文章
824
问答
问答排行榜
最热
最新
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载