开发者社区> 问答> 正文

通过从旧数据框pyspark中选择列将列追加到新创建的数据框

我正在读取JSON,并且有一个dictionary(dictn),其键告诉我应从JSON df中选择所有列。

我正在尝试创建一个新的df,然后追加那些来自dictn的键存在于JSON中的列,但出现以下错误:由于我真的是新手,因此非常感谢对此提供的任何帮助。

'运算符中缺少解析的属性ip#238!Project [ip#238 AS ip#267]。;; \ n!Project [ip#238 AS ip#267] \ n +-LogicalRDD false \

from pyspark.sql.functions import lit
from pyspark.sql.types import StructType
import json
from pyspark.sql.functions import explode

jsn={"body":[{"ip":"177.284.10.91","sg_message_id":"YcbG1IBnQ1-626TaUVg2bQ.filter1049p1las1-18982-5C868E5A-20.0","hostname":"d2214390ce89","useragent":"Mozilla/5.0 (Linux; Android 7.1.2; E6810) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.105 Mobile Safari/537.36","method_name":"mass_gifting","email":"test@aol.com","timestamp":1554076768,"url":"https://engagement.test.com/b/genghisgrill","object_id":42813,"category":["42813-3-11-19-bottomless-is-back","713","mass_gifting"],"sg_event_id":"Krfn-yDfTG-CQ-o8zhTb0w","event":"click","klass":"3-11-19-bottomless-is-back","url_offset":{"index":3,"type":"html"},"rails_env":"production","user_id":78003906,"business_id":713}],"account":"testaccount"}

dictn={'ip':'string',
       'sg_message_id':'string',
       'hostname':'string',
       'method_name':'string',
       'email':'string',
       'timestamp':'bigint',
       'smtp-id':'string',
       'object_id':'bigint',
       'response':'string',
       'sg_event_id':'string',
       'tls':'string',
       'event':'string',
       'klass':'string',
       'user_id':'string',
       'rails_env':'string',
       'business_id':'bigint'}
schema = StructType([])
new_df = sqlContext.createDataFrame(sc.emptyRDD(), schema)
a=[json.dumps(jsn)]
jsonRDD = sc.parallelize(a)
df = spark.read.json(jsonRDD)
x=df.select("body")
df1=df.withColumn("foo",explode("body")).select("foo.*")
for k1,v1 in dictn.items():
  if k1 in df1.columns:
      new_df=new_df.withColumn(k1,df1[k1])
  else:
      new_df=new_df.withColumn(k1,lit(10))
new_df.show()

展开
收起
几许相思几点泪 2019-12-29 20:24:19 953 0
1 条回答
写回答
取消 提交回答
  • 之所以会出现该错误,是因为您试图通过引用另一个DataFrame中的列来添加新列,而Spark实际上并不支持该列。已经在这里提出并回答了这个问题:在另一个DataFrame中添加一列

    但要实现你想要的东西在这里你只需要使用select从df1它给你与列的列表,你从字典获得新的数据帧。

    这应该适合您:

    select_expr = [col(c).alias(c) if c in df1.columns else lit(10).alias(c) for c, _ in dictn.items()]
    
    new_df = df1.select(select_expr)
    
    new_df.show()
    
    2019-12-29 20:24:36
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载