基于大数据技术的开源在线教育项目 二2

简介: 基于大数据技术的开源在线教育项目 二

创建DwdController

import com.catelf.qz.service.EtlDataService
import com.catelf.util.HiveUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
 * 解析做题数据导入dwd层
 */
object DwdController {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "hdfs")
    val sparkConf = new SparkConf().setAppName("dwd_qz_controller").setMaster("local[*]")
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    val ssc = sparkSession.sparkContext
    HiveUtil.openDynamicPartition(sparkSession) //开启动态分区
    HiveUtil.openCompression(sparkSession) //开启压缩
    HiveUtil.useSnappyCompression(sparkSession) //使用snappy压缩
    EtlDataService.etlQzChapter(ssc, sparkSession)
    EtlDataService.etlQzChapterList(ssc, sparkSession)
    EtlDataService.etlQzPoint(ssc, sparkSession)
    EtlDataService.etlQzPointQuestion(ssc, sparkSession)
    EtlDataService.etlQzSiteCourse(ssc, sparkSession)
    EtlDataService.etlQzCourse(ssc, sparkSession)
    EtlDataService.etlQzCourseEdusubject(ssc, sparkSession)
    EtlDataService.etlQzWebsite(ssc, sparkSession)
    EtlDataService.etlQzMajor(ssc, sparkSession)
    EtlDataService.etlQzBusiness(ssc, sparkSession)
    EtlDataService.etlQzPaperView(ssc, sparkSession)
    EtlDataService.etlQzCenterPaper(ssc, sparkSession)
    EtlDataService.etlQzPaper(ssc, sparkSession)
    EtlDataService.etlQzCenter(ssc, sparkSession)
    EtlDataService.etlQzQuestion(ssc, sparkSession)
    EtlDataService.etlQzQuestionType(ssc, sparkSession)
    EtlDataService.etlQzMemberPaperQuestion(ssc, sparkSession)
  }
}

运行该主类,可以在hive中得到解析后的dwd表

创建QzChapterDao 章节表dao类

import org.apache.spark.sql.SparkSession
object QzChapterDao {
  /**
   * 查询qz_chapter基础数据
   *
   * @param sparkSession
   * @return
   */
  def getDwdQzChapter(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select chapterid,chapterlistid,chaptername,sequence,showstatus,creator as " +
      "chapter_creator,createtime as chapter_createtime,courseid as chapter_courseid,chapternum,outchapterid,dt,dn from dwd.dwd_qz_chapter where " +
      s"dt='$dt'")
  }
  /**
   * 查询qz_chapter_list基础数据
   *
   * @param sparkSession
   * @param dt
   */
  def getDwdQzChapterList(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select chapterlistid,chapterlistname,chapterallnum,dn from dwd.dwd_qz_chapter_list " +
      s"where dt='$dt'")
  }
  /**
   * 查询qz_point基础数据
   *
   * @param sparkSession
   * @param dt
   */
  def getDwdQzPoint(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select pointid,pointname,pointyear,chapter,excisenum,pointlistid,chapterid," +
      "pointdescribe,pointlevel,typelist,score as point_score,thought,remid,pointnamelist,typelistids,pointlist,dn from " +
      s"dwd.dwd_qz_point where dt='$dt'")
  }
  /**
   * 查询qz_point_question基础数据
   *
   * @param sparkSession
   * @param dt
   */
  def getDwdQzPointQuestion(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql(s"select pointid,questionid,questype,dn from dwd.dwd_qz_point_question where dt='$dt'")
  }
}

创建QzCourseDao 课程表dao类

import org.apache.spark.sql.SparkSession
object QzCourseDao {
  def getDwdQzSiteCourse(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select sitecourseid,siteid,courseid,sitecoursename,coursechapter,sequence,status," +
      "creator as sitecourse_creator,createtime as sitecourse_createtime,helppaperstatus,servertype,boardid,showstatus,dt,dn " +
      s"from dwd.dwd_qz_site_course where dt='${dt}'")
  }
  def getDwdQzCourse(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select courseid,majorid,coursename,isadvc,chapterlistid,pointlistid,dn from " +
      s"dwd.dwd_qz_course where dt='${dt}'")
  }
  def getDwdQzCourseEduSubject(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select courseeduid,edusubjectid,courseid,dn from dwd.dwd_qz_course_edusubject " +
      s"where dt='${dt}'")
  }
}

创建QzMajorDao 主修表dao类

import org.apache.spark.sql.SparkSession
object QzMajorDao {
  def getQzMajor(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select majorid,businessid,siteid,majorname,shortname,status,sequence,creator as major_creator," +
      s"createtime as major_createtime,dt,dn from dwd.dwd_qz_major where dt='$dt'")
  }
  def getQzWebsite(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select siteid,sitename,domain,multicastserver,templateserver,creator," +
      s"createtime,multicastgateway,multicastport,dn from dwd.dwd_qz_website where dt='$dt'")
  }
  def getQzBusiness(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql(s"select businessid,businessname,dn from dwd.dwd_qz_business where dt='$dt'")
  }
}

