开发者社区> 问答> 正文

使用PySpark计算每个窗口的用户数

社区小助手 2019-01-02 15:13:41 545

我正在使用Kafka流式传输JSON文件,将每一行作为消息发送。其中一个关键是用户email。

然后我使用PySpark计算每个窗口的唯一用户数,使用他们的电子邮件来识别它们。命令

def print_users_count(count):

print 'The number of unique users is:', count

print_users_count((lambda message: message['email']).distinct().count())
给我下面的错误。我怎样才能解决这个问题?

AttributeError Traceback (most recent call last)
in ()

  2     print 'The number of unique users is:', count
  3 

----> 4 print_users_count((lambda message: message['email']).distinct().count())

AttributeError: 'function' object has no attribute 'distinct'
这是我的PySpark代码:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json

try:

sc.stop()

except:

pass  

sc = SparkContext(appName="KafkaStreaming")
sc.setLogLevel("WARN")

ssc = StreamingContext(sc, 60)

Define the PySpark consumer.
kafkaStream = KafkaUtils.createStream(ssc, bootstrap_servers, 'spark-streaming2', {topicName:1})

Parse the incoming data as JSON.
parsed = kafkaStream.map(lambda v: json.loads(v[1]))

Count the number of messages per batch.
parsed.count().map(lambda x:'Messages in this batch: %s' % x).pprint()

消息中间件 JSON Kafka 数据格式
分享到
取消 提交回答
全部回答(1)
  • 社区小助手
    2019-07-17 23:24:25

    你没有将lambda函数应用于任何东西。什么是message参考?对吧lambda函数就是一个函数。那就是为什么你的得到AttributeError: 'function' object has no attribute 'distinct'。它没有应用于任何数据,因此它不返回任何数据。您需要引用密钥email所在的数据框。

    请参阅pyspark docs pyspark.sql.functions.countDistinct(col, *cols)和pyspark.sql.functions.approx_count_distinct pyspark文档。这应该是获得唯一计数的更简单的解决方案。

    0 0
+ 订阅

构建可靠、高效、易扩展的技术基石

推荐文章
相似问题
推荐课程