3)编写 DwdIcebergService
package com.atguigu.iceberg.warehouse.service import java.sql.Timestamp import java.time.LocalDate import java.time.format.DateTimeFormatter import com.atguigu.iceberg.warehouse.bean.{BaseWebsite, MemberRegType, VipLevel} import org.apache.spark.sql.SparkSession object DwdIcebergService { def readOdsData(sparkSession: SparkSession) = { import org.apache.spark.sql.functions._ import sparkSession.implicits._ sparkSession.read.json("/ods/baseadlog.log") .withColumn("adid", col("adid").cast("Int")) .writeTo("hadoop_prod.db.dwd_base_ad").overwritePartitions() sparkSession.read.json("/ods/baswewebsite.log").map(item => { val createtime = item.getAs[String]("createtime") val str = LocalDate.parse(createtime, DateTimeFormatter.ofPattern("yyyy-MM-dd")).atStartOfDay().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) BaseWebsite(item.getAs[String]("siteid").toInt, item.getAs[String]("sitename"), item.getAs[String]("siteurl"), item.getAs[String]("delete").toInt, Timestamp.valueOf(str), item.getAs[String]("creator"), item.getAs[String]("dn")) }).writeTo("hadoop_prod.db.dwd_base_website").overwritePartitions() sparkSession.read.json("/ods/member.log").drop("dn") .withColumn("uid", col("uid").cast("int")) .withColumn("ad_id", col("ad_id").cast("int")) .writeTo("hadoop_prod.db.dwd_member").overwritePartitions() sparkSession.read.json("/ods/memberRegtype.log").drop("domain").drop("dn") .withColumn("regsourcename", col("regsource")) .map(item => { val createtime = item.getAs[String]("createtime") val str = LocalDate.parse(createtime, DateTimeFormatter.ofPattern("yyyy-MM-dd")).atStartOfDay(). format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) MemberRegType(item.getAs[String]("uid").toInt, item.getAs[String]("appkey"), item.getAs[String]("appregurl"), item.getAs[String]("bdp_uuid"), Timestamp.valueOf(str), item.getAs[String]("isranreg"), item.getAs[String]("regsource"), item.getAs[String]("regsourcename"), item.getAs[String]("websiteid").toInt, item.getAs[String]("dt")) }).writeTo("hadoop_prod.db.dwd_member_regtype").overwritePartitions() sparkSession.read.json("/ods/pcenterMemViplevel.log").drop("discountval") .map(item => { val startTime = item.getAs[String]("start_time") val endTime = item.getAs[String]("end_time") val last_modify_time = item.getAs[String]("last_modify_time") val startTimeStr = LocalDate.parse(startTime, DateTimeFormatter.ofPattern("yyyy-MM- dd")).atStartOfDay().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) val endTimeStr = LocalDate.parse(endTime, DateTimeFormatter.ofPattern("yyyy-MM-dd")).atStartOfDay().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) val last_modify_timeStr = LocalDate.parse(last_modify_time, DateTimeFormatter.ofPattern("yyyy-MM-dd")).atStartOfDay().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) VipLevel(item.getAs[String]("vip_id").toInt, item.getAs[String]("vip_level"), Timestamp.valueOf(startTimeStr), Timestamp.valueOf(endTimeStr), Timestamp.valueOf(last_modify_timeStr), item.getAs[String]("max_free"), item.getAs[String]("min_free"), item.getAs[String]("next_level"), item.getAs[String]("operator"), item.getAs[String]("dn")) }).writeTo("hadoop_prod.db.dwd_vip_level").overwritePartitions() sparkSession.read.json("/ods/pcentermempaymoney.log") .withColumn("uid", col("uid").cast("int")) .withColumn("siteid", col("siteid").cast("int")) .withColumn("vip_id", col("vip_id").cast("int")) .writeTo("hadoop_prod.db.dwd_pcentermempaymoney").overwritePartitions() } }
4).编写 DwdIcebergController
package com.atguigu.iceberg.warehouse.controller import com.atguigu.iceberg.warehouse.service.DwdIcebergService import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object DwdIcebergController { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf() .set("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog") .set("spark.sql.catalog.hadoop_prod.type", "hadoop") .set("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/hive/warehouse") .set("spark.sql.catalog.catalog-name.type", "hadoop") .set("spark.sql.catalog.catalog-name.default-namespace", "default") .set("spark.sql.sources.partitionOverwriteMode", "dynamic") .set("spark.sql.session.timeZone", "GMT+8") .setMaster("local[*]").setAppName("table_operations") val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() DwdIcebergService.readOdsData(sparkSession) } }
4.6.4.2 dws 层(表指定多个分区列会有 bug)
1)创建 case class
case class DwsMember( uid: Int, ad_id: Int, fullname: String, iconurl: String, lastlogin: String, mailaddr: String, memberlevel: String, password: String, paymoney: String, phone: String, qq: String, register: String, regupdatetime: String, unitname: String, userip: String, zipcode: String, appkey: String, appregurl: String, bdp_uuid: String, reg_createtime: String, isranreg: String, regsource: String, regsourcename: String, adname: String, siteid: String, sitename: String, siteurl: String, site_delete: String, site_createtime: String, site_creator: String, vip_id: String, vip_level: String, vip_start_time: String, vip_end_time: String, vip_last_modify_time: String, vip_max_free: String, vip_min_free: String, vip_next_level: String, vip_operator: String, dt: String, dn: String ) case class DwsMember_Result( uid: Int, ad_id: Int, fullname: String, iconurl: String, lastlogin: String, mailaddr: String, memberlevel: String, password: String, paymoney: String, phone: String, qq: String, register: String, regupdatetime: String, unitname: String, userip: String, zipcode: String, appkey: String, appregurl: String, bdp_uuid: String, reg_createtime: Timestamp, isranreg: String, regsource: String, regsourcename: String, adname: String, siteid: Int, sitename: String, siteurl: String, site_delete: String, site_createtime: String, site_creator: String, vip_id: Int, vip_level: String, vip_start_time: Timestamp, vip_end_time: Timestamp, vip_last_modify_time: Timestamp, vip_max_free: String, vip_min_free: String, vip_next_level: String, vip_operator: String, dt: String, dn: String )
2)创建 DwdIcebergDao 操作六张基础表
package com.atguigu.iceberg.warehouse.dao import org.apache.spark.sql.SparkSession object DwDIcebergDao { def getDwdMember(sparkSession: SparkSession) = { sparkSession.sql("selectuid,ad_id ,birthday,email,fullname,iconurl,lastlogin,mailaddr,memberlevel," + "password,phone,qq,register,regupdatetime,unitname,userip,zipcode,dt from hadoop_prod.db.dwd_member") } } def getDwdPcentermempaymoney(sparkSession: SparkSession) = { sparkSession.sql("select uid,paymoney,siteid,vip_id,dt,dn hadoop_prod.db.dwd_pcentermempaymoney") } def getDwdVipLevel(sparkSession: SparkSession) = { sparkSession.sql("select vip_id,vip_level,start_time as vip_start_time,end_time as vip_end_time," + "last_modify_time as vip_last_modify_time,max_free as vip_max_free,min_free as vip_min_free,next_level as vip_next_level," + "operator as vip_operator,dn from hadoop_prod.db.dwd_vip_level") } def getDwdBaseWebsite(sparkSession: SparkSession) = { sparkSession.sql("select siteid,sitename,siteurl,delete as site_delete,createtime as site_createtime,creator as site_creator" + ",dn from hadoop_prod.db.dwd_base_website") } def getDwdMemberRegtyp(sparkSession: SparkSession) = { sparkSession.sql("select uid,appkey,appregurl,bdp_uuid,createtime as reg_createtime,isranreg,regsource,regsourcename,websiteid," + "dt from hadoop_prod.db.dwd_member_regtype") } def getDwdBaseAd(sparkSession: SparkSession) = { sparkSession.sql("select adid as ad_id,adname,dn from hadoop_prod.db.dwd_base_ad;") }
3)编写 DwsIcebergService,处理业务
package com.atguigu.iceberg.warehouse.service import java.sql.Timestamp import java.time.LocalDateTime import java.time.format.DateTimeFormatter import com.atguigu.iceberg.warehouse.bean.{DwsMember, DwsMember_Result} import com.atguigu.iceberg.warehouse.dao.DwDIcebergDao import org.apache.spark.sql.SparkSession object DwsIcebergService { def getDwsMemberData(sparkSession: SparkSession, dt: String) = { import sparkSession.implicits._ val dwdPcentermempaymoney = DwDIcebergDao.getDwdPcentermempaymoney(sparkSession).where($"dt" === dt) val dwdVipLevel = DwDIcebergDao.getDwdVipLevel(sparkSession) val dwdMember = DwDIcebergDao.getDwdMember(sparkSession).where($"dt" === dt) val dwdBaseWebsite = DwDIcebergDao.getDwdBaseWebsite(sparkSession) val dwdMemberRegtype = DwDIcebergDao.getDwdMemberRegtyp(sparkSession).where($"dt" ===dt) val dwdBaseAd = DwDIcebergDao.getDwdBaseAd(sparkSession) val result = dwdMember.join(dwdMemberRegtype.drop("dt"), Seq("uid"), "left") .join(dwdPcentermempaymoney.drop("dt"), Seq("uid"), "left") .join(dwdBaseAd, Seq("ad_id", "dn"), "left") .join(dwdBaseWebsite, Seq("siteid", "dn"), "left") .join(dwdVipLevel, Seq("vip_id", "dn"), "left_outer") .select("uid", "ad_id", "fullname", "iconurl", "lastlogin", "mailaddr", "memberlevel","password", "paymoney", "phone", "qq", "register","regupdatetime", "unitname", "userip", "zipcode", "appkey", "appregurl", "bdp_uuid", "reg_createtime", "isranreg", "regsource", "regsourcename", "adname", "siteid", "sitename", "siteurl", "site_delete", "site_createtime", "site_creator", "vip_id", "vip_level", "vip_start_time", "vip_end_time", "vip_last_modify_time", "vip_max_free", "vip_min_free", "vip_next_level", "vip_operator", "dt", "dn").as[DwsMember] val resultData = result.groupByKey(item => item.uid + "_" + item.dn) .mapGroups { case (key, iters) => val keys = key.split("_") val uid = Integer.parseInt(keys(0)) val dn = keys(1) val dwsMembers = iters.toList val paymoney = dwsMembers.filter(_.paymoney!=null).map(item=>BigDecimal.apply(item.paymoney)).reduceOption(_ + _).getOrElse(BigDecimal.apply(0.00)).toString val ad_id = dwsMembers.map(_.ad_id).head val fullname = dwsMembers.map(_.fullname).head val icounurl = dwsMembers.map(_.iconurl).head val lastlogin = dwsMembers.map(_.lastlogin).head val mailaddr = dwsMembers.map(_.mailaddr).head val memberlevel = dwsMembers.map(_.memberlevel).head val password = dwsMembers.map(_.password).head val phone = dwsMembers.map(_.phone).head val qq = dwsMembers.map(_.qq).head val register = dwsMembers.map(_.register).head val regupdatetime = dwsMembers.map(_.regupdatetime).head val unitname = dwsMembers.map(_.unitname).head val userip = dwsMembers.map(_.userip).head val zipcode = dwsMembers.map(_.zipcode).head val appkey = dwsMembers.map(_.appkey).head val appregurl = dwsMembers.map(_.appregurl).head val bdp_uuid = dwsMembers.map(_.bdp_uuid).head val reg_createtime = if (dwsMembers.map(_.reg_createtime).head != null) dwsMembers.map(_.reg_createtime).head else "1970-01-01 00:00:00" val isranreg = dwsMembers.map(_.isranreg).head val regsource = dwsMembers.map(_.regsource).head val regsourcename = dwsMembers.map(_.regsourcename).head val adname = dwsMembers.map(_.adname).head val siteid = if (dwsMembers.map(_.siteid).head != null) dwsMembers.map(_.siteid).head else "0" val sitename = dwsMembers.map(_.sitename).head val siteurl = dwsMembers.map(_.siteurl).head val site_delete = dwsMembers.map(_.site_delete).head val site_createtime = dwsMembers.map(_.site_createtime).head val site_creator = dwsMembers.map(_.site_creator).head val vip_id = if (dwsMembers.map(_.vip_id).head != null) dwsMembers.map(_.vip_id).head else "0" val vip_level = dwsMembers.map(_.vip_level).max val vip_start_time = if (dwsMembers.map(_.vip_start_time).min != null) dwsMembers.map(_.vip_start_time).min else "1970-01-01 00:00:00" val vip_end_time = if (dwsMembers.map(_.vip_end_time).max != null) dwsMembers.map(_.vip_end_time).max else "1970-01-01 00:00:00" val vip_last_modify_time = if (dwsMembers.map(_.vip_last_modify_time).max != null) dwsMembers.map(_.vip_last_modify_time).max else "1970-01-01 00:00:00" val vip_max_free = dwsMembers.map(_.vip_max_free).head val vip_min_free = dwsMembers.map(_.vip_min_free).head val vip_next_level = dwsMembers.map(_.vip_next_level).head val vip_operator = dwsMembers.map(_.vip_operator).head val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss") val reg_createtimeStr = LocalDateTime.parse(reg_createtime, formatter); val vip_start_timeStr = LocalDateTime.parse(vip_start_time, formatter) val vip_end_timeStr = LocalDateTime.parse(vip_end_time, formatter) val vip_last_modify_timeStr = LocalDateTime.parse(vip_last_modify_time, formatter) DwsMember_Result(uid, ad_id, fullname, icounurl, lastlogin, mailaddr, memberlevel,password, paymoney,phone, qq, register, regupdatetime, unitname,userip, zipcode, appkey, appregurl,bdp_uuid,Timestamp.valueOf(reg_createtimeStr), isranreg, regsource,regsourcename, adname, siteid.toInt, sitename, siteurl, site_delete, site_createtime, site_creator, vip_id.toInt, vip_level,Timestamp.valueOf(vip_start_timeStr), Timestamp.valueOf(vip_end_timeStr), Timestamp.valueOf(vip_last_modify_timeStr), vip_max_free, vip_min_free, vip_next_level, vip_operator, dt, dn) } resultData.write.format("iceberg").mode("overwrite"). save("hadoop_prod.db.dws_member") } }
4)编写 DwsIcebergController,进行运行测试
package com.atguigu.iceberg.warehouse.controller import com.atguigu.iceberg.warehouse.service.DwsIcebergService import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object DwsIcebergController { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf() .set("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog") .set("spark.sql.catalog.hadoop_prod.type", "hadoop") .set("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/hive/warehouse") .set("spark.sql.catalog.catalog-name.type", "hadoop") .set("spark.sql.catalog.catalog-name.default-namespace", "default") .set("spark.sql.sources.partitionOverwriteMode", "dynamic") .set("spark.sql.session.timeZone", "GMT+8") .setMaster("local[*]").setAppName("table_operations") val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() DwsIcebergService.getDwsMemberData(sparkSession, "20190722") } }
5)发生报错,和上面在 spark sql 黑窗口测试的错误一致,当有批量数据插入分区时提示分区已关闭无法插入
6)重新建表,分区列去掉 dn,只用 dt,bug:不能指定多个分区,只能指定一个分区列
spark-sql (default)> drop table hadoop_prod.db.dws_member; create table hadoop_prod.db.dws_member( uid int, ad_id int, fullname string, iconurl string, lastlogin string, mailaddr string, memberlevel string, password string, paymoney string, phone string, register string, regupdatetime string, unitname string, userip string, zipcode string, appkey string, appregurl string, bdp_uuid string, reg_createtime timestamp, isranreg string, regsource string, regsourcename string, adname string, siteid int, sitename string, siteurl string, site_delete string, site_createtime string, site_creator string, vip_id int, vip_level string, vip_start_time timestamp, vip_end_time timestamp, vip_last_modify_time timestamp, vip_max_free string, vip_min_free string, vip_next_level string, vip_operator string, dt string, dn string) using iceberg partitioned by(dt);
7)建完表后,重新测试,插入数据成功
4.6.4.3 ads 层
1)编写所需 case class
case class QueryResult( uid: Int, ad_id: Int, memberlevel: String, register: String, appregurl: String, //注册来源url regsource: String, regsourcename: String, adname: String, siteid: String, sitename: String, vip_level: String, paymoney: BigDecimal, dt: String, dn: String )
2)编写 DwsIcebergDao,查询宽表
package com.atguigu.iceberg.warehouse.dao import org.apache.spark.sql.SparkSession object DwsIcebergDao { /** *查询用户宽表数据 * *@param sparkSession *@return */ def queryDwsMemberData(sparkSession: SparkSession) = { sparkSession.sql("select uid,ad_id,memberlevel,register,appregurl,regsource,regsourcename,adname," + "siteid,sitename,vip_level,cast(paymoney as decimal(10,4)) as paymoney,dt,dn from hadoop_prod.db.dws_member ") } }
3)编写 AdsIcebergService,统计指标
package com.atguigu.iceberg.warehouse.service import com.atguigu.iceberg.warehouse.bean.QueryResult import com.atguigu.iceberg.warehouse.dao.DwsIcebergDao import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.{SaveMode, SparkSession} object AdsIcebergService { def queryDetails(sparkSession: SparkSession, dt: String) = { import sparkSession.implicits._ val result = DwsIcebergDao.queryDwsMemberData(sparkSession).as[QueryResult].where(s"dt='${dt}'") result.cache() //统计根据 url 统计人数 wordcount result.mapPartitions(partition => { partition.map(item => (item.appregurl + "_" + item.dn + "_" + item.dt, 1)) }).groupByKey(_._1).mapValues(item => item._2).reduceGroups(_ + _).map(item => { val keys = item._1.split("_") val appregurl = keys(0) val dn = keys(1) val dt = keys(2) (appregurl, item._2, dt, dn) }).toDF("appregurl", "num", "dt", "dn").writeTo("hadoop_prod.db.ads_register_appregurlnum").overwritePartitions() //统计各 memberlevel 等级 支付金额前三的用户 import org.apache.spark.sql.functions._ result.withColumn("rownum",row_number().over(Window.partitionBy("memberlevel"). orderBy(desc("paymoney")))) .where("rownum<4").orderBy("memberlevel", "rownum") .select("uid", "memberlevel", "register", "appregurl", "regsourcename", "adname", "sitename", "vip_level", "paymoney", "rownum", "dt", "dn") .writeTo("hadoop_prod.db.ads_register_top3memberpay").overwritePartitions() } }
4)编写 AdsIcebergController,进行本地测试
package com.atguigu.iceberg.warehouse.controller import com.atguigu.iceberg.warehouse.service.AdsIcebergService import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object AdsIcebergController { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf() .set("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog") .set("spark.sql.catalog.hadoop_prod.type", "hadoop") .set("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/hive/warehouse") .set("spark.sql.catalog.catalog-name.type", "hadoop") .set("spark.sql.catalog.catalog-name.default-namespace", "default") .set("spark.sql.sources.partitionOverwriteMode", "dynamic") .set("spark.sql.session.timeZone", "GMT+8") .setMaster("local[*]").setAppName("dwd_app") val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() AdsIcebergService.queryDetails(sparkSession, "20190722") } }
5)查询,验证结果
4.6.5 yarn 测试
1)local 模式测试完毕后,将代码打成 jar 包,提交到集群上进行测试,那么插入模式当前都是为 overwrite 模式,所以在 yarn 上测试的时候也无需删除历史数据
2)打 jar 包之前,注意将代码中 setMast(local[*]) 注释了,把集群上有的依赖也可用<scope>provided</scope>剔除了打一个瘦包
3)打成 jar 包提交到集群,运行 spark-submit 命令运行 yarn 模式。
spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 4 --executor-memory 2g --queue spark --class com.atguigu.iceberg.warehouse.controller.DwdIcebergContorller iceberg-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 4 --executor-memory 2g --queue spark --class com.atguigu.iceberg.warehouse.controller.DwsIcebergController iceberg-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 4 --executor-memory 2g --queue spark --class com.atguigu.iceberg.warehouse.controller.AdsIcebergController iceberg-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar