pyspark有条件地解析固定宽度的文本文件-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

pyspark有条件地解析固定宽度的文本文件

2018-12-19 16:00:18 1508 1

我有一个固定宽度的文件,我不知道它的格式,直到它中的某个变量检查​​某个变量是'01'还是'02'。所以我想创建这样的东西:

myreport= spark.read.text("/mnt/path/mydata")
myreport= myreport.select(myreport.value.substr(1,3).alias('client'),
myreport.value.substr(4,2).alias('rptnum'),
if rptnum = '01', then
myreport.value.substr(6,2).cast('integer').alias('mo1'),
myreport.value.substr(8,2).cast('integer').alias('mo2'),
myreport.value.substr(12,2).cast('integer').alias('mo3'),
Else
myreport.value.substr(6,2).cast('integer').alias('mo1'),
myreport.value.substr(8,2).cast('integer').alias('mo2'),
myreport.value.substr(12,2).cast('integer').alias('mo3'),
myreport.value.substr(14,2).cast('integer').alias('mo4'),
myreport.value.substr(16,2).cast('integer').alias('mo5'),
myreport.value.substr(18,2).cast('integer').alias('mo6'),
如果rpt编号不是01,基本上列的数量会翻倍。不确定如何在pyspark中执行此操作

取消 提交回答
全部回答(1)
  • 社区小助手
    2019-07-17 23:23:00

    编写从df.rdd.map()中调用的函数并转换/解析每一行。您可以创建相同数量的列,但在一种情况下,某些列将为null。filter()在rptnum上使用,您可以分离行并选择相应的列。

    from pyspark.sql.functions import *
    from pyspark.sql import *

    def transformRow(row):

    value = row['value']
    client = value[1:4]
    rptnum = value[4:6]
    rowDict = {'client': client, 'rptnum': rptnum,'mo1': None,'mo2': None,'mo3': None,'mo4': None,'mo5': None,'mo6': None}
    rowDict['mo1'] = value[6:8]
    rowDict['mo2'] = value[8:10]
    rowDict['mo3'] = value[10:12]
    
    if rptnum != '01' :
        rowDict['mo4'] = value[12:14]
        rowDict['mo5'] = value[14:16]
        rowDict['mo6'] = value[16:18]
    return Row(**rowDict)
    

    myreport= spark.read.text("/mnt/path/mydata")
    myreport = myreport.rdd.map(transformRow).toDF()

    rpt1 = myreport.filter(col("rptnum") == '01').select("mo1","mo2","mo3")
    rpt2 = myreport.filter(col("rptnum") != '01')

    0 0
相关问答

1

回答

在string.h的一个文件当中,这里面所有的函数都是些什么呢?

2022-03-29 22:42:18 115浏览量 回答数 1

1

回答

Lambda架构-Apach Cassandra,Spark,and Pulsar如何直观理解?

2021-12-13 14:48:56 58浏览量 回答数 1

1

回答

Lambda架构-Apach Cassandra,Spark,and Pulsar工作原理是什么?

2021-12-13 14:49:18 96浏览量 回答数 1

1

回答

hadoop中创建本地版web页面,放在/tmp/hadoop-site的返回代码是什么?

2021-12-06 08:58:17 111浏览量 回答数 1

1

回答

怎样才能在hadoop下创建文件夹?

2021-12-04 19:40:02 110浏览量 回答数 1

0

回答

免费体验搭建Hadoop环境,在ssh这步连接不上,提示连接到主机106.xx.xx端口22超时

2020-09-30 11:35:30 369浏览量 回答数 0

1

回答

请问一下 我 flink yarn session 指定了 , 两个 t#Flink

2020-07-06 17:10:42 604浏览量 回答数 1

1

回答

如何在MySQL中查看日志文件?

2020-05-11 16:59:42 419浏览量 回答数 1

2

回答

windows server 2008 r2 搭建ftp外网访问遇到问题

2018-08-02 20:09:13 6372浏览量 回答数 2

1

回答

PC端下载文件jsapi 返回512error Code

2017-12-13 13:11:35 1884浏览量 回答数 1
+关注
社区小助手
社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。
12
文章
824
问答
问答排行榜
最热
最新
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载