基于大数据技术的开源在线教育项目 三1

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 基于大数据技术的开源在线教育项目 三

上篇文章我们介绍了离线数仓的做题模块,本文我们来看下在线教育项目的实时部分。

本文代码可在开源项目https://github.com/SoundHearer/kuaiban中找到。

实时部分的架构图如下

原始数据格式及对应的topic

实时统计注册人数

topic:register_topic

数据格式

用户ID 平台ID 1:PC 2:APP 3:Others 创建时间
85571 1 2019-07-16 16:01:55

做题正确率与知识掌握度数据格式

topic:qz_log

用户ID 课程ID 知识点ID 题目ID 是否正确 0 错误 1 正确 创建时间
1005 505 29 1 1 2019-09-12 11:17:48

商品页面到订单页,订单页到支付页数据格式

{"app_id":"1","device_id":"102","distinct_id":"5fa401c8-dd45-4425-b8c6-700f9f74c532","event_name":"-","ip":"121.76.152.135","last_event_name":"-","last_page_id":"0","next_event_name":"-","next_page_id":"2","page_id":"1","server_time":"-","uid":"245494"}

topic: page_topic

uid:用户id

app_id:平台id

deviceid:平台id

disinct_id:唯一标识

Ip:用户ip地址

last_page_id :上一页面id

page_id:当前页面id 0:首页 1:商品课程页 2:订单页面 3:支付页面

next_page_id:下一页面id

实时统计学员播放视频各时长

topic: course_learn

uid:用户id

app_id:平台id

deviceid:平台id

disinct_id:唯一标识

Ip:用户ip地址

last_page_id :上一页面id

page_id:当前页面id 0:首页 1:商品课程页 2:订单页面 3:支付页面

next_page_id:下一页面id

实时统计学员播放视频各时长

topic: course_learn

{"biz":"bdfb58e5-d14c-45d2-91bc-1d9409800ac3","chapterid":"1","cwareid":"3","edutypeid":"3","pe":"55","ps":"41","sourceType":"APP","speed":"2","subjectid":"2","te":"1563352166417","ts":"1563352159417","uid":"235","videoid":"2"}

biz:唯一标识

chapterid:章节id

cwareid:课件id

edutypeid:辅导id

ps:视频播放时间区间

pe:视频播放结束区间

sourceType:播放平台

speed:播放倍速

ts:视频播放开始时间(时间戳)

te:视频播放结束时间(时间戳)

videoid:视频id

新建Topic

kafka-topics --zookeeper cdh1.macro.com:2181,cdh2.macro.com:2181,cdh3.macro.com:2181/kafka --create --topic register_topic --partitions 10 --replication-factor 2
kafka-topics --zookeeper cdh1.macro.com:2181,cdh2.macro.com:2181,cdh3.macro.com:2181/kafka --create --topic page_topic --partitions 10 --replication-factor 2
kafka-topics --zookeeper cdh1.macro.com:2181,cdh2.macro.com:2181,cdh3.macro.com:2181/kafka --create --topic course_learn --partitions 10 --replication-factor 2
 kafka-topics --zookeeper cdh1.macro.com:2181,cdh2.macro.com:2181,cdh3.macro.com:2181/kafka --create --topic qz_log --partitions 10 --replication-factor 2

模拟数据采集

将log文件通过kafka生产者发送到topic中去,log源文件可以在开源项目https://github.com/SoundHearer/kuaiban中找到

以course_learn.log为例,我们将log上传到了hdfs中,生产者代码为下:

import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark.{SparkConf, SparkContext}
object CourseLearnProducer {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("courseProducer").setMaster("local[*]")
    val ssc = new SparkContext(sparkConf)
//    System.setProperty("hadoop.home.dir", "D:\\hadoop\\hadoop-common-2.2.0-bin-master")
    val resultLog = ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/data/course_learn.log", 10)
      .foreachPartition(partitoin => {
        val props = new Properties()
        props.put("bootstrap.servers", "cdh1.macro.com:9092,cdh2.macro.com:9092,cdh3.macro.com:9092")
        props.put("acks", "1")
        props.put("batch.size", "16384")
        props.put("linger.ms", "10")
        props.put("buffer.memory", "33554432")
        props.put("key.serializer",
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put("value.serializer",
          "org.apache.kafka.common.serialization.StringSerializer")
        val producer = new KafkaProducer[String, String](props)
        partitoin.foreach(item => {
          val msg = new ProducerRecord[String, String]("course_learn", item)
          producer.send(msg)
        })
        producer.flush()
        producer.close()
      })
  }
}

