开发者学堂课程【大数据 Spark 2020版(知识精讲与实战演练)第三阶段:
DataFrame 介绍_操作】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/690/detail/12048
DataFrame 介绍_操作
刚才我们了解了如何创建 DataFrame,创建方式大概有三种,我们就不再赘述,创建完了以后,我们应该给 frame 做一些处理和操作,我们接下来,通过一个案例来去了解一下 DataFrame 。
我们要去做案例的话,在做案例的过程当中,第一步应该先取名字,明确需求,然后第二步处理数据集,然后第三步,进行代码编写,这是整个的步骤,代码编写当中,一般情况下是 ETL 码,第一步,读取数据集,第二步处理和操作,第三步,得出结论。这个结论无论你是写在一个地方,还是直接展示出来都可以。这个时候,我们就先去明确我们的需求,接下来再去处理一下这个数据集。
我们的需求是什么呢?打开笔记,我们现在的需求就是想去看一看北京的这个 PM 值的一个 CSV 文件当中数据集当中每个月统计了多少次,比如说三月份统计了100次,四月份统计了1000次,所以我们现在想做一个,要做一个这样的一个需求,如果是这样的话,我们就应该先去看一看我们的数据集,对应的就是这个北京 PM 值的数据集。
我们打开 IDEA,这个数据集当中,我们前面也读取过了,读取出来的时候是有一点点问题的,他没有列名,但这好像不行,我觉得我们需要在这个数据集当中为其添加一个 header,之前我们是专门的把这个数学题当中的 header给去掉,在接下来我们就应该再重新拷贝一下,原来的数据集,如果是这样的话,打开我们 spark的这个整个目录,在整个目录当中有一个fail这样的一个文件夹,点开它其中有一个 Dataset,这个 Dataset当中,有一个北京 PM 值这样的一个 CSV,有两个是一样的,一个是后面跟上的 header,就是去掉 header。
一个是有 header,我们把这个有 header拷贝过来,拷贝过来以后放在这个位置。
我们这个 no header he 有 header 的最大区别就是第一行是否是一个 heather 头,我们现在把这个头也拿过来了,我们就可以进行具体的代码编写,但是我们在进行代码编写之前要先去梳理一下整个的一个流程和设计,首先我们来看到我们的这个数据集,在这个数据集当中,统计每个月这个东四的这个统计次数,
比如说一月,我们这个东四有直的有100个,二月有直的有200个对不对,其实我就想看一看每个月统计了多少次,很简单的需求,第一步,应该先拿出年和月,按照年和月来进行分组,在分组之前应该先去掉 NA 的数据,大致就是这样的一个思路,最终我分组完以后,求一个 count 就可以了,这就是我们的步骤。
直接打开我们的代码拿去进行我们的这个操作,我们来 test,命名一个方法,叫做DataFrame。
然后,这个得FRAME3当中第一步应该创建我们的这个 spark session,创建好这个spark session,然后第三部处理,然后,第四步得出结论。
接下来,再来看第一步,创建 spark session 怎么做呢?直接 va 出来一个 spark, spark直接通过 spark session 创建,然后点 builder 。
我们有两个参数需要设置,第一个叫做 master,然后给一个 local,给六个县城,再给他一个 appName,然后,我们就做一个 PM analysis。
要去 DataFrame 来获取的对象,我们要进行数据集的读取,数据集读取的时候,首先 spark.read,然后读取的是 CSV,CSV 的位置在哪?
我们复制一下这个数据集的名字,写上 Dataset,这样读取出来以后,SourceDF,读出这样的一个东西以后,这能直接用吗?因为我们打开这个数据,你会发现第一条数据就是我们的一个 header,如果是这样的话,会去考虑你这个头信息吗、默认情况下是不会的,如果不会的话,我们就需要去看一下它生成的这个东西是什么,每一步最好都要看一下这个数据是怎么样的。
所以,每一步都要进行一个验收,运行这样的一个方法,大家也发现了,我们已经运行出结果,但是这个结果对吗?明显是不对的,大家有没有发现这个结果的头信息,还是什么 column0、column1、column2,然后,第一行数据变成了 member year month,这明显是不合理不科学的,我们该怎么办呢?办法也比较简单,如果是这样的话,我们就可以去指定一个 option,这个 option 指定为 header,然后呢,我们指定这样的一个配置的含义是什么呢?就是告诉 spark 的,在读取 CSV 的时候,把第一行理解为是 header,是有害的,就是这样的一个意思。
如果是这样的话,这个地方有一个黄线,大家不要放过任何一个航线,这个航线的意思,要去给这个布尔型的值设置参数名,给他设置一下叫做 value,我们这个整个数据已经读过了,但是我们还是要再去测验一下,如果我们非常笃定的说,一定把这个结构信息放在了别的费用,不会再有这样的列名了,我们就可以打印一下 scheme,这个 Pro 的 scheme 就是去查看查看我们的这个 game 的 scheme 信息。这也是我们经常要去做的一件事情,通过这样的一个查看,希望大家意识到一件事情,这个 dataframe 中是有结构信息的。
我们再去运行一下我们的这个代码。现在,打印出来的就是我们的密码信息。
我们这个 source frame 当中,首先有这么多列,第一列,叫做 membe r什么类型,是否为空,可以为空。这就是 schema 信息,通过 schema 信息,我们也基本上能够判定我们的这个读取是没有问题的,我们就要进行一些处理,怎么去处理?
第一步应该先列值裁剪一下,我们只能用到只会用到年月和 Dongsi 的偏执,我们把这三列先取出来,第一步,我们应该先去选择列,再去使用这个 data frame 的时候,思路就和 RDD 完全不同,我们再去思考的时候,直接针对于这个数据集里面断裂,我们要去思考,这个字符串怎么处理,对整条数据该怎么拆开,所以,这也是思考上的一些不同,第二步,我们列选择过了以后,我们就可以去过滤掉一些东西,过滤掉 NA 的一些 PM 值,然后过滤,就相当于我们清洗的部分。
清洗过了以后,我们就应该进行分组,就是我们要按月何年来进行分组,我们写的这个 SQL 语句,很像 select,第一个是 year,第二个是 month然后,第三个是 PM,然后接下来,我们 where 之前,我们先是 from,然后 where,我们where什么是 PM,Dongsi 不能等于 NA,然后最终我们是不是应该来进行 group,by year 和 month。
我们按照年和月来进行统计,统计完了以后,我们是不是应该去求一个 come,这就是我们所表达出来的这个含义使用 SQL 去写,我们继续向下,第四步,就应该去聚合,就是这个 count,然后第五步得出结论。
这是我们的一个大体的步骤,我们就来一起去做一做,首先通过 source df,我们就可以第一步选择列 select,我们点一列 year,然后,第二列,我们再点一下 boss,然后第三列,我们再点一下 PM 的 Dongsi,我们把这三列选出来了以后,就继续可以向下进行了,我们是不是可以 where一下去过滤掉的,这个 PM 记录,那怎么去过滤呢?
我们需要去简单做一个判断,在这里面,我们通过 PM 下划线 Dongsi,这样的一个方式来去取得其中 Dongsi 的 PM 值,我们可以直接不等于,如果仅仅是这样的话,其实不行,如果要使用这种形式列支来去判断他是否,你应该使用=!=这样的一个符号来判断。
但是,这个地方出了很多问题,包括前面也出红线,后面也出了这样的一个红线,那是为什么呢?原因很简单,我们在使用这个点儿的时候,本质上还是通过类似于字符串的形式生成了一个 column 对象,也是转换的一个差不多的一个方式。
如果要使用隐式转换,就必须得去导入什么,我们 spark 中的 in places 第二个,这个时候就可以了,我们过滤掉了以后,就可以直接进,我们就去掰两列。第一列叫做 year,先按照 year来进行分组,分完了以后,在这个分组的结果当中,month 再进行分组,分组完以后,我们可以直接使用 count 去求得这一组的这个数,这个 count 就相当于我们 SQL 语句。
这个 count 其实他并不是一个,如果你前面有 group,这个 group会生成一个特殊的叫做 relation group 的生成,
这样的一个对象在其中调用 count,这个 count 不是 action,但是如果在这个 where 后面调用,这就是 X,这个时候,我们看到过以后,其实是一个聚合操作,聚合过以后,我们使用 action 来进行求值,这就是我们整个的这个过程,第四步得出结论,也整合在第三步里面去做,这是我们整个的过程,做完了以后,使用spark来去进行 stop。
这就是整个的这个办法,运行一下,看看结果,这个时候我们已经获取到结果了,获取到结果以后,大家很明显的发现一四年的二月,靠的是632,然后,一四年的11月,是712。
如果大家感兴趣或者有必要的话,可以按照这个 month 的升序来进行一个排列,但是,这个地方就没有必要做的那么复杂,整个案例就结束了,大家不要忘了这一小节要讲的是 data fame 支持什么操作,首先支持 select,其次支持 where,其次支持 group,再其次支持 CT。
这是我们这个这个整个的 frame 支持的东西,关于更多的支持,我们接下来一一再讲。
最后一个步骤,我先把这个代码给注射,我们得到这样的一个 source df,是否能使用 SQL 语句这项查询,前面也还是使用类似于 SQL 的这个操作符,但是其实还是算子操作,命令式的操作,每一个步骤得出的数据集是可以控制的,我们能否直接使用 SQL?可以。
如果直接使用 SQL 话,大概两个步骤,第一个步骤,你要将这个 data frame 注册好临表,第二步,执行,我们就分别来看看这两部怎么做,Source df,然后我们就可以直接 create Or replace,执行查询的时候,注意不是从 source d f 来的,而是从 spark 这个对象当中来的,从我们那个 spark session 对象来的,来一个 SQL,在其中执行,我们前面已经写过一个这个 SQL 语句。我们就复制这个 SQL 语句来给他拷贝到底下进行一个简单的修改。
首先还是 select year month count,然后 FromPM,然后 where,我们应该也加上我们的这个引号范围,然后接下来,应该判断他,这就是我们整个的这个执行查询的步骤执行,查询完了以后,我们会生成一个 result df。
我们可以直接通过这个 result df 来进行这个数据的查询,我们直接通过这个 show去 action 展示我们的这个数据结果,我们整个过程,其实也很简单,读学数学题,然后这个查看一下可以把信息,直接把读取出来的这个 data frame 注册为临表执行查询,然后显示结果,最终关闭 spark session。
这是我们整个的这个过程,我们再来运行一下,这时,运行结果我们已经获取到了,大概就是这样的一个结果,这时,我们做一个简单的
总结,我们现在所讲的这些所有东西,大家只需要有一个印象,我们会再跟大家进行这个更详细的扩展。
最后,先回答大家几个问题,首先 get frame 支持什么样的操作呢?支持 where,然后 group,还有 select 等一系列,这个我们这个就是类似于 SQL 的这种操作方式的命令式 API,然后,还支持 data frame。
这就是我们的 frame,它支持的操作还是比较方便的,甚至可以先使用好,甚至可以先使用 SQL 进行查询,再通过查询到的 dataframe 来进行相应命令式的编写好。可以先使用 SQL 查询过后将返回的结果,data frame 在此进行,这就是我们的这个 Spark 当中的所支持的操作。