Iceberg实战踩坑指南(三)

简介: Iceberg实战踩坑指南

3)编写 DwdIcebergService

package com.atguigu.iceberg.warehouse.service
import java.sql.Timestamp import java.time.LocalDate
import java.time.format.DateTimeFormatter
import com.atguigu.iceberg.warehouse.bean.{BaseWebsite, MemberRegType, VipLevel} import org.apache.spark.sql.SparkSession
object DwdIcebergService {
  def readOdsData(sparkSession: SparkSession) = { 
    import  org.apache.spark.sql.functions._ 
    import sparkSession.implicits._ 
    sparkSession.read.json("/ods/baseadlog.log")
    .withColumn("adid", col("adid").cast("Int"))
    .writeTo("hadoop_prod.db.dwd_base_ad").overwritePartitions()
    sparkSession.read.json("/ods/baswewebsite.log").map(item => { val createtime = item.getAs[String]("createtime")
    val str = LocalDate.parse(createtime, DateTimeFormatter.ofPattern("yyyy-MM-dd")).atStartOfDay().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) 
    BaseWebsite(item.getAs[String]("siteid").toInt, item.getAs[String]("sitename"),
    item.getAs[String]("siteurl"), item.getAs[String]("delete").toInt, 
    Timestamp.valueOf(str), item.getAs[String]("creator"), item.getAs[String]("dn"))
  }).writeTo("hadoop_prod.db.dwd_base_website").overwritePartitions()
sparkSession.read.json("/ods/member.log").drop("dn")
.withColumn("uid", col("uid").cast("int"))
.withColumn("ad_id", col("ad_id").cast("int"))
.writeTo("hadoop_prod.db.dwd_member").overwritePartitions()
sparkSession.read.json("/ods/memberRegtype.log").drop("domain").drop("dn")
.withColumn("regsourcename", col("regsource"))
.map(item => {
val createtime = item.getAs[String]("createtime")
val str = LocalDate.parse(createtime, DateTimeFormatter.ofPattern("yyyy-MM-dd")).atStartOfDay().
format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) MemberRegType(item.getAs[String]("uid").toInt, item.getAs[String]("appkey"),
item.getAs[String]("appregurl"), item.getAs[String]("bdp_uuid"), Timestamp.valueOf(str), item.getAs[String]("isranreg"), item.getAs[String]("regsource"), item.getAs[String]("regsourcename"), item.getAs[String]("websiteid").toInt, item.getAs[String]("dt"))
}).writeTo("hadoop_prod.db.dwd_member_regtype").overwritePartitions()
sparkSession.read.json("/ods/pcenterMemViplevel.log").drop("discountval")
.map(item => {
  val startTime = item.getAs[String]("start_time") val endTime = item.getAs[String]("end_time")
  val last_modify_time = item.getAs[String]("last_modify_time")
  val startTimeStr = LocalDate.parse(startTime, DateTimeFormatter.ofPattern("yyyy-MM- 
 dd")).atStartOfDay().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
  val endTimeStr = LocalDate.parse(endTime, DateTimeFormatter.ofPattern("yyyy-MM-dd")).atStartOfDay().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
  val last_modify_timeStr = LocalDate.parse(last_modify_time, 
  DateTimeFormatter.ofPattern("yyyy-MM-dd")).atStartOfDay().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) 
  VipLevel(item.getAs[String]("vip_id").toInt, item.getAs[String]("vip_level"),
  Timestamp.valueOf(startTimeStr), Timestamp.valueOf(endTimeStr), 
  Timestamp.valueOf(last_modify_timeStr),
  item.getAs[String]("max_free"), item.getAs[String]("min_free"), 
  item.getAs[String]("next_level"), item.getAs[String]("operator"), 
  item.getAs[String]("dn"))
}).writeTo("hadoop_prod.db.dwd_vip_level").overwritePartitions()
  sparkSession.read.json("/ods/pcentermempaymoney.log")
   .withColumn("uid", col("uid").cast("int"))
   .withColumn("siteid", col("siteid").cast("int"))
   .withColumn("vip_id", col("vip_id").cast("int"))
   .writeTo("hadoop_prod.db.dwd_pcentermempaymoney").overwritePartitions()
 }
}