实现

实时统计注册人员信息

用户使用网站或APP进行注册,后台实时收集数据传输Kafka,Spark Streaming进行对接统计,实时统计注册人数。

需求1:实时统计注册人数,批次为3秒一批,使用updateStateBykey算子计算历史数据和当前批次的数据总数,仅此需求使用updateStateBykey,后续需求不使用updateStateBykey。

需求2:每6秒统统计一次1分钟内的注册数据,不需要历史数据 提示:reduceByKeyAndWindow算子

import java.lang
import java.sql.ResultSet
import java.util.Random
import com.catelf.qzpoint.util.{DataSourceUtil, QueryCallback, SqlProxy}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
object RegisterStreaming {
  private val groupid = "register_group_test"
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "hdfs")
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
      .set("spark.streaming.kafka.maxRatePerPartition", "50")
      .set("spark.streaming.stopGracefullyOnShutdown", "true")
    val ssc = new StreamingContext(conf, Seconds(3))
    val topics = Array("register_topic")
    val kafkaMap: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "cdh1.macro.com:9092,cdh2.macro.com:9092,cdh3.macro.com:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupid,
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: lang.Boolean)
    )
    ssc.checkpoint("hdfs://cdh1.macro.com:8020/user/catelf/sparkstreaming/checkpoint")
    //查询mysql中是否有偏移量
    val sqlProxy = new SqlProxy()
    val offsetMap = new mutable.HashMap[TopicPartition, Long]()
    val client = DataSourceUtil.getConnection
    try {
      sqlProxy.executeQuery(client, "select * from `offset_manager` where groupid=?", Array(groupid), new QueryCallback {
        override def process(rs: ResultSet): Unit = {
          while (rs.next()) {
            val model = new TopicPartition(rs.getString(2), rs.getInt(3))
            val offset = rs.getLong(4)
            offsetMap.put(model, offset)
          }
          rs.close() //关闭游标
        }
      })
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      sqlProxy.shutdown(client)
    }
    //设置kafka消费数据的参数  判断本地是否有偏移量  有则根据偏移量继续消费 无则重新消费
    val stream: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.isEmpty) {
      KafkaUtils.createDirectStream(
        ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap))
    } else {
      KafkaUtils.createDirectStream(
        ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap))
    }
    val resultDStream = stream.filter(item => item.value().split("\t").length == 3).
      mapPartitions(partitions => {
        partitions.map(item => {
          val line = item.value()
          val arr = line.split("\t")
          val app_name = arr(1) match {
            case "1" => "PC"
            case "2" => "APP"
            case _ => "Other"
          }
          (app_name, 1)
        })
      })
    resultDStream.cache()
    //    resultDStream.reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(60), Seconds(6)).print()
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.sum //本批次求和
      val previousCount = state.getOrElse(0) //历史数据
      Some(currentCount + previousCount)
    }
    resultDStream.updateStateByKey(updateFunc).print()
    //处理完 业务逻辑后 手动提交offset维护到本地 mysql中
    stream.foreachRDD(rdd => {
      val sqlProxy = new SqlProxy()
      val client = DataSourceUtil.getConnection
      try {
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        for (or <- offsetRanges) {
          sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",
            Array(groupid, or.topic, or.partition.toString, or.untilOffset))
        }
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        sqlProxy.shutdown(client)
      }
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

实时计算学员做题算正确率与知识点掌握度

用户在网站或APP进行做题,做完题点击交卷按钮,程序将做题记录提交,传输到Kafka中,下游Spark Streaming对接kafka实现实时计算做题正确率和掌握度,将正确率和掌握度存入mysql中,用户点击交卷后刷新页面能立马看到自己做题的详情。

需求1:要求Spark Streaming 保证数据不丢失,每秒100条处理速度,需要手动维护偏移量

需求2:同一个用户做在同一门课程同一知识点下做题需要去重,需要根据历史数据进行去重并且记录去重后的做题id与个数。

需求3:计算知识点正确率 正确率计算公式:做题正确总个数/做题总数 保留两位小数

需求4:计算知识点掌握度 去重后的做题个数/当前知识点总题数(已知30题)*当前知识点的正确率

import java.lang
import java.sql.{Connection, ResultSet}
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import com.catelf.qzpoint.util.{DataSourceUtil, QueryCallback, SqlProxy}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
/**
 * 知识点掌握度实时统计
 */
object QzPointStreaming {
  private val groupid = "qz_point_group"
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
      .set("spark.streaming.kafka.maxRatePerPartition", "50")
      .set("spark.streaming.stopGracefullyOnShutdown", "true")
    val ssc = new StreamingContext(conf, Seconds(3))
    val topics = Array("qz_log")
    val kafkaMap: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "cdh1.macro.com:9092,cdh2.macro.com:9092,cdh3.macro.com:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupid,
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: lang.Boolean)
    )
    //查询mysql中是否存在偏移量
    val sqlProxy = new SqlProxy()
    val offsetMap = new mutable.HashMap[TopicPartition, Long]()
    val client = DataSourceUtil.getConnection
    try {
      sqlProxy.executeQuery(client, "select * from `offset_manager` where groupid=?", Array(groupid), new QueryCallback {
        override def process(rs: ResultSet): Unit = {
          while (rs.next()) {
            val model = new TopicPartition(rs.getString(2), rs.getInt(3))
            val offset = rs.getLong(4)
            offsetMap.put(model, offset)
          }
          rs.close() //关闭游标
        }
      })
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      sqlProxy.shutdown(client)
    }
    //设置kafka消费数据的参数  判断本地是否有偏移量  有则根据偏移量继续消费 无则重新消费
    val stream: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.isEmpty) {
      KafkaUtils.createDirectStream(
        ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap))
    } else {
      KafkaUtils.createDirectStream(
        ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap))
    }
    //过滤不正常数据 获取数据
    val dsStream = stream.filter(item => item.value().split("\t").length == 6).
      mapPartitions(partition => partition.map(item => {
        val line = item.value()
        val arr = line.split("\t")
        val uid = arr(0) //用户id
        val courseid = arr(1) //课程id
        val pointid = arr(2) //知识点id
        val questionid = arr(3) //题目id
        val istrue = arr(4) //是否正确
        val createtime = arr(5) //创建时间
        (uid, courseid, pointid, questionid, istrue, createtime)
      }))
    dsStream.foreachRDD(rdd => {
      //获取相同用户 同一课程 同一知识点的数据
      val groupRdd = rdd.groupBy(item => item._1 + "-" + item._2 + "-" + item._3)
      groupRdd.foreachPartition(partition => {
        //在分区下获取jdbc连接
        val sqlProxy = new SqlProxy()
        val client = DataSourceUtil.getConnection
        try {
          partition.foreach { case (key, iters) =>
            qzQuestionUpdate(key, iters, sqlProxy, client) //对题库进行更新操作
          }
        } catch {
          case e: Exception => e.printStackTrace()
        }
        finally {
          sqlProxy.shutdown(client)
        }
      }
      )
    })
    //处理完 业务逻辑后 手动提交offset维护到本地 mysql中
    stream.foreachRDD(rdd => {
      val sqlProxy = new SqlProxy()
      val client = DataSourceUtil.getConnection
      try {
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        for (or <- offsetRanges) {
          sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",
            Array(groupid, or.topic, or.partition.toString, or.untilOffset))
        }
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        sqlProxy.shutdown(client)
      }
    })
    ssc.start()
    ssc.awaitTermination()
  }
  /**
   * 对题目表进行更新操作
   *
   * @param key
   * @param iters
   * @param sqlProxy
   * @param client
   * @return
   */
  def qzQuestionUpdate(key: String, iters: Iterable[(String, String, String, String, String, String)], sqlProxy: SqlProxy, client: Connection) = {
    val keys = key.split("-")
    val userid = keys(0).toInt
    val courseid = keys(1).toInt
    val pointid = keys(2).toInt
    val array = iters.toArray
    val questionids = array.map(_._4).distinct //对当前批次的数据下questionid 去重
    //查询历史数据下的 questionid
    var questionids_history: Array[String] = Array()
    sqlProxy.executeQuery(client, "select questionids from qz_point_history where userid=? and courseid=? and pointid=?",
      Array(userid, courseid, pointid), new QueryCallback {
        override def process(rs: ResultSet): Unit = {
          while (rs.next()) {
            questionids_history = rs.getString(1).split(",")
          }
          rs.close() //关闭游标
        }
      })
    //获取到历史数据后再与当前数据进行拼接 去重
    val resultQuestionid = questionids.union(questionids_history).distinct
    val countSize = resultQuestionid.length
    val resultQuestionid_str = resultQuestionid.mkString(",")
    val qz_count = questionids.length //去重后的题个数
    var qz_sum = array.length //获取当前批次题总数
    var qz_istrue = array.filter(_._5.equals("1")).size //获取当前批次做正确的题个数
    val createtime = array.map(_._6).min //获取最早的创建时间 作为表中创建时间
    //更新qz_point_set 记录表 此表用于存当前用户做过的questionid表
    val updatetime = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now())
    sqlProxy.executeUpdate(client, "insert into qz_point_history(userid,courseid,pointid,questionids,createtime,updatetime) values(?,?,?,?,?,?) " +
      " on duplicate key update questionids=?,updatetime=?", Array(userid, courseid, pointid, resultQuestionid_str, createtime, createtime, resultQuestionid_str, updatetime))
    var qzSum_history = 0
    var istrue_history = 0
    sqlProxy.executeQuery(client, "select qz_sum,qz_istrue from qz_point_detail where userid=? and courseid=? and pointid=?",
      Array(userid, courseid, pointid), new QueryCallback {
        override def process(rs: ResultSet): Unit = {
          while (rs.next()) {
            qzSum_history += rs.getInt(1)
            istrue_history += rs.getInt(2)
          }
          rs.close()
        }
      })
    qz_sum += qzSum_history
    qz_istrue += istrue_history
    val correct_rate = qz_istrue.toDouble / qz_sum.toDouble //计算正确率
    //计算完成率
    //假设每个知识点下一共有30道题  先计算题的做题情况 再计知识点掌握度
    val qz_detail_rate = countSize.toDouble / 30 //算出做题情况乘以 正确率 得出完成率 假如30道题都做了那么正确率等于 知识点掌握度
    val mastery_rate = qz_detail_rate * correct_rate
    sqlProxy.executeUpdate(client, "insert into qz_point_detail(userid,courseid,pointid,qz_sum,qz_count,qz_istrue,correct_rate,mastery_rate,createtime,updatetime)" +
      " values(?,?,?,?,?,?,?,?,?,?) on duplicate key update qz_sum=?,qz_count=?,qz_istrue=?,correct_rate=?,mastery_rate=?,updatetime=?",
      Array(userid, courseid, pointid, qz_sum, countSize, qz_istrue, correct_rate, mastery_rate, createtime, updatetime, qz_sum, countSize, qz_istrue, correct_rate, mastery_rate, updatetime))
  }
}