创建QzPaperDao 试卷dao类

import org.apache.spark.sql.SparkSession
object QzPaperDao {
  def getDwdQzPaperView(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select paperviewid,paperid,paperviewname,paperparam,openstatus,explainurl,iscontest," +
      "contesttime,conteststarttime,contestendtime,contesttimelimit,dayiid,status,creator as paper_view_creator," +
      "createtime as paper_view_createtime,paperviewcatid,modifystatus,description,papertype,downurl,paperuse," +
      s"paperdifficult,testreport,paperuseshow,dt,dn from dwd.dwd_qz_paper_view where dt='$dt'")
  }
  def getDwdQzCenterPaper(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql(s"select paperviewid,sequence,centerid,dn from dwd.dwd_qz_center_paper where dt='$dt'")
  }
  def getDwdQzPaper(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select paperid,papercatid,courseid,paperyear,chapter,suitnum,papername,totalscore,chapterid," +
      s"chapterlistid,dn from dwd.dwd_qz_paper where dt='$dt'")
  }
  def getDwdQzCenter(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select centerid,centername,centeryear,centertype,centerparam,provideuser," +
      s"centerviewtype,stage,dn from dwd.dwd_qz_center where dt='$dt'")
  }
}

创建QzQuestionDao 做题dao类

import org.apache.spark.sql.SparkSession
object QzQuestionDao {
  def getQzQuestion(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select questionid,parentid,questypeid,quesviewtype,content,answer,analysis,limitminute," +
      "score,splitscore,status,optnum,lecture,creator,createtime,modifystatus,attanswer,questag,vanalysisaddr,difficulty," +
      s"quesskill,vdeoaddr,dt,dn from  dwd.dwd_qz_question where dt='$dt'")
  }
  def getQzQuestionType(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select questypeid,viewtypename,description,papertypename,remark,splitscoretype,dn from " +
      s"dwd.dwd_qz_question_type where dt='$dt'")
  }
}

创建 UserPaperDetailDao 宽表dao类

import org.apache.spark.sql.SparkSession
object UserPaperDetailDao {
  def getDwdQzMemberPaperQuestion(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select userid,paperviewid,chapterid,sitecourseid,questionid,majorid,useranswer,istrue,lasttime,opertype," +
      s"paperid,spendtime,score,question_answer,dt,dn from dwd.dwd_qz_member_paper_question where dt='$dt'")
  }
  def getDwsQzChapter(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select chapterid,chapterlistid,chaptername,sequence as chapter_sequence,status as chapter_status," +
      "chapter_courseid,chapternum,chapterallnum,outchapterid,chapterlistname,pointid,questype,pointname,pointyear" +
      ",chapter,excisenum,pointlistid,pointdescribe,pointlevel,typelist,point_score,thought,remid,pointnamelist," +
      s"typelistids,pointlist,dn from dws.dws_qz_chapter where dt='$dt'")
  }
  def getDwsQzCourse(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select sitecourseid,siteid,courseid,sitecoursename,coursechapter,sequence as course_sequence," +
      "status as course_status,sitecourse_creator,sitecourse_createtime,helppaperstatus,servertype,boardid,showstatus,majorid," +
      s"coursename,isadvc,chapterlistid,pointlistid,courseeduid,edusubjectid,dn from dws.dws_qz_course where dt='$dt'")
  }
  def getDwsQzMajor(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select majorid,businessid,majorname,shortname,status as major_status,sequence  as major_sequence," +
      "major_creator,major_createtime,businessname,sitename,domain,multicastserver,templateserver,multicastgateway,multicastport," +
      s"dn from dws.dws_qz_major where dt=$dt")
  }
  def getDwsQzPaper(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select paperviewid,paperid,paperviewname,paperparam,openstatus,explainurl,iscontest,contesttime," +
      "conteststarttime,contestendtime,contesttimelimit,dayiid,status as paper_status,paper_view_creator,paper_view_createtime," +
      "paperviewcatid,modifystatus,description,paperuse,testreport,centerid,sequence as paper_sequence,centername,centeryear," +
      "centertype,provideuser,centerviewtype,stage as paper_stage,papercatid,courseid,paperyear,suitnum,papername,totalscore,dn" +
      s" from dws.dws_qz_paper where dt=$dt")
  }
  def getDwsQzQuestion(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select questionid,parentid as question_parentid,questypeid,quesviewtype,content as question_content," +
      "answer as question_answer,analysis as question_analysis,limitminute as question_limitminute,score as question_score," +
      "splitscore,lecture,creator as question_creator,createtime as question_createtime,modifystatus as question_modifystatus," +
      "attanswer as question_attanswer,questag as question_questag,vanalysisaddr as question_vanalysisaddr,difficulty as question_difficulty," +
      "quesskill,vdeoaddr,description as question_description,splitscoretype as question_splitscoretype,dn " +
      s" from dws.dws_qz_question where dt=$dt")
  }
}

