开发者学堂课程【2020版大数据实战项目之 DMP 广告系统(第五阶段):报表统计_执行框架_框架编写】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/680/detail/11817
报表统计_执行框架_框架编写
内容介绍:
一、框架设计
二、编写 ReportProcessor
三、编写 DailyReportRunner
一、框架设计
框架设计分为两个部分,第一个是表示可变点的 processor,就是个性点的 processor,还有一个表示公共流程的 DailReportRunner,也就是这个报表每天执行一次。两个类是需要进行编写的,下面就编写一下这两个类。
二、编写 ReportProcessor
1、进入 IDEA 中,在 report 下写第一个东西,叫做 DailyReportRunner,这个时候就创建了一个新的类,这个类改变为 object
在其中写一个 main 方法,关闭其他的类,这个时候 DailyReportRunner 就是公共流程所在的类。然后要为每一个个性化的点再创建一个类,这个类叫做 ReportProcessor。Runner 一般提供给要执行的代码去命名,Possessor 一般情况下提供给数据处理的类命名,ReportProcessor 是公共的一个基类。所有的 Report 生成,都继承自这个类,这是一个公共的父类,是一个基类,基类一般情况下可以是 class,但是对于这个 Processor,没有必要提供出任何公共的代码,所以可以把它改为 trait。
2、这个时候要为它提供几个方法,首先,第一个方法,要提供源表的表名,叫做 sourceTableName,它什么都不用接收,但需要返回东西出去,返回一个 string, sourceTableName 的作用就是对外提供源表的表名。第二部分是数据处理,数据处理命名为 process,process 当中一般情况下,让它接收 DataFrame,然后针对 DataFrame进行操作,操作完以后再返回一个 DataFrame。为了表示这是一个处理过程,所以接收一个 DataFrame。再写一个注释,这个方法对外提供数据处理的过程,拿到一个 DataFrame,经过处理以后,返回一个新的 DataFrame。
3、第三个方法和目标表有关,提供一个 targetTableName,targetTableName 提供的是目标表的表名,所以只需要返回一个 string 出去,不需要接收任何参数。提供一个目标表名出去,还要提供目标表的 schema 信息,接下来还需要有一个方法提供目标表的分区键。提供目标表的 scheme 信息要写一个方法,叫做 targetTableSchema,targetTableSchema 当中返回一个数据,返回 Kudo 的 schema,然后提供目标表的分区键,def targetTableKeys,返回一个 List,List 当中是 string 类型,这样的话 trait 就做好了。
package cn. itcast. Report
import org. apache. spark. sql.DataFrame
/**
*所有的 Report 生成,都继承自这个类
*这是一个公共的父类,是一个基类
*/
trait ReportProcessor {
/**
*对外提供源表的表名
/*
def sourceTableName(): String
/**
*对外提供数据处理的过程
*拿到一个 DataFrame,经过处理以后,返回一个新的 DataFrame
*/
def process(dataFrame: DataFrame): DataFrame
/**
*提供一个目标表名出去
*/
def targetTableName(): String
/**
*提供目标表的 Schema 信息
*/
def targetTableSchema(): Schema
/**
*提供标表的分区键
*/
def targetTableKeys(): List[String]
}
三、编写 DailyReportRunner
1、对于不同的报表处理过程来说,他们只需要继承 report possessor,接下来去编写 DailyReportRunner。第一步,创建 SparkSession;第二步,创建一个容器,放置所有的 processor;第三步,循环容器,拿到每一个 processor,每一个processor 都代表一个报表的统计过程;第四步应该写在循环里,读取源表;第五步,进行数据处理;第六步进行数据落地。
2、第一步创建一个 sparksession ,new sparksession.builder,master (master = “local【6】”),接下来再给一个 appName,叫做 daily report runner,接下来进行 load config,但是在 load config 之前,应该先导入编写的工具类的隐式转换。
import cn. itcast. utils.SparkConfigHelper._
import cn. itcast. utils.KuduHelper._
再接下来 getOrCreate,创建 sparksession,创建 sparksession 以后,创建一个容器,放置所有的 processor。直接 processors,new List,这个 List 当中放所有的 processor,但是现在还没有这个东西。所以给个 List ,里面给 ReportProcessor,这个时候这份代码就搞定了,processes 类型也确定了。
3、确定了以后要循环 processors,因为现在还没有编写任何的 processe,所以直接进行循环。for processor,从 processors 里面取出,这个时候 for 循环就并写好了,4、5、6三个步骤应该放在循环里面做。接下来应该读取源表,spark.readKudoTable,从 processor 当中拿到表名 sourceTableName,读取出来源表以后,给它命名,叫做 source。下面进行数据处理,数据处理的结果叫做 result,拿到 processor,processor 当中有一个方法叫做 process,这个 process 方法当中要接受一个 source data frame,直接把 source 传入,传入以后就拿到了 result。第四步的 source 是一个 option,option 是不能进行这种处理的,所以option 要先进行一个非空性的判断。
4、接下来 if (source.isDefined),如果它里面是有内容的,才继续往下走,如果没有内容就结束了。代码要写好看,否则在后期维护的时候会带来很大的麻烦。接下来拿到一个新的 source,叫做 sourceDF,从 source 当中获取 get。因为前面判断过已经 defined 过,已经有了 data frame,所以可以直接 get。继续往下走,这个 process 当中传入的就是 processDF,这个时候已经对数据处理过了,处理过以后可以进行数据的落地。spark.createKudoTable,KudoTable 应该是 processor 当中获取 targetTableName,这是第一个参数。第二个参数在 processor 中获取到 targetTableSchema,第三个参数 processor.targetTableKeys,这样数据落地的这张表就完成了。接下来,通过已经处理过的数据 saveToKudo,把它保存到 Kudo 当中。通过processor 当中取得 targetTableName,来获取到目标表的表名,这个时候整体上的流程就处理完了。
package cn.itcast. report
import org. apache. spark. sql.SparkSession
object DailyReportRunner {
def main(args:Array[String]):Unit={
import cn. itcast. utils.SparkConfigHelper._
import cn. itcast. utils.KuduHelper._
//1、创建 SparkgessEph
val spark=SparkSession. builder()
. master(master="local[6]")
. appName(name="daily report runner")
. loadConfig()
. getOrCreate()
//2.创建一个容器,放置所有的 Processor
val processors= List[ReportProcessor](
)
//3.循环容器,拿到每一个 Processor,每一个 Processor 都代表了一个报表的统计过程
for(processor<-processors){
//4.(读取源表
val source= spark.readKuduTable(processor.
sourceTableName())
if (source.isDefined) {
val sourceDF=source. get
//5.数据处理
val result=processor. process(sourceDF)
// 6. 数据落地
spark. createKuduTable( -
processor.targetTableName(),
processor.targetTableSchema(),
processor.targetTableKeys()
)
result.saveToKuduo(processor.targetTableName())
5、整个代码的逻辑从上层来看,从俯视视角来看,大致分为两部分。第一部分叫做 DailyReportRunner,这是整体上的一个类,整体流程的一个类,第二部分叫做 reportpossessor,reportpossessor 是具体的每一个报表处理器要继承的公共父类。所以整体的处理器当中要写一堆流程代码,每一个流程代码会有一些个性化的东西,比如说源表表名、目标表表名等,这些东西都从 report processor 当中取得,取到以后进行相应的处理,大致的逻辑就是这样。