基于大数据技术的开源在线教育项目

简介: 基于大数据技术的开源在线教育项目

毫无疑问最近几年是在线教育和内容付费的风口期,尤其是今年,大量做教育的公司都获得数额不小的投资。我们不是要去追这个风口,而是因为在线教育的成熟,开始产生巨量有价值的数据,率先采用和融合大数据和AI技术的公司,必将强力推动公司的发展,从而转型成为科技公司。

本文代码可以在github找到https://github.com/SoundHearer/kuaiban

离线数仓流程图

我们的离线数仓目前完成了两个模块,用户注册和做题模块。

首先我们来看用户注册模块

原始数据格式及字段含义

1.baseadlog 广告基础表原始json数据
{
  "adid": "0",     //基础广告表广告id 
  "adname": "注册弹窗广告0",  //广告详情名称
  "dn": "webA"     //网站分区
}
2.basewebsitelog 网站基础表原始json数据
{
  "createtime": "2000-01-01",  
  "creator": "admin",
  "delete": "0",
  "dn": "webC",  //网站分区
  "siteid": "2",  //网站id  
  "sitename": "114",  //网站名称
  "siteurl": "www.114.com/webC"  //网站地址
}
3.memberRegtype 用户跳转地址注册表
{
  "appkey": "-",
  "appregurl": "http:www.webA.com/product/register/index.html",  //注册时跳转地址
  "bdp_uuid": "-",
  "createtime": "2015-05-11",
"dt":"20190722",  //日期分区
  "dn": "webA",    //网站分区 
  "domain": "-",
  "isranreg": "-",
  "regsource": "4", //所属平台 1.PC  2.MOBILE  3.APP   4.WECHAT
  "uid": "0", //用户id
  "websiteid": "0" //对应basewebsitelog 下的siteid网站
}

数据分层

在hdfs中创建ods目录

hadoop dfs -mkdir /user/catelf/ods

在hive里分别建立三个库,dwd、dws、ads 分别用于存储etl清洗后的数据、宽表和拉链表数据、各报表层统计指标数据。

create database dwd;

create database dws;

create database ads;

各层级 ods 存放原始数据

dwd 结构与原始表结构保持一致,对ods层数据进行清洗

dws 以dwd为基础进行轻度汇总

ads 报表层,为各种统计报表提供数据

在hive中各层新建数据表

create external table `dwd`.`dwd_member`(
   uid int,
   ad_id int,
   birthday string,
   email string,
   fullname string,
   iconurl string,
   lastlogin string,
   mailaddr string,
   memberlevel string,
   password string,
   paymoney string,
   phone string,
   qq string,
   register string,
   regupdatetime string,
   unitname string,
   userip string,
   zipcode string)
   partitioned by(
    dt string,
    dn string
   )  
   ROW FORMAT DELIMITED
   FIELDS TERMINATED BY '\t'
   STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY');
create external table `dwd`.`dwd_member_regtype`(
  uid int,
  appkey string,
  appregurl string,
  bdp_uuid string,
  createtime timestamp,
  isranreg string,
  regsource string,
  regsourcename string,
  websiteid int)
  partitioned by(
   dt string,
   dn string)
    ROW FORMAT DELIMITED
   FIELDS TERMINATED BY '\t'
   STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY');
create external table `dwd`.`dwd_base_ad`(
adid int,
adname string)
partitioned by (
 dn string)    
   ROW FORMAT DELIMITED
   FIELDS TERMINATED BY '\t'
   STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY');
 create external table `dwd`.`dwd_base_website`(
  siteid int,
  sitename string,
  siteurl string,
 `delete` int,
  createtime timestamp,
  creator string)
partitioned by (
 dn string)    
   ROW FORMAT DELIMITED
   FIELDS TERMINATED BY '\t'
   STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY');
create external table `dwd`.`dwd_pcentermempaymoney`(
  uid int,
  paymoney string,
  siteid int,
  vip_id int
)   
 partitioned by(
   dt string,
   dn string
 )
   ROW FORMAT DELIMITED
   FIELDS TERMINATED BY '\t'
   STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY');  
 create external table `dwd`.`dwd_vip_level`(
   vip_id int,
   vip_level string,
   start_time timestamp,
   end_time timestamp,
   last_modify_time timestamp,
   max_free string,
   min_free string,
   next_level string,
   operator string
 )partitioned by(
   dn string
 )
   ROW FORMAT DELIMITED
   FIELDS TERMINATED BY '\t'
   STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY

dwd层六张基础表

dwd_base_ad

dwd_base_website

dwd_member

dwd_member_regtype

dwd_pcentermempaymoney

dwd_viplevel

dws层 宽表和拉链表

宽表

dws_member

拉链表

dws_member_zipper

报表层各统计表

ads_register_adnamenum

ads_register_appregurlnum

