报表统计_执行框架_旧模块改造 | 学习笔记

简介: 快速学习报表统计_执行框架_旧模块改造

开发者学堂课程【2020版大数据实战项目之 DMP 广告系统(第五阶段)报表统计_执行框架_旧模块改造】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/680/detail/11818


报表统计_执行框架_旧模块改造

 

内容介绍:

一、创建 processor 的类

二、到 DailyReport 中注册

三、总结经验

 

一、创建 processor 的类

1、在前面已经写了一个 RegionReportProcessor,也就是已经有一个报表的处理类了。这个报表的处理类明显不是遵循 report possessor 的写法。

image.png

可以把这个类改成 report possessor 的写法。大致分为两步,第一步,创建一个 processor 的类;第二步,到 DailyReport 当中注册。

2、先去创建,编写一个 scala class,名称叫做 NewRegionReportProcessor。这个 processor 第一件事是让他继承 ReportProcessor,把 class 改为 object,重写其中的方法,实现其中的方法,把把每一个方法都实现。第一个方法 sourceTableName,应该找 ETLRunner,从其中取出 ODS_TABLE_NAME。targetTableName 可以直接到 RegionReportProcessor 中拷贝,也可以手写。在这里我们手写,这个可以叫做 report_data_region_"+KudoHelper.formattedDate,formattedDate 就是格式化过的日期字符串。

3、下面要提供目标表的 schema 信息,拷贝目标表的 schema 信息.

import scala. collection.JavaConverters._

