使用PySpark计算每个窗口的用户数-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

使用PySpark计算每个窗口的用户数

2019-01-02 15:13:41 2626 1

我正在使用Kafka流式传输JSON文件,将每一行作为消息发送。其中一个关键是用户email。

然后我使用PySpark计算每个窗口的唯一用户数,使用他们的电子邮件来识别它们。命令

def print_users_count(count):

print 'The number of unique users is:', count

print_users_count((lambda message: message['email']).distinct().count())
给我下面的错误。我怎样才能解决这个问题?

AttributeError Traceback (most recent call last)
in ()

  2     print 'The number of unique users is:', count
  3 

----> 4 print_users_count((lambda message: message['email']).distinct().count())

AttributeError: 'function' object has no attribute 'distinct'
这是我的PySpark代码:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json

try:

sc.stop()

except:

pass  

sc = SparkContext(appName="KafkaStreaming")
sc.setLogLevel("WARN")

ssc = StreamingContext(sc, 60)

Define the PySpark consumer.
kafkaStream = KafkaUtils.createStream(ssc, bootstrap_servers, 'spark-streaming2', {topicName:1})

Parse the incoming data as JSON.
parsed = kafkaStream.map(lambda v: json.loads(v[1]))

Count the number of messages per batch.
parsed.count().map(lambda x:'Messages in this batch: %s' % x).pprint()

取消 提交回答
全部回答(1)
  • 社区小助手
    2019-07-17 23:24:25

    你没有将lambda函数应用于任何东西。什么是message参考?对吧lambda函数就是一个函数。那就是为什么你的得到AttributeError: 'function' object has no attribute 'distinct'。它没有应用于任何数据,因此它不返回任何数据。您需要引用密钥email所在的数据框。

    请参阅pyspark docs pyspark.sql.functions.countDistinct(col, *cols)和pyspark.sql.functions.approx_count_distinct pyspark文档。这应该是获得唯一计数的更简单的解决方案。

    0 0
相关问答

1

回答

Spark是基于什么来计算的机制呢?

2022-08-03 13:03:56 210浏览量 回答数 1

1

回答

Spark计算框架的官网地址是什么?

2022-01-13 18:25:48 724浏览量 回答数 1

1

回答

怎么使用Spark对接Kafka进行实时计算?

2021-12-12 12:04:15 176浏览量 回答数 1

1

回答

Spark使用JindoFS计算加速读取parquet数据的前提是什么?

2021-12-09 21:14:30 206浏览量 回答数 1

1

回答

spark的五大优势是什么?

2021-12-07 07:42:01 324浏览量 回答数 1

1

回答

Spark如何实现交互式计算?

2021-12-10 14:05:02 103浏览量 回答数 1

1

回答

Spark计算框架可以分为几层?

2021-12-07 13:16:20 80浏览量 回答数 1

1

回答

什么是Spark?它的特点是什么啊?

2021-12-06 16:55:33 123浏览量 回答数 1

1

回答

Hive、Spark无法直接使用官方SDK,哪它们将可以怎么使用呢?

2021-12-09 19:08:21 365浏览量 回答数 1

2

回答

请问开放存储服务OSS支持上传的视频在线播放吗

2014-05-20 07:03:33 6141浏览量 回答数 2
+关注
社区小助手
社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。
12
文章
824
问答
问答排行榜
最热
最新
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载