维度退化、合成宽表 业务类

需求2:基于dwd层基础表数据,需要对表进行维度退化进行表聚合,聚合成dws.dws_qz_chapter(章节维度表),dws.dws_qz_course(课程维度表),dws.dws_qz_major(主修维度表),dws.dws_qz_paper(试卷维度表),dws.dws_qz_question(题目维度表),使用spark sql和dataframe api操作

dws.dws_qz_chapte : 4张表join dwd.dwd_qz_chapter inner join dwd.qz_chapter_list join条件:chapterlistid和dn ,inner join dwd.dwd_qz_point join条件:chapterid和dn, inner join dwd.dwd_qz_point_question join条件:pointid和dn

dws.dws_qz_course:3张表join dwd.dwd_qz_site_course inner join dwd.qz_course join条件:courseid和dn , inner join dwd.qz_course_edusubject join条件:courseid和dn

dws.dws_qz_major:3张表join dwd.dwd_qz_major inner join dwd.dwd_qz_website join条件:siteid和dn , inner join dwd.dwd_qz_business join条件:siteid和dn

dws.dws_qz_paper: 4张表join qz_paperview left join qz_center join 条件:paperviewid和dn,

left join qz_center join 条件:centerid和dn, inner join qz_paper join条件:paperid和dn

dws.dws_qz_paper: 4张表join qz_paperview left join qz_center join 条件:paperviewid和dn,

left join qz_center join 条件:centerid和dn, inner join qz_paper join条件:paperid和dn

需求3:基于dws.dws_qz_chapter、dws.dws_qz_course、dws.dws_qz_major、dws.dws_qz_paper、dws.dws_qz_question、dwd.dwd_qz_member_paper_question 合成宽表dw.user_paper_detail,使用spark sql和dataframe api操作

dws.user_paper_detail:dwd_qz_member_paper_question inner join dws_qz_chapter join条件:chapterid 和dn ,inner join dws_qz_course join条件:sitecourseid和dn , inner join dws_qz_major join条件majorid和dn, inner join dws_qz_paper 条件paperviewid和dn , inner join dws_qz_question 条件questionid和dn

