开发者社区> 问答> 正文

在Spark Streaming Python中将RDD转换为Dataframe

我试图在Spark Streaming中将RDD转换为DataFrame。我正在关注以下流程。

socket_stream = ssc.socketTextStream("localhost", 9999)
def convert_to_df(rdd):

schema = StructType([StructField("text", StringType(), True)])
df =spark.createDataFrame(rdd, schema = schema)
df.show(10)

socket_stream.foreachRDD(convert_to_df)
我通过套接字提供输入 nc -lk 9999

如果我将“hello world”作为我的输入,它会向我显示以下错误

StructType can not accept object 'hello world' in type
预期产出

+-------=-+

展开
收起
社区小助手 2018-12-21 13:36:36 2864 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    因为你使用RDD[str]你应该提供匹配的类型。对于原子值,它是相应的AtomicType

    from pyspark.sql.types import StringType, StructField, StructType

    rdd = sc.parallelize(["hello world"])
    spark.createDataFrame(rdd, StringType())
    或其字符串描述:

    spark.createDataFrame(rdd, "string")
    如果要首先使用StructType 转换数据tuples:

    schema = StructType([StructField("text", StringType(), True)])

    spark.createDataFrame(rdd.map(lambda x: (x, )), schema)
    当然,如果您要将每个批次转换为DataFrame它,那么使用Structured Streaming就更有意义了:

    lines = (spark

    .readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999)
    .load())
    2019-07-17 23:23:23
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
From Python Scikit-Learn to Sc 立即下载
Data Pre-Processing in Python: 立即下载
双剑合璧-Python和大数据计算平台的结合 立即下载