基于大数据技术的开源在线教育项目 二1

简介: 基于大数据技术的开源在线教育项目 二

上篇文章我们介绍了离线数仓的用户注册模块,本文我们来介绍做题模块

模拟日志的数据格式如下,详细请参见我的开源项目 https://github.com/SoundHearer/kuaiban

1.QzWebsite.log 做题网站日志数据
{
    "createtime": "2019-07-22 11:47:18",  //创建时间
    "creator": "admin",   //创建者
    "dn": "webA",   //网站分区
    "domain": "-",
    "dt": "20190722",  //日期分区
    "multicastgateway": "-",
    "multicastport": "-",
    "multicastserver": "-",
    "sequence": "-",
    "siteid": 0,   //网站id
    "sitename": "sitename0",  //网站名称
    "status": "-",   
    "templateserver": "-"
}
2.QzSiteCourse.log  网站课程日志数据
{
    "boardid": 64,  //课程模板id
    "coursechapter": "-",  
    "courseid": 66,  //课程id
    "createtime": "2019-07-22 11:43:32",  //创建时间
    "creator": "admin",   //创建者
    "dn": "webA",   //网站分区
    "dt": "20190722",  //日期分区
    "helpparperstatus": "-",
    "sequence": "-",
    "servertype": "-",
    "showstatus": "-",
    "sitecourseid": 2,  //网站课程id
    "sitecoursename": "sitecoursename2",  //网站课程名称
    "siteid": 77,  //网站id
    "status": "-"
}
3.QzQuestionType.log 题目类型数据
{
    "createtime": "2019-07-22 10:42:47",   //创建时间
    "creator": "admin",    //创建者
    "description": "-",
    "dn": "webA",   //网站分区
    "dt": "20190722",  //日期分区
    "papertypename": "-",
    "questypeid": 0,  //做题类型id
    "quesviewtype": 0,
    "remark": "-",
    "sequence": "-",
    "splitscoretype": "-",
    "status": "-",
    "viewtypename": "viewtypename0"
}
4.QzQuestion.log 做题日志数据
{
    "analysis": "-",
    "answer": "-",
    "attanswer": "-",
    "content": "-",
    "createtime": "2019-07-22 11:33:46",  //创建时间
    "creator": "admin",  //创建者
    "difficulty": "-",
    "dn": "webA",   //网站分区
    "dt": "20190722",  //日期分区
    "lecture": "-",
    "limitminute": "-",
    "modifystatus": "-",
    "optnum": 8,
    "parentid": 57,
    "quesskill": "-",
    "questag": "-",
    "questionid": 0,  //题id
    "questypeid": 57, //题目类型id
    "quesviewtype": 44,  
    "score": 24.124501582742543, //题的分数
    "splitscore": 0.0,
    "status": "-",
    "vanalysisaddr": "-",
    "vdeoaddr": "-"
}
5.QzPointQuestion.log 做题知识点关联数据
{
    "createtime": "2019-07-22 09:16:46",   //创建时间
    "creator": "admin",  //创建者
    "dn": "webA",  //网站分区
    "dt": "20190722", //日期分区
    "pointid": 0,  //知识点id
    "questionid": 0, //题id
    "questype": 0  
}

模拟数据采集上传数据

建表

篇幅较大,详见开源项目

create external  table `dwd`.`dwd_qz_chapter`(
chapterid int ,
chapterlistid int ,
chaptername string ,
sequence string ,
showstatus string  ,
creator string  ,
createtime timestamp,
courseid int  ,
chapternum int,
outchapterid int)
partitioned by(
dt string,
dn string)
 ROW FORMAT DELIMITED
 FIELDS TERMINATED BY '\t'
 STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY');
create external table `dwd`.`dwd_qz_chapter_list`(
chapterlistid int ,
chapterlistname string ,
courseid int ,
chapterallnum int ,
sequence string,
status string,
creator string ,
createtime timestamp 
)
partitioned by(
dt string,
dn string)
 ROW FORMAT DELIMITED
 FIELDS TERMINATED BY '\t'
 STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY');

解析数据