import com.catelf.qz.dao.{QzChapterDao, QzCourseDao, QzMajorDao, QzPaperDao, QzQuestionDao, UserPaperDetailDao}
import org.apache.spark.sql.{SaveMode, SparkSession}
object DwsQzService {
  def saveDwsQzChapter(sparkSession: SparkSession, dt: String) = {
    val dwdQzChapter = QzChapterDao.getDwdQzChapter(sparkSession, dt)
    val dwdQzChapterlist = QzChapterDao.getDwdQzChapterList(sparkSession, dt)
    val dwdQzPoint = QzChapterDao.getDwdQzPoint(sparkSession, dt)
    val dwdQzPointQuestion = QzChapterDao.getDwdQzPointQuestion(sparkSession, dt)
    val result = dwdQzChapter.join(dwdQzChapterlist, Seq("chapterlistid", "dn"))
      .join(dwdQzPoint, Seq("chapterid", "dn"))
      .join(dwdQzPointQuestion, Seq("pointid", "dn"))
    result.select("chapterid", "chapterlistid", "chaptername", "sequence", "showstatus", "status",
      "chapter_creator", "chapter_createtime", "chapter_courseid", "chapternum", "chapterallnum", "outchapterid", "chapterlistname",
      "pointid", "questionid", "questype", "pointname", "pointyear", "chapter", "excisenum", "pointlistid", "pointdescribe",
      "pointlevel", "typelist", "point_score", "thought", "remid", "pointnamelist", "typelistids", "pointlist", "dt", "dn")
      .coalesce(1).write.mode(SaveMode.Append).insertInto("dws.dws_qz_chapter")
  }
  def saveDwsQzCourse(sparkSession: SparkSession, dt: String) = {
    val dwdQzSiteCourse = QzCourseDao.getDwdQzSiteCourse(sparkSession, dt)
    val dwdQzCourse = QzCourseDao.getDwdQzCourse(sparkSession, dt)
    val dwdQzCourseEdusubject = QzCourseDao.getDwdQzCourseEduSubject(sparkSession, dt)
    val result = dwdQzSiteCourse.join(dwdQzCourse, Seq("courseid", "dn"))
      .join(dwdQzCourseEdusubject, Seq("courseid", "dn"))
      .select("sitecourseid", "siteid", "courseid", "sitecoursename", "coursechapter",
        "sequence", "status", "sitecourse_creator", "sitecourse_createtime", "helppaperstatus", "servertype", "boardid",
        "showstatus", "majorid", "coursename", "isadvc", "chapterlistid", "pointlistid", "courseeduid", "edusubjectid"
        , "dt", "dn")
    result.coalesce(1).write.mode(SaveMode.Append).insertInto("dws.dws_qz_course")
  }
  def saveDwsQzMajor(sparkSession: SparkSession, dt: String) = {
    val dwdQzMajor = QzMajorDao.getQzMajor(sparkSession, dt)
    val dwdQzWebsite = QzMajorDao.getQzWebsite(sparkSession, dt)
    val dwdQzBusiness = QzMajorDao.getQzBusiness(sparkSession, dt)
    val result = dwdQzMajor.join(dwdQzWebsite, Seq("siteid", "dn"))
      .join(dwdQzBusiness, Seq("businessid", "dn"))
      .select("majorid", "businessid", "siteid", "majorname", "shortname", "status", "sequence",
        "major_creator", "major_createtime", "businessname", "sitename", "domain", "multicastserver", "templateserver",
        "multicastgateway", "multicastport", "dt", "dn")
    result.coalesce(1).write.mode(SaveMode.Append).insertInto("dws.dws_qz_major")
  }
  def saveDwsQzPaper(sparkSession: SparkSession, dt: String) = {
    val dwdQzPaperView = QzPaperDao.getDwdQzPaperView(sparkSession, dt)
    val dwdQzCenterPaper = QzPaperDao.getDwdQzCenterPaper(sparkSession, dt)
    val dwdQzCenter = QzPaperDao.getDwdQzCenter(sparkSession, dt)
    val dwdQzPaper = QzPaperDao.getDwdQzPaper(sparkSession, dt)
    val result = dwdQzPaperView.join(dwdQzCenterPaper, Seq("paperviewid", "dn"), "left")
      .join(dwdQzCenter, Seq("centerid", "dn"), "left")
      .join(dwdQzPaper, Seq("paperid", "dn"))
      .select("paperviewid", "paperid", "paperviewname", "paperparam", "openstatus", "explainurl", "iscontest"
        , "contesttime", "conteststarttime", "contestendtime", "contesttimelimit", "dayiid", "status", "paper_view_creator",
        "paper_view_createtime", "paperviewcatid", "modifystatus", "description", "paperuse", "paperdifficult", "testreport",
        "paperuseshow", "centerid", "sequence", "centername", "centeryear", "centertype", "provideuser", "centerviewtype",
        "stage", "papercatid", "courseid", "paperyear", "suitnum", "papername", "totalscore", "chapterid", "chapterlistid",
        "dt", "dn")
    result.coalesce(1).write.mode(SaveMode.Append).insertInto("dws.dws_qz_paper")
  }
  def saveDwsQzQuestionTpe(sparkSession: SparkSession, dt: String) = {
    val dwdQzQuestion = QzQuestionDao.getQzQuestion(sparkSession, dt)
    val dwdQzQuestionType = QzQuestionDao.getQzQuestionType(sparkSession, dt)
    val result = dwdQzQuestion.join(dwdQzQuestionType, Seq("questypeid", "dn"))
      .select("questionid", "parentid", "questypeid", "quesviewtype", "content", "answer", "analysis"
        , "limitminute", "score", "splitscore", "status", "optnum", "lecture", "creator", "createtime", "modifystatus"
        , "attanswer", "questag", "vanalysisaddr", "difficulty", "quesskill", "vdeoaddr", "viewtypename", "papertypename",
        "remark", "splitscoretype", "dt", "dn")
    result.coalesce(1).write.mode(SaveMode.Append).insertInto("dws.dws_qz_question")
  }
  def saveDwsUserPaperDetail(sparkSession: SparkSession, dt: String) = {
    val dwdQzMemberPaperQuestion = UserPaperDetailDao.getDwdQzMemberPaperQuestion(sparkSession, dt).drop("paperid")
      .withColumnRenamed("question_answer", "user_question_answer")
    val dwsQzChapter = UserPaperDetailDao.getDwsQzChapter(sparkSession, dt).drop("courseid")
    val dwsQzCourse = UserPaperDetailDao.getDwsQzCourse(sparkSession, dt).withColumnRenamed("sitecourse_creator", "course_creator")
      .withColumnRenamed("sitecourse_createtime", "course_createtime").drop("majorid")
      .drop("chapterlistid").drop("pointlistid")
    val dwsQzMajor = UserPaperDetailDao.getDwsQzMajor(sparkSession, dt)
    val dwsQzPaper = UserPaperDetailDao.getDwsQzPaper(sparkSession, dt).drop("courseid")
    val dwsQzQuestion = UserPaperDetailDao.getDwsQzQuestion(sparkSession, dt)
    dwdQzMemberPaperQuestion.join(dwsQzCourse, Seq("sitecourseid", "dn")).
      join(dwsQzChapter, Seq("chapterid", "dn")).join(dwsQzMajor, Seq("majorid", "dn"))
      .join(dwsQzPaper, Seq("paperviewid", "dn")).join(dwsQzQuestion, Seq("questionid", "dn"))
      .select("userid", "courseid", "questionid", "useranswer", "istrue", "lasttime", "opertype",
        "paperid", "spendtime", "chapterid", "chaptername", "chapternum",
        "chapterallnum", "outchapterid", "chapterlistname", "pointid", "questype", "pointyear", "chapter", "pointname"
        , "excisenum", "pointdescribe", "pointlevel", "typelist", "point_score", "thought", "remid", "pointnamelist",
        "typelistids", "pointlist", "sitecourseid", "siteid", "sitecoursename", "coursechapter", "course_sequence", "course_status"
        , "course_creator", "course_createtime", "servertype", "helppaperstatus", "boardid", "showstatus", "majorid", "coursename",
        "isadvc", "chapterlistid", "pointlistid", "courseeduid", "edusubjectid", "businessid", "majorname", "shortname",
        "major_status", "major_sequence", "major_creator", "major_createtime", "businessname", "sitename",
        "domain", "multicastserver", "templateserver", "multicastgateway", "multicastport", "paperviewid", "paperviewname", "paperparam",
        "openstatus", "explainurl", "iscontest", "contesttime", "conteststarttime", "contestendtime", "contesttimelimit",
        "dayiid", "paper_status", "paper_view_creator", "paper_view_createtime", "paperviewcatid", "modifystatus", "description", "paperuse",
        "testreport", "centerid", "paper_sequence", "centername", "centeryear", "centertype", "provideuser", "centerviewtype",
        "paper_stage", "papercatid", "paperyear", "suitnum", "papername", "totalscore", "question_parentid", "questypeid",
        "quesviewtype", "question_content", "question_answer", "question_analysis", "question_limitminute", "score",
        "splitscore", "lecture", "question_creator", "question_createtime", "question_modifystatus", "question_attanswer",
        "question_questag", "question_vanalysisaddr", "question_difficulty", "quesskill", "vdeoaddr", "question_description",
        "question_splitscoretype", "user_question_answer", "dt", "dn").coalesce(1)
      .write.mode(SaveMode.Append).insertInto("dws.dws_user_paper_detail")
  }

创建DwsController

import com.catelf.qz.service.DwsQzService
import com.catelf.util.HiveUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object DwsController {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "hdfs")
    val sparkConf = new SparkConf().setAppName("dws_qz_controller").setMaster("local[*]")
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    val ssc = sparkSession.sparkContext
    HiveUtil.openDynamicPartition(sparkSession) //开启动态分区
    HiveUtil.openCompression(sparkSession) //开启压缩
    HiveUtil.useSnappyCompression(sparkSession) //使用snappy压缩
    val dt = "20190722"
    DwsQzService.saveDwsQzChapter(sparkSession, dt)
    DwsQzService.saveDwsQzCourse(sparkSession, dt)
    DwsQzService.saveDwsQzMajor(sparkSession, dt)
    DwsQzService.saveDwsQzPaper(sparkSession, dt)
    DwsQzService.saveDwsQzQuestionTpe(sparkSession, dt)
    DwsQzService.saveDwsUserPaperDetail(sparkSession, dt)
  }
}

