报表统计_执行框架_框架编写 | 学习笔记

简介: 快速学习报表统计_执行框架_框架编写

开发者学堂课程【2020版大数据实战项目之 DMP 广告系统(第五阶段)报表统计_执行框架_框架编写】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/680/detail/11817


报表统计_执行框架_框架编写

 

内容介绍:

一、框架设计

二、编写 ReportProcessor

三、编写 DailyReportRunner

 

一、框架设计

框架设计分为两个部分,第一个是表示可变点的 processor,就是个性点的 processor,还有一个表示公共流程的 DailReportRunner,也就是这个报表每天执行一次。两个类是需要进行编写的,下面就编写一下这两个类。

 

二、编写 ReportProcessor

1、进入 IDEA 中,在 report 下写第一个东西,叫做 DailyReportRunner,这个时候就创建了一个新的类,这个类改变为 object

在其中写一个 main 方法,关闭其他的类,这个时候 DailyReportRunner 就是公共流程所在的类。然后要为每一个个性化的点再创建一个类,这个类叫做 ReportProcessor。Runner 一般提供给要执行的代码去命名,Possessor 一般情况下提供给数据处理的类命名,ReportProcessor 是公共的一个基类。所有的 Report 生成,都继承自这个类,这是一个公共的父类,是一个基类,基类一般情况下可以是 class,但是对于这个 Processor,没有必要提供出任何公共的代码,所以可以把它改为 trait。

2、这个时候要为它提供几个方法,首先,第一个方法,要提供源表的表名,叫做 sourceTableName,它什么都不用接收,但需要返回东西出去,返回一个 string, sourceTableName 的作用就是对外提供源表的表名。第二部分是数据处理,数据处理命名为 process,process 当中一般情况下,让它接收 DataFrame,然后针对 DataFrame进行操作,操作完以后再返回一个 DataFrame。为了表示这是一个处理过程,所以接收一个 DataFrame。再写一个注释,这个方法对外提供数据处理的过程,拿到一个 DataFrame,经过处理以后,返回一个新的 DataFrame。

3、第三个方法和目标表有关,提供一个 targetTableName,targetTableName 提供的是目标表的表名,所以只需要返回一个 string 出去,不需要接收任何参数。提供一个目标表名出去,还要提供目标表的 schema 信息,接下来还需要有一个方法提供目标表的分区键。提供目标表的 scheme 信息要写一个方法,叫做 targetTableSchema,targetTableSchema 当中返回一个数据,返回 Kudo 的 schema,然后提供目标表的分区键,def targetTableKeys,返回一个 List,List 当中是 string 类型,这样的话 trait 就做好了。

package cn. itcast. Report

import org. apache. spark. sql.DataFrame

/**

*所有的 Report 生成,都继承自这个类

*这是一个公共的父类,是一个基类

*/

trait ReportProcessor {

/**

*对外提供源表的表名

/*

def sourceTableName(): String

/**

*对外提供数据处理的过程

*拿到一个 DataFrame,经过处理以后,返回一个新的 DataFrame

*/

def process(dataFrame: DataFrame): DataFrame

/**

*提供一个目标表名出去

*/

def targetTableName(): String

/**

*提供目标表的 Schema 信息

*/

def targetTableSchema(): Schema

/**

*提供标表的分区键

*/

def targetTableKeys(): List[String]

}

 

三、编写 DailyReportRunner

1、对于不同的报表处理过程来说,他们只需要继承 report possessor,接下来去编写 DailyReportRunner。第一步,创建 SparkSession;第二步,创建一个容器,放置所有的 processor;第三步,循环容器,拿到每一个 processor,每一个processor 都代表一个报表的统计过程;第四步应该写在循环里,读取源表;第五步,进行数据处理;第六步进行数据落地。

2、第一步创建一个 sparksession ,new sparksession.builder,master (master = “local【6】”),接下来再给一个 appName,叫做 daily report runner,接下来进行 load config,但是在 load config 之前,应该先导入编写的工具类的隐式转换。

import cn. itcast. utils.SparkConfigHelper._

import cn. itcast. utils.KuduHelper._

再接下来 getOrCreate,创建 sparksession,创建 sparksession 以后,创建一个容器,放置所有的 processor。直接 processors,new List,这个 List 当中放所有的 processor,但是现在还没有这个东西。所以给个 List ,里面给 ReportProcessor,这个时候这份代码就搞定了,processes 类型也确定了。

3、确定了以后要循环 processors,因为现在还没有编写任何的 processe,所以直接进行循环。for processor,从 processors 里面取出,这个时候 for 循环就并写好了,4、5、6三个步骤应该放在循环里面做。接下来应该读取源表,spark.readKudoTable,从 processor 当中拿到表名 sourceTableName,读取出来源表以后,给它命名,叫做 source。下面进行数据处理,数据处理的结果叫做 result,拿到 processor,processor 当中有一个方法叫做 process,这个 process 方法当中要接受一个 source data frame,直接把 source 传入,传入以后就拿到了 result。第四步的 source 是一个 option,option 是不能进行这种处理的,所以option 要先进行一个非空性的判断。