4).编写 DwdIcebergController

package com.atguigu.iceberg.warehouse.controller
import com.atguigu.iceberg.warehouse.service.DwdIcebergService import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object DwdIcebergController {
  def main(args: Array[String]): Unit = { 
     val sparkConf = new SparkConf()
    .set("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
    .set("spark.sql.catalog.hadoop_prod.type", "hadoop")
    .set("spark.sql.catalog.hadoop_prod.warehouse",
"hdfs://mycluster/hive/warehouse")
    .set("spark.sql.catalog.catalog-name.type", "hadoop")
    .set("spark.sql.catalog.catalog-name.default-namespace", "default")
    .set("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .set("spark.sql.session.timeZone", "GMT+8")
    .setMaster("local[*]").setAppName("table_operations")
    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() 
     DwdIcebergService.readOdsData(sparkSession)
  }
}

4.6.4.2 dws 层(表指定多个分区列会有 bug)

1)创建 case class

case class DwsMember(
  uid: Int, ad_id: Int,
  fullname: String, iconurl: String, lastlogin: String, mailaddr: String, 
  memberlevel: String, password: String, paymoney: String, phone: String,
  qq: String, register: String,
  regupdatetime: String, unitname: String, userip: String, zipcode: String, appkey: 
  String, appregurl: String, bdp_uuid: String, reg_createtime: String, isranreg: 
  String, regsource: String, regsourcename: String, adname:  String, siteid: 
  String, sitename: String, siteurl: String, site_delete: String,
  site_createtime: String, site_creator: String, vip_id: String, vip_level: String, 
  vip_start_time: String, vip_end_time: String,
  vip_last_modify_time: String, vip_max_free: String, vip_min_free: String, 
  vip_next_level: String, vip_operator: String,
  dt: String, dn: String
)
case class DwsMember_Result(
  uid: Int, ad_id: Int,
  fullname: String, iconurl: String, lastlogin: String, mailaddr: String, 
  memberlevel: String,
  password: String,
  paymoney: String, phone: String, qq: String, register: String,
  regupdatetime: String, unitname: String, userip: String, zipcode: String, appkey: 
  String, appregurl: String, bdp_uuid: String,
  reg_createtime: Timestamp, isranreg: String, regsource: String, regsourcename: 
  String, adname: String,
  siteid: Int, sitename: String, siteurl: String, site_delete: String,
  site_createtime: String, site_creator: String, vip_id: Int,
  vip_level: String, vip_start_time: Timestamp, vip_end_time: Timestamp, 
  vip_last_modify_time: Timestamp, vip_max_free: String, vip_min_free: String, 
  vip_next_level: String, vip_operator: String,
  dt: String, dn: String
)

2)创建 DwdIcebergDao 操作六张基础表