ads_register_memberlevelnum

ads_register_regsourcenamenum

ads_register_sitenamenum

ads_register_top3memberpay

ads_register_viplevelnum

模拟数据采集上传数据

将日志文件数据直接上传到hadoop集群上

-rw-r--r--   3 hdfs supergroup        570 2020-12-02 14:59 /user/catelf/ods/baseadlog.log
-rw-r--r--   3 hdfs supergroup       2001 2020-12-02 15:01 /user/catelf/ods/baswewebsite.log
-rw-r--r--   3 hdfs supergroup   33618728 2020-12-02 15:05 /user/catelf/ods/member.log
-rw-r--r--   3 hdfs supergroup   20160318 2020-12-02 15:00 /user/catelf/ods/memberRegtype.log
-rw-r--r--   3 hdfs supergroup       1086 2020-12-02 15:04 /user/catelf/ods/pcenterMemViplevel.log
-rw-r--r--   3 hdfs supergroup   71645082 2020-12-02 15:03 /user/catelf/ods/pcentermempaymoney.log

ETL数据清洗

需求1:必须使用Spark进行数据清洗,对用户名、手机号、密码进行脱敏处理,并使用Spark将数据导入到dwd层hive表中

清洗规则 用户名:王XX 手机号:137789 密码直接替换成*

收集日志原始数据后 我们需要对日志原始数据进行清洗 将清洗后的数据存入dwd层表

我们在IDEA中新建warehouse项目,包组织结构如下所示

Bean 目录下存放实体类

Controller 目录下存放程序入口类

Dao 目录下存放各表sql类

Service 目录下存放各表业务类

Util目录下存放工具类

创建EtlDatService清洗类,使用该类读取hdfs上的原始日志数据,对原始日志进行清洗处理,对敏感字段姓名、电话做脱敏处理。filter对不能正常转换json数据的日志数据进行过滤,mappartiton针对每个分区去做数据循环map操作组装成对应表需要的字段,重组完之后coalesce缩小分区(减少文件个数)刷新到目标表中。