实时统计商品页到订单页,订单页到支付页转换率

用户浏览课程首页点击下订单,跳转到订单页面,再点击支付跳转到支付页面进行支付,收集各页面跳转json数据,解析json数据计算各页面点击数和转换率,计算top3点击量按地区排名(ip字段,需要根据历史数据累计)

需求1:计算首页总浏览数、订单页总浏览数、支付页面总浏览数

需求2:计算商品课程页面到订单页的跳转转换率、订单页面到支付页面的跳转转换率

需求3:根据ip得出相应省份,展示出top3省份的点击数,需要根据历史数据累加

import java.lang
import java.sql.{Connection, ResultSet}
import java.text.NumberFormat
import com.catelf.qzpoint.util.{DataSourceUtil, ParseJsonData, QueryCallback, SqlProxy}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkFiles}
import org.lionsoul.ip2region.{DbConfig, DbSearcher}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
/**
 * 页面转换率实时统计
 */
object PageStreaming {
  private val groupid = "vip_count_groupid"
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
      .set("spark.streaming.kafka.maxRatePerPartition", "30")
      .set("spark.streaming.stopGracefullyOnShutdown", "true")
    val ssc = new StreamingContext(conf, Seconds(3))
    val topics = Array("page_topic")
    val kafkaMap: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "cdh1.macro.com:9092,cdh2.macro.com:9092,cdh3.macro.com:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupid,
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: lang.Boolean)
    )
    //查询mysql中是否存在偏移量
    val sqlProxy = new SqlProxy()
    val offsetMap = new mutable.HashMap[TopicPartition, Long]()
    val client = DataSourceUtil.getConnection
    try {
      sqlProxy.executeQuery(client, "select *from `offset_manager` where groupid=?", Array(groupid), new QueryCallback {
        override def process(rs: ResultSet): Unit = {
          while (rs.next()) {
            val model = new TopicPartition(rs.getString(2), rs.getInt(3))
            val offset = rs.getLong(4)
            offsetMap.put(model, offset)
          }
          rs.close()
        }
      })
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      sqlProxy.shutdown(client)
    }
    //设置kafka消费数据的参数 判断本地是否有偏移量  有则根据偏移量继续消费 无则重新消费
    val stream: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.isEmpty) {
      KafkaUtils.createDirectStream(
        ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap))
    } else {
      KafkaUtils.createDirectStream(
        ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap))
    }
    //解析json数据
    val dsStream = stream.map(item => item.value()).mapPartitions(partition => {
      partition.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val uid = if (jsonObject.containsKey("uid")) jsonObject.getString("uid") else ""
        val app_id = if (jsonObject.containsKey("app_id")) jsonObject.getString("app_id") else ""
        val device_id = if (jsonObject.containsKey("device_id")) jsonObject.getString("device_id") else ""
        val ip = if (jsonObject.containsKey("ip")) jsonObject.getString("ip") else ""
        val last_page_id = if (jsonObject.containsKey("last_page_id")) jsonObject.getString("last_page_id") else ""
        val pageid = if (jsonObject.containsKey("page_id")) jsonObject.getString("page_id") else ""
        val next_page_id = if (jsonObject.containsKey("next_page_id")) jsonObject.getString("next_page_id") else ""
        (uid, app_id, device_id, ip, last_page_id, pageid, next_page_id)
      })
    }).filter(item => {
      !item._5.equals("") && !item._6.equals("") && !item._7.equals("")
    })
    dsStream.cache()
    val pageValueDStream = dsStream.map(item => (item._5 + "_" + item._6 + "_" + item._7, 1))
    val resultDStream = pageValueDStream.reduceByKey(_ + _)
    resultDStream.foreachRDD(rdd => {
      rdd.foreachPartition(partition => {
        //在分区下获取jdbc连接
        val sqlProxy = new SqlProxy()
        val client = DataSourceUtil.getConnection
        try {
          partition.foreach(item => {
            calcPageJumpCount(sqlProxy, item, client) //计算页面跳转个数
          })
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          sqlProxy.shutdown(client)
        }
      })
    })
    ssc.sparkContext.addFile("hdfs://cdh1.macro.com:8020/user/catelf/data/ip2region.db") //广播文件
    val ipDStream = dsStream.mapPartitions(patitions => {
      val dbFile = SparkFiles.get("ip2region.db")
      val ipsearch = new DbSearcher(new DbConfig(), dbFile)
      patitions.map { item =>
        val ip = item._4
        val province = ipsearch.memorySearch(ip).getRegion().split("\\|")(2) //获取ip详情   中国|0|上海|上海市|有线通
        (province, 1l) //根据省份 统计点击个数
      }
    }).reduceByKey(_ + _)
    ipDStream.foreachRDD(rdd => {
      //查询mysql历史数据 转成rdd
      val ipSqlProxy = new SqlProxy()
      val ipClient = DataSourceUtil.getConnection
      try {
        val history_data = new ArrayBuffer[(String, Long)]()
        ipSqlProxy.executeQuery(ipClient, "select province,num from tmp_city_num_detail", null, new QueryCallback {
          override def process(rs: ResultSet): Unit = {
            while (rs.next()) {
              val tuple = (rs.getString(1), rs.getLong(2))
              history_data += tuple
            }
          }
        })
        val history_rdd = ssc.sparkContext.makeRDD(history_data)
        val resultRdd = history_rdd.fullOuterJoin(rdd).map(item => {
          val province = item._1
          val nums = item._2._1.getOrElse(0l) + item._2._2.getOrElse(0l)
          (province, nums)
        })
        resultRdd.foreachPartition(partitions => {
          val sqlProxy = new SqlProxy()
          val client = DataSourceUtil.getConnection
          try {
            partitions.foreach(item => {
              val province = item._1
              val num = item._2
              //修改mysql数据 并重组返回最新结果数据
              sqlProxy.executeUpdate(client, "insert into tmp_city_num_detail(province,num)values(?,?) on duplicate key update num=?",
                Array(province, num, num))
            })
          } catch {
            case e: Exception => e.printStackTrace()
          } finally {
              sqlProxy.shutdown(client)
          }
        })
        val top3Rdd = resultRdd.sortBy[Long](_._2, false).take(3)
        sqlProxy.executeUpdate(ipClient, "truncate table top_city_num", null)
        top3Rdd.foreach(item => {
          sqlProxy.executeUpdate(ipClient, "insert into top_city_num (province,num) values(?,?)", Array(item._1, item._2))
        })
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        sqlProxy.shutdown(ipClient)
      }
    })
    //计算转换率
    //处理完 业务逻辑后 手动提交offset维护到本地 mysql中
    stream.foreachRDD(rdd => {
      val sqlProxy = new SqlProxy()
      val client = DataSourceUtil.getConnection
      try {
        calcJumRate(sqlProxy, client) //计算转换率
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        for (or <- offsetRanges) {
          sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",
            Array(groupid, or.topic, or.partition.toString, or.untilOffset))
        }
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        sqlProxy.shutdown(client)
      }
    })
    ssc.start()
    ssc.awaitTermination()
  }
  /**
   * 计算页面跳转个数
   *
   * @param sqlProxy
   * @param item
   * @param client
   */
  def calcPageJumpCount(sqlProxy: SqlProxy, item: (String, Int), client: Connection): Unit = {
    val keys = item._1.split("_")
    var num: Long = item._2
    val page_id = keys(1).toInt //获取当前page_id
    val last_page_id = keys(0).toInt //获取上一page_id
    val next_page_id = keys(2).toInt //获取下页面page_id
    //查询当前page_id的历史num个数
    sqlProxy.executeQuery(client, "select num from page_jump_rate where page_id=?", Array(page_id), new QueryCallback {
      override def process(rs: ResultSet): Unit = {
        while (rs.next()) {
          num += rs.getLong(1)
        }
        rs.close()
      }
      //对num 进行修改 并且判断当前page_id是否为首页
      if (page_id == 1) {
        sqlProxy.executeUpdate(client, "insert into page_jump_rate(last_page_id,page_id,next_page_id,num,jump_rate)" +
          "values(?,?,?,?,?) on duplicate key update num=num+?", Array(last_page_id, page_id, next_page_id, num, "100%", num))
      } else {
        sqlProxy.executeUpdate(client, "insert into page_jump_rate(last_page_id,page_id,next_page_id,num)" +
          "values(?,?,?,?) on duplicate key update num=num+?", Array(last_page_id, page_id, next_page_id, num, num))
      }
    })
  }
  /**
   * 计算转换率
   */
  def calcJumRate(sqlProxy: SqlProxy, client: Connection): Unit = {
    var page1_num = 0l
    var page2_num = 0l
    var page3_num = 0l
    sqlProxy.executeQuery(client, "select num from page_jump_rate where page_id=?", Array(1), new QueryCallback {
      override def process(rs: ResultSet): Unit = {
        while (rs.next()) {
          page1_num = rs.getLong(1)
        }
      }
    })
    sqlProxy.executeQuery(client, "select num from page_jump_rate where page_id=?", Array(2), new QueryCallback {
      override def process(rs: ResultSet): Unit = {
        while (rs.next()) {
          page2_num = rs.getLong(1)
        }
      }
    })
    sqlProxy.executeQuery(client, "select num from page_jump_rate where page_id=?", Array(3), new QueryCallback {
      override def process(rs: ResultSet): Unit = {
        while (rs.next()) {
          page3_num = rs.getLong(1)
        }
      }
    })
    val nf = NumberFormat.getPercentInstance
    val page1ToPage2Rate = if (page1_num == 0) "0%" else nf.format(page2_num.toDouble / page1_num.toDouble)
    val page2ToPage3Rate = if (page2_num == 0) "0%" else nf.format(page3_num.toDouble / page2_num.toDouble)
    sqlProxy.executeUpdate(client, "update page_jump_rate set jump_rate=? where page_id=?", Array(page1ToPage2Rate, 2))
    sqlProxy.executeUpdate(client, "update page_jump_rate set jump_rate=? where page_id=?", Array(page2ToPage3Rate, 3))
  }
}


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
相关文章
|
2天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
2天前
|
分布式计算 大数据 BI
MaxCompute产品使用合集之MaxCompute项目的数据是否可以被接入到阿里云的Quick BI中
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
2天前
|
存储 数据采集 数据可视化
大数据处理技术
【4月更文挑战第10天】大数据处理涵盖采集、预处理、存储、分析挖掘、展现和应用等关键步骤。采集涉及多种类型数据,预处理确保数据质量,存储管理关注规模、速度和安全,分析挖掘利用机器学习发现价值,展现和应用则通过可视化和检索实现数据价值。云计算和AI强化了大数据处理能力,整体目标是提取数据中的价值,驱动企业和社会进步。
35 4
大数据处理技术
|
2天前
|
机器学习/深度学习 运维 算法
大数据基础工程技术团队4篇论文入选ICLR,ICDE,WWW
近日,由阿里云计算平台大数据基础工程技术团队主导的四篇时间序列相关论文分别被国际顶会ICLR2024、ICDE2024和WWW2024接收。
|
2天前
|
存储 机器学习/深度学习 数据采集
大数据处理与分析实战:技术深度剖析与案例分享
【5月更文挑战第2天】本文探讨了大数据处理与分析的关键环节,包括数据采集、预处理、存储、分析和可视化,并介绍了Hadoop、Spark和机器学习等核心技术。通过电商推荐系统和智慧城市交通管理的实战案例,展示了大数据在提高用户体验和解决实际问题上的效能。随着技术进步,大数据处理与分析将在更多领域发挥作用,推动社会进步。
|
2天前
|
SQL 分布式计算 DataWorks
MaxCompute产品使用合集之要查看MaxCompute Studio中的项目中的计算任务代码,我该怎么操作
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
2天前
|
SQL 分布式计算 DataWorks
MaxCompute产品使用合集之该怎么创建MaxCompute的项目
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
2天前
|
存储 分布式计算 Hadoop
【专栏】Hadoop,开源大数据处理框架:驭服数据洪流的利器
【4月更文挑战第28天】Hadoop,开源大数据处理框架,由Hadoop Common、HDFS、YARN和MapReduce组成,提供大规模数据存储和并行处理。其优势在于可扩展性、容错性、高性能、灵活性及社区支持。然而,数据安全、处理速度、系统复杂性和技能短缺是挑战。通过加强安全措施、结合Spark、自动化工具和培训,Hadoop在应对大数据问题中保持关键地位。
|
2天前
|
存储 数据可视化 大数据
大数据技术框架
【4月更文挑战第20天】大数据一般需要经过6个主要环节,包括数据收集、数据存储、资源管理与服务协调、计算引擎、数据分析和数据可视化。
|
2天前
|
分布式计算 容灾 大数据
MaxCompute( 原名ODPS)大数据容灾方案与实现(及项目落地实例)专有云
一,背景与概述    复杂系统的灾难恢复是个难题,具有海量数据及复杂业务场景的大数据容灾是个大难题。    MaxCompute是集团内重要数据平台,是自主研发的大数据解决方案,其规模和稳定性在业界都是领先的。
1509 12

热门文章

最新文章