package com.atguigu.iceberg.warehouse.dao 
import org.apache.spark.sql.SparkSession
object DwDIcebergDao {
    def getDwdMember(sparkSession: SparkSession) = {
       sparkSession.sql("selectuid,ad_id 
      ,birthday,email,fullname,iconurl,lastlogin,mailaddr,memberlevel," + 
      "password,phone,qq,register,regupdatetime,unitname,userip,zipcode,dt
      from hadoop_prod.db.dwd_member")
    }
}
def getDwdPcentermempaymoney(sparkSession: SparkSession) = {
      sparkSession.sql("select  uid,paymoney,siteid,vip_id,dt,dn
      hadoop_prod.db.dwd_pcentermempaymoney")
}
def getDwdVipLevel(sparkSession: SparkSession) = {
      sparkSession.sql("select vip_id,vip_level,start_time as 
      vip_start_time,end_time as vip_end_time," +
      "last_modify_time as vip_last_modify_time,max_free as vip_max_free,min_free 
      as vip_min_free,next_level as vip_next_level," +
      "operator as vip_operator,dn from hadoop_prod.db.dwd_vip_level")
}
def getDwdBaseWebsite(sparkSession: SparkSession) = {
   sparkSession.sql("select siteid,sitename,siteurl,delete as 
   site_delete,createtime as site_createtime,creator as site_creator" +
   ",dn from hadoop_prod.db.dwd_base_website")
}
def getDwdMemberRegtyp(sparkSession: SparkSession) = {
   sparkSession.sql("select uid,appkey,appregurl,bdp_uuid,createtime  as 
   reg_createtime,isranreg,regsource,regsourcename,websiteid," +
   "dt from hadoop_prod.db.dwd_member_regtype")
}
def getDwdBaseAd(sparkSession: SparkSession) = {
   sparkSession.sql("select adid as ad_id,adname,dn from 
   hadoop_prod.db.dwd_base_ad;")
}

3)编写 DwsIcebergService,处理业务

package com.atguigu.iceberg.warehouse.service
import java.sql.Timestamp import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import com.atguigu.iceberg.warehouse.bean.{DwsMember, DwsMember_Result} import com.atguigu.iceberg.warehouse.dao.DwDIcebergDao
import org.apache.spark.sql.SparkSession
object DwsIcebergService {
   def getDwsMemberData(sparkSession: SparkSession, dt: String) = { 
     import sparkSession.implicits._
     val  dwdPcentermempaymoney = 
     DwDIcebergDao.getDwdPcentermempaymoney(sparkSession).where($"dt" === dt)
     val dwdVipLevel = DwDIcebergDao.getDwdVipLevel(sparkSession)
     val dwdMember = DwDIcebergDao.getDwdMember(sparkSession).where($"dt" === dt) 
     val dwdBaseWebsite = DwDIcebergDao.getDwdBaseWebsite(sparkSession)
     val dwdMemberRegtype = 
     DwDIcebergDao.getDwdMemberRegtyp(sparkSession).where($"dt" ===dt)
     val dwdBaseAd = DwDIcebergDao.getDwdBaseAd(sparkSession)
     val result = dwdMember.join(dwdMemberRegtype.drop("dt"), Seq("uid"), "left")
     .join(dwdPcentermempaymoney.drop("dt"), Seq("uid"), "left")
     .join(dwdBaseAd, Seq("ad_id", "dn"), "left")
     .join(dwdBaseWebsite, Seq("siteid", "dn"), "left")
     .join(dwdVipLevel, Seq("vip_id", "dn"), "left_outer")
      .select("uid", "ad_id", "fullname", "iconurl", "lastlogin", "mailaddr", 
     "memberlevel","password", "paymoney", "phone", "qq", 
     "register","regupdatetime", "unitname", "userip", "zipcode", "appkey",  
     "appregurl", "bdp_uuid", "reg_createtime", "isranreg", "regsource", 
     "regsourcename", "adname", "siteid", "sitename", "siteurl", "site_delete", 
     "site_createtime", "site_creator", "vip_id", "vip_level",
     "vip_start_time",  "vip_end_time", "vip_last_modify_time", "vip_max_free", 
     "vip_min_free", "vip_next_level", "vip_operator", "dt", "dn").as[DwsMember]
     val resultData = result.groupByKey(item => item.uid + "_" + item.dn)
     .mapGroups { 
        case (key, iters) => val keys = key.split("_")
        val uid = Integer.parseInt(keys(0)) val dn = keys(1)
        val dwsMembers = iters.toList
        val paymoney  =  
dwsMembers.filter(_.paymoney!=null).map(item=>BigDecimal.apply(item.paymoney)).reduceOption(_   +
_).getOrElse(BigDecimal.apply(0.00)).toString   
        val ad_id = dwsMembers.map(_.ad_id).head    
        val fullname = dwsMembers.map(_.fullname).head    
        val icounurl = dwsMembers.map(_.iconurl).head   
        val lastlogin = dwsMembers.map(_.lastlogin).head    
        val mailaddr = dwsMembers.map(_.mailaddr).head    
        val memberlevel = dwsMembers.map(_.memberlevel).head    
        val password = dwsMembers.map(_.password).head    
        val phone = dwsMembers.map(_.phone).head    
        val qq = dwsMembers.map(_.qq).head    
        val register = dwsMembers.map(_.register).head    
        val regupdatetime = dwsMembers.map(_.regupdatetime).head    
        val unitname = dwsMembers.map(_.unitname).head    
        val userip = dwsMembers.map(_.userip).head    
        val zipcode = dwsMembers.map(_.zipcode).head    
        val appkey = dwsMembers.map(_.appkey).head    
        val appregurl = dwsMembers.map(_.appregurl).head    
        val bdp_uuid = dwsMembers.map(_.bdp_uuid).head    
        val reg_createtime  = if  (dwsMembers.map(_.reg_createtime).head  !=  null)
        dwsMembers.map(_.reg_createtime).head else "1970-01-01 00:00:00" val isranreg = 
        dwsMembers.map(_.isranreg).head
        val regsource = dwsMembers.map(_.regsource).head
        val regsourcename = dwsMembers.map(_.regsourcename).head val adname = 
        dwsMembers.map(_.adname).head
        val siteid  = if  (dwsMembers.map(_.siteid).head  !=  null) 
        dwsMembers.map(_.siteid).head else "0"
        val sitename = dwsMembers.map(_.sitename).head val siteurl = 
        dwsMembers.map(_.siteurl).head
        val site_delete = dwsMembers.map(_.site_delete).head
        val site_createtime = dwsMembers.map(_.site_createtime).head val site_creator = 
        dwsMembers.map(_.site_creator).head
        val vip_id  = if  (dwsMembers.map(_.vip_id).head  !=  null) 
        dwsMembers.map(_.vip_id).head else "0"
        val vip_level = dwsMembers.map(_.vip_level).max
        val vip_start_time  = if  (dwsMembers.map(_.vip_start_time).min !=  null) 
        dwsMembers.map(_.vip_start_time).min else "1970-01-01 00:00:00"
        val vip_end_time  = if  (dwsMembers.map(_.vip_end_time).max !=  null) 
        dwsMembers.map(_.vip_end_time).max else "1970-01-01 00:00:00"
        val vip_last_modify_time = if (dwsMembers.map(_.vip_last_modify_time).max != null)
        dwsMembers.map(_.vip_last_modify_time).max else "1970-01-01 00:00:00"
        val vip_max_free =  dwsMembers.map(_.vip_max_free).head val vip_min_free =  
        dwsMembers.map(_.vip_min_free).head val vip_next_level = 
        dwsMembers.map(_.vip_next_level).head val vip_operator = 
        dwsMembers.map(_.vip_operator).head
        val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
        val reg_createtimeStr = LocalDateTime.parse(reg_createtime, formatter); val 
        vip_start_timeStr = LocalDateTime.parse(vip_start_time, formatter) val 
        vip_end_timeStr = LocalDateTime.parse(vip_end_time, formatter)
        val vip_last_modify_timeStr = LocalDateTime.parse(vip_last_modify_time, formatter) 
        DwsMember_Result(uid, ad_id, fullname, icounurl, lastlogin, mailaddr, 
        memberlevel,password, paymoney,phone, qq, register, regupdatetime, 
        unitname,userip, zipcode, appkey, 
        appregurl,bdp_uuid,Timestamp.valueOf(reg_createtimeStr),  isranreg,  
        regsource,regsourcename, adname, siteid.toInt,
        sitename, siteurl, site_delete, site_createtime, site_creator, vip_id.toInt, 
        vip_level,Timestamp.valueOf(vip_start_timeStr), 
        Timestamp.valueOf(vip_end_timeStr), 
        Timestamp.valueOf(vip_last_modify_timeStr), vip_max_free, vip_min_free,
                      vip_next_level, vip_operator, dt, dn)
     }
   resultData.write.format("iceberg").mode("overwrite").
    save("hadoop_prod.db.dws_member")
   }
}

4)编写 DwsIcebergController,进行运行测试

package com.atguigu.iceberg.warehouse.controller
import com.atguigu.iceberg.warehouse.service.DwsIcebergService 
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object DwsIcebergController {
   def main(args: Array[String]): Unit = { 
     val sparkConf = new SparkConf()
    .set("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
    .set("spark.sql.catalog.hadoop_prod.type", "hadoop")
    .set("spark.sql.catalog.hadoop_prod.warehouse", 
      "hdfs://mycluster/hive/warehouse")
    .set("spark.sql.catalog.catalog-name.type", "hadoop")
    .set("spark.sql.catalog.catalog-name.default-namespace", "default")
    .set("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .set("spark.sql.session.timeZone", "GMT+8")
    .setMaster("local[*]").setAppName("table_operations")
    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() 
    DwsIcebergService.getDwsMemberData(sparkSession, "20190722")
   }
}

5)发生报错,和上面在 spark sql 黑窗口测试的错误一致,当有批量数据插入分区时提示分区已关闭无法插入

6)重新建表,分区列去掉 dn,只用 dt,bug:不能指定多个分区,只能指定一个分区列

spark-sql (default)> drop table hadoop_prod.db.dws_member; create table hadoop_prod.db.dws_member(
  uid int, ad_id int,
  fullname string, iconurl string, lastlogin string, mailaddr string, memberlevel 
  string, password string, paymoney string, phone string,
  register string, regupdatetime string, unitname string, userip string, zipcode 
  string, appkey string, appregurl string, bdp_uuid string,
  reg_createtime timestamp, isranreg string, regsource string, regsourcename 
  string, adname string,
  siteid int, sitename string, siteurl string, site_delete string,
  site_createtime string, site_creator string, vip_id int,
  vip_level string, vip_start_time timestamp, vip_end_time timestamp, 
  vip_last_modify_time timestamp, vip_max_free string, vip_min_free string, 
  vip_next_level string, vip_operator string,
dt string, dn string)
using iceberg
partitioned by(dt);

7)建完表后,重新测试,插入数据成功