import com.alibaba.fastjson.JSONObject
import com.catelf.util.ParseJsonData
import org.apache.spark.SparkContext
import org.apache.spark.sql.{SaveMode, SparkSession}
object EtlDataService {
  /**
   * etl用户注册信息
   *
   * @param ssc
   * @param sparkSession
   */
  def etlMemberRegtypeLog(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._ //隐式转换
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/memberRegtype.log")
      .filter(item => {
        val obj = ParseJsonData.getJsonData(item)
        obj.isInstanceOf[JSONObject]
      }).mapPartitions(partitoin => {
      partitoin.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val appkey = jsonObject.getString("appkey")
        val appregurl = jsonObject.getString("appregurl")
        val bdp_uuid = jsonObject.getString("bdp_uuid")
        val createtime = jsonObject.getString("createtime")
        val isranreg = jsonObject.getString("isranreg")
        val regsource = jsonObject.getString("regsource")
        val regsourceName = regsource match {
          case "1" => "PC"
          case "2" => "Mobile"
          case "3" => "App"
          case "4" => "WeChat"
          case _ => "other"
        }
        val uid = jsonObject.getIntValue("uid")
        val websiteid = jsonObject.getIntValue("websiteid")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        (uid, appkey, appregurl, bdp_uuid, createtime, isranreg, regsource, regsourceName, websiteid, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_member_regtype")
  }
  /**
   * etl用户表数据
   *
   * @param ssc
   * @param sparkSession
   */
  def etlMemberLog(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._ //隐式转换
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/member.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partition => {
      partition.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val ad_id = jsonObject.getIntValue("ad_id")
        val birthday = jsonObject.getString("birthday")
        val email = jsonObject.getString("email")
        val fullname = jsonObject.getString("fullname").substring(0, 1) + "xx"
        val iconurl = jsonObject.getString("iconurl")
        val lastlogin = jsonObject.getString("lastlogin")
        val mailaddr = jsonObject.getString("mailaddr")
        val memberlevel = jsonObject.getString("memberlevel")
        val password = "******"
        val paymoney = jsonObject.getString("paymoney")
        val phone = jsonObject.getString("phone")
        val newphone = phone.substring(0, 3) + "*****" + phone.substring(7, 11)
        val qq = jsonObject.getString("qq")
        val register = jsonObject.getString("register")
        val regupdatetime = jsonObject.getString("regupdatetime")
        val uid = jsonObject.getIntValue("uid")
        val unitname = jsonObject.getString("unitname")
        val userip = jsonObject.getString("userip")
        val zipcode = jsonObject.getString("zipcode")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        (uid, ad_id, birthday, email, fullname, iconurl, lastlogin, mailaddr, memberlevel, password, paymoney, newphone, qq,
          register, regupdatetime, unitname, userip, zipcode, dt, dn)
      })
    }).toDF().coalesce(2).write.mode(SaveMode.Append).insertInto("dwd.dwd_member")
  }
  /**
   * 导入广告表基础数据
   *
   * @param ssc
   * @param sparkSession
   */
  def etlBaseAdLog(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._ //隐式转换
    val result = ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/baseadlog.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partition => {
      partition.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val adid = jsonObject.getIntValue("adid")
        val adname = jsonObject.getString("adname")
        val dn = jsonObject.getString("dn")
        (adid, adname, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Overwrite).insertInto("dwd.dwd_base_ad")
  }
  /**
   * 导入网站表基础数据
   *
   * @param ssc
   * @param sparkSession
   */
  def etlBaseWebSiteLog(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._ //隐式转换
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/baswewebsite.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partition => {
      partition.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val siteid = jsonObject.getIntValue("siteid")
        val sitename = jsonObject.getString("sitename")
        val siteurl = jsonObject.getString("siteurl")
        val delete = jsonObject.getIntValue("delete")
        val createtime = jsonObject.getString("createtime")
        val creator = jsonObject.getString("creator")
        val dn = jsonObject.getString("dn")
        (siteid, sitename, siteurl, delete, createtime, creator, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Overwrite).insertInto("dwd.dwd_base_website")
  }

创建DwdMemberController作为主类

import com.catelf.member.service.EtlDataService
import com.catelf.util.HiveUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object DwdMemberController {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "hdfs")
    val sparkConf = new SparkConf().setAppName("dwd_member_import").setMaster("local[*]")
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    val ssc = sparkSession.sparkContext
    HiveUtil.openDynamicPartition(sparkSession) //开启动态分区
    HiveUtil.openCompression(sparkSession) //开启压缩
    HiveUtil.useSnappyCompression(sparkSession) //使用snappy压缩
    //对用户原始数据进行数据清洗 存入bdl层表中
    EtlDataService.etlBaseAdLog(ssc, sparkSession) //导入基础广告表数据
    EtlDataService.etlBaseWebSiteLog(ssc, sparkSession) //导入基础网站表数据
    EtlDataService.etlMemberLog(ssc, sparkSession) //清洗用户数据
    EtlDataService.etlMemberRegtypeLog(ssc, sparkSession) //清洗用户注册数据
    EtlDataService.etlMemPayMoneyLog(ssc, sparkSession) //导入用户支付情况记录
    EtlDataService.etlMemVipLevelLog(ssc, sparkSession) //导入vip基础数据
  }
}

查看dwd层数据,已经成功导入

基于dwd层表合成dws层的宽表,拉链表

需求2:对dwd层的6张表进行合并,生成一张宽表,先使用Spark Sql实现。有时间的同学需要使用DataFrame api实现功能,并对join进行优化。

需求3:针对dws层宽表的支付金额(paymoney)和vip等级(vip_level)这两个会变动的字段生成一张拉链表,需要一天进行一次更新

创建DwdMemberDao,目的是为了获取数据,提供给上一层使用

import org.apache.spark.sql.SparkSession
object DwdMemberDao {
  def getDwdMember(sparkSession: SparkSession) = {
    sparkSession.sql("select uid,ad_id,email,fullname,iconurl,lastlogin,mailaddr,memberlevel," +
      "password,phone,qq,register,regupdatetime,unitname,userip,zipcode,dt,dn from dwd.dwd_member")
  }
  def getDwdMemberRegType(sparkSession: SparkSession) = {
    sparkSession.sql("select uid,appkey,appregurl,bdp_uuid,createtime as reg_createtime,isranreg," +
      "regsource,regsourcename,websiteid as siteid,dn from dwd.dwd_member_regtype ")
  }
  def getDwdBaseAd(sparkSession: SparkSession) = {
    sparkSession.sql("select adid as ad_id,adname,dn from dwd.dwd_base_ad")
  }
  def getDwdBaseWebSite(sparkSession: SparkSession) = {
    sparkSession.sql("select siteid,sitename,siteurl,delete as site_delete," +
      "createtime as site_createtime,creator as site_creator,dn from dwd.dwd_base_website")
  }
  def getDwdVipLevel(sparkSession: SparkSession) = {
    sparkSession.sql("select vip_id,vip_level,start_time as vip_start_time,end_time as vip_end_time," +
      "last_modify_time as vip_last_modify_time,max_free as vip_max_free,min_free as vip_min_free," +
      "next_level as vip_next_level,operator as vip_operator,dn from dwd.dwd_vip_level")
  }
  def getDwdPcentermemPayMoney(sparkSession: SparkSession) = {
    sparkSession.sql("select uid,cast(paymoney as decimal(10,4)) as paymoney,vip_id,dn from dwd.dwd_pcentermempaymoney")
  }
}

基于dwd层表合成dws层的宽表和拉链表

宽表两种方式实现:

一种查询各单表基于单表dataframe使用 join算子得到结果,再使用groupbykey算子去重和取最大最小值等操作得到最终结果。

一种使用spark sql直接实现。

import com.catelf.member.bean.{DwsMember, DwsMember_Result, MemberZipper, MemberZipperResult}
import com.catelf.member.dao.DwdMemberDao
import org.apache.spark.sql.{SaveMode, SparkSession}
object DwsMemberService {
  def importMemberUseApi(sparkSession: SparkSession, dt: String) = {
    import sparkSession.implicits._ //隐式转换
    val dwdMember = DwdMemberDao.getDwdMember(sparkSession).where(s"dt='${dt}'") //主表用户表
    val dwdMemberRegtype = DwdMemberDao.getDwdMemberRegType(sparkSession)
    val dwdBaseAd = DwdMemberDao.getDwdBaseAd(sparkSession)
    val dwdBaseWebsite = DwdMemberDao.getDwdBaseWebSite(sparkSession)
    val dwdPcentermemPaymoney = DwdMemberDao.getDwdPcentermemPayMoney(sparkSession)
    val dwdVipLevel = DwdMemberDao.getDwdVipLevel(sparkSession)
    import org.apache.spark.sql.functions.broadcast
    val result = dwdMember.join(dwdMemberRegtype, Seq("uid", "dn"), "left_outer")
      .join(broadcast(dwdBaseAd), Seq("ad_id", "dn"), "left_outer")
      .join(broadcast(dwdBaseWebsite), Seq("siteid", "dn"), "left_outer")
      .join(broadcast(dwdPcentermemPaymoney), Seq("uid", "dn"), "left_outer")
      .join(broadcast(dwdVipLevel), Seq("vip_id", "dn"), "left_outer")
      .select("uid", "ad_id", "fullname", "iconurl", "lastlogin", "mailaddr", "memberlevel", "password"
        , "paymoney", "phone", "qq", "register", "regupdatetime", "unitname", "userip", "zipcode", "appkey"
        , "appregurl", "bdp_uuid", "reg_createtime", "isranreg", "regsource", "regsourcename", "adname"
        , "siteid", "sitename", "siteurl", "site_delete", "site_createtime", "site_creator", "vip_id", "vip_level",
        "vip_start_time", "vip_end_time", "vip_last_modify_time", "vip_max_free", "vip_min_free", "vip_next_level"
        , "vip_operator", "dt", "dn").as[DwsMember]
    result.groupByKey(item => item.uid + "_" + item.dn)
      .mapGroups { case (key, iters) =>
        val keys = key.split("_")
        val uid = Integer.parseInt(keys(0))
        val dn = keys(1)
        val dwsMembers = iters.toList
        val paymoney = dwsMembers.filter(_.paymoney != null).map(_.paymoney).reduceOption(_ + _).getOrElse(BigDecimal.apply(0.00)).toString
        val ad_id = dwsMembers.map(_.ad_id).head
        val fullname = dwsMembers.map(_.fullname).head
        val icounurl = dwsMembers.map(_.iconurl).head
        val lastlogin = dwsMembers.map(_.lastlogin).head
        val mailaddr = dwsMembers.map(_.mailaddr).head
        val memberlevel = dwsMembers.map(_.memberlevel).head
        val password = dwsMembers.map(_.password).head
        val phone = dwsMembers.map(_.phone).head
        val qq = dwsMembers.map(_.qq).head
        val register = dwsMembers.map(_.register).head
        val regupdatetime = dwsMembers.map(_.regupdatetime).head
        val unitname = dwsMembers.map(_.unitname).head
        val userip = dwsMembers.map(_.userip).head
        val zipcode = dwsMembers.map(_.zipcode).head
        val appkey = dwsMembers.map(_.appkey).head
        val appregurl = dwsMembers.map(_.appregurl).head
        val bdp_uuid = dwsMembers.map(_.bdp_uuid).head
        val reg_createtime = dwsMembers.map(_.reg_createtime).head
//        val domain = dwsMembers.map(_.domain).head
        val isranreg = dwsMembers.map(_.isranreg).head
        val regsource = dwsMembers.map(_.regsource).head
        val regsourcename = dwsMembers.map(_.regsourcename).head
        val adname = dwsMembers.map(_.adname).head
        val siteid = dwsMembers.map(_.siteid).head
        val sitename = dwsMembers.map(_.sitename).head
        val siteurl = dwsMembers.map(_.siteurl).head
        val site_delete = dwsMembers.map(_.site_delete).head
        val site_createtime = dwsMembers.map(_.site_createtime).head
        val site_creator = dwsMembers.map(_.site_creator).head
        val vip_id = dwsMembers.map(_.vip_id).head
        val vip_level = dwsMembers.map(_.vip_level).max
        val vip_start_time = dwsMembers.map(_.vip_start_time).min
        val vip_end_time = dwsMembers.map(_.vip_end_time).max
        val vip_last_modify_time = dwsMembers.map(_.vip_last_modify_time).max
        val vip_max_free = dwsMembers.map(_.vip_max_free).head
        val vip_min_free = dwsMembers.map(_.vip_min_free).head
        val vip_next_level = dwsMembers.map(_.vip_next_level).head
        val vip_operator = dwsMembers.map(_.vip_operator).head
        DwsMember_Result(uid, ad_id, fullname, icounurl, lastlogin, mailaddr, memberlevel, password, paymoney,
          phone, qq, register, regupdatetime, unitname, userip, zipcode, appkey, appregurl,
          bdp_uuid, reg_createtime, isranreg, regsource, regsourcename, adname, siteid,
          sitename, siteurl, site_delete, site_createtime, site_creator, vip_id, vip_level,
          vip_start_time, vip_end_time, vip_last_modify_time, vip_max_free, vip_min_free,
          vip_next_level, vip_operator, dt, dn)
      }.show()
  }
  def importMember(sparkSession: SparkSession, time: String) = {
    import sparkSession.implicits._ //隐式转换
    //查询全量数据 刷新到宽表
    sparkSession.sql("select uid,first(ad_id),first(fullname),first(iconurl),first(lastlogin)," +
      "first(mailaddr),first(memberlevel),first(password),sum(cast(paymoney as decimal(10,4))),first(phone),first(qq)," +
      "first(register),first(regupdatetime),first(unitname),first(userip),first(zipcode)," +
      "first(appkey),first(appregurl),first(bdp_uuid),first(reg_createtime)," +
      "first(isranreg),first(regsource),first(regsourcename),first(adname),first(siteid),first(sitename)," +
      "first(siteurl),first(site_delete),first(site_createtime),first(site_creator),first(vip_id),max(vip_level)," +
      "min(vip_start_time),max(vip_end_time),max(vip_last_modify_time),first(vip_max_free),first(vip_min_free),max(vip_next_level)," +
      "first(vip_operator),dt,dn from" +
      "(select a.uid,a.ad_id,a.fullname,a.iconurl,a.lastlogin,a.mailaddr,a.memberlevel," +
      "a.password,e.paymoney,a.phone,a.qq,a.register,a.regupdatetime,a.unitname,a.userip," +
      "a.zipcode,a.dt,b.appkey,b.appregurl,b.bdp_uuid,b.createtime as reg_createtime,b.isranreg,b.regsource," +
      "b.regsourcename,c.adname,d.siteid,d.sitename,d.siteurl,d.delete as site_delete,d.createtime as site_createtime," +
      "d.creator as site_creator,f.vip_id,f.vip_level,f.start_time as vip_start_time,f.end_time as vip_end_time," +
      "f.last_modify_time as vip_last_modify_time,f.max_free as vip_max_free,f.min_free as vip_min_free," +
      "f.next_level as vip_next_level,f.operator as vip_operator,a.dn " +
      s"from dwd.dwd_member a left join dwd.dwd_member_regtype b on a.uid=b.uid " +
      "and a.dn=b.dn left join dwd.dwd_base_ad c on a.ad_id=c.adid and a.dn=c.dn left join " +
      " dwd.dwd_base_website d on b.websiteid=d.siteid and b.dn=d.dn left join dwd.dwd_pcentermempaymoney e" +
      s" on a.uid=e.uid and a.dn=e.dn left join dwd.dwd_vip_level f on e.vip_id=f.vip_id and e.dn=f.dn where a.dt='${time}')r  " +
      "group by uid,dn,dt").coalesce(3).write.mode(SaveMode.Overwrite).insertInto("dws.dws_member")
    //查询当天增量数据
    val dayResult = sparkSession.sql(s"select a.uid,sum(cast(a.paymoney as decimal(10,4))) as paymoney,max(b.vip_level) as vip_level," +
      s"from_unixtime(unix_timestamp('$time','yyyyMMdd'),'yyyy-MM-dd') as start_time,'9999-12-31' as end_time,first(a.dn) as dn " +
      " from dwd.dwd_pcentermempaymoney a join " +
      s"dwd.dwd_vip_level b on a.vip_id=b.vip_id and a.dn=b.dn where a.dt='$time' group by uid").as[MemberZipper]
    //查询历史拉链表数据
    val historyResult = sparkSession.sql("select *from dws.dws_member_zipper").as[MemberZipper]
    //两份数据根据用户id进行聚合 对end_time进行重新修改
    val reuslt = dayResult.union(historyResult).groupByKey(item => item.uid + "_" + item.dn)
      .mapGroups { case (key, iters) =>
        val keys = key.split("_")
        val uid = keys(0)
        val dn = keys(1)
        val list = iters.toList.sortBy(item => item.start_time) //对开始时间进行排序
        if (list.size > 1 && "9999-12-31".equals(list(list.size - 2).end_time)) {
          //如果存在历史数据 需要对历史数据的end_time进行修改
          //获取历史数据的最后一条数据
          val oldLastModel = list(list.size - 2)
          //获取当前时间最后一条数据
          val lastModel = list(list.size - 1)
          oldLastModel.end_time = lastModel.start_time
          lastModel.paymoney = list.map(item => BigDecimal.apply(item.paymoney)).sum.toString
        }
        MemberZipperResult(list)
      }.flatMap(_.list).coalesce(3).write.mode(SaveMode.Overwrite).insertInto("dws.dws_member_zipper") //重组对象打散 刷新拉链表
  }
}

创建主类DwsMemberController

import com.catelf.member.service.DwsMemberService
import com.catelf.util.HiveUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object DwsMemberController {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "hdfs")
    val sparkConf = new SparkConf().setAppName("dws_member_import")
      .setMaster("local[*]")
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    val ssc = sparkSession.sparkContext
    HiveUtil.openDynamicPartition(sparkSession) //开启动态分区
    HiveUtil.openCompression(sparkSession) //开启压缩
    HiveUtil.useSnappyCompression(sparkSession) //使用snappy压缩
    DwsMemberService.importMember(sparkSession, "20190722") //根据用户信息聚合用户表数据
//        DwsMemberService.importMemberUseApi(sparkSession, "20190722")
  }
}
  • 调用该方法,查看hive数据,可以看到dws层的宽表和拉链表已经生成

