Spark实时(数据采集)项目

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: Spark实时(数据采集)项目

学习项目前提

熟悉Scala基础
熟悉Spark基础
熟悉Kafka基础
熟悉Redis基础
熟悉IDEA、Git、Maven基础操作



第一章 数据采集

一、 简介

1.1 离线计算

离线计算一般指通过批处理的方式计算已知的所有输入数据,输入数据不会产生变化,一般计算量级较大, 计算时间较长。

1.1.1 离线计算特点

(1)数据确定,不会发生变化

(2)数据量大,保存时间长

(3)大量数据进行的复杂批量计算

(4)方便查看计算后的结果

1.2 实时计算

实时计算一般是指通过流处理方式计算当日的数据都算是实时计算。

也会有一些准实时计算,利用离线框架通过批处理完成(小时、 10 分钟级)的计算,一般为过渡产品,不能算是实时计算。

1.2.1 实时计算特点

(1)局部计算

每次计算以输入的每条数据,或者微批次、小窗口的数据范围进行计算,没法像离线数据一样能够基于当日全部数据进行统计排序分组

(2)开发成本高

离线的批处理 SQL,实时计算需要通过代码,往往需要对接多种数据容器完成,相对开发较为复杂。

(3)资源成本高

(4)时效性

(5)可视化性

1.3 数仓架构设计

1.3.1 离线架构

1.3.2 实时架构

1.4 项目需求

1.4.1 BI分析
1.4.2 数据接口应用

二、 日志数据采集和分流

2.1 整体架构

2.2 采集日志数据

1.采集到数据---数据生成器

(1) 上传模拟日志数据生成器到/export/server/applog 目录

(2) 根据业务修改application.yml配置

#业务日期
mock.date: 2023-02-05
#模拟数据发送模式
mock.type: "kafka"
#kafka模式下,发送的地址
mock:
  kafka-server: "node1:9092,node2:9092,node3:9092"
  kafka-topic: "ODS_BASE_LOG_1018"

(3) 生成日志数据

java -jar gmall2020-mock-log-2021-11-29.jar

image.png

(4) 消费数据测试

bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic ODS_BASE_LOG


image.png


2.3 辅助脚本

(1) 模拟生成数据脚本 lg.sh  cd/bin/lg.sh

       chmod u+x lg.sh

#!/bin/bash
if [ $# -ge 1 ]
then
        sed -i "/mock.date/c mock.date: $1" /opt/module/applog/application.yml
fi
cd /opt/module/applog;
java -jar gmall2020-mock-log-2021-11-29.jar >/dev/null 2>&1 &

2.编写时间脚本动态修改日期

image.png

image.png

标准输出放到黑洞里面,错误输出跟随标准输出,这样没有消息输出

image.png

image.png

但是他也有个阻塞,把窗口阻塞住,直到生成完才结束

所以用个&推到后台

image.png

image.png

(2)Kafka脚本

#!/bin/bash
if [ $# -lt 1 ]
then
   echo "Usage: kf.sh {start|stop|kc [topic]|kp [topic] |list |delete [topic] |describe [topic]}"
   exit
fi
 case $1 in
 "start")
#!/bin/bash
if [ $# -lt 1 ]
then
   echo "Usage: kf.sh {start|stop|kc [topic]|kp [topic] |list |delete [topic] |describe [topic]}"
   exit
fi
 case $1 in
 "start")
 ;;
;;
"kp")
        if [ $2 ]
        then
          kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic $2
        else
         echo "Usage: kf.sh {start|stop|kc [topic]|kp [topic] |list |delete [topic] |describe [topic]}"
#!/bin/bash
   echo "Usage: kf.sh {start|stop|kc [topic]|kp [topic] |list |delete [topic] |describe [topic]}"
   exit
fi
      for i in node1 node2 node3
      do
          echo "---  启动 $i kafka  ---"
          ssh $i "/export/server/kafka/bin/kafka-server-start.sh -daemon /export/server/kafka/config/server.properties"
      done

 ;;
 "stop")
     for i in node1 node2 node3
      do
          echo "--- 停止 $i kafka  ---"
          ssh $i "/export/server/kafka/bin/kafka-server-stop.sh"
      done
if [ $2 ]
        then
           kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic $2
        else
          echo "Usage: kf.sh {start|stop|kc [topic]|kp [topic] |list |delete [topic] |describe [topic]}"
        fi
        if [ $2 ]
        else
          echo "Usage: kf.sh {start|stop|kc [topic]|kp [topic] |list |delete [topic] |describe [topic]}"
        fi
;;
"list")
        kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092
;;
"describe")
        if [ $2 ]
        then
         kafka-topics.sh --describe --bootstrap-server node1:9092,node2:9092,node3:9092 --topic $2
        else
         echo "Usage: kf.sh {start|stop|kc [topic]|kp [topic] |list |delete [topic] |describe [topic]}"
        fi
;;

"delete")
        if [ $2 ]
        then
         kafka-topics.sh --delete --bootstrap-server node1:9092,node2:9092,node3:9092 --topic $2
        else
         echo "Usage: kf.sh {start|stop|kc [topic]|kp [topic] |list |delete [topic] |describe [topic]}"
        fi
;;
*)
   echo "Usage: kf.sh {start|stop|kc [topic]|kp [topic] |list |delete [topic] |describe [topic]}"
   exit
;;
 esac

2.4 准备开发环境