4.6.4.3 ads 层

1)编写所需 case class

case class QueryResult(
  uid: Int, ad_id: Int,
  memberlevel: String, register: String,
  appregurl: String, //注册来源url
  regsource: String, regsourcename: String,
  adname: String, siteid: String, sitename: String, vip_level: String, paymoney: 
  BigDecimal, dt: String,
  dn: String
)

2)编写 DwsIcebergDao,查询宽表

package com.atguigu.iceberg.warehouse.dao 
import org.apache.spark.sql.SparkSession
object DwsIcebergDao {
   /**
    *查询用户宽表数据
    *
    *@param sparkSession
    *@return
    */
  def queryDwsMemberData(sparkSession: SparkSession) = { 
         sparkSession.sql("select
          uid,ad_id,memberlevel,register,appregurl,regsource,regsourcename,adname," 
          + "siteid,sitename,vip_level,cast(paymoney as decimal(10,4)) as 
          paymoney,dt,dn from hadoop_prod.db.dws_member ")
  }
}

3)编写 AdsIcebergService,统计指标

package com.atguigu.iceberg.warehouse.service
import com.atguigu.iceberg.warehouse.bean.QueryResult 
import com.atguigu.iceberg.warehouse.dao.DwsIcebergDao 
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{SaveMode, SparkSession}
object AdsIcebergService {
   def queryDetails(sparkSession: SparkSession, dt: String) = { 
    import   sparkSession.implicits._
     val  result  = 
DwsIcebergDao.queryDwsMemberData(sparkSession).as[QueryResult].where(s"dt='${dt}'")
result.cache()
   //统计根据 url 统计人数 wordcount result.mapPartitions(partition => {
    partition.map(item => (item.appregurl + "_" + item.dn + "_" + item.dt, 1))
}).groupByKey(_._1).mapValues(item => item._2).reduceGroups(_ + _).map(item => {
    val keys = item._1.split("_") val appregurl = keys(0)
    val dn = keys(1) val dt = keys(2)
    (appregurl, item._2, dt, dn)
 }).toDF("appregurl", "num",  "dt", "dn").writeTo("hadoop_prod.db.ads_register_appregurlnum").overwritePartitions()
    //统计各 memberlevel 等级 支付金额前三的用户
   import org.apache.spark.sql.functions._ 
  result.withColumn("rownum",row_number().over(Window.partitionBy("memberlevel").
         orderBy(desc("paymoney"))))
        .where("rownum<4").orderBy("memberlevel", "rownum")
        .select("uid", "memberlevel", "register", "appregurl", "regsourcename", 
        "adname", "sitename", "vip_level", "paymoney", "rownum", "dt", "dn")
     .writeTo("hadoop_prod.db.ads_register_top3memberpay").overwritePartitions()
   }
}