报表层各指标统计

需求4:使用Spark DataFrame Api统计通过各注册跳转地址(appregurl)进行注册的用户数,有时间的再写Spark Sql

需求5:使用Spark DataFrame Api统计各所属网站(sitename)的用户数,有时间的再写Spark Sql

需求6:使用Spark DataFrame Api统计各所属平台的(regsourcename)用户数,有时间的再写Spark Sql

需求7:使用Spark DataFrame Api统计通过各广告跳转(adname)的用户数,有时间的再写Spark Sql

需求8:使用Spark DataFrame Api统计各用户级别(memberlevel)的用户数,有时间的再写Spark Sql

需求9:使用Spark DataFrame Api统计各分区网站、用户级别下(dn、memberlevel)的top3用户,有时间的再写Spark Sql

创建DwsMemberDao

import org.apache.spark.sql.SparkSession
object DwsMemberDao {
  /**
   * 查询用户宽表数据
   *
   * @param sparkSession
   * @return
   */
  def queryIdlMemberData(sparkSession: SparkSession) = {
    sparkSession.sql("select uid,ad_id,memberlevel,register,appregurl,regsource,regsourcename,adname," +
      "siteid,sitename,vip_level,cast(paymoney as decimal(10,4)) as paymoney,dt,dn from dws.dws_member ")
  }
  /**
   * 统计注册来源url人数
   *
   * @param sparkSession
   */
  def queryAppregurlCount(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql(s"select appregurl,count(uid),dn,dt from dws.dws_member where dt='${dt}' group by appregurl,dn,dt")
  }
  //统计所属网站人数
  def querySiteNameCount(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql(s"select sitename,count(uid),dn,dt from dws.dws_member where dt='${dt}' group by sitename,dn,dt")
  }
  //统计所属来源人数
  def queryRegsourceNameCount(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql(s"select regsourcename,count(uid),dn,dt from dws.dws_member where dt='${dt}' group by regsourcename,dn,dt ")
  }
  //统计通过各广告注册的人数
  def queryAdNameCount(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql(s"select adname,count(uid),dn,dt from dws.dws_member where dt='${dt}' group by adname,dn,dt")
  }
  //统计各用户等级人数
  def queryMemberLevelCount(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql(s"select memberlevel,count(uid),dn,dt from dws.dws_member where dt='${dt}' group by memberlevel,dn,dt")
  }
  //统计各用户vip等级人数
  def queryVipLevelCount(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql(s"select vip_level,count(uid),dn,dt from dws.dws_member group where dt='${dt}' by vip_level,dn,dt")
  }
  //统计各memberlevel等级 支付金额前三的用户
  def getTop3MemberLevelPayMoneyUser(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select *from(select uid,ad_id,memberlevel,register,appregurl,regsource" +
      ",regsourcename,adname,siteid,sitename,vip_level,cast(paymoney as decimal(10,4)),row_number() over" +
      s" (partition by dn,memberlevel order by cast(paymoney as decimal(10,4)) desc) as rownum,dn from dws.dws_member where dt='${dt}') " +
      " where rownum<4 order by memberlevel,rownum")
  }
}

