我有固定宽度文件如下
00120181120xyz12341
00220180203abc56792
00320181203pqr25483
以及JSON指定架构的相应文件:
{"Column":"id","From":"1","To":"3"}
{"Column":"date","From":"4","To":"8"}
{"Column":"name","From":"12","To":"3"}
{"Column":"salary","From":"15","To":"5"}
我使用以下方法将模式文件读入DataFrame:
SchemaFile = spark.read\
.format("json")\
.option("header","true")\
.json('C:\Temp\schemaFile\schema.json')
SchemaFile.show() | ||
---|---|---|
Column | From | To |
id | 1 | 3 |
date | 4 | 8 |
name | 12 | 3 |
salary | 15 | 5 |
同样,我正在将固定宽度文件解析为pyspark DataFrame,如下所示:
File = spark.read\
.format("csv")\
.option("header","false")\
.load("C:\Temp\samplefile.txt")
File.show()
+-------------------+
_c0 |
---|
00120181120xyz12341 |
00220180203abc56792 |
00320181203pqr25483 |
我显然可以硬编码每列的位置和长度的值来获得所需的输出:
from pyspark.sql.functions import substring
data = File.select(
substring(File._c0,1,3).alias('id'),
substring(File._c0,4,8).alias('date'),
substring(File._c0,12,3).alias('name'),
substring(File._c0,15,5).alias('salary')
)
data.show() | ||||
---|---|---|---|---|
id | date | name | salary | #+---+--------+----+------+ |
001 | 20181120 | xyz | 12341 | |
002 | 20180203 | abc | 56792 | |
003 | 20181203 | pqr | 25483 |
但是,如何使用SchemaFileDataFrame指定行的宽度和列名称,以便可以在运行时动态应用架构(无需硬编码)?
这里最简单的方法是在其行collect的内容SchemaFile和循环上提取所需的数据。
首先将模式文件作为JSON读入DataFrame。然后调用collect并将每一行映射到字典:
sfDict = map(lambda x: x.asDict(), SchemaFile.collect())
print(sfDict)
现在,您可以循环遍历行sfDict并使用值对列进行子串:
from pyspark.sql.functions import substring
File.select(
*[
substring(
str='_c0',
pos=int(row['From']),
len=int(row['To'])
).alias(row['Column'])
for row in sfDict
]
).show()
请注意,我们必须投To和From,因为他们被指定为您的字符串为整数json文件。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。