4)编写 AdsIcebergController,进行本地测试

package com.atguigu.iceberg.warehouse.controller
import com.atguigu.iceberg.warehouse.service.AdsIcebergService 
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object AdsIcebergController {
   def main(args: Array[String]): Unit = { 
      val sparkConf = new SparkConf()
     .set("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
     .set("spark.sql.catalog.hadoop_prod.type", "hadoop")
     .set("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/hive/warehouse")
     .set("spark.sql.catalog.catalog-name.type", "hadoop")
     .set("spark.sql.catalog.catalog-name.default-namespace", "default")
     .set("spark.sql.sources.partitionOverwriteMode", "dynamic")
     .set("spark.sql.session.timeZone", "GMT+8")
     .setMaster("local[*]").setAppName("dwd_app")
     val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() 
     AdsIcebergService.queryDetails(sparkSession, "20190722")
    }
}

5)查询,验证结果

4.6.5 yarn 测试

1)local 模式测试完毕后,将代码打成 jar 包,提交到集群上进行测试,那么插入模式当前都是为 overwrite 模式,所以在 yarn 上测试的时候也无需删除历史数据

2)打 jar 包之前,注意将代码中 setMast(local[*]) 注释了,把集群上有的依赖也可用<scope>provided</scope>剔除了打一个瘦包