2.4.1 创建工程
2.4.2 添加依赖
<properties>
        <spark.version>3.0.0</spark.version>
        <scala.version>2.12.11</scala.version>
        <kafka.version>2.4.1</kafka.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.12</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>${kafka.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-core</artifactId>
        <version>2.11.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-to-slf4j</artifactId>
        <version>2.11.0</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.47</version>
    </dependency>
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>3.3.0</version>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>7.8.0</version>
    </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.10</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- 该插件用于将 Scala 代码编译成 class 文件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <!-- 声明绑定到 maven 的 compile 阶段 -->
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

指定分区,

2.4.3 添加配置文件

(1)添加config.properties文件

#kafka配置
kafka_bootstrap_servers=hadoop102:9092,hadoop103:9092,hadoop104:9092

(2)添加log4j.properties文件

log4j.appender.atguigu.MyConsole=org.apache.log4j.ConsoleAppender
log4j.appender.atguigu.MyConsole.target=System.out
log4j.appender.atguigu.MyConsole.layout=org.apache.log4j.PatternL
ayout
log4j.appender.atguigu.MyConsole.layout.ConversionPattern=%d{yyyy
-MM-dd HH:mm:ss} %10p (%c:%M) - %m%n
log4j.rootLogger =error,atguigu.MyConsole
2.4.4 添加工具类

(1)properties工具类

package com.atguigu.gmall.realtime.util
import java.util.ResourceBundle
/**
 * @author sxr
 * @create 2022-05-29-23:10
 */
object MyPropsUtils {
  private val bundle: ResourceBundle = ResourceBundle.getBundle("config")
  def apply(propsKey:String): String ={
    bundle.getString(propsKey)
  }
  def main(args: Array[String]): Unit = {
    println(MyPropsUtils.apply("kafka_bootstrap_servers"))
    println(MyPropsUtils("kafka_bootstrap_servers"))
  }
}

(2)Config配置类

package com.atguigu.gmall.realtime.util
/**
 * @author sxr
 * @create 2022-05-31-20:59
 *
 * 配置类
 *
 */
object MyConfig {
  val KAFKA_BOOTSTRAP_SERVERS :String = "kafka_bootstrap_servers"
  val REDIS_HOST :String = "redis.host"
  val REDIS_POST:String = "redis.post"
}

(3)kafka工具类

package com.atguigu.gmall.realtime.util
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import java.util
import scala.collection.mutable
/**
 * @author sxr
 * @create 2022-05-26-22:42
 */
/*
 * Kafka 工具类,用于生产和消费
 */
object MyKafkaUtils {
//消费者配置ConsumerConfig
  private val consumerConfigs: mutable.Map[String, Object] = mutable.Map[String, Object](
    //kafka集群位置
    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> MyPropsUtils(MyConfig.KAFKA_BOOTSTRAP_SERVERS),
    //kv反序列化器 deserializer
    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
    //groupId
    //offset的提交 自动 手动
    ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG ->"true",
//    ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG
    //offset的重置 默认:"latest", "earliest"
    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest"//latest
    //....
  )
  /**
   * 消费
   */
  def getKafkaDStream(ssc:StreamingContext,topic:String,groupId:String)={
    consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG,groupId)
    val KafkaDStream = KafkaUtils.createDirectStream(ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Array(topic), consumerConfigs)
    )
    KafkaDStream
  }
  //使用指定的offset进行消费
  def getKafkaDStream(ssc:StreamingContext,topic:String,groupId:String,offsets:Map[TopicPartition, Long])={
    consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG,groupId)
    val KafkaDStream = KafkaUtils.createDirectStream(ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Array(topic), consumerConfigs,offsets)
    )
    KafkaDStream
  }
  /**
   * 生产
   */
  val producer : KafkaProducer[String,String]=createProducer()
  def createProducer(): KafkaProducer[String,String] = {
    //生产者配置ProducerConfig
    val producerConfigs = new util.HashMap[String, AnyRef]
    //kafka集群位置
    producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG ,MyPropsUtils(MyConfig.KAFKA_BOOTSTRAP_SERVERS))
    //kv序列化器 serializer
    producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringSerializer")
    producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringSerializer")
    //ack
    producerConfigs.put(ProducerConfig.ACKS_CONFIG,"all")
    //batch.size
    //linger.ms
    //retries
    //幂等性
    producerConfigs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true")
    val producer = new KafkaProducer[String, String](producerConfigs)
    producer
  }
  //(按照默认的粘性分区策略)
  def send(topic:String,msg:String): Unit ={
    producer.send(new ProducerRecord[String,String](topic,msg))
  }
  //(按照key进行分区)
  def send(topic:String,key:String,msg:String): Unit ={
    producer.send(new ProducerRecord[String,String](topic,key,msg))
  }
  /**
   * 关闭生产对象
   */
  def close(): Unit ={
    if(producer !=null){
      producer.close()
    }
  }
  /**
   * 刷写,将缓冲区的数据刷写到磁盘
   */
  def flush(): Unit ={
    producer.flush()
  }
}

2.5 日志数据消费分流

2.5.1相关实体bean

(1) 页面日志(PageLog)

case class PageLog(
      mid :String,
      user_id:String,
      province_id:String,
      channel:String,
      is_new:String,
      model:String,
      operate_system:String,
      version_code:String,
      brand:String,
      page_id:String ,
      last_page_id:String,
      page_item:String,
      page_item_type:String,
      during_time:Long,
      sourceType:String,
      ts:Long
                  ){
}

(2)启动日志(StartLog)

case class StartLog(
                     mid :String,
                     user_id:String,
                     province_id:String,
                     channel:String,
                     is_new:String,
                     model:String,
                     operate_system:String,
                     version_code:String,
                     brand:String,
                     entry:String,
                     open_ad_id:String,
                     loading_time_ms:Long,
                     open_ad_ms:Long,
                     open_ad_skip_ms:Long,
                     ts:Long
                   ){
}

(3)页面动作日志(PageActionLog)

case class PageActionLog(
                          mid :String,
                          user_id:String,
                          province_id:String,
                          channel:String,
                          is_new:String,
                          model:String,
                          operate_system:String,
                          version_code:String,
                          brand:String,
                          page_id:String ,
                          last_page_id:String,
                          page_item:String,
                          page_item_type:String,
                          during_time:Long,
                          sourceType:String,
                          action_id:String,
                          action_item:String,
                          action_item_type:String,
                          ts:Long
                        ){
}

(4)页面曝光日志(PageDisplayLog)