可以看到已经生成了dws表数据

报表层各指标统计

需求4:基于宽表统计各试卷平均耗时、平均分,先使用Spark Sql 完成指标统计,再使用Spark DataFrame Api。

需求5:统计各试卷最高分、最低分,先使用Spark Sql 完成指标统计,再使用Spark DataFrame Api。

需求6:按试卷分组统计每份试卷的前三用户详情,先使用Spark Sql 完成指标统计,再使用Spark DataFrame Api。

需求7:按试卷分组统计每份试卷的倒数前三的用户详情,先使用Spark Sql 完成指标统计,再使用Spark DataFrame Api。

需求8:统计各试卷各分段的用户id,分段有0-20,20-40,40-60,60-80,80-100

需求9:统计试卷未及格的人数,及格的人数,试卷的及格率 及格分数60

需求10:统计各题的错误数,正确数,错题率

创建AdsAzDao

import org.apache.spark.sql.SparkSession
object AdsQzDao {
  /**
   * 统计各试卷平均耗时 平均分
   *
   * @param sparkSession
   * @param dt
   * @return
   */
  def getAvgSPendTimeAndScore(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql(s"select paperviewid,paperviewname,cast(avg(score) as decimal(4,1)) score,cast(avg(spendtime) as decimal(10,2))" +
      s" spendtime,dt,dn from dws.dws_user_paper_detail where dt='$dt' group by " +
      "paperviewid,paperviewname,dt,dn order by score desc,spendtime desc");
  }
  /**
   * 统计试卷 最高分 最低分
   *
   * @param sparkSession
   * @param dt
   */
  def getTopScore(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select paperviewid,paperviewname,cast(max(score) as decimal(4,1)),cast(min(score) as decimal(4,1)) " +
      s",dt,dn from dws.dws_user_paper_detail where dt=$dt group by paperviewid,paperviewname,dt,dn ")
  }
  /**
   * 按试卷分组获取每份试卷的分数前三用户详情
   *
   * @param sparkSession
   * @param dt
   */
  def getTop3UserDetail(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select *from (select userid,paperviewname,chaptername,pointname,sitecoursename,coursename,majorname,shortname," +
      "sitename,papername,score,dense_rank() over (partition by paperviewid order by score desc) as rk,dt,dn from dws.dws_user_paper_detail) " +
      "where rk<4")
  }
  /**
   * 按试卷分组获取每份试卷的分数倒数三的用户详情
   *
   * @param sparkSession
   * @param dt
   * @return
   */
  def getLow3UserDetail(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select *from (select userid,paperviewname,chaptername,pointname,sitecoursename,coursename,majorname,shortname," +
      s"sitename,papername,score,dense_rank() over (partition by paperviewid order by score asc) as rk,dt,dn from dws.dws_user_paper_detail where dt='$dt') where rk<4")
  }
  /**
   * 统计各试卷 各分段学员名称
   */
  def getPaperScoreSegmentUser(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select paperviewid,paperviewname,score_segment,concat_ws(',',collect_list(cast(userid as string))),dt,dn" +
      " from (select paperviewid,paperviewname,userid," +
      " case  when score >=0  and score <=20 then '0-20'" +
      "       when score >20 and score <=40 then '20-40' " +
      "       when score >40 and score <=60 then '40-60' " +
      "       when score >60 and score <=80 then '60-80' " +
      "       when score >80 and score <=100 then '80-100' end  as score_segment" +
      s",dt,dn from  dws.dws_user_paper_detail where dt='$dt') group by paperviewid,paperviewname,score_segment,dt,dn order by paperviewid,score_segment")
  }
  /**
   * 统计各试卷未及格人数 及格人数 及格率
   *
   * @param sparkSession
   * @param dt
   */
  def getPaperPassDetail(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select t.*,cast(t.passcount/(t.passcount+t.countdetail) as decimal(4,2)) as rate,dt,dn" +
      "   from(select a.paperviewid,a.paperviewname,a.countdetail,a.dt,a.dn,b.passcount from " +
      s"(select paperviewid,paperviewname,count(*) countdetail,dt,dn from dws.dws_user_paper_detail where dt='$dt' and score between 0 and 60 group by" +
      s" paperviewid,paperviewname,dt,dn) a join (select paperviewid,count(*) passcount,dn from  dws.dws_user_paper_detail  where dt='$dt' and score >60  " +
      "group by paperviewid,dn) b on a.paperviewid=b.paperviewid and a.dn=b.dn)t")
  }
  /**
   * 统计各题 正确人数 错误人数 错题率 top3错误题数多的questionid
   *
   * @param sparkSession
   * @param dt
   */
  def getQuestionDetail(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql(s"select t.*,cast(t.errcount/(t.errcount+t.rightcount) as decimal(4,2))as rate" +
      s" from((select questionid,count(*) errcount,dt,dn from dws.dws_user_paper_detail where dt='$dt' and user_question_answer='0' " +
      s"group by questionid,dt,dn) a join(select questionid,count(*) rightcount,dt,dn from dws.dws_user_paper_detail where dt='$dt' and user_question_answer='1' " +
      s"group by questionid,dt,dn) b on a.questionid=b.questionid and a.dn=b.dn)t order by errcount desc")
  }
}

