学习项目前提
熟悉Scala基础 熟悉Spark基础 熟悉Kafka基础 熟悉Redis基础 熟悉IDEA、Git、Maven基础操作
第一章 数据采集
一、 简介
1.1 离线计算
离线计算一般指通过批处理的方式计算已知的所有输入数据,输入数据不会产生变化,一般计算量级较大, 计算时间较长。
1.1.1 离线计算特点
(1)数据确定,不会发生变化
(2)数据量大,保存时间长
(3)大量数据进行的复杂批量计算
(4)方便查看计算后的结果
1.2 实时计算
实时计算一般是指通过流处理方式计算当日的数据都算是实时计算。
也会有一些准实时计算,利用离线框架通过批处理完成(小时、 10 分钟级)的计算,一般为过渡产品,不能算是实时计算。
1.2.1 实时计算特点
(1)局部计算
每次计算以输入的每条数据,或者微批次、小窗口的数据范围进行计算,没法像离线数据一样能够基于当日全部数据进行统计排序分组
(2)开发成本高
离线的批处理 SQL,实时计算需要通过代码,往往需要对接多种数据容器完成,相对开发较为复杂。
(3)资源成本高
(4)时效性
(5)可视化性
1.3 数仓架构设计
1.3.1 离线架构
1.3.2 实时架构
1.4 项目需求
1.4.1 BI分析
1.4.2 数据接口应用
二、 日志数据采集和分流
2.1 整体架构
2.2 采集日志数据
1.采集到数据---数据生成器
(1) 上传模拟日志数据生成器到/export/server/applog 目录
(2) 根据业务修改application.yml配置
#业务日期 mock.date: 2023-02-05 #模拟数据发送模式 mock.type: "kafka" #kafka模式下,发送的地址 mock: kafka-server: "node1:9092,node2:9092,node3:9092" kafka-topic: "ODS_BASE_LOG_1018"
(3) 生成日志数据
java -jar gmall2020-mock-log-2021-11-29.jar
(4) 消费数据测试
bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic ODS_BASE_LOG
2.3 辅助脚本
(1) 模拟生成数据脚本 lg.sh cd/bin/lg.sh
chmod u+x lg.sh
#!/bin/bash if [ $# -ge 1 ] then sed -i "/mock.date/c mock.date: $1" /opt/module/applog/application.yml fi cd /opt/module/applog; java -jar gmall2020-mock-log-2021-11-29.jar >/dev/null 2>&1 &
2.编写时间脚本动态修改日期
标准输出放到黑洞里面,错误输出跟随标准输出,这样没有消息输出
但是他也有个阻塞,把窗口阻塞住,直到生成完才结束
所以用个&推到后台
(2)Kafka脚本
#!/bin/bash if [ $# -lt 1 ] then echo "Usage: kf.sh {start|stop|kc [topic]|kp [topic] |list |delete [topic] |describe [topic]}" exit fi case $1 in "start") #!/bin/bash if [ $# -lt 1 ] then echo "Usage: kf.sh {start|stop|kc [topic]|kp [topic] |list |delete [topic] |describe [topic]}" exit fi case $1 in "start") ;; ;; "kp") if [ $2 ] then kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic $2 else echo "Usage: kf.sh {start|stop|kc [topic]|kp [topic] |list |delete [topic] |describe [topic]}" #!/bin/bash echo "Usage: kf.sh {start|stop|kc [topic]|kp [topic] |list |delete [topic] |describe [topic]}" exit fi for i in node1 node2 node3 do echo "--- 启动 $i kafka ---" ssh $i "/export/server/kafka/bin/kafka-server-start.sh -daemon /export/server/kafka/config/server.properties" done ;; "stop") for i in node1 node2 node3 do echo "--- 停止 $i kafka ---" ssh $i "/export/server/kafka/bin/kafka-server-stop.sh" done if [ $2 ] then kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic $2 else echo "Usage: kf.sh {start|stop|kc [topic]|kp [topic] |list |delete [topic] |describe [topic]}" fi if [ $2 ] else echo "Usage: kf.sh {start|stop|kc [topic]|kp [topic] |list |delete [topic] |describe [topic]}" fi ;; "list") kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092 ;; "describe") if [ $2 ] then kafka-topics.sh --describe --bootstrap-server node1:9092,node2:9092,node3:9092 --topic $2 else echo "Usage: kf.sh {start|stop|kc [topic]|kp [topic] |list |delete [topic] |describe [topic]}" fi ;; "delete") if [ $2 ] then kafka-topics.sh --delete --bootstrap-server node1:9092,node2:9092,node3:9092 --topic $2 else echo "Usage: kf.sh {start|stop|kc [topic]|kp [topic] |list |delete [topic] |describe [topic]}" fi ;; *) echo "Usage: kf.sh {start|stop|kc [topic]|kp [topic] |list |delete [topic] |describe [topic]}" exit ;; esac
2.4 准备开发环境
2.4.1 创建工程
2.4.2 添加依赖
<properties> <spark.version>3.0.0</spark.version> <scala.version>2.12.11</scala.version> <kafka.version>2.4.1</kafka.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.12</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.11.0</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-to-slf4j</artifactId> <version>2.11.0</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.47</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.3.0</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.8.0</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.8.0</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.10</version> </dependency> </dependencies> <build> <plugins> <!-- 该插件用于将 Scala 代码编译成 class 文件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.4.6</version> <executions> <execution> <!-- 声明绑定到 maven 的 compile 阶段 --> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
指定分区,
2.4.3 添加配置文件
(1)添加config.properties文件
#kafka配置 kafka_bootstrap_servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
(2)添加log4j.properties文件
log4j.appender.atguigu.MyConsole=org.apache.log4j.ConsoleAppender log4j.appender.atguigu.MyConsole.target=System.out log4j.appender.atguigu.MyConsole.layout=org.apache.log4j.PatternL ayout log4j.appender.atguigu.MyConsole.layout.ConversionPattern=%d{yyyy -MM-dd HH:mm:ss} %10p (%c:%M) - %m%n log4j.rootLogger =error,atguigu.MyConsole
2.4.4 添加工具类
(1)properties工具类
package com.atguigu.gmall.realtime.util import java.util.ResourceBundle /** * @author sxr * @create 2022-05-29-23:10 */ object MyPropsUtils { private val bundle: ResourceBundle = ResourceBundle.getBundle("config") def apply(propsKey:String): String ={ bundle.getString(propsKey) } def main(args: Array[String]): Unit = { println(MyPropsUtils.apply("kafka_bootstrap_servers")) println(MyPropsUtils("kafka_bootstrap_servers")) } }
(2)Config配置类
package com.atguigu.gmall.realtime.util /** * @author sxr * @create 2022-05-31-20:59 * * 配置类 * */ object MyConfig { val KAFKA_BOOTSTRAP_SERVERS :String = "kafka_bootstrap_servers" val REDIS_HOST :String = "redis.host" val REDIS_POST:String = "redis.post" }
(3)kafka工具类
package com.atguigu.gmall.realtime.util import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.TopicPartition import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import java.util import scala.collection.mutable /** * @author sxr * @create 2022-05-26-22:42 */ /* * Kafka 工具类,用于生产和消费 */ object MyKafkaUtils { //消费者配置ConsumerConfig private val consumerConfigs: mutable.Map[String, Object] = mutable.Map[String, Object]( //kafka集群位置 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> MyPropsUtils(MyConfig.KAFKA_BOOTSTRAP_SERVERS), //kv反序列化器 deserializer ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer", //groupId //offset的提交 自动 手动 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG ->"true", // ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG //offset的重置 默认:"latest", "earliest" ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest"//latest //.... ) /** * 消费 */ def getKafkaDStream(ssc:StreamingContext,topic:String,groupId:String)={ consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG,groupId) val KafkaDStream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array(topic), consumerConfigs) ) KafkaDStream } //使用指定的offset进行消费 def getKafkaDStream(ssc:StreamingContext,topic:String,groupId:String,offsets:Map[TopicPartition, Long])={ consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG,groupId) val KafkaDStream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array(topic), consumerConfigs,offsets) ) KafkaDStream } /** * 生产 */ val producer : KafkaProducer[String,String]=createProducer() def createProducer(): KafkaProducer[String,String] = { //生产者配置ProducerConfig val producerConfigs = new util.HashMap[String, AnyRef] //kafka集群位置 producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG ,MyPropsUtils(MyConfig.KAFKA_BOOTSTRAP_SERVERS)) //kv序列化器 serializer producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringSerializer") producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringSerializer") //ack producerConfigs.put(ProducerConfig.ACKS_CONFIG,"all") //batch.size //linger.ms //retries //幂等性 producerConfigs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true") val producer = new KafkaProducer[String, String](producerConfigs) producer } //(按照默认的粘性分区策略) def send(topic:String,msg:String): Unit ={ producer.send(new ProducerRecord[String,String](topic,msg)) } //(按照key进行分区) def send(topic:String,key:String,msg:String): Unit ={ producer.send(new ProducerRecord[String,String](topic,key,msg)) } /** * 关闭生产对象 */ def close(): Unit ={ if(producer !=null){ producer.close() } } /** * 刷写,将缓冲区的数据刷写到磁盘 */ def flush(): Unit ={ producer.flush() } }
2.5 日志数据消费分流
2.5.1相关实体bean
(1) 页面日志(PageLog)
case class PageLog( mid :String, user_id:String, province_id:String, channel:String, is_new:String, model:String, operate_system:String, version_code:String, brand:String, page_id:String , last_page_id:String, page_item:String, page_item_type:String, during_time:Long, sourceType:String, ts:Long ){ }
(2)启动日志(StartLog)
case class StartLog( mid :String, user_id:String, province_id:String, channel:String, is_new:String, model:String, operate_system:String, version_code:String, brand:String, entry:String, open_ad_id:String, loading_time_ms:Long, open_ad_ms:Long, open_ad_skip_ms:Long, ts:Long ){ }
(3)页面动作日志(PageActionLog)
case class PageActionLog( mid :String, user_id:String, province_id:String, channel:String, is_new:String, model:String, operate_system:String, version_code:String, brand:String, page_id:String , last_page_id:String, page_item:String, page_item_type:String, during_time:Long, sourceType:String, action_id:String, action_item:String, action_item_type:String, ts:Long ){ }
(4)页面曝光日志(PageDisplayLog)
case class PageDisplayLog( mid :String, user_id:String, province_id:String, channel:String, is_new:String, model:String, operate_system:String, version_code:String, brand:String, page_id:String , last_page_id:String, page_item:String, page_item_type:String, during_time:Long, sourceType:String, display_type:String, display_item: String, display_item_type:String, display_order:String , display_pos_id:String, ts:Long, ){ }
2.5.2 消费分流代码(优化后代码)
spark核数并行度最好和kafka分区的并行度保持一致
SparkConf().setAppName("ods_base_log_app").setMaster("local[4]")
修改kafka默认分区数的地方
package com.atguigu.gmall.realtime.app import com.alibaba.fastjson.serializer.SerializeConfig import com.alibaba.fastjson.{JSON, JSONObject} import com.atguigu.gmall.realtime.bean.{PageActionLog, PageDisplayLog, PageLog, StartLog} import com.atguigu.gmall.realtime.util import com.atguigu.gmall.realtime.util.{MyKafkaUtils, MyOffsetUtils} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange} import org.apache.spark.streaming.{Seconds, StreamingContext} import java.lang /** * @author sxr * @create 2022-05-31-21:25 * * 1.准备实时处理环境StreamingContext * 2.从kafka消费数据 * 3.处理数据 * 3.1转换数据结构 * 专用结构 Bean * 通用结构 Map JsonObject * 3.2分流 :将数据拆分到不同的主题中 * 启动主题: DWD_START_LOG * 页面访问主题: DWD_PAGE_LOG * 页面动作主题:DWD_PAGE_ACTION * 页面曝光主题:DWD_PAGE_DISPLAY * 错误主题:DWD_ERROR_INFO * * 4.写到DWD层 */ object OdsBaseLogApp { def main(args: Array[String]): Unit = { //1.准备实时环境 val conf: SparkConf = new SparkConf().setAppName("ods_base_log_app").setMaster("local[*]") val ssc: StreamingContext = new StreamingContext(conf, Seconds(5)) //2.从kafka消费数据 val topicName: String = "ODS_BASE_LOG_1018" val groupId: String = "ODS_BASE_LOG_GROUP" // TODO 从redis中读取offset,指定offset进行消费 val offsets: Map[TopicPartition, Long] = MyOffsetUtils.readOffset(topicName, groupId) var KafkaDStream: InputDStream[ConsumerRecord[String, String]] = null if (offsets!=null && offsets.nonEmpty){ //指定offset进行消费 KafkaDStream = MyKafkaUtils.getKafkaDStream(ssc, topicName, groupId,offsets) }else{ //默认的offset进行消费 KafkaDStream = MyKafkaUtils.getKafkaDStream(ssc, topicName, groupId) } // TODO 补充:从当前消费到的数据中提取offset,不对流中的数据做任何处理 var offsetRanges: Array[OffsetRange] = null val offsetRangesDStream: DStream[ConsumerRecord[String, String]] = KafkaDStream.transform( rdd => { offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd } ) // 3.处理数据 // 3.1转换数据结构 val jsonObjDStream: DStream[JSONObject] = offsetRangesDStream.map ( ConsumerRecord => { val log: String = ConsumerRecord.value() val jSONObject: JSONObject = JSON.parseObject(log) jSONObject } ) // jsonObjDStream.print(1000) ///3.2分流 :将数据拆分到不同的主题中 val DWD_PAGE_LOG_TOPIC :String = "DWD_PAGE_LOG_TOPIC" //页面访问 val DWD_ERROR_LOG_TOPIC : String = "DWD_ERROR_LOG_TOPIC" //错误数据 val DWD_START_LOG_TOPIC : String = "DWD_START_LOG_TOPIC" //启动数据 val DWD_PAGE_DISPLAY_TOPIC : String = "DWD_PAGE_DISPLAY_TOPIC" //页面曝光 val DWD_PAGE_ACTION_TOPIC :String = "DWD_PAGE_ACTION_TOPIC" //页面事件 //分流规则: // 错误数据:不做任何的拆分,只要包含错误字段,直接整条数据发送到对应的topic // 页面数据:拆分到页面访问,曝光,事件 // 启动数据:发动到对应的topic jsonObjDStream.foreachRDD( rdd => { rdd.foreachPartition( jsonObjIter => { for (JSONObject <- jsonObjIter) { //分流过程 //分流错误数据 val errObj: JSONObject = JSONObject.getJSONObject("err") if (errObj != null) { //将该错误数据发送到DWD_ERROR_LOG_TOPIC MyKafkaUtils.send(DWD_ERROR_LOG_TOPIC, JSONObject.toJSONString) } else { //提取公共字段 val commontObj: JSONObject = JSONObject.getJSONObject("common") val ar: String = commontObj.getString("ar") val uid: String = commontObj.getString("uid") val os: String = commontObj.getString("os") val ch: String = commontObj.getString("ch") val is_new: String = commontObj.getString("is_new") val md: String = commontObj.getString("md") val mid: String = commontObj.getString("mid") val vc: String = commontObj.getString("vc") val ba: String = commontObj.getString("ba") val ts: Long = JSONObject.getLong("ts") //时间戳 //页面数据 val pageObj: JSONObject = JSONObject.getJSONObject("page") if (pageObj != null) { //提取page字段 val page_id: String = pageObj.getString("page_id") val item: String = pageObj.getString("item") val during_time: Long = pageObj.getLong("during_time") val item_type: String = pageObj.getString("item_type") val last_page_id: String = pageObj.getString("last_page_id") val source_type: String = pageObj.getString("source_type") //封装到pageLog val pageLog: PageLog = PageLog(mid, uid, ar, ch, is_new, md, os, vc, ba, page_id, last_page_id, item, item_type, during_time, source_type, ts) //发送到 DWD_PAGE_LOG_TOPIC MyKafkaUtils.send(DWD_PAGE_LOG_TOPIC, JSON.toJSONString(pageLog, new SerializeConfig(true))) //JSON.toJSONString()是java的方法,会在里面找get(), set()方法,这是scala所以加一个SerializeConfig去找字段,不找get,set方法 //提取曝光字段 val displaysJsonArr = JSONObject.getJSONArray("displays") if (displaysJsonArr != null && displaysJsonArr.size() > 0) { for (i <- 0 until displaysJsonArr.size()) { val displayObj: JSONObject = displaysJsonArr.getJSONObject(i) val display_type: String = displayObj.getString("display_type") val item: String = displayObj.getString("item") val item_type: String = displayObj.getString("item_type") val pos_id: String = displayObj.getString("pos_id") val order: String = displayObj.getString("order") //封装到对象 val displayLog: PageDisplayLog = PageDisplayLog( mid, uid, ar, ch, is_new, md, os, vc, ba, page_id, last_page_id, item, item_type, during_time, source_type, display_type, item, item_type, order, pos_id, ts ) //写到 DWD_PAGE_DISPLAY_TOPIC MyKafkaUtils.send( DWD_PAGE_DISPLAY_TOPIC, JSON.toJSONString(displayLog, new SerializeConfig(true)) ) } } //提取事件字段 val actionsJsonArr = JSONObject.getJSONArray("actions") if (actionsJsonArr != null && actionsJsonArr.size() > 0) { for (i <- 0 until actionsJsonArr.size()) { val actionObj: JSONObject = actionsJsonArr.getJSONObject(i) val item: String = actionObj.getString("item") val action_id: String = actionObj.getString("action_id") val item_type: String = actionObj.getString("item_type") val ts: Long = actionObj.getLong("ts") //封装成pageActionLog对象 val pageActionLog: PageActionLog = PageActionLog( mid, uid, ar, ch, is_new, md, os, vc, ba, page_id, last_page_id, item, item_type, during_time, source_type, action_id, item, item_type, ts ) //发送到 DWD_PAGE_ACTION_TOPIC MyKafkaUtils.send( DWD_PAGE_ACTION_TOPIC, JSON.toJSONString(pageActionLog, new SerializeConfig(true)) ) } } } //启动数据 val startObj: JSONObject = JSONObject.getJSONObject("start") if (startObj != null) { val entry: String = startObj.getString("entry") val open_ad_skip_ms: Long = startObj.getLong("open_ad_skip_ms") val open_ad_ms: Long = startObj.getLong("open_ad_ms") val loading_time: Long = startObj.getLong("loading_time") val open_ad_id: String = startObj.getString("open_ad_id") //封装到对象 val startLog: StartLog = StartLog(mid, uid, ar, ch, is_new, md, os, vc, ba, entry, open_ad_id, loading_time, open_ad_ms, open_ad_skip_ms, ts) // 发送到 DWD_START_LOG_TOPIC MyKafkaUtils.send( DWD_START_LOG_TOPIC, JSON.toJSONString(startLog, new SerializeConfig(true)) ) } } } //算子(foreachPartition)里面 : Executor端执行, 每批次每分区执行一次. MyKafkaUtils.flush() } ) // rdd.foreach( // JSONObject=>{ // //C : 算子(foreach)里面 : Executor端执行, 每批次每条数据执行一次. // } // ) //B: foreachRDD里面, 算子(foreach)外面: Driver端执行. 每批次执行一次. MyOffsetUtils.saveOffset(topicName,groupId,offsetRanges) } ) //A: foreachRDD外面: Driver端执行. 程序启动的时候执行一次。 ssc.start() ssc.awaitTermination() } }
2.6 优化-精准一次消费
2.6.1 相关定义
(1)至少一次消费(at least once)
写出数据 ==> 提交offset 若offset提交失败,导致数据重发。主要保证数据不丢失
(2)最多一次消费 (at most once)
提交offset ==> 写出数据 若写出数据失败,导致数据丢失。保证数据不重复
(3) 精确一次消费(Exactly-once)
不出现丢数据和数据重复
2.6.2 消费问题
(1)漏消费 (数据丢失)
(2)重复消费(数据重复)
(3)所以
目前 Kafka 默认每 5 秒钟做一次自动提交偏移量,这样并不能保证精准一次消费。
enable.auto.commit 的默认值是 true;就是默认采用自动提交的机制。 auto.commit.interval.ms 的默认值是 5000,单位是毫秒。
2.6.3 解决问题-方案一 (原子性)
2.6.4 解决问题-方案二(后置提交offset + 幂等性)
(1)策略:手动提交偏移量 + 幂等性处理
解决数据重复和数据丢失,就可以达到精准一次消费。
首先,数据丢失问题。手动后置提交offset 就可以解决 数据丢失问题,但会有数据重复。(至少消费一次)
幂等性,就是解决数据重复。
(2)难点
在实际的开发中手动提交偏移量其实不难,难的是幂等性的保存,有的时候并不一定能保证,这个需要看使用的数据库,如果数据库本身不支持幂等性操作,那只能优先保证的数据不丢失,数据重复难以避免即只保证了至少一次消费的语义。一般有主键的数据库都支持幂等性操作 Upsert。
(3) 使用场景
处理数据较多,或者数据保存在不支持事务的数据库上
2.6.5 手动提交偏移量
(1)偏移量保存在哪
Kafka 0.9版本以后consumer的偏移量是保存在Kafka的__consumer_offsets主题中。
但是如果用这种方式管理偏移量,有一个限制就是在提交偏移量时,数据流的元素结构不能
发生转变,即提交偏移量时数据流,必须是 InputDStream[ConsumerRecord[String, String]]
这种结构。但是在实际计算中,数据难免发生转变,或聚合,或关联,一旦发生转变, 就无
法在利用以下语句进行偏移量的提交:
xxDstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
因为 offset 的存储于 hasOffsetRanges,只有 KafkaRDD 继承了他,所以假如我们对KafkaRDD 进行了转化之后,其它 RDD 没有继承 HasOffsetRanges,所以就无法再获取 offset
了 。
所以实际生产中通常会利用 ZooKeeper,Redis,Mysql 等工具手动对偏移量进行保存 。
(2)流程
2.6.6 代码实现
(1)在 config.properties 中添加 redis 的连接配置
#redis redis.host=hadoop102 redis.post=6379
(2) 添加Redis工具类
package com.atguigu.gmall.realtime.util import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig} /** * @author sxr * @create 2022-06-04-18:42 */ object MyRedisUtils { var jedisPool : JedisPool = null def getJedisClient : Jedis = { if(jedisPool == null ){ var host : String = MyPropsUtils(MyConfig.REDIS_HOST) var port : String = MyPropsUtils(MyConfig.REDIS_POST) val jedisPoolConfig = new JedisPoolConfig() jedisPoolConfig.setMaxTotal(100) //最大连接数 jedisPoolConfig.setMaxIdle(20) //最大空闲 jedisPoolConfig.setMinIdle(20) //最小空闲 jedisPoolConfig.setBlockWhenExhausted(true) //忙碌时是否等待 jedisPoolConfig.setMaxWaitMillis(5000) //忙碌时等待时长 毫秒 jedisPoolConfig.setTestOnBorrow(true) //每次获得连接的进行测试 jedisPool = new JedisPool(jedisPoolConfig,host,port.toInt) } jedisPool.getResource } }
(3) 偏移量工具类
package com.atguigu.gmall.realtime.util import org.apache.kafka.common.TopicPartition import org.apache.spark.streaming.kafka010.OffsetRange import redis.clients.jedis.Jedis import java.util import scala.collection.mutable /** * @author sxr * @create 2022-06-04-19:41 * * Offset管理工具类 用于redis往中存储和读取offset * * 管理方案: * 1. 后置提交偏移量 -> 手动控制偏移量提交 * 2. 手动控制偏移量提交 -> SparkStreaming提供了手动提交方案,但是我们不能用,因为我们会对DStream的结构进行转换. * 3. 手动的提取偏移量维护到redis中 * -> 从kafka中消费到数据,先提取偏移量 * -> 等数据成功写出后,将偏移量存储到redis中 * -> 从kafka中消费数据之前,先到redis中读取偏移量, 使用读取到的偏移量到kafka中消费数据 * * 4. 手动的将偏移量存储到redis中,每次消费数据需要使用存储的offset进行消费,每次消费数据后,要将本次消费的offset存储到redis中。 * */ object MyOffsetUtils { /** * 往Redis中存储offset * 问题:存的offset从哪里来? * 从消费的数据中提取出来的,传入到该方法中 * offsetRanges: Array[OffsetRange] * offset的结构是什么? * Kafka中offset维护的结构 * groupID + topic + partition => offset * 从传入的offset中提取信息 * 在redis中怎么存? * 类型 : hash * key : groupID + topic * value : partition - offset * 写入API : hset/hmset * 读取API : hgetall * 是否过期 : 不过期 */ def saveOffset(topic : String, groupId : String, offsetRanges : Array[OffsetRange]): Unit ={ if(offsetRanges!=null && offsetRanges.length>0){ val offsets: util.HashMap[String, String] = new util.HashMap[String, String]() for (offsetRange <- offsetRanges) { val partition: Int = offsetRange.partition val offset: Long = offsetRange.untilOffset offsets.put(partition.toString,offset.toString) } // 往redis中存 val jedis: Jedis = MyRedisUtils.getJedisClient val redisKey:String = s"offsets:$topic:$groupId" jedis.hset(redisKey,offsets) jedis.close() println("提交offset:"+offsets) } } /** * 从Redis中读取存储的offset * * 问题: * 如何让SparkStreaming通过指定的offset进行消费? * * SparkStreaming要求的offset格式是什么? * Map[TopicPartition,Long] */ def readOffset(topic:String,groupId :String):Map[TopicPartition,Long]={ val jedis: Jedis = MyRedisUtils.getJedisClient val redisKey:String = s"offsets:$topic:$groupId" val offsets: util.Map[String, String] = jedis.hgetAll(redisKey) println("读取到offset:"+offsets) val results: mutable.Map[TopicPartition, Long] = mutable.Map[TopicPartition, Long]() if (offsets!=null && offsets.size() > 0) { //将java的map转化为Scala的map进行迭代 import scala.collection.JavaConverters._ for ((partition, offset) <- offsets.asScala) { val topicPartition: TopicPartition = new TopicPartition(topic, partition.toInt) results.put(topicPartition, offset.toLong) } } jedis.close() results.toMap } }
2.6.7 幂等性操作
目前处理完的数据写到了 kafka,如果程序出现宕机重试, kafka 是没有办法通过唯一性标识实现幂等性识别,但是也没有关系,因为 kafka 中的数据只是用于中间存储,并不会进行统计,所以只要保证不丢失即可,重复数据的幂等性处理可以交给下游处理,只要保证最终统计结果是不会有重复即可 。
2.7 优化-Kafka消息发送问题
2.7.1 缓冲区问题
Kafka 消息的发送分为同步发送和异步发送。 Kafka 默认使用异步发送的方式。 Kafka的生产者将消息进行发送时,会先将消息发送到缓冲区中,待缓冲区写满或者到达指定的时间,才会真正的将缓冲区的数据写Broker。
假设消息发送到缓冲区中还未写到 Broker,我们认为数据已经成功写给了 Kafka,接下来会手动的提offset, 如果 offset 提交成功,但此刻 Kafka 集群突然出现故障。 缓冲区的数据会丢失,最终导致的问题就是数据没有成功写到 Kafka ,而 offset 已经提交,此部分的数据就会被漏掉。
2.7.2 问题解决-方法一
将消息的发送修改为同步发送,保证每条数据都能发送到 Broker。 但带来的问题就是消息是一条一条写给 Broker,会牺牲性能,一般不推荐。
2.7.3 问题解决-方法二
(1) 策略: 在手动提交 offset 之前,强制将缓冲区的数据 flush 到 broker 。
Kafka 的生产者对象提供了 flush 方法, 可以强制将缓冲区的数据刷到 Broker。
(2)修补代码
//算子(foreachPartition)里面 : Executor端执行, 每批次每分区执行一次. MyKafkaUtils.flush()
三、 业务数据采集和分流
3.1 整体架构
业务数据库的采集主要是基于对数据库的变化的实时监控。目前市面上的开源产品主要是 Canal 和 Maxwell。要利用这些工具实时采集数据到 kafka,以备后续处理。
Maxwell 采集的日志数据,默认是放在一个统一的 kafka 的 Topic 中,为了后续方便处理要进行以表为单位拆分到不同 kafka 的 Topic 中。
针对维度数据,要单独保存。通常考虑用 redis、 hbase、 mysql、 kudu 等通过唯一键查询性能较快的数据库中。
3.2 MaxWell
3.2.1 简介
Maxwell 是由美国 Zendesk 公司开源,用 Java 编写的 MySQL 变更数据抓取软件。它会实时监控 Mysql 数据库的数据变更操作(包括 insert、 update、 delete),并将变更数据以 JSON格式发送给 Kafka、 Kinesi 等流数据处理平台。
官网地址: http://maxwells-daemon.io/
3.2.2 MySQL主从复制
1) 主从复制的应用场景
(1) 做数据库的热备:主数据库服务器故障后,可切换到从数据库继续工作 。
(2) 分离:主数据库只负责业务数据的写入操作,而多个从数据库只负责业务数据的查询工作,在读多写少场景下,可以提高数据库工作效率 。
2) MySQL 主从复制工作原理
(1) Master 主库将数据变更记录,写到二进制日志(binary log)中。
(2) Slave 从库向 mysql master 发送 dump 协议,将 master 主库的 binary log events 拷贝到它的中继日志(relay log)。
(3) Slave 从库读取并回放中继日志中的事件,将改变的数据同步到自己的数据库 。
3) binlog
(1) 什么是 binlog
MySQL 的二进制日志可以说 MySQL 最重要的日志了,它记录了所有的 DDL 和 DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间, MySQL 的二进制日志是事务安全型的。
一般来说开启二进制日志大概会有 1%的性能损耗。
二进制日志包括两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的 DDL 和 DML(除了数据查询语句)语句事件 。
(2) binlog 的分类设置
binlog 格式: statement | row | mixed
statement : 语句级, 只记录写操作的sql语句. (可能造成数据不一致)
row : 行级,记录每行数据的变化 (综合对比 ,Maxwell想做监控分析,选择 row 格式比较合适。 )
mixed : statement升级版,正常情况下都是statement,当sql中使用了随机、时间、自定函数等时会变成row
3.2.3 MaxWell 工作原理
maxwell就是将自己伪装成 slave,并遵循 MySQL 主从复制的协议,从 master 同步数据。
3.2.4 Maxwell 安装
3.3 采集业务数据
3.3.1 MySQL 部分
1) 安装 MySQL(略)
2) 创建业务数据库(直接使用离线数仓的即可)
3) 导入数据表(直接使用离线数仓的即可)
4) 修改/etc/my.cnf 文件,开启 binlog
[sxr@hadoop102 maxwell-1.29.2]$ sudo vim /etc/my.cnf server-id=1 log-bin=mysql-bin binlog_format=row binlog-do-db=gmall
注意: binlog-do-db 根据自己的情况进行修改,指定具体要同步的数据库。
5) 重启 MySQL 使配置生效
sudo systemctl restart mysqld
到/var/lib/mysql 目录下查看是否生成 binlog 文件
-rw-r-----. 1 mysql mysql 250740 7月 12 18:02 mysql-bin.000001 -rw-r-----. 1 mysql mysql 133 7月 21 12:59 mysql-bin.index
3.3.2 模拟数据
1) 上传 jar 和 properties 文件上传到/opt/module/db_log 目录下
2) 修改 application.properties 中数据库连接信息
3) 模拟生成数据
java -jar gmall-mock-db-2020-03-16-SNAPSHOT.jar
4)再次到/var/lib/mysql 目录下,查看 binlog 的改动
3.3.3 赋权限
1) 给 maxwell 创建用户并赋权限
mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell'; mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
3.3.4 Maxwell 部分
1) 创建数据库,用于存储 Maxwell 运行过程中的一些数据,包括 binlog 同步的断点位置等
mysql> create database maxwell; mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
2) 修改 Maxwell 配置文件名字
3) 修改 Maxwell 配置文件内容
#Maxwell 数 据 发 送 目 的 地 , 可 选 配 置 有 #stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis producer=kafka #目标 Kafka 集群地址 kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092 kafka_topic=ODS_BASE_DB_M # mysql login info host=hadoop102 user=maxwell password=maxwell
4) 启动 Maxwell
bin/maxwell --config config.properties --daemon
3.4 业务数据消费分流
3.4.1 Maxwell 数据格式
3.4.2 消费分流
Maxwell 会追踪整个数据库的变更,把所有的数据变化都发到一个 topic 中了,但是为了后续处理方便,应该将事实表的数据分流到不同的 topic 中,将维度表的数据写入到 redis中
1) 分流业务代码
package com.atguigu.gmall.realtime.app import com.alibaba.fastjson.{JSON, JSONObject} import com.atguigu.gmall.realtime.util.{MyKafkaUtils, MyOffsetUtils, MyRedisUtils} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkConf import org.apache.spark.broadcast.Broadcast import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange} import org.apache.spark.streaming.{Seconds, StreamingContext} import redis.clients.jedis.Jedis import java.util /** * @author sxr * @create 2022-07-13-10:08 * * 业务数据消费分流 * * 1. 准备实时环境 * 2. 从redis读取偏移量 * 3. 从Kafka消费数据 * 4. 提取偏移量结束点 * 5. 数据处理 * 5.1 转换数据结构 * 5.2 分流 * 事实数据 => kafka * 维度数据 => redis * 6. flush kafka的缓冲区 * 7. 提交offset */ object OdsBaseDbApp { def main(args: Array[String]): Unit = { //1.准备实时环境 val conf: SparkConf = new SparkConf().setAppName("ods_base_db_app").setMaster("local[*]") val ssc: StreamingContext = new StreamingContext(conf, Seconds(5)) val topicName: String = "ODS_BASE_DB_M" val groupId: String = "ODS_BASE_DB_GROUP" //2. 从redis读取偏移量 val offsets: Map[TopicPartition, Long] = MyOffsetUtils.readOffset(topicName, groupId) //3. 从Kafka消费数据 var kafkaDStream: InputDStream[ConsumerRecord[String, String]] = null if(offsets!=null && offsets.nonEmpty){ kafkaDStream = MyKafkaUtils.getKafkaDStream(ssc, topicName, groupId, offsets) }else{ kafkaDStream = MyKafkaUtils.getKafkaDStream(ssc, topicName, groupId) } //4. 提取偏移量结束点 var offsetRanges: Array[OffsetRange] = null val offsetRangesDStream: DStream[ConsumerRecord[String, String]] = kafkaDStream.transform( rdd => { offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd } ) //5. 数据处理 //5.1 转换数据结构 val JsonObjDStream: DStream[JSONObject] = offsetRangesDStream.map( consumerRecord => { val jsonData: String = consumerRecord.value() val jSONObject: JSONObject = JSON.parseObject(jsonData) jSONObject } ) // JsonObjDStream.print(100) //5.2 分流 // //事实表清单 // val factTables :Array[String] = Array[String]("order_info","order_detail") // //维度表清单 // val dimTables :Array[String] = Array[String]("user_info","base_province") JsonObjDStream.foreachRDD( rdd =>{ // Driver //TODO 如何动态维护表清单? // 将表清单维护到redis中,代码中只要保证能够周期性的读取redis中维护的表清单. // Redis中如何存储: // type: set // key : FACT:TABLES DIM:TABLES // value: 表名的集合 // 写入API: sadd // 读取API: smembers // 是否过期: 不过期 val redisFactKeys :String = "FACT:TABLES" val redisDimKeys : String = "DIM:TABLES" val jedisClient : Jedis = MyRedisUtils.getJedisClient //事实表清单 val factTables: util.Set[String] = jedisClient.smembers(redisFactKeys) val factTablesBC: Broadcast[util.Set[String]] = ssc.sparkContext.broadcast(factTables) println("factTables: " + factTables) //维度表清单 val dimTables = jedisClient.smembers(redisDimKeys) val dimTablesBC: Broadcast[util.Set[String]] = ssc.sparkContext.broadcast(dimTables) println("dimTables: " + dimTables) jedisClient.close() rdd.foreachPartition( jsonObjIter =>{ val jedisClient: Jedis = MyRedisUtils.getJedisClient for (jsonObj <- jsonObjIter) { //提取操作类型 val operType: String = jsonObj.getString("type") val opValue: String = operType match { case "bootstrap-insert" => "I" case "insert" => "I" case "update" => "U" case "delete" => "D" case _ => null } //判断操作类型:1.明确什么操作 2.过滤不感兴趣的操作 if (opValue != null){ // 提取表名 val tableName: String = jsonObj.getString("table") // 提取数据 val dataObj: JSONObject = jsonObj.getJSONObject("data") if(factTablesBC.value.contains(tableName)){ println(dataObj.toJSONString) //事实数据 val dwdTopicName:String = s"DWD_${tableName.toUpperCase}_${opValue}" MyKafkaUtils.send(dwdTopicName,dataObj.toJSONString) } if(dimTablesBC.value.contains(tableName)){ //维度数据 /* * 类型 : string * key : DIM:表名:ID * value : 整条数据的jsonString * 写入API : set * 读取API : get * 是否过期 : 不过期 */ val id: String = dataObj.getString("id") val redisKey :String = s"DIM:${tableName.toUpperCase()}:$id" // TODO 此处获取Redis连接好不好? //不好, 每条数据都要获取一次连接, 用完直接还回去. //我们希望获取一次连接,反复使用,最终还回去. // val jedisClient: Jedis = MyRedisUtils.getJedisClient jedisClient.set(redisKey,dataObj.toJSONString) } } } //关闭redis连接 jedisClient.close() //刷新kafka缓冲区 MyKafkaUtils.flush() } ) //提交offset MyOffsetUtils.saveOffset(topicName,groupId,offsetRanges) } ) ssc.start() ssc.awaitTermination() } }
3.4.3 历史维度数据初始引导
1) Maxwell 提供了 bootstrap 功能来进行历史数据的全量同步,命令如下:
bin/maxwell-bootstrap --config config.properties --database gmall --table user_info
2) Bootstrap 数据格式
{ "database": "fooDB", "table": "barTable", "type": "bootstrap-start", "ts": 1450557744, "data": {} } { "database": "fooDB", "table": "barTable", "type": "bootstrap-insert", "ts": 1450557744, "data": { "txt": "hello" } } { "database": "fooDB", "table": "barTable", "type": "bootstrap-insert", "ts": 1450557744, "data": { "txt": "bootstrap!" } } { "database": "fooDB", "table": "barTable", "type": "bootstrap-complete", "ts": 1450557744, "data": {} }
第一条 type 为 bootstrap-start 和最后一条 type 为 bootstrap-complete 的数据,是 bootstrap开始和结束的标志,不包含数据,中间的 type 为 bootstrap-insert 的数据才包含数据。
一次 bootstrap 输出的所有记录的 ts 都相同,为 bootstrap 开始的时间
3) 修改分流代码 --将维度数据初始引导的代码进行分流
case "bootstrap-insert" => "I"
3.5 数据处理顺序性
在实时计算中, 对业务数据的计算, 要考虑到数据处理的顺序, 即能否依照数据改变的顺序进行处理。
假设一个场景,如果将某个用户数据的姓名字段进行多次更改,由原先的 A 改为 B 再改为 C, 在数据库层面最终的结果为 C, 但是我们能否保证数据经过实时处理后,在 DIM层存储的结果也为 C,可不可能存储的结果为 B。
我们依次审视一下,在实时处理的各个环节中,是否能保证数据的顺序?如果不能保证,是在哪个环节出的问题,最终导致存储的结果不正确。
3.5.2 解决
通过分析,目前我们的计算过程中,只有可能在 Kafka 环节出现数据乱序,导致最终存储的结果不正确。如果想要保证数据处理的顺序性,我们可以将同一条数据的修改发往 topic的同一个分区中。需要修改 maxwell 的配置文件,指定发送数据到 kafka 时要使用分区键。
1) 修改 config.properties 文件中的如下配置
producer_partition_by=column producer_partition_columns=id producer_partition_by_fallback=table
第二章 分层处理
一、 DWD到DWS层数据处理
二、 日活宽表
三、 订单业务宽表
第三章 Elasticsearch
第四章 可视化
第五章 总结
一、Kafka常用命令
# 消费数据 bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic # # 生成数据 bin/kafka-console-producer.sh --broker-list hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic # # 显示topic列表 bin/kafka-topics.sh --list --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181 # 删除topic bin/kafka-topics.sh --delete --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181 --topic # # 描述topic bin/kafka-topics.sh --describe --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181 --to