case class PageDisplayLog(
                           mid :String,
                           user_id:String,
                           province_id:String,
                           channel:String,
                           is_new:String,
                           model:String,
                           operate_system:String,
                           version_code:String,
                           brand:String,
                           page_id:String ,
                           last_page_id:String,
                           page_item:String,
                           page_item_type:String,
                           during_time:Long,
                           sourceType:String,
                           display_type:String,
                           display_item: String,
                           display_item_type:String,
                           display_order:String ,
                           display_pos_id:String,
                           ts:Long,
                         ){
}
2.5.2 消费分流代码(优化后代码)

spark核数并行度最好和kafka分区的并行度保持一致

SparkConf().setAppName("ods_base_log_app").setMaster("local[4]")

修改kafka默认分区数的地方

image.png

image.png

image.png


package com.atguigu.gmall.realtime.app
import com.alibaba.fastjson.serializer.SerializeConfig
import com.alibaba.fastjson.{JSON, JSONObject}
import com.atguigu.gmall.realtime.bean.{PageActionLog, PageDisplayLog, PageLog, StartLog}
import com.atguigu.gmall.realtime.util
import com.atguigu.gmall.realtime.util.{MyKafkaUtils, MyOffsetUtils}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import java.lang
/**
 * @author sxr
 * @create 2022-05-31-21:25
 *
 * 1.准备实时处理环境StreamingContext
 
 * 2.从kafka消费数据
 
 * 3.处理数据
 *    3.1转换数据结构
 *        专用结构 Bean
 *        通用结构 Map JsonObject
 
 *    3.2分流 :将数据拆分到不同的主题中
 *    启动主题: DWD_START_LOG
 *    页面访问主题: DWD_PAGE_LOG
 *    页面动作主题:DWD_PAGE_ACTION
 *    页面曝光主题:DWD_PAGE_DISPLAY
 *    错误主题:DWD_ERROR_INFO
 *
 * 4.写到DWD层
 */