创建AdsQzService

import com.catelf.qz.dao.AdsQzDao
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{SaveMode, SparkSession}
object AdsQzService {
  def getTarget(sparkSession: SparkSession, dt: String) = {
    val avgDetail = AdsQzDao.getAvgSPendTimeAndScore(sparkSession, dt)
    val topscore = AdsQzDao.getTopScore(sparkSession, dt)
    val top3UserDetail = AdsQzDao.getTop3UserDetail(sparkSession, dt)
    val low3UserDetail = AdsQzDao.getLow3UserDetail(sparkSession, dt)
    val paperScore = AdsQzDao.getPaperScoreSegmentUser(sparkSession, dt)
    val paperPassDetail = AdsQzDao.getPaperPassDetail(sparkSession, dt)
    val questionDetail = AdsQzDao.getQuestionDetail(sparkSession, dt)
  }
  def getTargetApi(sparkSession: SparkSession, dt: String) = {
    import org.apache.spark.sql.functions._
    val avgDetail = sparkSession.sql("select paperviewid,paperviewname,score,spendtime,dt,dn from dws.dws_user_paper_detail ")
      .where(s"dt=${dt}").groupBy("paperviewid", "paperviewname", "dt", "dn").
      agg(avg("score").cast("decimal(4,1)").as("avgscore"),
        avg("spendtime").cast("decimal(10,1)").as("avgspendtime"))
      .select("paperviewid", "paperviewname", "avgscore", "avgspendtime", "dt", "dn")
      .coalesce(1).write.mode(SaveMode.Append).insertInto("ads.ads_paper_avgtimeandscore")
    val topscore = sparkSession.sql("select paperviewid,paperviewname,score,dt,dn from dws.dws_user_paper_detail")
      .where(s"dt=$dt").groupBy("paperviewid", "paperviewname", "dt", "dn")
      .agg(max("score").as("maxscore"), min("score").as("minscore"))
      .select("paperviewid", "paperviewname", "maxscore", "minscore", "dt", "dn")
      .coalesce(1).write.mode(SaveMode.Append).insertInto("ads.ads_paper_maxdetail")
    val top3UserDetail = sparkSession.sql("select *from dws.dws_user_paper_detail")
      .where(s"dt=$dt").select("userid", "paperviewid", "paperviewname", "chaptername", "pointname"
      , "sitecoursename", "coursename", "majorname", "shortname", "papername", "score", "dt", "dn")
      .withColumn("rk", dense_rank().over(Window.partitionBy("paperviewid").orderBy(desc("score"))))
      .where("rk<4")
      .select("userid", "paperviewid", "paperviewname", "chaptername", "pointname", "sitecoursename"
        , "coursename", "majorname", "shortname", "papername", "score", "rk", "dt", "dn")
      .coalesce(1).write.mode(SaveMode.Append).insertInto("ads.ads_top3_userdetail")
    val low3UserDetail = sparkSession.sql("select *from dws.dws_user_paper_detail")
      .where(s"dt=$dt").select("userid", "paperviewid", "paperviewname", "chaptername", "pointname"
      , "sitecoursename", "coursename", "majorname", "shortname", "papername", "score", "dt", "dn")
      .withColumn("rk", dense_rank().over(Window.partitionBy("paperviewid").orderBy("score")))
      .where("rk<4")
      .select("userid", "paperviewid", "paperviewname", "chaptername", "pointname", "sitecoursename"
        , "coursename", "majorname", "shortname", "papername", "score", "rk", "dt", "dn")
      .coalesce(1).write.mode(SaveMode.Append).insertInto("ads.ads_low3_userdetail")
    val paperScore = sparkSession.sql("select *from dws.dws_user_paper_detail")
      .where(s"dt=$dt")
      .select("paperviewid", "paperviewname", "userid", "score", "dt", "dn")
      .withColumn("score_segment",
        when(col("score").between(0, 20), "0-20")
          .when(col("score") > 20 && col("score") <= 40, "20-40")
          .when(col("score") > 40 && col("score") <= 60, "40-60")
          .when(col("score") > 60 && col("score") <= 80, "60-80")
          .when(col("score") > 80 && col("score") <= 100, "80-100"))
      .drop("score").groupBy("paperviewid", "paperviewname", "score_segment", "dt", "dn")
      .agg(concat_ws(",", collect_list(col("userid").cast("string").as("userids"))).as("userids"))
      .select("paperviewid", "paperviewname", "score_segment", "userids", "dt", "dn")
      .orderBy("paperviewid", "score_segment")
      .coalesce(1).write.mode(SaveMode.Append).insertInto("ads.ads_paper_scoresegment_user")
    val paperPassDetail = sparkSession.sql("select * from dws.dws_user_paper_detail").cache()
    val unPassDetail = paperPassDetail.select("paperviewid", "paperviewname", "dn", "dt")
      .where(s"dt='$dt'").where("score between 0 and 60")
      .groupBy("paperviewid", "paperviewname", "dn", "dt")
      .agg(count("paperviewid").as("unpasscount"))
    val passDetail = paperPassDetail.select("paperviewid", "dn")
      .where(s"dt='$dt'").where("score >60")
      .groupBy("paperviewid", "dn")
      .agg(count("paperviewid").as("passcount"))
    unPassDetail.join(passDetail, Seq("paperviewid", "dn")).
      withColumn("rate", (col("passcount")./(col("passcount") + col("unpasscount")))
        .cast("decimal(4,2)"))
      .select("paperviewid", "paperviewname", "unpasscount", "passcount", "rate", "dt", "dn")
      .coalesce(1).write.mode(SaveMode.Append).insertInto("ads.ads_user_paper_detail")
    paperPassDetail.unpersist()
    val userQuestionDetail = sparkSession.sql("select * from dws.dws_user_paper_detail").cache()
    val userQuestionError = userQuestionDetail.select("questionid", "dt", "dn", "user_question_answer")
      .where(s"dt='$dt'").where("user_question_answer='0'").drop("user_question_answer")
      .groupBy("questionid", "dt", "dn")
      .agg(count("questionid").as("errcount"))
    val userQuestionRight = userQuestionDetail.select("questionid", "dn", "user_question_answer")
      .where(s"dt='$dt'").where("user_question_answer='1'").drop("user_question_answer")
      .groupBy("questionid", "dn")
      .agg(count("questionid").as("rightcount"))
    userQuestionError.join(userQuestionRight, Seq("questionid", "dn"))
      .withColumn("rate", (col("errcount") / (col("errcount") + col("rightcount"))).cast("decimal(4,2)"))
      .orderBy(desc("errcount")).coalesce(1)
      .select("questionid", "errcount", "rightcount", "rate", "dt", "dn")
      .write.mode(SaveMode.Append).insertInto("ads.ads_user_question_detail")
  }
}

