在Spark Streaming Python中将RDD转换为Dataframe-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文

在Spark Streaming Python中将RDD转换为Dataframe

社区小助手 2018-12-21 13:36:36 1318

我试图在Spark Streaming中将RDD转换为DataFrame。我正在关注以下流程。

socket_stream = ssc.socketTextStream("localhost", 9999)
def convert_to_df(rdd):

schema = StructType([StructField("text", StringType(), True)])
df =spark.createDataFrame(rdd, schema = schema)
df.show(10)

socket_stream.foreachRDD(convert_to_df)
我通过套接字提供输入 nc -lk 9999

如果我将“hello world”作为我的输入,它会向我显示以下错误

StructType can not accept object 'hello world' in type
预期产出

+-------=-+

分布式计算 流计算 Spark Python
分享到
取消 提交回答
全部回答(1)
  • 社区小助手
    2019-07-17 23:23:23

    因为你使用RDD[str]你应该提供匹配的类型。对于原子值,它是相应的AtomicType

    from pyspark.sql.types import StringType, StructField, StructType

    rdd = sc.parallelize(["hello world"])
    spark.createDataFrame(rdd, StringType())
    或其字符串描述:

    spark.createDataFrame(rdd, "string")
    如果要首先使用StructType 转换数据tuples:

    schema = StructType([StructField("text", StringType(), True)])

    spark.createDataFrame(rdd.map(lambda x: (x, )), schema)
    当然,如果您要将每个批次转换为DataFrame它,那么使用Structured Streaming就更有意义了:

    lines = (spark

    .readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999)
    .load())
    0 0
大数据
使用钉钉扫一扫加入圈子
+ 订阅

大数据计算实践乐园,近距离学习前沿技术

推荐文章
相似问题
推荐课程