object OdsBaseLogApp {
  def main(args: Array[String]): Unit = {
    //1.准备实时环境
    val conf: SparkConf = new SparkConf().setAppName("ods_base_log_app").setMaster("local[*]")
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(5))
    //2.从kafka消费数据
    val topicName: String = "ODS_BASE_LOG_1018"
    val groupId: String = "ODS_BASE_LOG_GROUP"
    // TODO   从redis中读取offset,指定offset进行消费
    val offsets: Map[TopicPartition, Long] = MyOffsetUtils.readOffset(topicName, groupId)
    var KafkaDStream: InputDStream[ConsumerRecord[String, String]] = null
    if (offsets!=null && offsets.nonEmpty){
      //指定offset进行消费
       KafkaDStream = MyKafkaUtils.getKafkaDStream(ssc, topicName, groupId,offsets)
    }else{
      //默认的offset进行消费
      KafkaDStream = MyKafkaUtils.getKafkaDStream(ssc, topicName, groupId)
    }
    // TODO 补充:从当前消费到的数据中提取offset,不对流中的数据做任何处理
    var offsetRanges: Array[OffsetRange] = null
    val offsetRangesDStream: DStream[ConsumerRecord[String, String]] = KafkaDStream.transform(
      rdd => {
        offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd
      }
    )
//    3.处理数据
//       3.1转换数据结构
val jsonObjDStream: DStream[JSONObject] = offsetRangesDStream.map (
  ConsumerRecord => {
    val log: String = ConsumerRecord.value()
    val jSONObject: JSONObject = JSON.parseObject(log)
    jSONObject
  }
)
//   jsonObjDStream.print(1000)
    ///3.2分流 :将数据拆分到不同的主题中
    val DWD_PAGE_LOG_TOPIC :String = "DWD_PAGE_LOG_TOPIC"          //页面访问
    val DWD_ERROR_LOG_TOPIC : String = "DWD_ERROR_LOG_TOPIC"        //错误数据
    val DWD_START_LOG_TOPIC : String = "DWD_START_LOG_TOPIC"        //启动数据
    val DWD_PAGE_DISPLAY_TOPIC : String = "DWD_PAGE_DISPLAY_TOPIC" //页面曝光
    val DWD_PAGE_ACTION_TOPIC :String = "DWD_PAGE_ACTION_TOPIC"     //页面事件
    //分流规则:
    //  错误数据:不做任何的拆分,只要包含错误字段,直接整条数据发送到对应的topic
    //  页面数据:拆分到页面访问,曝光,事件
    //  启动数据:发动到对应的topic
    jsonObjDStream.foreachRDD(
      rdd => {
        rdd.foreachPartition(
          jsonObjIter => {
            for (JSONObject <- jsonObjIter) {
              //分流过程
              //分流错误数据
              val errObj: JSONObject = JSONObject.getJSONObject("err")
              if (errObj != null) {
                //将该错误数据发送到DWD_ERROR_LOG_TOPIC
                MyKafkaUtils.send(DWD_ERROR_LOG_TOPIC, JSONObject.toJSONString)
              } else {
                //提取公共字段
                val commontObj: JSONObject = JSONObject.getJSONObject("common")
                val ar: String = commontObj.getString("ar")
                val uid: String = commontObj.getString("uid")
                val os: String = commontObj.getString("os")
                val ch: String = commontObj.getString("ch")
                val is_new: String = commontObj.getString("is_new")
                val md: String = commontObj.getString("md")
                val mid: String = commontObj.getString("mid")
                val vc: String = commontObj.getString("vc")
                val ba: String = commontObj.getString("ba")
                val ts: Long = JSONObject.getLong("ts") //时间戳
                //页面数据
                val pageObj: JSONObject = JSONObject.getJSONObject("page")
                if (pageObj != null) {
                  //提取page字段
                  val page_id: String = pageObj.getString("page_id")
                  val item: String = pageObj.getString("item")
                  val during_time: Long = pageObj.getLong("during_time")
                  val item_type: String = pageObj.getString("item_type")
                  val last_page_id: String = pageObj.getString("last_page_id")
                  val source_type: String = pageObj.getString("source_type")
                  //封装到pageLog
                  val pageLog: PageLog = PageLog(mid, uid, ar, ch, is_new, md, os, vc, ba, page_id, last_page_id, item, item_type, during_time, source_type, ts)
                  //发送到 DWD_PAGE_LOG_TOPIC
                  MyKafkaUtils.send(DWD_PAGE_LOG_TOPIC, JSON.toJSONString(pageLog, new SerializeConfig(true)))
                  //JSON.toJSONString()是java的方法,会在里面找get(), set()方法,这是scala所以加一个SerializeConfig去找字段,不找get,set方法
                  //提取曝光字段
                  val displaysJsonArr = JSONObject.getJSONArray("displays")
                  if (displaysJsonArr != null && displaysJsonArr.size() > 0) {
                    for (i <- 0 until displaysJsonArr.size()) {
                      val displayObj: JSONObject = displaysJsonArr.getJSONObject(i)
                      val display_type: String = displayObj.getString("display_type")
                      val item: String = displayObj.getString("item")
                      val item_type: String = displayObj.getString("item_type")
                      val pos_id: String = displayObj.getString("pos_id")
                      val order: String = displayObj.getString("order")
                      //封装到对象
                      val displayLog: PageDisplayLog = PageDisplayLog(
                        mid, uid, ar, ch, is_new, md, os, vc, ba, page_id, last_page_id,
                        item, item_type, during_time, source_type, display_type, item, item_type, order, pos_id, ts
                      )
                      //写到 DWD_PAGE_DISPLAY_TOPIC
                      MyKafkaUtils.send(
                        DWD_PAGE_DISPLAY_TOPIC,
                        JSON.toJSONString(displayLog, new SerializeConfig(true))
                      )
                    }
                  }
                  //提取事件字段
                  val actionsJsonArr = JSONObject.getJSONArray("actions")
                  if (actionsJsonArr != null && actionsJsonArr.size() > 0) {
                    for (i <- 0 until actionsJsonArr.size()) {
                      val actionObj: JSONObject = actionsJsonArr.getJSONObject(i)
                      val item: String = actionObj.getString("item")
                      val action_id: String = actionObj.getString("action_id")
                      val item_type: String = actionObj.getString("item_type")
                      val ts: Long = actionObj.getLong("ts")
                      //封装成pageActionLog对象
                      val pageActionLog: PageActionLog = PageActionLog(
                        mid, uid, ar, ch, is_new, md, os, vc, ba, page_id, last_page_id,
                        item, item_type, during_time, source_type, action_id, item, item_type, ts
                      )
                      //发送到 DWD_PAGE_ACTION_TOPIC
                      MyKafkaUtils.send(
                        DWD_PAGE_ACTION_TOPIC,
                        JSON.toJSONString(pageActionLog, new SerializeConfig(true))
                      )
                    }
                  }
                }
                //启动数据
                val startObj: JSONObject = JSONObject.getJSONObject("start")
                if (startObj != null) {
                  val entry: String = startObj.getString("entry")
                  val open_ad_skip_ms: Long = startObj.getLong("open_ad_skip_ms")
                  val open_ad_ms: Long = startObj.getLong("open_ad_ms")
                  val loading_time: Long = startObj.getLong("loading_time")
                  val open_ad_id: String = startObj.getString("open_ad_id")
                  //封装到对象
                  val startLog: StartLog = StartLog(mid, uid, ar, ch, is_new, md, os, vc, ba, entry, open_ad_id, loading_time, open_ad_ms, open_ad_skip_ms, ts)
                  // 发送到 DWD_START_LOG_TOPIC
                  MyKafkaUtils.send(
                    DWD_START_LOG_TOPIC,
                    JSON.toJSONString(startLog, new SerializeConfig(true))
                  )
                }
              }
            }
            //算子(foreachPartition)里面 : Executor端执行, 每批次每分区执行一次.
            MyKafkaUtils.flush()
          }
        )
//        rdd.foreach(
//          JSONObject=>{
//            //C : 算子(foreach)里面 : Executor端执行, 每批次每条数据执行一次.
//          }
//        )
        //B: foreachRDD里面, 算子(foreach)外面:  Driver端执行. 每批次执行一次.
        MyOffsetUtils.saveOffset(topicName,groupId,offsetRanges)
      }
    )
    //A: foreachRDD外面: Driver端执行. 程序启动的时候执行一次。
    ssc.start()
    ssc.awaitTermination()
  }
}

2.6 优化-精准一次消费

2.6.1 相关定义

(1)至少一次消费(at least once)

写出数据 ==> 提交offset 若offset提交失败,导致数据重发。主要保证数据不丢失

(2)最多一次消费 (at most once)

提交offset ==> 写出数据 若写出数据失败,导致数据丢失。保证数据不重复

(3) 精确一次消费(Exactly-once)

不出现丢数据和数据重复

2.6.2 消费问题

(1)漏消费 (数据丢失)

(2)重复消费(数据重复)

(3)所以

目前 Kafka 默认每 5 秒钟做一次自动提交偏移量,这样并不能保证精准一次消费。

enable.auto.commit 的默认值是 true;就是默认采用自动提交的机制。
auto.commit.interval.ms 的默认值是 5000,单位是毫秒。
2.6.3 解决问题-方案一 (原子性)
2.6.4 解决问题-方案二(后置提交offset + 幂等性)

(1)策略:手动提交偏移量 + 幂等性处理

解决数据重复和数据丢失,就可以达到精准一次消费。

首先,数据丢失问题。手动后置提交offset 就可以解决 数据丢失问题,但会有数据重复。(至少消费一次)

幂等性,就是解决数据重复。

(2)难点

在实际的开发中手动提交偏移量其实不难,难的是幂等性的保存,有的时候并不一定能保证,这个需要看使用的数据库,如果数据库本身不支持幂等性操作,那只能优先保证的数据不丢失,数据重复难以避免即只保证了至少一次消费的语义。一般有主键的数据库都支持幂等性操作 Upsert。

(3) 使用场景