报表层统计指标AdsMemberService

import com.catelf.member.bean.QueryResult
import com.catelf.member.dao.DwsMemberDao
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{SaveMode, SparkSession}
object AdsMemberService {
  /**
   * 统计各项指标 使用api
   *
   * @param sparkSession
   */
  def queryDetailApi(sparkSession: SparkSession, dt: String) = {
    import sparkSession.implicits._ //隐式转换
    val result = DwsMemberDao.queryIdlMemberData(sparkSession).as[QueryResult].where(s"dt='${dt}'")
    result.cache()
    //统计注册来源url人数
    val a = result.mapPartitions(partition => {
      partition.map(item => (item.appregurl + "_" + item.dn + "_" + item.dt, 1))
    }).groupByKey(_._1)
      .mapValues(item => item._2).reduceGroups(_ + _)
      .map(item => {
        val keys = item._1.split("_")
        val appregurl = keys(0)
        val dn = keys(1)
        val dt = keys(2)
        (appregurl, item._2, dt, dn)
      }).toDF().coalesce(1).write.mode(SaveMode.Overwrite).insertInto("ads.ads_register_appregurlnum")
    //统计所属网站人数
    result.mapPartitions(partiton => {
      partiton.map(item => (item.sitename + "_" + item.dn + "_" + item.dt, 1))
    }).groupByKey(_._1).mapValues((item => item._2)).reduceGroups(_ + _)
      .map(item => {
        val keys = item._1.split("_")
        val sitename = keys(0)
        val dn = keys(1)
        val dt = keys(2)
        (sitename, item._2, dt, dn)
      }).toDF().coalesce(1).write.mode(SaveMode.Overwrite).insertInto("ads.ads_register_sitenamenum")
    //统计所属来源人数 pc mobile wechat app
    result.mapPartitions(partition => {
      partition.map(item => (item.regsourcename + "_" + item.dn + "_" + item.dt, 1))
    }).groupByKey(_._1).mapValues(item => item._2).reduceGroups(_ + _)
      .map(item => {
        val keys = item._1.split("_")
        val regsourcename = keys(0)
        val dn = keys(1)
        val dt = keys(2)
        (regsourcename, item._2, dt, dn)
      }).toDF().coalesce(1).write.mode(SaveMode.Overwrite).insertInto("ads.ads_register_regsourcenamenum")
    //统计通过各广告进来的人数
    result.mapPartitions(partition => {
      partition.map(item => (item.adname + "_" + item.dn + "_" + item.dt, 1))
    }).groupByKey(_._1).mapValues(_._2).reduceGroups(_ + _)
      .map(item => {
        val keys = item._1.split("_")
        val adname = keys(0)
        val dn = keys(1)
        val dt = keys(2)
        (adname, item._2, dt, dn)
      }).toDF().coalesce(1).write.mode(SaveMode.Overwrite).insertInto("ads.ads_register_adnamenum")
    //统计各用户等级人数
    result.mapPartitions(partition => {
      partition.map(item => (item.memberlevel + "_" + item.dn + "_" + item.dt, 1))
    }).groupByKey(_._1).mapValues(_._2).reduceGroups(_ + _)
      .map(item => {
        val keys = item._1.split("_")
        val memberlevel = keys(0)
        val dn = keys(1)
        val dt = keys(2)
        (memberlevel, item._2, dt, dn)
      }).toDF().coalesce(1).write.mode(SaveMode.Overwrite).insertInto("ads.ads_register_memberlevelnum")
    //统计各用户vip等级人数
    result.mapPartitions(partition => {
      partition.map(item => (item.vip_level + "_" + item.dn + "_" + item.dt, 1))
    }).groupByKey(_._1).mapValues(_._2).reduceGroups(_ + _)
      .map(item => {
        val keys = item._1.split("_")
        val vip_level = keys(0)
        val dn = keys(1)
        val dt = keys(2)
        (vip_level, item._2, dt, dn)
      }).toDF().coalesce(1).write.mode(SaveMode.Overwrite).insertInto("ads.ads_register_viplevelnum")
    //统计各memberlevel等级 支付金额前三的用户
    import org.apache.spark.sql.functions._
    result.withColumn("rownum", row_number().over(Window.partitionBy("dn", "memberlevel").orderBy(desc("paymoney"))))
      .where("rownum<4").orderBy("memberlevel", "rownum")
      .select("uid", "memberlevel", "register", "appregurl", "regsourcename", "adname"
        , "sitename", "vip_level", "paymoney", "rownum", "dt", "dn")
      .coalesce(1).write.mode(SaveMode.Overwrite).insertInto("ads.ads_register_top3memberpay")
  }
  /**
   * 统计各项指标 使用sql
   *
   * @param sparkSession
   */
  def queryDetailSql(sparkSession: SparkSession, dt: String) = {
    val appregurlCount = DwsMemberDao.queryAppregurlCount(sparkSession, dt)
    val siteNameCount = DwsMemberDao.querySiteNameCount(sparkSession, dt)
    val regsourceNameCount = DwsMemberDao.queryRegsourceNameCount(sparkSession, dt)
    val adNameCount = DwsMemberDao.queryAdNameCount(sparkSession, dt)
    val memberLevelCount = DwsMemberDao.queryMemberLevelCount(sparkSession, dt)
    val vipLevelCount = DwsMemberDao.queryMemberLevelCount(sparkSession, dt)
    val top3MemberLevelPayMoneyUser = DwsMemberDao.getTop3MemberLevelPayMoneyUser(sparkSession, dt).show()
  }
}

