上篇文章我们介绍了离线数仓的做题模块,本文我们来看下在线教育项目的实时部分。
本文代码可在开源项目https://github.com/SoundHearer/kuaiban中找到。
实时部分的架构图如下
原始数据格式及对应的topic
实时统计注册人数
topic:register_topic
数据格式
用户ID | 平台ID 1:PC 2:APP 3:Others | 创建时间 |
85571 | 1 | 2019-07-16 16:01:55 |
做题正确率与知识掌握度数据格式
topic:qz_log
用户ID | 课程ID | 知识点ID | 题目ID | 是否正确 0 错误 1 正确 | 创建时间 |
1005 | 505 | 29 | 1 | 1 | 2019-09-12 11:17:48 |
商品页面到订单页,订单页到支付页数据格式
{"app_id":"1","device_id":"102","distinct_id":"5fa401c8-dd45-4425-b8c6-700f9f74c532","event_name":"-","ip":"121.76.152.135","last_event_name":"-","last_page_id":"0","next_event_name":"-","next_page_id":"2","page_id":"1","server_time":"-","uid":"245494"}
topic: page_topic
uid:用户id
app_id:平台id
deviceid:平台id
disinct_id:唯一标识
Ip:用户ip地址
last_page_id :上一页面id
page_id:当前页面id 0:首页 1:商品课程页 2:订单页面 3:支付页面
next_page_id:下一页面id
实时统计学员播放视频各时长
topic: course_learn
uid:用户id
app_id:平台id
deviceid:平台id
disinct_id:唯一标识
Ip:用户ip地址
last_page_id :上一页面id
page_id:当前页面id 0:首页 1:商品课程页 2:订单页面 3:支付页面
next_page_id:下一页面id
实时统计学员播放视频各时长
topic: course_learn
{"biz":"bdfb58e5-d14c-45d2-91bc-1d9409800ac3","chapterid":"1","cwareid":"3","edutypeid":"3","pe":"55","ps":"41","sourceType":"APP","speed":"2","subjectid":"2","te":"1563352166417","ts":"1563352159417","uid":"235","videoid":"2"}
biz:唯一标识
chapterid:章节id
cwareid:课件id
edutypeid:辅导id
ps:视频播放时间区间
pe:视频播放结束区间
sourceType:播放平台
speed:播放倍速
ts:视频播放开始时间(时间戳)
te:视频播放结束时间(时间戳)
videoid:视频id
新建Topic
kafka-topics --zookeeper cdh1.macro.com:2181,cdh2.macro.com:2181,cdh3.macro.com:2181/kafka --create --topic register_topic --partitions 10 --replication-factor 2 kafka-topics --zookeeper cdh1.macro.com:2181,cdh2.macro.com:2181,cdh3.macro.com:2181/kafka --create --topic page_topic --partitions 10 --replication-factor 2 kafka-topics --zookeeper cdh1.macro.com:2181,cdh2.macro.com:2181,cdh3.macro.com:2181/kafka --create --topic course_learn --partitions 10 --replication-factor 2 kafka-topics --zookeeper cdh1.macro.com:2181,cdh2.macro.com:2181,cdh3.macro.com:2181/kafka --create --topic qz_log --partitions 10 --replication-factor 2
模拟数据采集
将log文件通过kafka生产者发送到topic中去,log源文件可以在开源项目https://github.com/SoundHearer/kuaiban中找到
以course_learn.log为例,我们将log上传到了hdfs中,生产者代码为下:
import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.spark.{SparkConf, SparkContext} object CourseLearnProducer { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("courseProducer").setMaster("local[*]") val ssc = new SparkContext(sparkConf) // System.setProperty("hadoop.home.dir", "D:\\hadoop\\hadoop-common-2.2.0-bin-master") val resultLog = ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/data/course_learn.log", 10) .foreachPartition(partitoin => { val props = new Properties() props.put("bootstrap.servers", "cdh1.macro.com:9092,cdh2.macro.com:9092,cdh3.macro.com:9092") props.put("acks", "1") props.put("batch.size", "16384") props.put("linger.ms", "10") props.put("buffer.memory", "33554432") props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, String](props) partitoin.foreach(item => { val msg = new ProducerRecord[String, String]("course_learn", item) producer.send(msg) }) producer.flush() producer.close() }) } }
实现
实时统计注册人员信息
用户使用网站或APP进行注册,后台实时收集数据传输Kafka,Spark Streaming进行对接统计,实时统计注册人数。
需求1:实时统计注册人数,批次为3秒一批,使用updateStateBykey算子计算历史数据和当前批次的数据总数,仅此需求使用updateStateBykey,后续需求不使用updateStateBykey。
需求2:每6秒统统计一次1分钟内的注册数据,不需要历史数据 提示:reduceByKeyAndWindow算子
import java.lang import java.sql.ResultSet import java.util.Random import com.catelf.qzpoint.util.{DataSourceUtil, QueryCallback, SqlProxy} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable object RegisterStreaming { private val groupid = "register_group_test" def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME", "hdfs") val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]") .set("spark.streaming.kafka.maxRatePerPartition", "50") .set("spark.streaming.stopGracefullyOnShutdown", "true") val ssc = new StreamingContext(conf, Seconds(3)) val topics = Array("register_topic") val kafkaMap: Map[String, Object] = Map[String, Object]( "bootstrap.servers" -> "cdh1.macro.com:9092,cdh2.macro.com:9092,cdh3.macro.com:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> groupid, "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (false: lang.Boolean) ) ssc.checkpoint("hdfs://cdh1.macro.com:8020/user/catelf/sparkstreaming/checkpoint") //查询mysql中是否有偏移量 val sqlProxy = new SqlProxy() val offsetMap = new mutable.HashMap[TopicPartition, Long]() val client = DataSourceUtil.getConnection try { sqlProxy.executeQuery(client, "select * from `offset_manager` where groupid=?", Array(groupid), new QueryCallback { override def process(rs: ResultSet): Unit = { while (rs.next()) { val model = new TopicPartition(rs.getString(2), rs.getInt(3)) val offset = rs.getLong(4) offsetMap.put(model, offset) } rs.close() //关闭游标 } }) } catch { case e: Exception => e.printStackTrace() } finally { sqlProxy.shutdown(client) } //设置kafka消费数据的参数 判断本地是否有偏移量 有则根据偏移量继续消费 无则重新消费 val stream: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.isEmpty) { KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap)) } else { KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap)) } val resultDStream = stream.filter(item => item.value().split("\t").length == 3). mapPartitions(partitions => { partitions.map(item => { val line = item.value() val arr = line.split("\t") val app_name = arr(1) match { case "1" => "PC" case "2" => "APP" case _ => "Other" } (app_name, 1) }) }) resultDStream.cache() // resultDStream.reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(60), Seconds(6)).print() val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.sum //本批次求和 val previousCount = state.getOrElse(0) //历史数据 Some(currentCount + previousCount) } resultDStream.updateStateByKey(updateFunc).print() //处理完 业务逻辑后 手动提交offset维护到本地 mysql中 stream.foreachRDD(rdd => { val sqlProxy = new SqlProxy() val client = DataSourceUtil.getConnection try { val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges for (or <- offsetRanges) { sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)", Array(groupid, or.topic, or.partition.toString, or.untilOffset)) } } catch { case e: Exception => e.printStackTrace() } finally { sqlProxy.shutdown(client) } }) ssc.start() ssc.awaitTermination() } }
实时计算学员做题算正确率与知识点掌握度
用户在网站或APP进行做题,做完题点击交卷按钮,程序将做题记录提交,传输到Kafka中,下游Spark Streaming对接kafka实现实时计算做题正确率和掌握度,将正确率和掌握度存入mysql中,用户点击交卷后刷新页面能立马看到自己做题的详情。
需求1:要求Spark Streaming 保证数据不丢失,每秒100条处理速度,需要手动维护偏移量
需求2:同一个用户做在同一门课程同一知识点下做题需要去重,需要根据历史数据进行去重并且记录去重后的做题id与个数。
需求3:计算知识点正确率 正确率计算公式:做题正确总个数/做题总数 保留两位小数
需求4:计算知识点掌握度 去重后的做题个数/当前知识点总题数(已知30题)*当前知识点的正确率
import java.lang import java.sql.{Connection, ResultSet} import java.time.LocalDateTime import java.time.format.DateTimeFormatter import com.catelf.qzpoint.util.{DataSourceUtil, QueryCallback, SqlProxy} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable /** * 知识点掌握度实时统计 */ object QzPointStreaming { private val groupid = "qz_point_group" def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]") .set("spark.streaming.kafka.maxRatePerPartition", "50") .set("spark.streaming.stopGracefullyOnShutdown", "true") val ssc = new StreamingContext(conf, Seconds(3)) val topics = Array("qz_log") val kafkaMap: Map[String, Object] = Map[String, Object]( "bootstrap.servers" -> "cdh1.macro.com:9092,cdh2.macro.com:9092,cdh3.macro.com:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> groupid, "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (false: lang.Boolean) ) //查询mysql中是否存在偏移量 val sqlProxy = new SqlProxy() val offsetMap = new mutable.HashMap[TopicPartition, Long]() val client = DataSourceUtil.getConnection try { sqlProxy.executeQuery(client, "select * from `offset_manager` where groupid=?", Array(groupid), new QueryCallback { override def process(rs: ResultSet): Unit = { while (rs.next()) { val model = new TopicPartition(rs.getString(2), rs.getInt(3)) val offset = rs.getLong(4) offsetMap.put(model, offset) } rs.close() //关闭游标 } }) } catch { case e: Exception => e.printStackTrace() } finally { sqlProxy.shutdown(client) } //设置kafka消费数据的参数 判断本地是否有偏移量 有则根据偏移量继续消费 无则重新消费 val stream: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.isEmpty) { KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap)) } else { KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap)) } //过滤不正常数据 获取数据 val dsStream = stream.filter(item => item.value().split("\t").length == 6). mapPartitions(partition => partition.map(item => { val line = item.value() val arr = line.split("\t") val uid = arr(0) //用户id val courseid = arr(1) //课程id val pointid = arr(2) //知识点id val questionid = arr(3) //题目id val istrue = arr(4) //是否正确 val createtime = arr(5) //创建时间 (uid, courseid, pointid, questionid, istrue, createtime) })) dsStream.foreachRDD(rdd => { //获取相同用户 同一课程 同一知识点的数据 val groupRdd = rdd.groupBy(item => item._1 + "-" + item._2 + "-" + item._3) groupRdd.foreachPartition(partition => { //在分区下获取jdbc连接 val sqlProxy = new SqlProxy() val client = DataSourceUtil.getConnection try { partition.foreach { case (key, iters) => qzQuestionUpdate(key, iters, sqlProxy, client) //对题库进行更新操作 } } catch { case e: Exception => e.printStackTrace() } finally { sqlProxy.shutdown(client) } } ) }) //处理完 业务逻辑后 手动提交offset维护到本地 mysql中 stream.foreachRDD(rdd => { val sqlProxy = new SqlProxy() val client = DataSourceUtil.getConnection try { val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges for (or <- offsetRanges) { sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)", Array(groupid, or.topic, or.partition.toString, or.untilOffset)) } } catch { case e: Exception => e.printStackTrace() } finally { sqlProxy.shutdown(client) } }) ssc.start() ssc.awaitTermination() } /** * 对题目表进行更新操作 * * @param key * @param iters * @param sqlProxy * @param client * @return */ def qzQuestionUpdate(key: String, iters: Iterable[(String, String, String, String, String, String)], sqlProxy: SqlProxy, client: Connection) = { val keys = key.split("-") val userid = keys(0).toInt val courseid = keys(1).toInt val pointid = keys(2).toInt val array = iters.toArray val questionids = array.map(_._4).distinct //对当前批次的数据下questionid 去重 //查询历史数据下的 questionid var questionids_history: Array[String] = Array() sqlProxy.executeQuery(client, "select questionids from qz_point_history where userid=? and courseid=? and pointid=?", Array(userid, courseid, pointid), new QueryCallback { override def process(rs: ResultSet): Unit = { while (rs.next()) { questionids_history = rs.getString(1).split(",") } rs.close() //关闭游标 } }) //获取到历史数据后再与当前数据进行拼接 去重 val resultQuestionid = questionids.union(questionids_history).distinct val countSize = resultQuestionid.length val resultQuestionid_str = resultQuestionid.mkString(",") val qz_count = questionids.length //去重后的题个数 var qz_sum = array.length //获取当前批次题总数 var qz_istrue = array.filter(_._5.equals("1")).size //获取当前批次做正确的题个数 val createtime = array.map(_._6).min //获取最早的创建时间 作为表中创建时间 //更新qz_point_set 记录表 此表用于存当前用户做过的questionid表 val updatetime = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now()) sqlProxy.executeUpdate(client, "insert into qz_point_history(userid,courseid,pointid,questionids,createtime,updatetime) values(?,?,?,?,?,?) " + " on duplicate key update questionids=?,updatetime=?", Array(userid, courseid, pointid, resultQuestionid_str, createtime, createtime, resultQuestionid_str, updatetime)) var qzSum_history = 0 var istrue_history = 0 sqlProxy.executeQuery(client, "select qz_sum,qz_istrue from qz_point_detail where userid=? and courseid=? and pointid=?", Array(userid, courseid, pointid), new QueryCallback { override def process(rs: ResultSet): Unit = { while (rs.next()) { qzSum_history += rs.getInt(1) istrue_history += rs.getInt(2) } rs.close() } }) qz_sum += qzSum_history qz_istrue += istrue_history val correct_rate = qz_istrue.toDouble / qz_sum.toDouble //计算正确率 //计算完成率 //假设每个知识点下一共有30道题 先计算题的做题情况 再计知识点掌握度 val qz_detail_rate = countSize.toDouble / 30 //算出做题情况乘以 正确率 得出完成率 假如30道题都做了那么正确率等于 知识点掌握度 val mastery_rate = qz_detail_rate * correct_rate sqlProxy.executeUpdate(client, "insert into qz_point_detail(userid,courseid,pointid,qz_sum,qz_count,qz_istrue,correct_rate,mastery_rate,createtime,updatetime)" + " values(?,?,?,?,?,?,?,?,?,?) on duplicate key update qz_sum=?,qz_count=?,qz_istrue=?,correct_rate=?,mastery_rate=?,updatetime=?", Array(userid, courseid, pointid, qz_sum, countSize, qz_istrue, correct_rate, mastery_rate, createtime, updatetime, qz_sum, countSize, qz_istrue, correct_rate, mastery_rate, updatetime)) } }
实时统计商品页到订单页,订单页到支付页转换率
用户浏览课程首页点击下订单,跳转到订单页面,再点击支付跳转到支付页面进行支付,收集各页面跳转json数据,解析json数据计算各页面点击数和转换率,计算top3点击量按地区排名(ip字段,需要根据历史数据累计)
需求1:计算首页总浏览数、订单页总浏览数、支付页面总浏览数
需求2:计算商品课程页面到订单页的跳转转换率、订单页面到支付页面的跳转转换率
需求3:根据ip得出相应省份,展示出top3省份的点击数,需要根据历史数据累加
import java.lang import java.sql.{Connection, ResultSet} import java.text.NumberFormat import com.catelf.qzpoint.util.{DataSourceUtil, ParseJsonData, QueryCallback, SqlProxy} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkFiles} import org.lionsoul.ip2region.{DbConfig, DbSearcher} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer /** * 页面转换率实时统计 */ object PageStreaming { private val groupid = "vip_count_groupid" def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]") .set("spark.streaming.kafka.maxRatePerPartition", "30") .set("spark.streaming.stopGracefullyOnShutdown", "true") val ssc = new StreamingContext(conf, Seconds(3)) val topics = Array("page_topic") val kafkaMap: Map[String, Object] = Map[String, Object]( "bootstrap.servers" -> "cdh1.macro.com:9092,cdh2.macro.com:9092,cdh3.macro.com:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> groupid, "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (false: lang.Boolean) ) //查询mysql中是否存在偏移量 val sqlProxy = new SqlProxy() val offsetMap = new mutable.HashMap[TopicPartition, Long]() val client = DataSourceUtil.getConnection try { sqlProxy.executeQuery(client, "select *from `offset_manager` where groupid=?", Array(groupid), new QueryCallback { override def process(rs: ResultSet): Unit = { while (rs.next()) { val model = new TopicPartition(rs.getString(2), rs.getInt(3)) val offset = rs.getLong(4) offsetMap.put(model, offset) } rs.close() } }) } catch { case e: Exception => e.printStackTrace() } finally { sqlProxy.shutdown(client) } //设置kafka消费数据的参数 判断本地是否有偏移量 有则根据偏移量继续消费 无则重新消费 val stream: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.isEmpty) { KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap)) } else { KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap)) } //解析json数据 val dsStream = stream.map(item => item.value()).mapPartitions(partition => { partition.map(item => { val jsonObject = ParseJsonData.getJsonData(item) val uid = if (jsonObject.containsKey("uid")) jsonObject.getString("uid") else "" val app_id = if (jsonObject.containsKey("app_id")) jsonObject.getString("app_id") else "" val device_id = if (jsonObject.containsKey("device_id")) jsonObject.getString("device_id") else "" val ip = if (jsonObject.containsKey("ip")) jsonObject.getString("ip") else "" val last_page_id = if (jsonObject.containsKey("last_page_id")) jsonObject.getString("last_page_id") else "" val pageid = if (jsonObject.containsKey("page_id")) jsonObject.getString("page_id") else "" val next_page_id = if (jsonObject.containsKey("next_page_id")) jsonObject.getString("next_page_id") else "" (uid, app_id, device_id, ip, last_page_id, pageid, next_page_id) }) }).filter(item => { !item._5.equals("") && !item._6.equals("") && !item._7.equals("") }) dsStream.cache() val pageValueDStream = dsStream.map(item => (item._5 + "_" + item._6 + "_" + item._7, 1)) val resultDStream = pageValueDStream.reduceByKey(_ + _) resultDStream.foreachRDD(rdd => { rdd.foreachPartition(partition => { //在分区下获取jdbc连接 val sqlProxy = new SqlProxy() val client = DataSourceUtil.getConnection try { partition.foreach(item => { calcPageJumpCount(sqlProxy, item, client) //计算页面跳转个数 }) } catch { case e: Exception => e.printStackTrace() } finally { sqlProxy.shutdown(client) } }) }) ssc.sparkContext.addFile("hdfs://cdh1.macro.com:8020/user/catelf/data/ip2region.db") //广播文件 val ipDStream = dsStream.mapPartitions(patitions => { val dbFile = SparkFiles.get("ip2region.db") val ipsearch = new DbSearcher(new DbConfig(), dbFile) patitions.map { item => val ip = item._4 val province = ipsearch.memorySearch(ip).getRegion().split("\\|")(2) //获取ip详情 中国|0|上海|上海市|有线通 (province, 1l) //根据省份 统计点击个数 } }).reduceByKey(_ + _) ipDStream.foreachRDD(rdd => { //查询mysql历史数据 转成rdd val ipSqlProxy = new SqlProxy() val ipClient = DataSourceUtil.getConnection try { val history_data = new ArrayBuffer[(String, Long)]() ipSqlProxy.executeQuery(ipClient, "select province,num from tmp_city_num_detail", null, new QueryCallback { override def process(rs: ResultSet): Unit = { while (rs.next()) { val tuple = (rs.getString(1), rs.getLong(2)) history_data += tuple } } }) val history_rdd = ssc.sparkContext.makeRDD(history_data) val resultRdd = history_rdd.fullOuterJoin(rdd).map(item => { val province = item._1 val nums = item._2._1.getOrElse(0l) + item._2._2.getOrElse(0l) (province, nums) }) resultRdd.foreachPartition(partitions => { val sqlProxy = new SqlProxy() val client = DataSourceUtil.getConnection try { partitions.foreach(item => { val province = item._1 val num = item._2 //修改mysql数据 并重组返回最新结果数据 sqlProxy.executeUpdate(client, "insert into tmp_city_num_detail(province,num)values(?,?) on duplicate key update num=?", Array(province, num, num)) }) } catch { case e: Exception => e.printStackTrace() } finally { sqlProxy.shutdown(client) } }) val top3Rdd = resultRdd.sortBy[Long](_._2, false).take(3) sqlProxy.executeUpdate(ipClient, "truncate table top_city_num", null) top3Rdd.foreach(item => { sqlProxy.executeUpdate(ipClient, "insert into top_city_num (province,num) values(?,?)", Array(item._1, item._2)) }) } catch { case e: Exception => e.printStackTrace() } finally { sqlProxy.shutdown(ipClient) } }) //计算转换率 //处理完 业务逻辑后 手动提交offset维护到本地 mysql中 stream.foreachRDD(rdd => { val sqlProxy = new SqlProxy() val client = DataSourceUtil.getConnection try { calcJumRate(sqlProxy, client) //计算转换率 val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges for (or <- offsetRanges) { sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)", Array(groupid, or.topic, or.partition.toString, or.untilOffset)) } } catch { case e: Exception => e.printStackTrace() } finally { sqlProxy.shutdown(client) } }) ssc.start() ssc.awaitTermination() } /** * 计算页面跳转个数 * * @param sqlProxy * @param item * @param client */ def calcPageJumpCount(sqlProxy: SqlProxy, item: (String, Int), client: Connection): Unit = { val keys = item._1.split("_") var num: Long = item._2 val page_id = keys(1).toInt //获取当前page_id val last_page_id = keys(0).toInt //获取上一page_id val next_page_id = keys(2).toInt //获取下页面page_id //查询当前page_id的历史num个数 sqlProxy.executeQuery(client, "select num from page_jump_rate where page_id=?", Array(page_id), new QueryCallback { override def process(rs: ResultSet): Unit = { while (rs.next()) { num += rs.getLong(1) } rs.close() } //对num 进行修改 并且判断当前page_id是否为首页 if (page_id == 1) { sqlProxy.executeUpdate(client, "insert into page_jump_rate(last_page_id,page_id,next_page_id,num,jump_rate)" + "values(?,?,?,?,?) on duplicate key update num=num+?", Array(last_page_id, page_id, next_page_id, num, "100%", num)) } else { sqlProxy.executeUpdate(client, "insert into page_jump_rate(last_page_id,page_id,next_page_id,num)" + "values(?,?,?,?) on duplicate key update num=num+?", Array(last_page_id, page_id, next_page_id, num, num)) } }) } /** * 计算转换率 */ def calcJumRate(sqlProxy: SqlProxy, client: Connection): Unit = { var page1_num = 0l var page2_num = 0l var page3_num = 0l sqlProxy.executeQuery(client, "select num from page_jump_rate where page_id=?", Array(1), new QueryCallback { override def process(rs: ResultSet): Unit = { while (rs.next()) { page1_num = rs.getLong(1) } } }) sqlProxy.executeQuery(client, "select num from page_jump_rate where page_id=?", Array(2), new QueryCallback { override def process(rs: ResultSet): Unit = { while (rs.next()) { page2_num = rs.getLong(1) } } }) sqlProxy.executeQuery(client, "select num from page_jump_rate where page_id=?", Array(3), new QueryCallback { override def process(rs: ResultSet): Unit = { while (rs.next()) { page3_num = rs.getLong(1) } } }) val nf = NumberFormat.getPercentInstance val page1ToPage2Rate = if (page1_num == 0) "0%" else nf.format(page2_num.toDouble / page1_num.toDouble) val page2ToPage3Rate = if (page2_num == 0) "0%" else nf.format(page3_num.toDouble / page2_num.toDouble) sqlProxy.executeUpdate(client, "update page_jump_rate set jump_rate=? where page_id=?", Array(page1ToPage2Rate, 2)) sqlProxy.executeUpdate(client, "update page_jump_rate set jump_rate=? where page_id=?", Array(page2ToPage3Rate, 3)) } }