3)打成 jar 包提交到集群,运行 spark-submit 命令运行 yarn 模式。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3
--executor-cores  4 --executor-memory 2g  --queue spark --class com.atguigu.iceberg.warehouse.controller.DwdIcebergContorller
iceberg-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar
spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3
--executor-cores  4 --executor-memory 2g  --queue spark --class com.atguigu.iceberg.warehouse.controller.DwsIcebergController
iceberg-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar
spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3
--executor-cores  4 --executor-memory 2g  --queue spark --class com.atguigu.iceberg.warehouse.controller.AdsIcebergController
iceberg-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar

目录
相关文章
|
SQL 消息中间件 关系型数据库
iceberg实践
iceberg实践
|
移动开发 运维 监控
掌握Linux运维利器:查看CPU和内存占用,轻松解决性能问题!
掌握Linux运维利器:查看CPU和内存占用,轻松解决性能问题!
681 0
|
OLAP 数据库 索引
59.【clickhouse】ClickHouse从入门到放弃-分区表
【clickhouse】ClickHouse从入门到放弃-分区表
59.【clickhouse】ClickHouse从入门到放弃-分区表
|
监控 JavaScript 前端开发
影刀RPA(初级)(二)
影刀RPA(初级)(二)
8507 2
|
10月前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
749 14
|
关系型数据库 MySQL
Mysql报错:InnoDB: Operating system error number 13 in a fil..的解决方法
Mysql报错:InnoDB: Operating system error number 13 in a fil..的解决方法
744 0
|
SQL 分布式计算 Hadoop
Iceberg实战踩坑指南(一)
Iceberg实战踩坑指南
2249 0
|
SQL 分布式计算 Hadoop
Hive on Tez 的安装配置
Hive on Tez 的安装配置
897 0
Hive on Tez 的安装配置
|
消息中间件 分布式计算 Kafka
Flink 1.16.2 版本在流式读取 Iceberg upsert primary key 表方面存在一些限制
,Flink 1.16.2 版本在流式读取 Iceberg upsert primary key 表方面存在一些限制
243 2