创建主类AdsMemberController

import com.catelf.member.service.EtlDataService
import com.catelf.util.HiveUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object DwdMemberController {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "hdfs")
    val sparkConf = new SparkConf().setAppName("dwd_member_import").setMaster("local[*]")
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    val ssc = sparkSession.sparkContext
    HiveUtil.openDynamicPartition(sparkSession) //开启动态分区
    HiveUtil.openCompression(sparkSession) //开启压缩
    HiveUtil.useSnappyCompression(sparkSession) //使用snappy压缩
    //对用户原始数据进行数据清洗 存入bdl层表中
    EtlDataService.etlBaseAdLog(ssc, sparkSession) //导入基础广告表数据
    EtlDataService.etlBaseWebSiteLog(ssc, sparkSession) //导入基础网站表数据
    EtlDataService.etlMemberLog(ssc, sparkSession) //清洗用户数据
    EtlDataService.etlMemberRegtypeLog(ssc, sparkSession) //清洗用户注册数据
    EtlDataService.etlMemPayMoneyLog(ssc, sparkSession) //导入用户支付情况记录
    EtlDataService.etlMemVipLevelLog(ssc, sparkSession) //导入vip基础数据
  }
}

查看ads表数据,已经生成

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
2天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
2天前
|
分布式计算 大数据 BI
MaxCompute产品使用合集之MaxCompute项目的数据是否可以被接入到阿里云的Quick BI中
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
2天前
|
存储 数据采集 数据可视化
大数据处理技术
【4月更文挑战第10天】大数据处理涵盖采集、预处理、存储、分析挖掘、展现和应用等关键步骤。采集涉及多种类型数据,预处理确保数据质量,存储管理关注规模、速度和安全,分析挖掘利用机器学习发现价值,展现和应用则通过可视化和检索实现数据价值。云计算和AI强化了大数据处理能力,整体目标是提取数据中的价值,驱动企业和社会进步。
35 4
大数据处理技术
|
2天前
|
机器学习/深度学习 运维 算法
大数据基础工程技术团队4篇论文入选ICLR,ICDE,WWW
近日,由阿里云计算平台大数据基础工程技术团队主导的四篇时间序列相关论文分别被国际顶会ICLR2024、ICDE2024和WWW2024接收。
|
2天前
|
存储 机器学习/深度学习 数据采集
大数据处理与分析实战:技术深度剖析与案例分享
【5月更文挑战第2天】本文探讨了大数据处理与分析的关键环节,包括数据采集、预处理、存储、分析和可视化,并介绍了Hadoop、Spark和机器学习等核心技术。通过电商推荐系统和智慧城市交通管理的实战案例,展示了大数据在提高用户体验和解决实际问题上的效能。随着技术进步,大数据处理与分析将在更多领域发挥作用,推动社会进步。
|
2天前
|
SQL 分布式计算 DataWorks
MaxCompute产品使用合集之要查看MaxCompute Studio中的项目中的计算任务代码,我该怎么操作
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
2天前
|
SQL 分布式计算 DataWorks
MaxCompute产品使用合集之该怎么创建MaxCompute的项目
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
2天前
|
存储 分布式计算 Hadoop
【专栏】Hadoop,开源大数据处理框架:驭服数据洪流的利器
【4月更文挑战第28天】Hadoop,开源大数据处理框架,由Hadoop Common、HDFS、YARN和MapReduce组成,提供大规模数据存储和并行处理。其优势在于可扩展性、容错性、高性能、灵活性及社区支持。然而,数据安全、处理速度、系统复杂性和技能短缺是挑战。通过加强安全措施、结合Spark、自动化工具和培训,Hadoop在应对大数据问题中保持关键地位。
|
2天前
|
存储 数据可视化 大数据
大数据技术框架
【4月更文挑战第20天】大数据一般需要经过6个主要环节,包括数据收集、数据存储、资源管理与服务协调、计算引擎、数据分析和数据可视化。
|
2天前
|
分布式计算 容灾 大数据
MaxCompute( 原名ODPS)大数据容灾方案与实现(及项目落地实例)专有云
一,背景与概述    复杂系统的灾难恢复是个难题,具有海量数据及复杂业务场景的大数据容灾是个大难题。    MaxCompute是集团内重要数据平台,是自主研发的大数据解决方案,其规模和稳定性在业界都是领先的。
1502 12

热门文章

最新文章