4、接下来 if (source.isDefined),如果它里面是有内容的,才继续往下走,如果没有内容就结束了。代码要写好看,否则在后期维护的时候会带来很大的麻烦。接下来拿到一个新的 source,叫做 sourceDF,从 source 当中获取 get。因为前面判断过已经 defined 过,已经有了 data frame,所以可以直接 get。继续往下走,这个 process 当中传入的就是 processDF,这个时候已经对数据处理过了,处理过以后可以进行数据的落地。spark.createKudoTable,KudoTable 应该是 processor 当中获取 targetTableName,这是第一个参数。第二个参数在 processor 中获取到 targetTableSchema,第三个参数 processor.targetTableKeys,这样数据落地的这张表就完成了。接下来,通过已经处理过的数据 saveToKudo,把它保存到 Kudo 当中。通过processor 当中取得 targetTableName,来获取到目标表的表名,这个时候整体上的流程就处理完了。

package cn.itcast. report

import org. apache. spark. sql.SparkSession

object DailyReportRunner {

def main(args:Array[String]):Unit={

import cn. itcast. utils.SparkConfigHelper._

import cn. itcast. utils.KuduHelper._

//1、创建 SparkgessEph

val spark=SparkSession. builder()

. master(master="local[6]")

. appName(name="daily report runner")

. loadConfig()

. getOrCreate()

//2.创建一个容器,放置所有的 Processor

val processors= List[ReportProcessor](

//3.循环容器,拿到每一个 Processor,每一个 Processor 都代表了一个报表的统计过程

for(processor<-processors){

//4.(读取源表

val source= spark.readKuduTable(processor.

sourceTableName())

if (source.isDefined) {

val sourceDF=source. get

//5.数据处理

val result=processor. process(sourceDF)

// 6. 数据落地

spark. createKuduTable( -

processor.targetTableName(),

processor.targetTableSchema(),

processor.targetTableKeys()

)

result.saveToKuduo(processor.targetTableName())

5、整个代码的逻辑从上层来看,从俯视视角来看,大致分为两部分。第一部分叫做 DailyReportRunner,这是整体上的一个类,整体流程的一个类,第二部分叫做 reportpossessor,reportpossessor 是具体的每一个报表处理器要继承的公共父类。所以整体的处理器当中要写一堆流程代码,每一个流程代码会有一些个性化的东西,比如说源表表名、目标表表名等,这些东西都从 report processor 当中取得,取到以后进行相应的处理,大致的逻辑就是这样。

相关文章
|
2月前
|
小程序 JavaScript 前端开发
【经验分享】如何实现小程序代码热更新| 江海计划
【经验分享】如何实现小程序代码热更新| 江海计划
43 1
|
6月前
|
开发框架 小程序 前端开发
阿里云小程序框架
阿里云小程序框架
|
6月前
|
存储 小程序 定位技术
小程序框架
小程序框架
76 0
|
11月前
|
机器学习/深度学习 数据管理 Go
量化交易系统开发逻辑策略编写 | 量化交易系统开发源码示例(go语言版)
一个基本的量化交易系统大致上有两个分层:资金管理层与 ( 商品 策略 ) 层。 框架 ( 模组 ) 决定好了,再更有系统地强化各个模组,进而更接近交易本质。 初阶的交易者多数先选定一个邻近市场,如外汇或是熟悉的台股、台指期、台指选择权,并进行策略的开发。 一个基本策略的框架大致上如下,可以分成数个模组,设计者可以依循这样的框架进行一个初步策略开发或交易程式撰写:
|
设计模式 分布式计算 搜索推荐
报表统计_执行框架_设计 | 学习笔记
快速学习报表统计_执行框架_设计
90 0
报表统计_执行框架_设计 | 学习笔记
|
BI 数据处理 Scala
报表统计_执行框架_旧模块改造 | 学习笔记
快速学习报表统计_执行框架_旧模块改造
76 0
报表统计_执行框架_旧模块改造 | 学习笔记
|
Java 编译器 开发工具
程序的执行流程和开发工具介绍 - 第五课
程序的执行流程和开发工具介绍 - 第五课
137 0
程序的执行流程和开发工具介绍 - 第五课
|
Java 数据库 开发者
项目框架分析| 学习笔记
快速学习项目框架分析
|
存储 SQL XML
工作流引擎Activiti使用进阶!详细解析工作流框架中高级功能的使用示例
本篇文章介绍了Activiti的几个高级用例。主要包括监听流程解析,使用UUID生成器,多租户,执行自定义的SQL,实现流程引擎配置,安全的BPMN 2.0结构以及事件日志的使用。使用这些高级功能,可以使得集成工作流Activiti的项目具有更多的可操作性。
942 0
工作流引擎Activiti使用进阶!详细解析工作流框架中高级功能的使用示例
|
开发框架 小程序 IDE
mPaaS 小程序架构解析 | 实操演示小程序如何实现多端开发
mPaaS 小程序开发框架已全面开放,免费接入,欢迎体验。
3216 0
mPaaS 小程序架构解析 | 实操演示小程序如何实现多端开发