处理数据较多,或者数据保存在不支持事务的数据库上

2.6.5 手动提交偏移量

(1)偏移量保存在哪

Kafka 0.9版本以后consumer的偏移量是保存在Kafka的__consumer_offsets主题中。

但是如果用这种方式管理偏移量,有一个限制就是在提交偏移量时,数据流的元素结构不能

发生转变,即提交偏移量时数据流,必须是 InputDStream[ConsumerRecord[String, String]]

这种结构。但是在实际计算中,数据难免发生转变,或聚合,或关联,一旦发生转变, 就无

法在利用以下语句进行偏移量的提交:

xxDstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

因为 offset 的存储于 hasOffsetRanges,只有 KafkaRDD 继承了他,所以假如我们对KafkaRDD 进行了转化之后,其它 RDD 没有继承 HasOffsetRanges,所以就无法再获取 offset

了 。

所以实际生产中通常会利用 ZooKeeper,Redis,Mysql 等工具手动对偏移量进行保存 。

(2)流程

2.6.6 代码实现

(1)在 config.properties 中添加 redis 的连接配置

#redis
redis.host=hadoop102
redis.post=6379

(2) 添加Redis工具类

package com.atguigu.gmall.realtime.util
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
/**
 * @author sxr
 * @create 2022-06-04-18:42
 */
object MyRedisUtils {
  var jedisPool : JedisPool = null
  def getJedisClient : Jedis = {
    if(jedisPool == null ){
      var host : String = MyPropsUtils(MyConfig.REDIS_HOST)
      var port : String = MyPropsUtils(MyConfig.REDIS_POST)
      val jedisPoolConfig = new JedisPoolConfig()
      jedisPoolConfig.setMaxTotal(100) //最大连接数
      jedisPoolConfig.setMaxIdle(20) //最大空闲
      jedisPoolConfig.setMinIdle(20) //最小空闲
      jedisPoolConfig.setBlockWhenExhausted(true) //忙碌时是否等待
      jedisPoolConfig.setMaxWaitMillis(5000) //忙碌时等待时长 毫秒
      jedisPoolConfig.setTestOnBorrow(true) //每次获得连接的进行测试
      jedisPool = new JedisPool(jedisPoolConfig,host,port.toInt)
    }
    jedisPool.getResource
  }
}

(3) 偏移量工具类

package com.atguigu.gmall.realtime.util
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import redis.clients.jedis.Jedis
import java.util
import scala.collection.mutable
/**
 * @author sxr
 * @create 2022-06-04-19:41
 *
 * Offset管理工具类 用于redis往中存储和读取offset
 *
 *  管理方案:
 *     1.  后置提交偏移量  ->  手动控制偏移量提交
 *     2.  手动控制偏移量提交 ->  SparkStreaming提供了手动提交方案,但是我们不能用,因为我们会对DStream的结构进行转换.
 *     3.  手动的提取偏移量维护到redis中
 *         -> 从kafka中消费到数据,先提取偏移量
 *         -> 等数据成功写出后,将偏移量存储到redis中
 *         -> 从kafka中消费数据之前,先到redis中读取偏移量, 使用读取到的偏移量到kafka中消费数据
 *
 *     4. 手动的将偏移量存储到redis中,每次消费数据需要使用存储的offset进行消费,每次消费数据后,要将本次消费的offset存储到redis中。
 *
 */
object MyOffsetUtils {
  /**
   * 往Redis中存储offset
   * 问题:存的offset从哪里来?
   *      从消费的数据中提取出来的,传入到该方法中
   *      offsetRanges: Array[OffsetRange]
   *    offset的结构是什么?
   *       Kafka中offset维护的结构
   *       groupID + topic + partition => offset
   *       从传入的offset中提取信息
   *    在redis中怎么存?
   *       类型 : hash
   *       key : groupID + topic
   *       value : partition - offset
   *       写入API : hset/hmset
   *       读取API : hgetall
   *       是否过期 : 不过期
   */
  def saveOffset(topic : String, groupId : String, offsetRanges : Array[OffsetRange]): Unit ={
    if(offsetRanges!=null && offsetRanges.length>0){
      val offsets: util.HashMap[String, String] = new util.HashMap[String, String]()
      for (offsetRange <- offsetRanges) {
        val partition: Int = offsetRange.partition
        val offset: Long = offsetRange.untilOffset
        offsets.put(partition.toString,offset.toString)
      }
      // 往redis中存
      val jedis: Jedis = MyRedisUtils.getJedisClient
      val redisKey:String  = s"offsets:$topic:$groupId"
      jedis.hset(redisKey,offsets)
      jedis.close()
      println("提交offset:"+offsets)
    }
  }
  /**
   * 从Redis中读取存储的offset
   *
   * 问题:
   *      如何让SparkStreaming通过指定的offset进行消费?
   *
   *      SparkStreaming要求的offset格式是什么?
   *      Map[TopicPartition,Long]
   */
  def readOffset(topic:String,groupId :String):Map[TopicPartition,Long]={
    val jedis: Jedis = MyRedisUtils.getJedisClient
    val redisKey:String  = s"offsets:$topic:$groupId"
    val offsets: util.Map[String, String] = jedis.hgetAll(redisKey)
    println("读取到offset:"+offsets)
    val results: mutable.Map[TopicPartition, Long] = mutable.Map[TopicPartition, Long]()
    if (offsets!=null && offsets.size() > 0) {
      //将java的map转化为Scala的map进行迭代
      import scala.collection.JavaConverters._
      for ((partition, offset) <- offsets.asScala) {
        val topicPartition: TopicPartition = new TopicPartition(topic, partition.toInt)
        results.put(topicPartition, offset.toLong)
      }
    }
    jedis.close()
    results.toMap
  }
}
2.6.7 幂等性操作

目前处理完的数据写到了 kafka,如果程序出现宕机重试, kafka 是没有办法通过唯一性标识实现幂等性识别,但是也没有关系,因为 kafka 中的数据只是用于中间存储,并不会进行统计,所以只要保证不丢失即可,重复数据的幂等性处理可以交给下游处理,只要保证最终统计结果是不会有重复即可 。

