创建DwdController
import com.catelf.qz.service.EtlDataService import com.catelf.util.HiveUtil import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession /** * 解析做题数据导入dwd层 */ object DwdController { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME", "hdfs") val sparkConf = new SparkConf().setAppName("dwd_qz_controller").setMaster("local[*]") val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate() val ssc = sparkSession.sparkContext HiveUtil.openDynamicPartition(sparkSession) //开启动态分区 HiveUtil.openCompression(sparkSession) //开启压缩 HiveUtil.useSnappyCompression(sparkSession) //使用snappy压缩 EtlDataService.etlQzChapter(ssc, sparkSession) EtlDataService.etlQzChapterList(ssc, sparkSession) EtlDataService.etlQzPoint(ssc, sparkSession) EtlDataService.etlQzPointQuestion(ssc, sparkSession) EtlDataService.etlQzSiteCourse(ssc, sparkSession) EtlDataService.etlQzCourse(ssc, sparkSession) EtlDataService.etlQzCourseEdusubject(ssc, sparkSession) EtlDataService.etlQzWebsite(ssc, sparkSession) EtlDataService.etlQzMajor(ssc, sparkSession) EtlDataService.etlQzBusiness(ssc, sparkSession) EtlDataService.etlQzPaperView(ssc, sparkSession) EtlDataService.etlQzCenterPaper(ssc, sparkSession) EtlDataService.etlQzPaper(ssc, sparkSession) EtlDataService.etlQzCenter(ssc, sparkSession) EtlDataService.etlQzQuestion(ssc, sparkSession) EtlDataService.etlQzQuestionType(ssc, sparkSession) EtlDataService.etlQzMemberPaperQuestion(ssc, sparkSession) } }
运行该主类,可以在hive中得到解析后的dwd表
创建QzChapterDao 章节表dao类
import org.apache.spark.sql.SparkSession object QzChapterDao { /** * 查询qz_chapter基础数据 * * @param sparkSession * @return */ def getDwdQzChapter(sparkSession: SparkSession, dt: String) = { sparkSession.sql("select chapterid,chapterlistid,chaptername,sequence,showstatus,creator as " + "chapter_creator,createtime as chapter_createtime,courseid as chapter_courseid,chapternum,outchapterid,dt,dn from dwd.dwd_qz_chapter where " + s"dt='$dt'") } /** * 查询qz_chapter_list基础数据 * * @param sparkSession * @param dt */ def getDwdQzChapterList(sparkSession: SparkSession, dt: String) = { sparkSession.sql("select chapterlistid,chapterlistname,chapterallnum,dn from dwd.dwd_qz_chapter_list " + s"where dt='$dt'") } /** * 查询qz_point基础数据 * * @param sparkSession * @param dt */ def getDwdQzPoint(sparkSession: SparkSession, dt: String) = { sparkSession.sql("select pointid,pointname,pointyear,chapter,excisenum,pointlistid,chapterid," + "pointdescribe,pointlevel,typelist,score as point_score,thought,remid,pointnamelist,typelistids,pointlist,dn from " + s"dwd.dwd_qz_point where dt='$dt'") } /** * 查询qz_point_question基础数据 * * @param sparkSession * @param dt */ def getDwdQzPointQuestion(sparkSession: SparkSession, dt: String) = { sparkSession.sql(s"select pointid,questionid,questype,dn from dwd.dwd_qz_point_question where dt='$dt'") } }
创建QzCourseDao 课程表dao类
import org.apache.spark.sql.SparkSession object QzCourseDao { def getDwdQzSiteCourse(sparkSession: SparkSession, dt: String) = { sparkSession.sql("select sitecourseid,siteid,courseid,sitecoursename,coursechapter,sequence,status," + "creator as sitecourse_creator,createtime as sitecourse_createtime,helppaperstatus,servertype,boardid,showstatus,dt,dn " + s"from dwd.dwd_qz_site_course where dt='${dt}'") } def getDwdQzCourse(sparkSession: SparkSession, dt: String) = { sparkSession.sql("select courseid,majorid,coursename,isadvc,chapterlistid,pointlistid,dn from " + s"dwd.dwd_qz_course where dt='${dt}'") } def getDwdQzCourseEduSubject(sparkSession: SparkSession, dt: String) = { sparkSession.sql("select courseeduid,edusubjectid,courseid,dn from dwd.dwd_qz_course_edusubject " + s"where dt='${dt}'") } }
创建QzMajorDao 主修表dao类
import org.apache.spark.sql.SparkSession object QzMajorDao { def getQzMajor(sparkSession: SparkSession, dt: String) = { sparkSession.sql("select majorid,businessid,siteid,majorname,shortname,status,sequence,creator as major_creator," + s"createtime as major_createtime,dt,dn from dwd.dwd_qz_major where dt='$dt'") } def getQzWebsite(sparkSession: SparkSession, dt: String) = { sparkSession.sql("select siteid,sitename,domain,multicastserver,templateserver,creator," + s"createtime,multicastgateway,multicastport,dn from dwd.dwd_qz_website where dt='$dt'") } def getQzBusiness(sparkSession: SparkSession, dt: String) = { sparkSession.sql(s"select businessid,businessname,dn from dwd.dwd_qz_business where dt='$dt'") } }
创建QzPaperDao 试卷dao类
import org.apache.spark.sql.SparkSession object QzPaperDao { def getDwdQzPaperView(sparkSession: SparkSession, dt: String) = { sparkSession.sql("select paperviewid,paperid,paperviewname,paperparam,openstatus,explainurl,iscontest," + "contesttime,conteststarttime,contestendtime,contesttimelimit,dayiid,status,creator as paper_view_creator," + "createtime as paper_view_createtime,paperviewcatid,modifystatus,description,papertype,downurl,paperuse," + s"paperdifficult,testreport,paperuseshow,dt,dn from dwd.dwd_qz_paper_view where dt='$dt'") } def getDwdQzCenterPaper(sparkSession: SparkSession, dt: String) = { sparkSession.sql(s"select paperviewid,sequence,centerid,dn from dwd.dwd_qz_center_paper where dt='$dt'") } def getDwdQzPaper(sparkSession: SparkSession, dt: String) = { sparkSession.sql("select paperid,papercatid,courseid,paperyear,chapter,suitnum,papername,totalscore,chapterid," + s"chapterlistid,dn from dwd.dwd_qz_paper where dt='$dt'") } def getDwdQzCenter(sparkSession: SparkSession, dt: String) = { sparkSession.sql("select centerid,centername,centeryear,centertype,centerparam,provideuser," + s"centerviewtype,stage,dn from dwd.dwd_qz_center where dt='$dt'") } }
创建QzQuestionDao 做题dao类
import org.apache.spark.sql.SparkSession object QzQuestionDao { def getQzQuestion(sparkSession: SparkSession, dt: String) = { sparkSession.sql("select questionid,parentid,questypeid,quesviewtype,content,answer,analysis,limitminute," + "score,splitscore,status,optnum,lecture,creator,createtime,modifystatus,attanswer,questag,vanalysisaddr,difficulty," + s"quesskill,vdeoaddr,dt,dn from dwd.dwd_qz_question where dt='$dt'") } def getQzQuestionType(sparkSession: SparkSession, dt: String) = { sparkSession.sql("select questypeid,viewtypename,description,papertypename,remark,splitscoretype,dn from " + s"dwd.dwd_qz_question_type where dt='$dt'") } }
创建 UserPaperDetailDao 宽表dao类
import org.apache.spark.sql.SparkSession object UserPaperDetailDao { def getDwdQzMemberPaperQuestion(sparkSession: SparkSession, dt: String) = { sparkSession.sql("select userid,paperviewid,chapterid,sitecourseid,questionid,majorid,useranswer,istrue,lasttime,opertype," + s"paperid,spendtime,score,question_answer,dt,dn from dwd.dwd_qz_member_paper_question where dt='$dt'") } def getDwsQzChapter(sparkSession: SparkSession, dt: String) = { sparkSession.sql("select chapterid,chapterlistid,chaptername,sequence as chapter_sequence,status as chapter_status," + "chapter_courseid,chapternum,chapterallnum,outchapterid,chapterlistname,pointid,questype,pointname,pointyear" + ",chapter,excisenum,pointlistid,pointdescribe,pointlevel,typelist,point_score,thought,remid,pointnamelist," + s"typelistids,pointlist,dn from dws.dws_qz_chapter where dt='$dt'") } def getDwsQzCourse(sparkSession: SparkSession, dt: String) = { sparkSession.sql("select sitecourseid,siteid,courseid,sitecoursename,coursechapter,sequence as course_sequence," + "status as course_status,sitecourse_creator,sitecourse_createtime,helppaperstatus,servertype,boardid,showstatus,majorid," + s"coursename,isadvc,chapterlistid,pointlistid,courseeduid,edusubjectid,dn from dws.dws_qz_course where dt='$dt'") } def getDwsQzMajor(sparkSession: SparkSession, dt: String) = { sparkSession.sql("select majorid,businessid,majorname,shortname,status as major_status,sequence as major_sequence," + "major_creator,major_createtime,businessname,sitename,domain,multicastserver,templateserver,multicastgateway,multicastport," + s"dn from dws.dws_qz_major where dt=$dt") } def getDwsQzPaper(sparkSession: SparkSession, dt: String) = { sparkSession.sql("select paperviewid,paperid,paperviewname,paperparam,openstatus,explainurl,iscontest,contesttime," + "conteststarttime,contestendtime,contesttimelimit,dayiid,status as paper_status,paper_view_creator,paper_view_createtime," + "paperviewcatid,modifystatus,description,paperuse,testreport,centerid,sequence as paper_sequence,centername,centeryear," + "centertype,provideuser,centerviewtype,stage as paper_stage,papercatid,courseid,paperyear,suitnum,papername,totalscore,dn" + s" from dws.dws_qz_paper where dt=$dt") } def getDwsQzQuestion(sparkSession: SparkSession, dt: String) = { sparkSession.sql("select questionid,parentid as question_parentid,questypeid,quesviewtype,content as question_content," + "answer as question_answer,analysis as question_analysis,limitminute as question_limitminute,score as question_score," + "splitscore,lecture,creator as question_creator,createtime as question_createtime,modifystatus as question_modifystatus," + "attanswer as question_attanswer,questag as question_questag,vanalysisaddr as question_vanalysisaddr,difficulty as question_difficulty," + "quesskill,vdeoaddr,description as question_description,splitscoretype as question_splitscoretype,dn " + s" from dws.dws_qz_question where dt=$dt") } }
维度退化、合成宽表 业务类
需求2:基于dwd层基础表数据,需要对表进行维度退化进行表聚合,聚合成dws.dws_qz_chapter(章节维度表),dws.dws_qz_course(课程维度表),dws.dws_qz_major(主修维度表),dws.dws_qz_paper(试卷维度表),dws.dws_qz_question(题目维度表),使用spark sql和dataframe api操作
dws.dws_qz_chapte : 4张表join dwd.dwd_qz_chapter inner join dwd.qz_chapter_list join条件:chapterlistid和dn ,inner join dwd.dwd_qz_point join条件:chapterid和dn, inner join dwd.dwd_qz_point_question join条件:pointid和dn
dws.dws_qz_course:3张表join dwd.dwd_qz_site_course inner join dwd.qz_course join条件:courseid和dn , inner join dwd.qz_course_edusubject join条件:courseid和dn
dws.dws_qz_major:3张表join dwd.dwd_qz_major inner join dwd.dwd_qz_website join条件:siteid和dn , inner join dwd.dwd_qz_business join条件:siteid和dn
dws.dws_qz_paper: 4张表join qz_paperview left join qz_center join 条件:paperviewid和dn,
left join qz_center join 条件:centerid和dn, inner join qz_paper join条件:paperid和dn
dws.dws_qz_paper: 4张表join qz_paperview left join qz_center join 条件:paperviewid和dn,
left join qz_center join 条件:centerid和dn, inner join qz_paper join条件:paperid和dn
需求3:基于dws.dws_qz_chapter、dws.dws_qz_course、dws.dws_qz_major、dws.dws_qz_paper、dws.dws_qz_question、dwd.dwd_qz_member_paper_question 合成宽表dw.user_paper_detail,使用spark sql和dataframe api操作
dws.user_paper_detail:dwd_qz_member_paper_question inner join dws_qz_chapter join条件:chapterid 和dn ,inner join dws_qz_course join条件:sitecourseid和dn , inner join dws_qz_major join条件majorid和dn, inner join dws_qz_paper 条件paperviewid和dn , inner join dws_qz_question 条件questionid和dn
import com.catelf.qz.dao.{QzChapterDao, QzCourseDao, QzMajorDao, QzPaperDao, QzQuestionDao, UserPaperDetailDao} import org.apache.spark.sql.{SaveMode, SparkSession} object DwsQzService { def saveDwsQzChapter(sparkSession: SparkSession, dt: String) = { val dwdQzChapter = QzChapterDao.getDwdQzChapter(sparkSession, dt) val dwdQzChapterlist = QzChapterDao.getDwdQzChapterList(sparkSession, dt) val dwdQzPoint = QzChapterDao.getDwdQzPoint(sparkSession, dt) val dwdQzPointQuestion = QzChapterDao.getDwdQzPointQuestion(sparkSession, dt) val result = dwdQzChapter.join(dwdQzChapterlist, Seq("chapterlistid", "dn")) .join(dwdQzPoint, Seq("chapterid", "dn")) .join(dwdQzPointQuestion, Seq("pointid", "dn")) result.select("chapterid", "chapterlistid", "chaptername", "sequence", "showstatus", "status", "chapter_creator", "chapter_createtime", "chapter_courseid", "chapternum", "chapterallnum", "outchapterid", "chapterlistname", "pointid", "questionid", "questype", "pointname", "pointyear", "chapter", "excisenum", "pointlistid", "pointdescribe", "pointlevel", "typelist", "point_score", "thought", "remid", "pointnamelist", "typelistids", "pointlist", "dt", "dn") .coalesce(1).write.mode(SaveMode.Append).insertInto("dws.dws_qz_chapter") } def saveDwsQzCourse(sparkSession: SparkSession, dt: String) = { val dwdQzSiteCourse = QzCourseDao.getDwdQzSiteCourse(sparkSession, dt) val dwdQzCourse = QzCourseDao.getDwdQzCourse(sparkSession, dt) val dwdQzCourseEdusubject = QzCourseDao.getDwdQzCourseEduSubject(sparkSession, dt) val result = dwdQzSiteCourse.join(dwdQzCourse, Seq("courseid", "dn")) .join(dwdQzCourseEdusubject, Seq("courseid", "dn")) .select("sitecourseid", "siteid", "courseid", "sitecoursename", "coursechapter", "sequence", "status", "sitecourse_creator", "sitecourse_createtime", "helppaperstatus", "servertype", "boardid", "showstatus", "majorid", "coursename", "isadvc", "chapterlistid", "pointlistid", "courseeduid", "edusubjectid" , "dt", "dn") result.coalesce(1).write.mode(SaveMode.Append).insertInto("dws.dws_qz_course") } def saveDwsQzMajor(sparkSession: SparkSession, dt: String) = { val dwdQzMajor = QzMajorDao.getQzMajor(sparkSession, dt) val dwdQzWebsite = QzMajorDao.getQzWebsite(sparkSession, dt) val dwdQzBusiness = QzMajorDao.getQzBusiness(sparkSession, dt) val result = dwdQzMajor.join(dwdQzWebsite, Seq("siteid", "dn")) .join(dwdQzBusiness, Seq("businessid", "dn")) .select("majorid", "businessid", "siteid", "majorname", "shortname", "status", "sequence", "major_creator", "major_createtime", "businessname", "sitename", "domain", "multicastserver", "templateserver", "multicastgateway", "multicastport", "dt", "dn") result.coalesce(1).write.mode(SaveMode.Append).insertInto("dws.dws_qz_major") } def saveDwsQzPaper(sparkSession: SparkSession, dt: String) = { val dwdQzPaperView = QzPaperDao.getDwdQzPaperView(sparkSession, dt) val dwdQzCenterPaper = QzPaperDao.getDwdQzCenterPaper(sparkSession, dt) val dwdQzCenter = QzPaperDao.getDwdQzCenter(sparkSession, dt) val dwdQzPaper = QzPaperDao.getDwdQzPaper(sparkSession, dt) val result = dwdQzPaperView.join(dwdQzCenterPaper, Seq("paperviewid", "dn"), "left") .join(dwdQzCenter, Seq("centerid", "dn"), "left") .join(dwdQzPaper, Seq("paperid", "dn")) .select("paperviewid", "paperid", "paperviewname", "paperparam", "openstatus", "explainurl", "iscontest" , "contesttime", "conteststarttime", "contestendtime", "contesttimelimit", "dayiid", "status", "paper_view_creator", "paper_view_createtime", "paperviewcatid", "modifystatus", "description", "paperuse", "paperdifficult", "testreport", "paperuseshow", "centerid", "sequence", "centername", "centeryear", "centertype", "provideuser", "centerviewtype", "stage", "papercatid", "courseid", "paperyear", "suitnum", "papername", "totalscore", "chapterid", "chapterlistid", "dt", "dn") result.coalesce(1).write.mode(SaveMode.Append).insertInto("dws.dws_qz_paper") } def saveDwsQzQuestionTpe(sparkSession: SparkSession, dt: String) = { val dwdQzQuestion = QzQuestionDao.getQzQuestion(sparkSession, dt) val dwdQzQuestionType = QzQuestionDao.getQzQuestionType(sparkSession, dt) val result = dwdQzQuestion.join(dwdQzQuestionType, Seq("questypeid", "dn")) .select("questionid", "parentid", "questypeid", "quesviewtype", "content", "answer", "analysis" , "limitminute", "score", "splitscore", "status", "optnum", "lecture", "creator", "createtime", "modifystatus" , "attanswer", "questag", "vanalysisaddr", "difficulty", "quesskill", "vdeoaddr", "viewtypename", "papertypename", "remark", "splitscoretype", "dt", "dn") result.coalesce(1).write.mode(SaveMode.Append).insertInto("dws.dws_qz_question") } def saveDwsUserPaperDetail(sparkSession: SparkSession, dt: String) = { val dwdQzMemberPaperQuestion = UserPaperDetailDao.getDwdQzMemberPaperQuestion(sparkSession, dt).drop("paperid") .withColumnRenamed("question_answer", "user_question_answer") val dwsQzChapter = UserPaperDetailDao.getDwsQzChapter(sparkSession, dt).drop("courseid") val dwsQzCourse = UserPaperDetailDao.getDwsQzCourse(sparkSession, dt).withColumnRenamed("sitecourse_creator", "course_creator") .withColumnRenamed("sitecourse_createtime", "course_createtime").drop("majorid") .drop("chapterlistid").drop("pointlistid") val dwsQzMajor = UserPaperDetailDao.getDwsQzMajor(sparkSession, dt) val dwsQzPaper = UserPaperDetailDao.getDwsQzPaper(sparkSession, dt).drop("courseid") val dwsQzQuestion = UserPaperDetailDao.getDwsQzQuestion(sparkSession, dt) dwdQzMemberPaperQuestion.join(dwsQzCourse, Seq("sitecourseid", "dn")). join(dwsQzChapter, Seq("chapterid", "dn")).join(dwsQzMajor, Seq("majorid", "dn")) .join(dwsQzPaper, Seq("paperviewid", "dn")).join(dwsQzQuestion, Seq("questionid", "dn")) .select("userid", "courseid", "questionid", "useranswer", "istrue", "lasttime", "opertype", "paperid", "spendtime", "chapterid", "chaptername", "chapternum", "chapterallnum", "outchapterid", "chapterlistname", "pointid", "questype", "pointyear", "chapter", "pointname" , "excisenum", "pointdescribe", "pointlevel", "typelist", "point_score", "thought", "remid", "pointnamelist", "typelistids", "pointlist", "sitecourseid", "siteid", "sitecoursename", "coursechapter", "course_sequence", "course_status" , "course_creator", "course_createtime", "servertype", "helppaperstatus", "boardid", "showstatus", "majorid", "coursename", "isadvc", "chapterlistid", "pointlistid", "courseeduid", "edusubjectid", "businessid", "majorname", "shortname", "major_status", "major_sequence", "major_creator", "major_createtime", "businessname", "sitename", "domain", "multicastserver", "templateserver", "multicastgateway", "multicastport", "paperviewid", "paperviewname", "paperparam", "openstatus", "explainurl", "iscontest", "contesttime", "conteststarttime", "contestendtime", "contesttimelimit", "dayiid", "paper_status", "paper_view_creator", "paper_view_createtime", "paperviewcatid", "modifystatus", "description", "paperuse", "testreport", "centerid", "paper_sequence", "centername", "centeryear", "centertype", "provideuser", "centerviewtype", "paper_stage", "papercatid", "paperyear", "suitnum", "papername", "totalscore", "question_parentid", "questypeid", "quesviewtype", "question_content", "question_answer", "question_analysis", "question_limitminute", "score", "splitscore", "lecture", "question_creator", "question_createtime", "question_modifystatus", "question_attanswer", "question_questag", "question_vanalysisaddr", "question_difficulty", "quesskill", "vdeoaddr", "question_description", "question_splitscoretype", "user_question_answer", "dt", "dn").coalesce(1) .write.mode(SaveMode.Append).insertInto("dws.dws_user_paper_detail") }
创建DwsController
import com.catelf.qz.service.DwsQzService import com.catelf.util.HiveUtil import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object DwsController { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME", "hdfs") val sparkConf = new SparkConf().setAppName("dws_qz_controller").setMaster("local[*]") val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate() val ssc = sparkSession.sparkContext HiveUtil.openDynamicPartition(sparkSession) //开启动态分区 HiveUtil.openCompression(sparkSession) //开启压缩 HiveUtil.useSnappyCompression(sparkSession) //使用snappy压缩 val dt = "20190722" DwsQzService.saveDwsQzChapter(sparkSession, dt) DwsQzService.saveDwsQzCourse(sparkSession, dt) DwsQzService.saveDwsQzMajor(sparkSession, dt) DwsQzService.saveDwsQzPaper(sparkSession, dt) DwsQzService.saveDwsQzQuestionTpe(sparkSession, dt) DwsQzService.saveDwsUserPaperDetail(sparkSession, dt) } }
可以看到已经生成了dws表数据
报表层各指标统计
需求4:基于宽表统计各试卷平均耗时、平均分,先使用Spark Sql 完成指标统计,再使用Spark DataFrame Api。
需求5:统计各试卷最高分、最低分,先使用Spark Sql 完成指标统计,再使用Spark DataFrame Api。
需求6:按试卷分组统计每份试卷的前三用户详情,先使用Spark Sql 完成指标统计,再使用Spark DataFrame Api。
需求7:按试卷分组统计每份试卷的倒数前三的用户详情,先使用Spark Sql 完成指标统计,再使用Spark DataFrame Api。
需求8:统计各试卷各分段的用户id,分段有0-20,20-40,40-60,60-80,80-100
需求9:统计试卷未及格的人数,及格的人数,试卷的及格率 及格分数60
需求10:统计各题的错误数,正确数,错题率
创建AdsAzDao
import org.apache.spark.sql.SparkSession object AdsQzDao { /** * 统计各试卷平均耗时 平均分 * * @param sparkSession * @param dt * @return */ def getAvgSPendTimeAndScore(sparkSession: SparkSession, dt: String) = { sparkSession.sql(s"select paperviewid,paperviewname,cast(avg(score) as decimal(4,1)) score,cast(avg(spendtime) as decimal(10,2))" + s" spendtime,dt,dn from dws.dws_user_paper_detail where dt='$dt' group by " + "paperviewid,paperviewname,dt,dn order by score desc,spendtime desc"); } /** * 统计试卷 最高分 最低分 * * @param sparkSession * @param dt */ def getTopScore(sparkSession: SparkSession, dt: String) = { sparkSession.sql("select paperviewid,paperviewname,cast(max(score) as decimal(4,1)),cast(min(score) as decimal(4,1)) " + s",dt,dn from dws.dws_user_paper_detail where dt=$dt group by paperviewid,paperviewname,dt,dn ") } /** * 按试卷分组获取每份试卷的分数前三用户详情 * * @param sparkSession * @param dt */ def getTop3UserDetail(sparkSession: SparkSession, dt: String) = { sparkSession.sql("select *from (select userid,paperviewname,chaptername,pointname,sitecoursename,coursename,majorname,shortname," + "sitename,papername,score,dense_rank() over (partition by paperviewid order by score desc) as rk,dt,dn from dws.dws_user_paper_detail) " + "where rk<4") } /** * 按试卷分组获取每份试卷的分数倒数三的用户详情 * * @param sparkSession * @param dt * @return */ def getLow3UserDetail(sparkSession: SparkSession, dt: String) = { sparkSession.sql("select *from (select userid,paperviewname,chaptername,pointname,sitecoursename,coursename,majorname,shortname," + s"sitename,papername,score,dense_rank() over (partition by paperviewid order by score asc) as rk,dt,dn from dws.dws_user_paper_detail where dt='$dt') where rk<4") } /** * 统计各试卷 各分段学员名称 */ def getPaperScoreSegmentUser(sparkSession: SparkSession, dt: String) = { sparkSession.sql("select paperviewid,paperviewname,score_segment,concat_ws(',',collect_list(cast(userid as string))),dt,dn" + " from (select paperviewid,paperviewname,userid," + " case when score >=0 and score <=20 then '0-20'" + " when score >20 and score <=40 then '20-40' " + " when score >40 and score <=60 then '40-60' " + " when score >60 and score <=80 then '60-80' " + " when score >80 and score <=100 then '80-100' end as score_segment" + s",dt,dn from dws.dws_user_paper_detail where dt='$dt') group by paperviewid,paperviewname,score_segment,dt,dn order by paperviewid,score_segment") } /** * 统计各试卷未及格人数 及格人数 及格率 * * @param sparkSession * @param dt */ def getPaperPassDetail(sparkSession: SparkSession, dt: String) = { sparkSession.sql("select t.*,cast(t.passcount/(t.passcount+t.countdetail) as decimal(4,2)) as rate,dt,dn" + " from(select a.paperviewid,a.paperviewname,a.countdetail,a.dt,a.dn,b.passcount from " + s"(select paperviewid,paperviewname,count(*) countdetail,dt,dn from dws.dws_user_paper_detail where dt='$dt' and score between 0 and 60 group by" + s" paperviewid,paperviewname,dt,dn) a join (select paperviewid,count(*) passcount,dn from dws.dws_user_paper_detail where dt='$dt' and score >60 " + "group by paperviewid,dn) b on a.paperviewid=b.paperviewid and a.dn=b.dn)t") } /** * 统计各题 正确人数 错误人数 错题率 top3错误题数多的questionid * * @param sparkSession * @param dt */ def getQuestionDetail(sparkSession: SparkSession, dt: String) = { sparkSession.sql(s"select t.*,cast(t.errcount/(t.errcount+t.rightcount) as decimal(4,2))as rate" + s" from((select questionid,count(*) errcount,dt,dn from dws.dws_user_paper_detail where dt='$dt' and user_question_answer='0' " + s"group by questionid,dt,dn) a join(select questionid,count(*) rightcount,dt,dn from dws.dws_user_paper_detail where dt='$dt' and user_question_answer='1' " + s"group by questionid,dt,dn) b on a.questionid=b.questionid and a.dn=b.dn)t order by errcount desc") } }
创建AdsQzService
import com.catelf.qz.dao.AdsQzDao import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.{SaveMode, SparkSession} object AdsQzService { def getTarget(sparkSession: SparkSession, dt: String) = { val avgDetail = AdsQzDao.getAvgSPendTimeAndScore(sparkSession, dt) val topscore = AdsQzDao.getTopScore(sparkSession, dt) val top3UserDetail = AdsQzDao.getTop3UserDetail(sparkSession, dt) val low3UserDetail = AdsQzDao.getLow3UserDetail(sparkSession, dt) val paperScore = AdsQzDao.getPaperScoreSegmentUser(sparkSession, dt) val paperPassDetail = AdsQzDao.getPaperPassDetail(sparkSession, dt) val questionDetail = AdsQzDao.getQuestionDetail(sparkSession, dt) } def getTargetApi(sparkSession: SparkSession, dt: String) = { import org.apache.spark.sql.functions._ val avgDetail = sparkSession.sql("select paperviewid,paperviewname,score,spendtime,dt,dn from dws.dws_user_paper_detail ") .where(s"dt=${dt}").groupBy("paperviewid", "paperviewname", "dt", "dn"). agg(avg("score").cast("decimal(4,1)").as("avgscore"), avg("spendtime").cast("decimal(10,1)").as("avgspendtime")) .select("paperviewid", "paperviewname", "avgscore", "avgspendtime", "dt", "dn") .coalesce(1).write.mode(SaveMode.Append).insertInto("ads.ads_paper_avgtimeandscore") val topscore = sparkSession.sql("select paperviewid,paperviewname,score,dt,dn from dws.dws_user_paper_detail") .where(s"dt=$dt").groupBy("paperviewid", "paperviewname", "dt", "dn") .agg(max("score").as("maxscore"), min("score").as("minscore")) .select("paperviewid", "paperviewname", "maxscore", "minscore", "dt", "dn") .coalesce(1).write.mode(SaveMode.Append).insertInto("ads.ads_paper_maxdetail") val top3UserDetail = sparkSession.sql("select *from dws.dws_user_paper_detail") .where(s"dt=$dt").select("userid", "paperviewid", "paperviewname", "chaptername", "pointname" , "sitecoursename", "coursename", "majorname", "shortname", "papername", "score", "dt", "dn") .withColumn("rk", dense_rank().over(Window.partitionBy("paperviewid").orderBy(desc("score")))) .where("rk<4") .select("userid", "paperviewid", "paperviewname", "chaptername", "pointname", "sitecoursename" , "coursename", "majorname", "shortname", "papername", "score", "rk", "dt", "dn") .coalesce(1).write.mode(SaveMode.Append).insertInto("ads.ads_top3_userdetail") val low3UserDetail = sparkSession.sql("select *from dws.dws_user_paper_detail") .where(s"dt=$dt").select("userid", "paperviewid", "paperviewname", "chaptername", "pointname" , "sitecoursename", "coursename", "majorname", "shortname", "papername", "score", "dt", "dn") .withColumn("rk", dense_rank().over(Window.partitionBy("paperviewid").orderBy("score"))) .where("rk<4") .select("userid", "paperviewid", "paperviewname", "chaptername", "pointname", "sitecoursename" , "coursename", "majorname", "shortname", "papername", "score", "rk", "dt", "dn") .coalesce(1).write.mode(SaveMode.Append).insertInto("ads.ads_low3_userdetail") val paperScore = sparkSession.sql("select *from dws.dws_user_paper_detail") .where(s"dt=$dt") .select("paperviewid", "paperviewname", "userid", "score", "dt", "dn") .withColumn("score_segment", when(col("score").between(0, 20), "0-20") .when(col("score") > 20 && col("score") <= 40, "20-40") .when(col("score") > 40 && col("score") <= 60, "40-60") .when(col("score") > 60 && col("score") <= 80, "60-80") .when(col("score") > 80 && col("score") <= 100, "80-100")) .drop("score").groupBy("paperviewid", "paperviewname", "score_segment", "dt", "dn") .agg(concat_ws(",", collect_list(col("userid").cast("string").as("userids"))).as("userids")) .select("paperviewid", "paperviewname", "score_segment", "userids", "dt", "dn") .orderBy("paperviewid", "score_segment") .coalesce(1).write.mode(SaveMode.Append).insertInto("ads.ads_paper_scoresegment_user") val paperPassDetail = sparkSession.sql("select * from dws.dws_user_paper_detail").cache() val unPassDetail = paperPassDetail.select("paperviewid", "paperviewname", "dn", "dt") .where(s"dt='$dt'").where("score between 0 and 60") .groupBy("paperviewid", "paperviewname", "dn", "dt") .agg(count("paperviewid").as("unpasscount")) val passDetail = paperPassDetail.select("paperviewid", "dn") .where(s"dt='$dt'").where("score >60") .groupBy("paperviewid", "dn") .agg(count("paperviewid").as("passcount")) unPassDetail.join(passDetail, Seq("paperviewid", "dn")). withColumn("rate", (col("passcount")./(col("passcount") + col("unpasscount"))) .cast("decimal(4,2)")) .select("paperviewid", "paperviewname", "unpasscount", "passcount", "rate", "dt", "dn") .coalesce(1).write.mode(SaveMode.Append).insertInto("ads.ads_user_paper_detail") paperPassDetail.unpersist() val userQuestionDetail = sparkSession.sql("select * from dws.dws_user_paper_detail").cache() val userQuestionError = userQuestionDetail.select("questionid", "dt", "dn", "user_question_answer") .where(s"dt='$dt'").where("user_question_answer='0'").drop("user_question_answer") .groupBy("questionid", "dt", "dn") .agg(count("questionid").as("errcount")) val userQuestionRight = userQuestionDetail.select("questionid", "dn", "user_question_answer") .where(s"dt='$dt'").where("user_question_answer='1'").drop("user_question_answer") .groupBy("questionid", "dn") .agg(count("questionid").as("rightcount")) userQuestionError.join(userQuestionRight, Seq("questionid", "dn")) .withColumn("rate", (col("errcount") / (col("errcount") + col("rightcount"))).cast("decimal(4,2)")) .orderBy(desc("errcount")).coalesce(1) .select("questionid", "errcount", "rightcount", "rate", "dt", "dn") .write.mode(SaveMode.Append).insertInto("ads.ads_user_question_detail") } }
创建AdsController
import com.catelf.qz.service.AdsQzService import com.catelf.util.HiveUtil import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object AdsController { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME", "hdfs") val sparkConf = new SparkConf().setAppName("ads_controller").setMaster("local[*]") val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate() val ssc = sparkSession.sparkContext HiveUtil.openDynamicPartition(sparkSession) //开启动态分
将数据导入MySQL
最后将统计指标用DataX导入MySQL中