我正在使用Kafka流式传输JSON文件,将每一行作为消息发送。其中一个关键是用户email。
然后我使用PySpark计算每个窗口的唯一用户数,使用他们的电子邮件来识别它们。命令
def print_users_count(count):
print 'The number of unique users is:', count
print_users_count((lambda message: message['email']).distinct().count())
给我下面的错误。我怎样才能解决这个问题?
AttributeError Traceback (most recent call last)
in ()
2 print 'The number of unique users is:', count
3
----> 4 print_users_count((lambda message: message['email']).distinct().count())
AttributeError: 'function' object has no attribute 'distinct'
这是我的PySpark代码:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
try:
sc.stop()
except:
pass
sc = SparkContext(appName="KafkaStreaming")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 60)
Define the PySpark consumer.
kafkaStream = KafkaUtils.createStream(ssc, bootstrap_servers, 'spark-streaming2', {topicName:1})
Parse the incoming data as JSON.
parsed = kafkaStream.map(lambda v: json.loads(v[1]))
Count the number of messages per batch.
parsed.count().map(lambda x:'Messages in this batch: %s' % x).pprint()
你没有将lambda函数应用于任何东西。什么是message参考?对吧lambda函数就是一个函数。那就是为什么你的得到AttributeError: 'function' object has no attribute 'distinct'。它没有应用于任何数据,因此它不返回任何数据。您需要引用密钥email所在的数据框。
请参阅pyspark docs pyspark.sql.functions.countDistinct(col, *cols)和pyspark.sql.functions.approx_count_distinct pyspark文档。这应该是获得唯一计数的更简单的解决方案。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。