2.7 优化-Kafka消息发送问题

2.7.1 缓冲区问题

Kafka 消息的发送分为同步发送和异步发送。 Kafka 默认使用异步发送的方式。 Kafka的生产者将消息进行发送时,会先将消息发送到缓冲区中,待缓冲区写满或者到达指定的时间,才会真正的将缓冲区的数据写Broker。

假设消息发送到缓冲区中还未写到 Broker,我们认为数据已经成功写给了 Kafka,接下来会手动的提offset, 如果 offset 提交成功,但此刻 Kafka 集群突然出现故障。 缓冲区的数据会丢失,最终导致的问题就是数据没有成功写到 Kafka ,而 offset 已经提交,此部分的数据就会被漏掉。

2.7.2 问题解决-方法一

将消息的发送修改为同步发送,保证每条数据都能发送到 Broker。 但带来的问题就是消息是一条一条写给 Broker,会牺牲性能,一般不推荐。

2.7.3 问题解决-方法二

(1) 策略: 在手动提交 offset 之前,强制将缓冲区的数据 flush 到 broker 。

Kafka 的生产者对象提供了 flush 方法, 可以强制将缓冲区的数据刷到 Broker。

(2)修补代码

//算子(foreachPartition)里面 : Executor端执行, 每批次每分区执行一次.
MyKafkaUtils.flush()

三、 业务数据采集和分流

3.1 整体架构

业务数据库的采集主要是基于对数据库的变化的实时监控。目前市面上的开源产品主要是 Canal 和 Maxwell。要利用这些工具实时采集数据到 kafka,以备后续处理。

Maxwell 采集的日志数据,默认是放在一个统一的 kafka 的 Topic 中,为了后续方便处理要进行以表为单位拆分到不同 kafka 的 Topic 中。

针对维度数据,要单独保存。通常考虑用 redis、 hbase、 mysql、 kudu 等通过唯一键查询性能较快的数据库中。

3.2 MaxWell

3.2.1 简介

Maxwell 是由美国 Zendesk 公司开源,用 Java 编写的 MySQL 变更数据抓取软件。它会实时监控 Mysql 数据库的数据变更操作(包括 insert、 update、 delete),并将变更数据以 JSON格式发送给 Kafka、 Kinesi 等流数据处理平台。

官网地址: http://maxwells-daemon.io/

3.2.2 MySQL主从复制

1) 主从复制的应用场景

(1) 做数据库的热备:主数据库服务器故障后,可切换到从数据库继续工作 。

(2) 分离:主数据库只负责业务数据的写入操作,而多个从数据库只负责业务数据的查询工作,在读多写少场景下,可以提高数据库工作效率 。

2) MySQL 主从复制工作原理

(1) Master 主库将数据变更记录,写到二进制日志(binary log)中。

(2) Slave 从库向 mysql master 发送 dump 协议,将 master 主库的 binary log events 拷贝到它的中继日志(relay log)。

(3) Slave 从库读取并回放中继日志中的事件,将改变的数据同步到自己的数据库 。

3) binlog

(1) 什么是 binlog

MySQL 的二进制日志可以说 MySQL 最重要的日志了,它记录了所有的 DDL 和 DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间, MySQL 的二进制日志是事务安全型的。

一般来说开启二进制日志大概会有 1%的性能损耗。

二进制日志包括两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的 DDL 和 DML(除了数据查询语句)语句事件 。

(2) binlog 的分类设置

binlog 格式: statement | row | mixed

statement : 语句级, 只记录写操作的sql语句. (可能造成数据不一致)

row : 行级,记录每行数据的变化 (综合对比 ,Maxwell想做监控分析,选择 row 格式比较合适。 )

mixed : statement升级版,正常情况下都是statement,当sql中使用了随机、时间、自定函数等时会变成row

3.2.3 MaxWell 工作原理

maxwell就是将自己伪装成 slave,并遵循 MySQL 主从复制的协议,从 master 同步数据。

3.2.4 Maxwell 安装

3.3 采集业务数据

3.3.1 MySQL 部分
1) 安装 MySQL(略)
2) 创建业务数据库(直接使用离线数仓的即可)
3) 导入数据表(直接使用离线数仓的即可)
4) 修改/etc/my.cnf 文件,开启 binlog
[sxr@hadoop102 maxwell-1.29.2]$ sudo vim /etc/my.cnf
server-id=1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=gmall

注意: binlog-do-db 根据自己的情况进行修改,指定具体要同步的数据库。

5) 重启 MySQL 使配置生效
sudo systemctl restart mysqld

到/var/lib/mysql 目录下查看是否生成 binlog 文件

-rw-r-----. 1 mysql mysql   250740 7月  12 18:02 mysql-bin.000001
-rw-r-----. 1 mysql mysql      133 7月  21 12:59 mysql-bin.index
3.3.2 模拟数据

1) 上传 jar 和 properties 文件上传到/opt/module/db_log 目录下

2) 修改 application.properties 中数据库连接信息

3) 模拟生成数据

java -jar gmall-mock-db-2020-03-16-SNAPSHOT.jar

4)再次到/var/lib/mysql 目录下,查看 binlog 的改动

3.3.3 赋权限

1) 给 maxwell 创建用户并赋权限

mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
3.3.4 Maxwell 部分

1) 创建数据库,用于存储 Maxwell 运行过程中的一些数据,包括 binlog 同步的断点位置等

mysql> create database maxwell;
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';

2) 修改 Maxwell 配置文件名字

3) 修改 Maxwell 配置文件内容

#Maxwell 数 据 发 送 目 的 地 , 可 选 配 置 有
#stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=kafka
#目标 Kafka 集群地址
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092
kafka_topic=ODS_BASE_DB_M
# mysql login info
host=hadoop102
user=maxwell
password=maxwell

4) 启动 Maxwell

bin/maxwell --config config.properties --daemon