创建AdsController

import com.catelf.qz.service.AdsQzService
import com.catelf.util.HiveUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object AdsController {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "hdfs")
    val sparkConf = new SparkConf().setAppName("ads_controller").setMaster("local[*]")
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    val ssc = sparkSession.sparkContext
    HiveUtil.openDynamicPartition(sparkSession) //开启动态分

将数据导入MySQL

最后将统计指标用DataX导入MySQL中

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
相关文章
|
2天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
2天前
|
分布式计算 大数据 BI
MaxCompute产品使用合集之MaxCompute项目的数据是否可以被接入到阿里云的Quick BI中
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
2天前
|
存储 数据采集 数据可视化
大数据处理技术
【4月更文挑战第10天】大数据处理涵盖采集、预处理、存储、分析挖掘、展现和应用等关键步骤。采集涉及多种类型数据,预处理确保数据质量,存储管理关注规模、速度和安全,分析挖掘利用机器学习发现价值,展现和应用则通过可视化和检索实现数据价值。云计算和AI强化了大数据处理能力,整体目标是提取数据中的价值,驱动企业和社会进步。
35 4
大数据处理技术
|
2天前
|
机器学习/深度学习 运维 算法
大数据基础工程技术团队4篇论文入选ICLR,ICDE,WWW
近日,由阿里云计算平台大数据基础工程技术团队主导的四篇时间序列相关论文分别被国际顶会ICLR2024、ICDE2024和WWW2024接收。
|
2天前
|
存储 机器学习/深度学习 数据采集
大数据处理与分析实战:技术深度剖析与案例分享
【5月更文挑战第2天】本文探讨了大数据处理与分析的关键环节,包括数据采集、预处理、存储、分析和可视化,并介绍了Hadoop、Spark和机器学习等核心技术。通过电商推荐系统和智慧城市交通管理的实战案例,展示了大数据在提高用户体验和解决实际问题上的效能。随着技术进步,大数据处理与分析将在更多领域发挥作用,推动社会进步。
|
2天前
|
SQL 分布式计算 DataWorks
MaxCompute产品使用合集之要查看MaxCompute Studio中的项目中的计算任务代码,我该怎么操作
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
2天前
|
SQL 分布式计算 DataWorks
MaxCompute产品使用合集之该怎么创建MaxCompute的项目
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
2天前
|
存储 分布式计算 Hadoop
【专栏】Hadoop,开源大数据处理框架:驭服数据洪流的利器
【4月更文挑战第28天】Hadoop,开源大数据处理框架,由Hadoop Common、HDFS、YARN和MapReduce组成,提供大规模数据存储和并行处理。其优势在于可扩展性、容错性、高性能、灵活性及社区支持。然而,数据安全、处理速度、系统复杂性和技能短缺是挑战。通过加强安全措施、结合Spark、自动化工具和培训,Hadoop在应对大数据问题中保持关键地位。
|
2天前
|
存储 数据可视化 大数据
大数据技术框架
【4月更文挑战第20天】大数据一般需要经过6个主要环节,包括数据收集、数据存储、资源管理与服务协调、计算引擎、数据分析和数据可视化。
|
2天前
|
分布式计算 容灾 大数据
MaxCompute( 原名ODPS)大数据容灾方案与实现(及项目落地实例)专有云
一,背景与概述    复杂系统的灾难恢复是个难题,具有海量数据及复杂业务场景的大数据容灾是个大难题。    MaxCompute是集团内重要数据平台,是自主研发的大数据解决方案,其规模和稳定性在业界都是领先的。
1482 12

热门文章

最新文章