目标: 对于具有架构的数据框
id:string
Cold:string
Medium:string
Hot:string
IsNull:string
annual_sales_c:string
average_check_c:string
credit_rating_c:string
cuisine_c:string
dayparts_c:string
location_name_c:string
market_category_c:string
market_segment_list_c:string
menu_items_c:string
msa_name_c:string
name:string
number_of_employees_c:string
number_of_rooms_c:string
Months In Role:integer
Tenured Status:string
IsCustomer:integer
units_c:string
years_in_business_c:string
medium_interactions_c:string
hot_interactions_c:string
cold_interactions_c:string
is_null_interactions_c:string
我想添加一个新列,它是列的所有键和值的JSON字符串。我在这篇文章PySpark中使用了这种方法- 逐行转换为JSON和相关问题。我的代码
df = df.withColumn("JSON",func.to_json(func.struct([df[x] for x in small_df.columns])))
我有一个问题:
问题: 当任何行的列具有空值(并且我的数据有许多...)时,Json字符串不包含该键。即如果27列中只有9列具有值,那么JSON字符串只有9个键...我想要做的是维护所有键但是对于空值只传递一个空字符串“”
请考虑以下示例DataFrame:
data = [
('one', 1, 10),
(None, 2, 20),
('three', None, 30),
(None, None, 40)
]
sdf = spark.createDataFrame(data, ["A", "B", "C"])
sdf.printSchema()
使用when来实现IF-THEN-ELSE逻辑。如果列不为null,请使用该列。否则返回一个空字符串。
from pyspark.sql.functions import col, to_json, struct, when, lit
sdf = sdf.withColumn(
"JSON",
to_json(
struct(
[
when(
col(x).isNotNull(),
col(x)
).otherwise(lit("")).alias(x)
for x in sdf.columns
]
)
)
)
sdf.show()
另一种选择是使用pyspark.sql.functions.coalesce而不是when:
from pyspark.sql.functions import coalesce
sdf.withColumn(
"JSON",
to_json(
struct(
[coalesce(col(x), lit("")).alias(x) for x in sdf.columns]
)
)
).show(truncate=False)
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。