3.4 业务数据消费分流

3.4.1 Maxwell 数据格式
3.4.2 消费分流

Maxwell 会追踪整个数据库的变更,把所有的数据变化都发到一个 topic 中了,但是为了后续处理方便,应该将事实表的数据分流到不同的 topic 中,将维度表的数据写入到 redis中

1) 分流业务代码
package com.atguigu.gmall.realtime.app
import com.alibaba.fastjson.{JSON, JSONObject}
import com.atguigu.gmall.realtime.util.{MyKafkaUtils, MyOffsetUtils, MyRedisUtils}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Jedis
import java.util
/**
 * @author sxr
 * @create 2022-07-13-10:08
 *
 * 业务数据消费分流
 *
 * 1. 准备实时环境
 * 2. 从redis读取偏移量
 * 3. 从Kafka消费数据
 * 4. 提取偏移量结束点
 * 5. 数据处理
 *     5.1 转换数据结构
 *     5.2 分流
 *          事实数据 => kafka
 *          维度数据 => redis
 * 6. flush kafka的缓冲区
 * 7. 提交offset
 */
object OdsBaseDbApp {
  def main(args: Array[String]): Unit = {
    //1.准备实时环境
    val conf: SparkConf = new SparkConf().setAppName("ods_base_db_app").setMaster("local[*]")
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(5))
    val topicName: String = "ODS_BASE_DB_M"
    val groupId: String = "ODS_BASE_DB_GROUP"
    //2. 从redis读取偏移量
    val offsets: Map[TopicPartition, Long] = MyOffsetUtils.readOffset(topicName, groupId)
    //3. 从Kafka消费数据
    var kafkaDStream: InputDStream[ConsumerRecord[String, String]] = null
    if(offsets!=null && offsets.nonEmpty){
      kafkaDStream = MyKafkaUtils.getKafkaDStream(ssc, topicName, groupId, offsets)
    }else{
      kafkaDStream = MyKafkaUtils.getKafkaDStream(ssc, topicName, groupId)
    }
    //4. 提取偏移量结束点
    var offsetRanges: Array[OffsetRange] = null
    val offsetRangesDStream: DStream[ConsumerRecord[String, String]] = kafkaDStream.transform(
      rdd => {
        offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd
      }
    )
    //5. 数据处理
        //5.1 转换数据结构
        val JsonObjDStream: DStream[JSONObject] = offsetRangesDStream.map(
          consumerRecord => {
            val jsonData: String = consumerRecord.value()
            val jSONObject: JSONObject = JSON.parseObject(jsonData)
            jSONObject
          }
        )
//        JsonObjDStream.print(100)
      //5.2 分流
//      //事实表清单
//      val factTables :Array[String] = Array[String]("order_info","order_detail")
//      //维度表清单
//      val dimTables :Array[String] = Array[String]("user_info","base_province")
      JsonObjDStream.foreachRDD(
        rdd =>{
          // Driver
          //TODO 如何动态维护表清单?
          // 将表清单维护到redis中,代码中只要保证能够周期性的读取redis中维护的表清单.
          // Redis中如何存储:
          // type:     set
          // key :     FACT:TABLES   DIM:TABLES
          // value:   表名的集合
          // 写入API:  sadd
          // 读取API:  smembers
          // 是否过期: 不过期
          val redisFactKeys :String = "FACT:TABLES"
          val redisDimKeys : String = "DIM:TABLES"
          val jedisClient : Jedis = MyRedisUtils.getJedisClient
          //事实表清单
          val factTables: util.Set[String] = jedisClient.smembers(redisFactKeys)
          val factTablesBC: Broadcast[util.Set[String]] = ssc.sparkContext.broadcast(factTables)
          println("factTables: " + factTables)
          //维度表清单
          val dimTables  = jedisClient.smembers(redisDimKeys)
          val dimTablesBC: Broadcast[util.Set[String]] = ssc.sparkContext.broadcast(dimTables)
          println("dimTables: " + dimTables)
          jedisClient.close()
          rdd.foreachPartition(
            jsonObjIter =>{
              val jedisClient: Jedis = MyRedisUtils.getJedisClient
              for (jsonObj <- jsonObjIter) {
                //提取操作类型
                val operType: String = jsonObj.getString("type")
                val opValue: String = operType match {
                  case "bootstrap-insert" => "I"
                  case "insert" => "I"
                  case "update" => "U"
                  case "delete" => "D"
                  case _ => null
                }
                //判断操作类型:1.明确什么操作 2.过滤不感兴趣的操作
                if (opValue != null){
                  // 提取表名
                  val tableName: String = jsonObj.getString("table")
                  // 提取数据
                  val dataObj: JSONObject = jsonObj.getJSONObject("data")
                  if(factTablesBC.value.contains(tableName)){
                    println(dataObj.toJSONString)
                    //事实数据
                    val dwdTopicName:String = s"DWD_${tableName.toUpperCase}_${opValue}"
                    MyKafkaUtils.send(dwdTopicName,dataObj.toJSONString)
                  }
                  if(dimTablesBC.value.contains(tableName)){
                    //维度数据
                    /*
                    *       类型 : string
                    *       key : DIM:表名:ID
                    *       value : 整条数据的jsonString
                    *       写入API : set
                    *       读取API : get
                    *       是否过期 : 不过期
                    */
                    val id: String = dataObj.getString("id")
                    val redisKey :String = s"DIM:${tableName.toUpperCase()}:$id"
                    // TODO 此处获取Redis连接好不好?
                    //不好, 每条数据都要获取一次连接, 用完直接还回去.
                    //我们希望获取一次连接,反复使用,最终还回去.
//                    val jedisClient: Jedis = MyRedisUtils.getJedisClient
                    jedisClient.set(redisKey,dataObj.toJSONString)
                  }
                }
              }
              //关闭redis连接
              jedisClient.close()
              //刷新kafka缓冲区
              MyKafkaUtils.flush()
            }
          )
          //提交offset
          MyOffsetUtils.saveOffset(topicName,groupId,offsetRanges)
        }
      )
    ssc.start()
    ssc.awaitTermination()
  }
}
3.4.3 历史维度数据初始引导