需求1:使用spark解析ods层数据,将数据存入到对应的hive表中,要求对所有score 分数字段进行保留两位1位小数并且四舍五入。

import com.alibaba.fastjson.JSONObject
import com.catelf.qz.bean.{DwdQzPaperView, DwdQzPoint, DwdQzQuestion}
import com.catelf.util.ParseJsonData
import org.apache.spark.SparkContext
import org.apache.spark.sql.{SaveMode, SparkSession}
/**
 * etl用户做题信息
 */
object EtlDataService {
  /**
   * 解析章节数据
   *
   * @param ssc
   * @param sparkSession
   * @return
   */
  def etlQzChapter(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._ //隐式转换
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzChapter.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val chapterid = jsonObject.getIntValue("chapterid")
        val chapterlistid = jsonObject.getIntValue("chapterlistid")
        val chaptername = jsonObject.getString("chaptername")
        val sequence = jsonObject.getString("sequence")
        val showstatus = jsonObject.getString("showstatus")
//        val status = jsonObject.getString("status")
        val creator = jsonObject.getString("creator")
        val createtime = jsonObject.getString("createtime")
        val courseid = jsonObject.getIntValue("courseid")
        val chapternum = jsonObject.getIntValue("chapternum")
        val outchapterid = jsonObject.getIntValue("outchapterid")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        (chapterid, chapterlistid, chaptername, sequence, showstatus, creator, createtime,
          courseid, chapternum, outchapterid, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_chapter")
  }
  /**
   * 解析章节列表数据
   *
   * @param ssc
   * @param sparkSession
   */
  def etlQzChapterList(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzChapterList.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val chapterlistid = jsonObject.getIntValue("chapterlistid")
        val chapterlistname = jsonObject.getString("chapterlistname")
        val courseid = jsonObject.getIntValue("courseid")
        val chapterallnum = jsonObject.getIntValue("chapterallnum")
        val sequence = jsonObject.getString("sequence")
        val status = jsonObject.getString("status")
        val creator = jsonObject.getString("creator")
        val createtime = jsonObject.getString("createtime")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        (chapterlistid, chapterlistname, courseid, chapterallnum, sequence, status, creator, createtime, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_chapter_list")
  }
  /**
   * 解析做题数据
   *
   * @param ssc
   * @param sparkSession
   */
  def etlQzPoint(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzPoint.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val pointid = jsonObject.getIntValue("pointid")
        val courseid = jsonObject.getIntValue("courseid")
        val pointname = jsonObject.getString("pointname")
        val pointyear = jsonObject.getString("pointyear")
        val chapter = jsonObject.getString("chapter")
        val creator = jsonObject.getString("creator")
        val createtime = jsonObject.getString("createtime")
        val status = jsonObject.getString("status")
        val modifystatus = jsonObject.getString("modifystatus")
        val excisenum = jsonObject.getIntValue("excisenum")
        val pointlistid = jsonObject.getIntValue("pointlistid")
        val chapterid = jsonObject.getIntValue("chapterid")
        val sequence = jsonObject.getString("sequence")
        val pointdescribe = jsonObject.getString("pointdescribe")
        val pointlevel = jsonObject.getString("pointlevel")
        val typeslist = jsonObject.getString("typelist")
        val score = BigDecimal(jsonObject.getDouble("score")).setScale(1, BigDecimal.RoundingMode.HALF_UP) //保留1位小数 并四舍五入
        val thought = jsonObject.getString("thought")
        val remid = jsonObject.getString("remid")
        val pointnamelist = jsonObject.getString("pointnamelist")
        val typelistids = jsonObject.getString("typelistids")
        val pointlist = jsonObject.getString("pointlist")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        DwdQzPoint(pointid, courseid, pointname, pointyear, chapter, creator, createtime, status, modifystatus, excisenum, pointlistid,
          chapterid, sequence, pointdescribe, pointlevel, typeslist, score, thought, remid, pointnamelist, typelistids,
          pointlist, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_point")
  }
  /**
   * 解析知识点下的题数据
   *
   * @param ssc
   * @param sparkSession
   * @return
   */
  def etlQzPointQuestion(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzPointQuestion.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val pointid = jsonObject.getIntValue("pointid")
        val questionid = jsonObject.getIntValue("questionid")
        val questtype = jsonObject.getIntValue("questtype")
        val creator = jsonObject.getString("creator")
        val createtime = jsonObject.getString("createtime")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        (pointid, questionid, questtype, creator, createtime, dt, dn)
      })
    }).toDF().write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_point_question")
  }
  /**
   * 解析网站课程
   *
   * @param ssc
   * @param sparkSession
   */
  def etlQzSiteCourse(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzSiteCourse.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val sitecourseid = jsonObject.getIntValue("sitecourseid")
        val siteid = jsonObject.getIntValue("siteid")
        val courseid = jsonObject.getIntValue("courseid")
        val sitecoursename = jsonObject.getString("sitecoursename")
        val coursechapter = jsonObject.getString("coursechapter")
        val sequence = jsonObject.getString("sequence")
        val status = jsonObject.getString("status")
        val creator = jsonObject.getString("creator")
        val createtime = jsonObject.getString("createtime")
        val helppaperstatus = jsonObject.getString("helppaperstatus")
        val servertype = jsonObject.getString("servertype")
        val boardid = jsonObject.getIntValue("boardid")
        val showstatus = jsonObject.getString("showstatus")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        (sitecourseid, siteid, courseid, sitecoursename, coursechapter, sequence, status, creator
          , createtime, helppaperstatus, servertype, boardid, showstatus, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_site_course")
  }
  /**
   * 解析课程数据
   *
   * @param ssc
   * @param sparkSession
   */
  def etlQzCourse(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzCourse.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val courseid = jsonObject.getIntValue("courseid")
        val majorid = jsonObject.getIntValue("majorid")
        val coursename = jsonObject.getString("coursename")
        val coursechapter = jsonObject.getString("coursechapter")
        val sequence = jsonObject.getString("sequnece")
        val isadvc = jsonObject.getString("isadvc")
        val creator = jsonObject.getString("creator")
        val createtime = jsonObject.getString("createtime")
        val status = jsonObject.getString("status")
        val chapterlistid = jsonObject.getIntValue("chapterlistid")
        val pointlistid = jsonObject.getIntValue("pointlistid")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        (courseid, majorid, coursename, coursechapter, sequence, isadvc, creator, createtime, status
          , chapterlistid, pointlistid, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_course")
  }
  /**
   * 解析课程辅导数据
   *
   * @param ssc
   * @param sparkSession
   */
  def etlQzCourseEdusubject(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzCourseEduSubject.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val courseeduid = jsonObject.getIntValue("courseeduid")
        val edusubjectid = jsonObject.getIntValue("edusubjectid")
        val courseid = jsonObject.getIntValue("courseid")
        val creator = jsonObject.getString("creator")
        val createtime = jsonObject.getString("createtime")
        val majorid = jsonObject.getIntValue("majorid")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        (courseeduid, edusubjectid, courseid, creator, createtime, majorid, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_course_edusubject")
  }
  /**
   * 解析课程网站
   *
   * @param ssc
   * @param sparkSession
   */
  def etlQzWebsite(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzWebsite.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val siteid = jsonObject.getIntValue("siteid")
        val sitename = jsonObject.getString("sitename")
        val domain = jsonObject.getString("domain")
        val sequence = jsonObject.getString("sequence")
        val multicastserver = jsonObject.getString("multicastserver")
        val templateserver = jsonObject.getString("templateserver")
        val status = jsonObject.getString("status")
        val creator = jsonObject.getString("creator")
        val createtime = jsonObject.getString("createtime")
        val multicastgateway = jsonObject.getString("multicastgateway")
        val multicastport = jsonObject.getString("multicastport")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        (siteid, sitename, domain, sequence, multicastserver, templateserver, status, creator, createtime,
          multicastgateway, multicastport, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_website")
  }
  /**
   * 解析主修数据
   *
   * @param ssc
   * @param sparkSession
   */
  def etlQzMajor(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzMajor.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val majorid = jsonObject.getIntValue("majorid")
        val businessid = jsonObject.getIntValue("businessid")
        val siteid = jsonObject.getIntValue("siteid")
        val majorname = jsonObject.getString("majorname")
        val shortname = jsonObject.getString("shortname")
        val status = jsonObject.getString("status")
        val sequence = jsonObject.getString("sequence")
        val creator = jsonObject.getString("creator")
        val createtime = jsonObject.getString("createtime")
        val columm_sitetype = jsonObject.getString("columm_sitetype")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        (majorid, businessid, siteid, majorname, shortname, status, sequence, creator, createtime, columm_sitetype, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_major")
  }
  /**
   * 解析做题业务
   *
   * @param ssc
   * @param sparkSession
   */
  def etlQzBusiness(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzBusiness.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item);
        val businessid = jsonObject.getIntValue("businessid")
        val businessname = jsonObject.getString("businessname")
        val sequence = jsonObject.getString("sequence")
        val status = jsonObject.getString("status")
        val creator = jsonObject.getString("creator")
        val createtime = jsonObject.getString("createtime")
        val siteid = jsonObject.getIntValue("siteid")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        (businessid, businessname, sequence, status, creator, createtime, siteid, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_business")
  }
  def etlQzPaperView(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzPaperView.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val paperviewid = jsonObject.getIntValue("paperviewid")
        val paperid = jsonObject.getIntValue("paperid")
        val paperviewname = jsonObject.getString("paperviewname")
        val paperparam = jsonObject.getString("paperparam")
        val openstatus = jsonObject.getString("openstatus")
        val explainurl = jsonObject.getString("explainurl")
        val iscontest = jsonObject.getString("iscontest")
        val contesttime = jsonObject.getString("contesttime")
        val conteststarttime = jsonObject.getString("conteststarttime")
        val contestendtime = jsonObject.getString("contestendtime")
        val contesttimelimit = jsonObject.getString("contesttimelimit")
        val dayiid = jsonObject.getIntValue("dayiid")
        val status = jsonObject.getString("status")
        val creator = jsonObject.getString("creator")
        val createtime = jsonObject.getString("createtime")
        val paperviewcatid = jsonObject.getIntValue("paperviewcatid")
        val modifystatus = jsonObject.getString("modifystatus")
        val description = jsonObject.getString("description")
        val papertype = jsonObject.getString("papertype")
        val downurl = jsonObject.getString("downurl")
        val paperuse = jsonObject.getString("paperuse")
        val paperdifficult = jsonObject.getString("paperdifficult")
        val testreport = jsonObject.getString("testreport")
        val paperuseshow = jsonObject.getString("paperuseshow")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        DwdQzPaperView(paperviewid, paperid, paperviewname, paperparam, openstatus, explainurl, iscontest, contesttime,
          conteststarttime, contestendtime, contesttimelimit, dayiid, status, creator, createtime, paperviewcatid, modifystatus,
          description, papertype, downurl, paperuse, paperdifficult, testreport, paperuseshow, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_paper_view")
  }
  def etlQzCenterPaper(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzCenterPaper.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val paperviewid = jsonObject.getIntValue("paperviewid")
        val centerid = jsonObject.getIntValue("centerid")
        val openstatus = jsonObject.getString("openstatus")
        val sequence = jsonObject.getString("sequence")
        val creator = jsonObject.getString("creator")
        val createtime = jsonObject.getString("createtime")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        (paperviewid, centerid, openstatus, sequence, creator, createtime, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_center_paper")
  }
  def etlQzPaper(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzPaper.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val paperid = jsonObject.getIntValue("paperid")
        val papercatid = jsonObject.getIntValue("papercatid")
        val courseid = jsonObject.getIntValue("courseid")
        val paperyear = jsonObject.getString("paperyear")
        val chapter = jsonObject.getString("chapter")
        val suitnum = jsonObject.getString("suitnum")
        val papername = jsonObject.getString("papername")
        val status = jsonObject.getString("status")
        val creator = jsonObject.getString("creator")
        val craetetime = jsonObject.getString("createtime")
        val totalscore = BigDecimal.apply(jsonObject.getString("totalscore")).setScale(1, BigDecimal.RoundingMode.HALF_UP)
        val chapterid = jsonObject.getIntValue("chapterid")
        val chapterlistid = jsonObject.getIntValue("chapterlistid")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        (paperid, papercatid, courseid, paperyear, chapter, suitnum, papername, status, creator, craetetime, totalscore, chapterid,
          chapterlistid, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_paper")
  }
  def etlQzCenter(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzCenter.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(parititons => {
      parititons.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val centerid = jsonObject.getIntValue("centerid")
        val centername = jsonObject.getString("centername")
        val centeryear = jsonObject.getString("centeryear")
        val centertype = jsonObject.getString("centertype")
        val openstatus = jsonObject.getString("openstatus")
        val centerparam = jsonObject.getString("centerparam")
        val description = jsonObject.getString("description")
        val creator = jsonObject.getString("creator")
        val createtime = jsonObject.getString("createtime")
        val sequence = jsonObject.getString("sequence")
        val provideuser = jsonObject.getString("provideuser")
        val centerviewtype = jsonObject.getString("centerviewtype")
        val stage = jsonObject.getString("stage")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        (centerid, centername, centeryear, centertype, openstatus, centerparam, description, creator, createtime,
          sequence, provideuser, centerviewtype, stage, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_center")
  }
  def etlQzQuestion(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzQuestion.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val questionid = jsonObject.getIntValue("questionid")
        val parentid = jsonObject.getIntValue("parentid")
        val questypeid = jsonObject.getIntValue("questypeid")
        val quesviewtype = jsonObject.getIntValue("quesviewtype")
        val content = jsonObject.getString("content")
        val answer = jsonObject.getString("answer")
        val analysis = jsonObject.getString("analysis")
        val limitminute = jsonObject.getString("limitminute")
        val score = BigDecimal.apply(jsonObject.getDoubleValue("score")).setScale(1, BigDecimal.RoundingMode.HALF_UP)
        val splitscore = BigDecimal.apply(jsonObject.getDoubleValue("splitscore")).setScale(1, BigDecimal.RoundingMode.HALF_UP)
        val status = jsonObject.getString("status")
        val optnum = jsonObject.getIntValue("optnum")
        val lecture = jsonObject.getString("lecture")
        val creator = jsonObject.getString("creator")
        val createtime = jsonObject.getString("createtime")
        val modifystatus = jsonObject.getString("modifystatus")
        val attanswer = jsonObject.getString("attanswer")
        val questag = jsonObject.getString("questag")
        val vanalysisaddr = jsonObject.getString("vanalysisaddr")
        val difficulty = jsonObject.getString("difficulty")
        val quesskill = jsonObject.getString("quesskill")
        val vdeoaddr = jsonObject.getString("vdeoaddr")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        DwdQzQuestion(questionid, parentid, questypeid, quesviewtype, content, answer, analysis, limitminute, score, splitscore,
          status, optnum, lecture, creator, createtime, modifystatus, attanswer, questag, vanalysisaddr, difficulty, quesskill,
          vdeoaddr, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_question")
  }
  def etlQzQuestionType(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzQuestionType.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val quesviewtype = jsonObject.getIntValue("quesviewtype")
        val viewtypename = jsonObject.getString("viewtypename")
        val questiontypeid = jsonObject.getIntValue("questypeid")
        val description = jsonObject.getString("description")
        val status = jsonObject.getString("status")
        val creator = jsonObject.getString("creator")
        val createtime = jsonObject.getString("createtime")
        val papertypename = jsonObject.getString("papertypename")
        val sequence = jsonObject.getString("sequence")
        val remark = jsonObject.getString("remark")
        val splitscoretype = jsonObject.getString("splitscoretype")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        (quesviewtype, viewtypename, questiontypeid, description, status, creator, createtime, papertypename, sequence,
          remark, splitscoretype, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_question_type")
  }
  /**
   * 解析用户做题情况数据
   *
   * @param ssc
   * @param sparkSession
   */
  def etlQzMemberPaperQuestion(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzMemberPaperQuestion.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val userid = jsonObject.getIntValue("userid")
        val paperviewid = jsonObject.getIntValue("paperviewid")
        val chapterid = jsonObject.getIntValue("chapterid")
        val sitecourseid = jsonObject.getIntValue("sitecourseid")
        val questionid = jsonObject.getIntValue("questionid")
        val majorid = jsonObject.getIntValue("majorid")
        val useranswer = jsonObject.getString("useranswer")
        val istrue = jsonObject.getString("istrue")
        val lasttime = jsonObject.getString("lasttime")
        val opertype = jsonObject.getString("opertype")
        val paperid = jsonObject.getIntValue("paperid")
        val spendtime = jsonObject.getIntValue("spendtime")
        v al score = BigDecimal.apply(jsonObject.getString("score")).setScale(1, BigDecimal.RoundingMode.HALF_UP)
        val question_answer = jsonObject.getIntValue("question_answer")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        (userid, paperviewid, chapterid, sitecourseid, questionid, majorid, useranswer, istrue, lasttime, opertype, paperid, spendtime, score,question_answer, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_member_paper_question")
  }
}


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
相关文章
|
2天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
2天前
|
分布式计算 大数据 BI
MaxCompute产品使用合集之MaxCompute项目的数据是否可以被接入到阿里云的Quick BI中
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
2天前
|
存储 数据采集 数据可视化
大数据处理技术
【4月更文挑战第10天】大数据处理涵盖采集、预处理、存储、分析挖掘、展现和应用等关键步骤。采集涉及多种类型数据,预处理确保数据质量,存储管理关注规模、速度和安全,分析挖掘利用机器学习发现价值,展现和应用则通过可视化和检索实现数据价值。云计算和AI强化了大数据处理能力,整体目标是提取数据中的价值,驱动企业和社会进步。
35 4
大数据处理技术
|
2天前
|
机器学习/深度学习 运维 算法
大数据基础工程技术团队4篇论文入选ICLR,ICDE,WWW
近日,由阿里云计算平台大数据基础工程技术团队主导的四篇时间序列相关论文分别被国际顶会ICLR2024、ICDE2024和WWW2024接收。
|
2天前
|
存储 机器学习/深度学习 数据采集
大数据处理与分析实战:技术深度剖析与案例分享
【5月更文挑战第2天】本文探讨了大数据处理与分析的关键环节,包括数据采集、预处理、存储、分析和可视化,并介绍了Hadoop、Spark和机器学习等核心技术。通过电商推荐系统和智慧城市交通管理的实战案例,展示了大数据在提高用户体验和解决实际问题上的效能。随着技术进步,大数据处理与分析将在更多领域发挥作用,推动社会进步。
|
2天前
|
SQL 分布式计算 DataWorks
MaxCompute产品使用合集之要查看MaxCompute Studio中的项目中的计算任务代码,我该怎么操作
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
2天前
|
SQL 分布式计算 DataWorks
MaxCompute产品使用合集之该怎么创建MaxCompute的项目
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
2天前
|
存储 分布式计算 Hadoop
【专栏】Hadoop,开源大数据处理框架:驭服数据洪流的利器
【4月更文挑战第28天】Hadoop,开源大数据处理框架,由Hadoop Common、HDFS、YARN和MapReduce组成,提供大规模数据存储和并行处理。其优势在于可扩展性、容错性、高性能、灵活性及社区支持。然而,数据安全、处理速度、系统复杂性和技能短缺是挑战。通过加强安全措施、结合Spark、自动化工具和培训,Hadoop在应对大数据问题中保持关键地位。
|
2天前
|
存储 数据可视化 大数据
大数据技术框架
【4月更文挑战第20天】大数据一般需要经过6个主要环节,包括数据收集、数据存储、资源管理与服务协调、计算引擎、数据分析和数据可视化。
|
2天前
|
分布式计算 容灾 大数据
MaxCompute( 原名ODPS)大数据容灾方案与实现(及项目落地实例)专有云
一,背景与概述    复杂系统的灾难恢复是个难题,具有海量数据及复杂业务场景的大数据容灾是个大难题。    MaxCompute是集团内重要数据平台,是自主研发的大数据解决方案,其规模和稳定性在业界都是领先的。
1490 12

热门文章

最新文章