private val schema=new Schema(

List(

new ColumnSchemaBuilder(name="region",Type.STRING). nullable(false). key(true). build,

new ColumnSchemaBuilder(name=“city”, Type.STRING). nullable(true). key(true). build(),

new ColumnSchemaBuilder(name="count",Type.STRING). nullable(true). key(false). build()

) .asJava

放到 NewRegionReportProcessor 当中。可以直接不 new 出 schema,直接把 schema 返回,这个方法对外提供了一个 schema 信息。

4、倒数第二步提供目标表的分区键,一个是 region,还有一个是 city,这个时候就把 List 提供出去了,提供出去以后,整体上该提供的信息都提供了,就差数据处理的过程了。直接拿到 dataFrame 然后 groupBy,groupBy 之前要用到“.”或者“$”,import dataFrame.sparkSession.implicits._,导入 implicits 后点一下可以找到 region,再点一下可以找到 city 。通过这两个列进行相应的分组,分组完了以后使用 agg,在使用 agg 的时候还要再导入一个 import_org.apache.spark.sql.functions._,这个时候 app 里就可以进行 count,这个 count 里面可以写“*”,对整个 count 进行一个统计,对所有的行进行一个统计,统计完以后,每个分组的行数就有了,可以 as count。

5、最后一步,select(cols = 'region,'city,'count),整体的处理流程就结束了。经过这样一步处理,生成了一个新的 DataFrame,顺着返回值返回到外面去,所以整个功能已经全部实现。

import org. apache. spark. sql.DataFrame

object NewRegionReportProcessor extends ReportProcessor {

/**

* 对外提供源表的表名

*/

override def sourceTableName(): String={

ETLRunner.ODS TABLE NAME

}

/**

*对外提供数据处理的过程

*拿到一个DataFrame,经过处理以后,返回一个新的DataFrame

*/

override def process(dataFrame:Dataframe):DataFrame={

import_org.apache.spark.sql.functions._

import dataFrame.sparkSession.implicits._

dataFrame.groupBy(cols = 'region,'city,)

agg(count(columnName=”*”)as “count”)

select(cols = 'region,'city,'count)

/**

*提供一个目标表名出去

*/

override def targetTableName(): String={

"report_data_region_ "+KuduHelper.formattedDate()

}

/**

*提供目标表的Schema信息

*/

override def targetTableSchema():Schema= {

import scala. collection.JavaConverters.

new Schema(

List(

new ColumnSchemaBuilder(name="region",Type.STRING). nullable(false). key(true). build,

new ColumnSchemaBuilder(name=“city”, Type.STRING). nullable(true). key(true). build(),

new ColumnSchemaBuilder(name="count",Type.STRING). nullable(true). key(false). build()

). as]ava

)

}

/**

*提供目标表的分区键

*/

override def targetTableKeys(): List[String] = {

List (“region”,”city”)

}

 

二、到DailyReport 中注册

1、总共两大步,第一步写这个类,第二步注册这个类,在下图地方写上 NewRegionReportProcessor。因为这是 object,所以不需要把它创建出来,直接放到 processor 里就可以了,写完以后直接运行 main 方法。

2、运行结果是没有任何问题的,整个程序是可以运行的,没有报错。

image.png

 

三、总结经验

1、下面总结一下整个的经验,DailyReportRunner 是整个的入口,

image.png

其次,会有多个 processor,比如说刚才写了 NewRegionProcessor, NewRegionProcessor 是某一个具体报表的处理类。在写一个新的报表,还是两个步骤,第一步,继承 report processor 写一个新的 processor object;第二步把新的 processor,比如说 NewRegionReportProcessor 注册到 DailyReportRunner 当中,经过这两步就可以完成一个新的功能。

2.NewRegionReportProcessor 代码更干净,并且所实现的功能是一样的,但是这个代码会把不同的功能放在不同的代码当中,每个代码各司其职,这样的代码会相对干净一些。这符合软件开发原则当中的单一职责原则,这种代码会多一层体系,就是他每一个东西都要放在一个函数当中,但是整体的流程是干净的。整体流程是干净的,以后要改的时候就方便知道去哪个地方改,并且在这个方法里面改源表名能确定改了这个地方不影响其他地方,因为这是模块化的,这是第一个好处。

3、第二个好处,查看代码,总共12行有效代码,再来查一查 RegionReportPossess,共27行代码,算的都是有效代码。一个只有12行的核心代码,一个有27行代码的核心代码,大家觉得哪一个类会更好些?现在说的这些都是开放性的,如果不喜欢现在这种写法,可以采用原来的写法。我有一个同事,他有一个想法是为什么要搞一个流程出来?不搞这个流程,写一个方法,这个方法接收十来个参数。从业界主流的声音来看,这样不是好做法,其实通过这样的做法能实现功能也是好的,所以大家可以采用自己喜欢的方式。这就是整个的内容,流程希望大家都搞清楚,总共两大块。

image.png

看代码框时要着重考虑这个代码的结构是怎样的,而不是流程。看到 dailyreport runner 的时候,才应该关注流程。

相关文章
|
13天前
|
存储 弹性计算 人工智能
【2025云栖精华内容】 打造持续领先,全球覆盖的澎湃算力底座——通用计算产品发布与行业实践专场回顾
2025年9月24日,阿里云弹性计算团队多位产品、技术专家及服务器团队技术专家共同在【2025云栖大会】现场带来了《通用计算产品发布与行业实践》的专场论坛,本论坛聚焦弹性计算多款通用算力产品发布。同时,ECS云服务器安全能力、资源售卖模式、计算AI助手等用户体验关键环节也宣布升级,让用云更简单、更智能。海尔三翼鸟云服务负责人刘建锋先生作为特邀嘉宾,莅临现场分享了关于阿里云ECS g9i推动AIoT平台的场景落地实践。
【2025云栖精华内容】 打造持续领先,全球覆盖的澎湃算力底座——通用计算产品发布与行业实践专场回顾
|
5天前
|
云安全 人工智能 安全
Dify平台集成阿里云AI安全护栏,构建AI Runtime安全防线
阿里云 AI 安全护栏加入Dify平台,打造可信赖的 AI
|
12天前
|
人工智能 自然语言处理 自动驾驶
关于举办首届全国大学生“启真问智”人工智能模型&智能体大赛决赛的通知
关于举办首届全国大学生“启真问智”人工智能模型&智能体大赛决赛的通知
|
8天前
|
人工智能 运维 Java
Spring AI Alibaba Admin 开源!以数据为中心的 Agent 开发平台
Spring AI Alibaba Admin 正式发布!一站式实现 Prompt 管理、动态热更新、评测集构建、自动化评估与全链路可观测,助力企业高效构建可信赖的 AI Agent 应用。开源共建,现已上线!
776 23
|
7天前
|
人工智能 Java Nacos
基于 Spring AI Alibaba + Nacos 的分布式 Multi-Agent 构建指南
本文将针对 Spring AI Alibaba + Nacos 的分布式多智能体构建方案展开介绍,同时结合 Demo 说明快速开发方法与实际效果。
497 37
|
7天前
|
机器学习/深度学习 人工智能 搜索推荐
万字长文深度解析最新Deep Research技术:前沿架构、核心技术与未来展望
近期发生了什么自 2025 年 2 月 OpenAI 正式发布Deep Research以来,深度研究/深度搜索(Deep Research / Deep Search)正在成为信息检索与知识工作的全新范式:系统以多步推理驱动大规模联网检索、跨源证据。
488 41
|
1天前
|
文字识别 监控 物联网
这是我写的实施一地两检的跨境高铁站旅客资料预报系统的系统架构
本文设计了一套基于IAPIS理念的高铁跨境旅客预报与边检联动系统,覆盖青青草原内地与喜羊羊特别行政区间“一地两检”场景。系统在旅客购票后即采集证件、生物特征及行程信息,通过Advance Passenger Info Checker等模块,向出发地和目的地移民管理机构实时推送数据,实现出入境许可预审。支持线上/线下购票、检票、退票全流程管控,结合面部识别、行为追踪技术监控旅客状态,防止滞留或非法通行。列车发车前进行最终核验,确保所有跨境旅客获边检许可。若旅行被中途取消,系统自动改签、退票并通知各方,保障安全与效率。(239字)