1) Maxwell 提供了 bootstrap 功能来进行历史数据的全量同步,命令如下:

bin/maxwell-bootstrap --config config.properties --database gmall --table user_info

2) Bootstrap 数据格式

{
"database": "fooDB",
"table": "barTable",
"type": "bootstrap-start",
"ts": 1450557744,
"data": {}
}
{
"database": "fooDB",
"table": "barTable",
"type": "bootstrap-insert",
"ts": 1450557744,
"data": {
"txt": "hello"
}
}
{
"database": "fooDB",
"table": "barTable",
"type": "bootstrap-insert",
"ts": 1450557744,
"data": {
"txt": "bootstrap!"
}
}
{
"database": "fooDB",
"table": "barTable",
"type": "bootstrap-complete",
"ts": 1450557744,
"data": {}
}

第一条 type 为 bootstrap-start 和最后一条 type 为 bootstrap-complete 的数据,是 bootstrap开始和结束的标志,不包含数据,中间的 type 为 bootstrap-insert 的数据才包含数据。

一次 bootstrap 输出的所有记录的 ts 都相同,为 bootstrap 开始的时间

3) 修改分流代码 --将维度数据初始引导的代码进行分流

case "bootstrap-insert" => "I"

3.5 数据处理顺序性

在实时计算中, 对业务数据的计算, 要考虑到数据处理的顺序, 即能否依照数据改变的顺序进行处理。

假设一个场景,如果将某个用户数据的姓名字段进行多次更改,由原先的 A 改为 B 再改为 C, 在数据库层面最终的结果为 C, 但是我们能否保证数据经过实时处理后,在 DIM层存储的结果也为 C,可不可能存储的结果为 B。

我们依次审视一下,在实时处理的各个环节中,是否能保证数据的顺序?如果不能保证,是在哪个环节出的问题,最终导致存储的结果不正确。

3.5.2 解决

通过分析,目前我们的计算过程中,只有可能在 Kafka 环节出现数据乱序,导致最终存储的结果不正确。如果想要保证数据处理的顺序性,我们可以将同一条数据的修改发往 topic的同一个分区中。需要修改 maxwell 的配置文件,指定发送数据到 kafka 时要使用分区键。

1) 修改 config.properties 文件中的如下配置

producer_partition_by=column
producer_partition_columns=id
producer_partition_by_fallback=table

第二章 分层处理

一、 DWD到DWS层数据处理

二、 日活宽表

三、 订单业务宽表

第三章 Elasticsearch

第四章 可视化

第五章 总结

一、Kafka常用命令

# 消费数据
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic #
# 生成数据
bin/kafka-console-producer.sh --broker-list hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic #
# 显示topic列表
bin/kafka-topics.sh --list --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181
# 删除topic
bin/kafka-topics.sh --delete --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181 --topic #
# 描述topic
bin/kafka-topics.sh --describe --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181 --to
相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
7月前
|
分布式计算 数据可视化 大数据
基于spark的医疗大数据可视化大屏项目
基于spark的医疗大数据可视化大屏项目
164 0
|
8月前
|
SQL 分布式计算 Java
IDEA 打包 Spark 项目 POM 文件依赖
这是一个 Maven POM 示例,用于构建一个使用 Spark 与 Hive 的项目,目标是将数据从 Hive 导入 ClickHouse。POM 文件设置了 Scala 和 Spark 的依赖,包括 `spark-core_2.12`, `spark-sql_2.12`, 和 `spark-hive_2.12`。`maven-assembly-plugin` 插件用于打包,生成包含依赖的和不含依赖的两种 JAR 包。`scope` 说明了依赖的使用范围,如 `compile`(默认),`provided`,`runtime`,`test` 和 `system`。
134 0
|
8月前
|
数据采集 分布式计算 Linux
Spark实时(数据采集)项目小知识点--sed -i命令详解及入门攻略
Spark实时(数据采集)项目小知识点--sed -i命令详解及入门攻略
187 0
|
8月前
|
机器学习/深度学习 分布式计算 大数据
【云计算与大数据技术】Spark实战项目之判别西瓜好坏(附源码和数据集)
【云计算与大数据技术】Spark实战项目之判别西瓜好坏(附源码和数据集)
125 0
|
分布式计算 Java Scala
配置spark,并在idea中搭建项目
配置spark,并在idea中搭建项目
155 0
|
机器学习/深度学习 人工智能 分布式计算
SparK项目原作解读:卷积模型的首个BERT预训练
SparK项目原作解读:卷积模型的首个BERT预训练
334 0
|
设计模式 分布式计算 Scala
Spark Master 和 Worker 项目需求 | 学习笔记
快速学习 Spark Master 和 Worker 项目需求
Spark Master 和 Worker 项目需求 | 学习笔记
|
机器学习/深度学习 分布式计算 算法
Spark项目实战:飞机延误预测项目
Spark项目实战:飞机延误预测项目
568 0
Spark项目实战:飞机延误预测项目
|
分布式计算 Java Hadoop
Spark集群搭建记录 | 云计算[CentOS8] | Scala Maven项目访问Spark(local模式)实现单词计数(下)
step6 创建scala object step7 修改pom文件 step8 配置项目 step9 添加依赖库(Spark的jar包) step10 设置输入路径
171 0
Spark集群搭建记录 | 云计算[CentOS8] | Scala Maven项目访问Spark(local模式)实现单词计数(下)
|
分布式计算 IDE Java
Spark集群搭建记录 | 云计算[CentOS7] | Scala Maven项目访问Spark(local模式)实现单词计数(上)
写在前面 step1 下载Scala IDE step2 解压安装Scala IDE step3 Scala 下载 step4 Scala 配置 step5 创建scala项目
170 0
Spark集群搭建记录 | 云计算[CentOS7] | Scala Maven项目访问Spark(local模式)实现单词计数(上)