我试图在Spark Streaming中将RDD转换为DataFrame。我正在关注以下流程。
socket_stream = ssc.socketTextStream("localhost", 9999)
def convert_to_df(rdd):
schema = StructType([StructField("text", StringType(), True)])
df =spark.createDataFrame(rdd, schema = schema)
df.show(10)
socket_stream.foreachRDD(convert_to_df)
我通过套接字提供输入 nc -lk 9999
如果我将“hello world”作为我的输入,它会向我显示以下错误
StructType can not accept object 'hello world' in type
预期产出
+-------=-+
因为你使用RDD[str]你应该提供匹配的类型。对于原子值,它是相应的AtomicType
from pyspark.sql.types import StringType, StructField, StructType
rdd = sc.parallelize(["hello world"])
spark.createDataFrame(rdd, StringType())
或其字符串描述:
spark.createDataFrame(rdd, "string")
如果要首先使用StructType 转换数据tuples:
schema = StructType([StructField("text", StringType(), True)])
spark.createDataFrame(rdd.map(lambda x: (x, )), schema)
当然,如果您要将每个批次转换为DataFrame它,那么使用Structured Streaming就更有意义了:
lines = (spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load())
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。