Iceberg实战踩坑指南(三)

简介: Iceberg实战踩坑指南

3)编写 DwdIcebergService

package com.atguigu.iceberg.warehouse.service
import java.sql.Timestamp import java.time.LocalDate
import java.time.format.DateTimeFormatter
import com.atguigu.iceberg.warehouse.bean.{BaseWebsite, MemberRegType, VipLevel} import org.apache.spark.sql.SparkSession
object DwdIcebergService {
  def readOdsData(sparkSession: SparkSession) = { 
    import  org.apache.spark.sql.functions._ 
    import sparkSession.implicits._ 
    sparkSession.read.json("/ods/baseadlog.log")
    .withColumn("adid", col("adid").cast("Int"))
    .writeTo("hadoop_prod.db.dwd_base_ad").overwritePartitions()
    sparkSession.read.json("/ods/baswewebsite.log").map(item => { val createtime = item.getAs[String]("createtime")
    val str = LocalDate.parse(createtime, DateTimeFormatter.ofPattern("yyyy-MM-dd")).atStartOfDay().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) 
    BaseWebsite(item.getAs[String]("siteid").toInt, item.getAs[String]("sitename"),
    item.getAs[String]("siteurl"), item.getAs[String]("delete").toInt, 
    Timestamp.valueOf(str), item.getAs[String]("creator"), item.getAs[String]("dn"))
  }).writeTo("hadoop_prod.db.dwd_base_website").overwritePartitions()
sparkSession.read.json("/ods/member.log").drop("dn")
.withColumn("uid", col("uid").cast("int"))
.withColumn("ad_id", col("ad_id").cast("int"))
.writeTo("hadoop_prod.db.dwd_member").overwritePartitions()
sparkSession.read.json("/ods/memberRegtype.log").drop("domain").drop("dn")
.withColumn("regsourcename", col("regsource"))
.map(item => {
val createtime = item.getAs[String]("createtime")
val str = LocalDate.parse(createtime, DateTimeFormatter.ofPattern("yyyy-MM-dd")).atStartOfDay().
format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) MemberRegType(item.getAs[String]("uid").toInt, item.getAs[String]("appkey"),
item.getAs[String]("appregurl"), item.getAs[String]("bdp_uuid"), Timestamp.valueOf(str), item.getAs[String]("isranreg"), item.getAs[String]("regsource"), item.getAs[String]("regsourcename"), item.getAs[String]("websiteid").toInt, item.getAs[String]("dt"))
}).writeTo("hadoop_prod.db.dwd_member_regtype").overwritePartitions()
sparkSession.read.json("/ods/pcenterMemViplevel.log").drop("discountval")
.map(item => {
  val startTime = item.getAs[String]("start_time") val endTime = item.getAs[String]("end_time")
  val last_modify_time = item.getAs[String]("last_modify_time")
  val startTimeStr = LocalDate.parse(startTime, DateTimeFormatter.ofPattern("yyyy-MM- 
 dd")).atStartOfDay().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
  val endTimeStr = LocalDate.parse(endTime, DateTimeFormatter.ofPattern("yyyy-MM-dd")).atStartOfDay().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
  val last_modify_timeStr = LocalDate.parse(last_modify_time, 
  DateTimeFormatter.ofPattern("yyyy-MM-dd")).atStartOfDay().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) 
  VipLevel(item.getAs[String]("vip_id").toInt, item.getAs[String]("vip_level"),
  Timestamp.valueOf(startTimeStr), Timestamp.valueOf(endTimeStr), 
  Timestamp.valueOf(last_modify_timeStr),
  item.getAs[String]("max_free"), item.getAs[String]("min_free"), 
  item.getAs[String]("next_level"), item.getAs[String]("operator"), 
  item.getAs[String]("dn"))
}).writeTo("hadoop_prod.db.dwd_vip_level").overwritePartitions()
  sparkSession.read.json("/ods/pcentermempaymoney.log")
   .withColumn("uid", col("uid").cast("int"))
   .withColumn("siteid", col("siteid").cast("int"))
   .withColumn("vip_id", col("vip_id").cast("int"))
   .writeTo("hadoop_prod.db.dwd_pcentermempaymoney").overwritePartitions()
 }
}

4).编写 DwdIcebergController

package com.atguigu.iceberg.warehouse.controller
import com.atguigu.iceberg.warehouse.service.DwdIcebergService import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object DwdIcebergController {
  def main(args: Array[String]): Unit = { 
     val sparkConf = new SparkConf()
    .set("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
    .set("spark.sql.catalog.hadoop_prod.type", "hadoop")
    .set("spark.sql.catalog.hadoop_prod.warehouse",
"hdfs://mycluster/hive/warehouse")
    .set("spark.sql.catalog.catalog-name.type", "hadoop")
    .set("spark.sql.catalog.catalog-name.default-namespace", "default")
    .set("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .set("spark.sql.session.timeZone", "GMT+8")
    .setMaster("local[*]").setAppName("table_operations")
    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() 
     DwdIcebergService.readOdsData(sparkSession)
  }
}

4.6.4.2 dws 层(表指定多个分区列会有 bug)

1)创建 case class

case class DwsMember(
  uid: Int, ad_id: Int,
  fullname: String, iconurl: String, lastlogin: String, mailaddr: String, 
  memberlevel: String, password: String, paymoney: String, phone: String,
  qq: String, register: String,
  regupdatetime: String, unitname: String, userip: String, zipcode: String, appkey: 
  String, appregurl: String, bdp_uuid: String, reg_createtime: String, isranreg: 
  String, regsource: String, regsourcename: String, adname:  String, siteid: 
  String, sitename: String, siteurl: String, site_delete: String,
  site_createtime: String, site_creator: String, vip_id: String, vip_level: String, 
  vip_start_time: String, vip_end_time: String,
  vip_last_modify_time: String, vip_max_free: String, vip_min_free: String, 
  vip_next_level: String, vip_operator: String,
  dt: String, dn: String
)
case class DwsMember_Result(
  uid: Int, ad_id: Int,
  fullname: String, iconurl: String, lastlogin: String, mailaddr: String, 
  memberlevel: String,
  password: String,
  paymoney: String, phone: String, qq: String, register: String,
  regupdatetime: String, unitname: String, userip: String, zipcode: String, appkey: 
  String, appregurl: String, bdp_uuid: String,
  reg_createtime: Timestamp, isranreg: String, regsource: String, regsourcename: 
  String, adname: String,
  siteid: Int, sitename: String, siteurl: String, site_delete: String,
  site_createtime: String, site_creator: String, vip_id: Int,
  vip_level: String, vip_start_time: Timestamp, vip_end_time: Timestamp, 
  vip_last_modify_time: Timestamp, vip_max_free: String, vip_min_free: String, 
  vip_next_level: String, vip_operator: String,
  dt: String, dn: String
)

2)创建 DwdIcebergDao 操作六张基础表

package com.atguigu.iceberg.warehouse.dao 
import org.apache.spark.sql.SparkSession
object DwDIcebergDao {
    def getDwdMember(sparkSession: SparkSession) = {
       sparkSession.sql("selectuid,ad_id 
      ,birthday,email,fullname,iconurl,lastlogin,mailaddr,memberlevel," + 
      "password,phone,qq,register,regupdatetime,unitname,userip,zipcode,dt
      from hadoop_prod.db.dwd_member")
    }
}
def getDwdPcentermempaymoney(sparkSession: SparkSession) = {
      sparkSession.sql("select  uid,paymoney,siteid,vip_id,dt,dn
      hadoop_prod.db.dwd_pcentermempaymoney")
}
def getDwdVipLevel(sparkSession: SparkSession) = {
      sparkSession.sql("select vip_id,vip_level,start_time as 
      vip_start_time,end_time as vip_end_time," +
      "last_modify_time as vip_last_modify_time,max_free as vip_max_free,min_free 
      as vip_min_free,next_level as vip_next_level," +
      "operator as vip_operator,dn from hadoop_prod.db.dwd_vip_level")
}
def getDwdBaseWebsite(sparkSession: SparkSession) = {
   sparkSession.sql("select siteid,sitename,siteurl,delete as 
   site_delete,createtime as site_createtime,creator as site_creator" +
   ",dn from hadoop_prod.db.dwd_base_website")
}
def getDwdMemberRegtyp(sparkSession: SparkSession) = {
   sparkSession.sql("select uid,appkey,appregurl,bdp_uuid,createtime  as 
   reg_createtime,isranreg,regsource,regsourcename,websiteid," +
   "dt from hadoop_prod.db.dwd_member_regtype")
}
def getDwdBaseAd(sparkSession: SparkSession) = {
   sparkSession.sql("select adid as ad_id,adname,dn from 
   hadoop_prod.db.dwd_base_ad;")
}

3)编写 DwsIcebergService,处理业务

package com.atguigu.iceberg.warehouse.service
import java.sql.Timestamp import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import com.atguigu.iceberg.warehouse.bean.{DwsMember, DwsMember_Result} import com.atguigu.iceberg.warehouse.dao.DwDIcebergDao
import org.apache.spark.sql.SparkSession
object DwsIcebergService {
   def getDwsMemberData(sparkSession: SparkSession, dt: String) = { 
     import sparkSession.implicits._
     val  dwdPcentermempaymoney = 
     DwDIcebergDao.getDwdPcentermempaymoney(sparkSession).where($"dt" === dt)
     val dwdVipLevel = DwDIcebergDao.getDwdVipLevel(sparkSession)
     val dwdMember = DwDIcebergDao.getDwdMember(sparkSession).where($"dt" === dt) 
     val dwdBaseWebsite = DwDIcebergDao.getDwdBaseWebsite(sparkSession)
     val dwdMemberRegtype = 
     DwDIcebergDao.getDwdMemberRegtyp(sparkSession).where($"dt" ===dt)
     val dwdBaseAd = DwDIcebergDao.getDwdBaseAd(sparkSession)
     val result = dwdMember.join(dwdMemberRegtype.drop("dt"), Seq("uid"), "left")
     .join(dwdPcentermempaymoney.drop("dt"), Seq("uid"), "left")
     .join(dwdBaseAd, Seq("ad_id", "dn"), "left")
     .join(dwdBaseWebsite, Seq("siteid", "dn"), "left")
     .join(dwdVipLevel, Seq("vip_id", "dn"), "left_outer")
      .select("uid", "ad_id", "fullname", "iconurl", "lastlogin", "mailaddr", 
     "memberlevel","password", "paymoney", "phone", "qq", 
     "register","regupdatetime", "unitname", "userip", "zipcode", "appkey",  
     "appregurl", "bdp_uuid", "reg_createtime", "isranreg", "regsource", 
     "regsourcename", "adname", "siteid", "sitename", "siteurl", "site_delete", 
     "site_createtime", "site_creator", "vip_id", "vip_level",
     "vip_start_time",  "vip_end_time", "vip_last_modify_time", "vip_max_free", 
     "vip_min_free", "vip_next_level", "vip_operator", "dt", "dn").as[DwsMember]
     val resultData = result.groupByKey(item => item.uid + "_" + item.dn)
     .mapGroups { 
        case (key, iters) => val keys = key.split("_")
        val uid = Integer.parseInt(keys(0)) val dn = keys(1)
        val dwsMembers = iters.toList
        val paymoney  =  
dwsMembers.filter(_.paymoney!=null).map(item=>BigDecimal.apply(item.paymoney)).reduceOption(_   +
_).getOrElse(BigDecimal.apply(0.00)).toString   
        val ad_id = dwsMembers.map(_.ad_id).head    
        val fullname = dwsMembers.map(_.fullname).head    
        val icounurl = dwsMembers.map(_.iconurl).head   
        val lastlogin = dwsMembers.map(_.lastlogin).head    
        val mailaddr = dwsMembers.map(_.mailaddr).head    
        val memberlevel = dwsMembers.map(_.memberlevel).head    
        val password = dwsMembers.map(_.password).head    
        val phone = dwsMembers.map(_.phone).head    
        val qq = dwsMembers.map(_.qq).head    
        val register = dwsMembers.map(_.register).head    
        val regupdatetime = dwsMembers.map(_.regupdatetime).head    
        val unitname = dwsMembers.map(_.unitname).head    
        val userip = dwsMembers.map(_.userip).head    
        val zipcode = dwsMembers.map(_.zipcode).head    
        val appkey = dwsMembers.map(_.appkey).head    
        val appregurl = dwsMembers.map(_.appregurl).head    
        val bdp_uuid = dwsMembers.map(_.bdp_uuid).head    
        val reg_createtime  = if  (dwsMembers.map(_.reg_createtime).head  !=  null)
        dwsMembers.map(_.reg_createtime).head else "1970-01-01 00:00:00" val isranreg = 
        dwsMembers.map(_.isranreg).head
        val regsource = dwsMembers.map(_.regsource).head
        val regsourcename = dwsMembers.map(_.regsourcename).head val adname = 
        dwsMembers.map(_.adname).head
        val siteid  = if  (dwsMembers.map(_.siteid).head  !=  null) 
        dwsMembers.map(_.siteid).head else "0"
        val sitename = dwsMembers.map(_.sitename).head val siteurl = 
        dwsMembers.map(_.siteurl).head
        val site_delete = dwsMembers.map(_.site_delete).head
        val site_createtime = dwsMembers.map(_.site_createtime).head val site_creator = 
        dwsMembers.map(_.site_creator).head
        val vip_id  = if  (dwsMembers.map(_.vip_id).head  !=  null) 
        dwsMembers.map(_.vip_id).head else "0"
        val vip_level = dwsMembers.map(_.vip_level).max
        val vip_start_time  = if  (dwsMembers.map(_.vip_start_time).min !=  null) 
        dwsMembers.map(_.vip_start_time).min else "1970-01-01 00:00:00"
        val vip_end_time  = if  (dwsMembers.map(_.vip_end_time).max !=  null) 
        dwsMembers.map(_.vip_end_time).max else "1970-01-01 00:00:00"
        val vip_last_modify_time = if (dwsMembers.map(_.vip_last_modify_time).max != null)
        dwsMembers.map(_.vip_last_modify_time).max else "1970-01-01 00:00:00"
        val vip_max_free =  dwsMembers.map(_.vip_max_free).head val vip_min_free =  
        dwsMembers.map(_.vip_min_free).head val vip_next_level = 
        dwsMembers.map(_.vip_next_level).head val vip_operator = 
        dwsMembers.map(_.vip_operator).head
        val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
        val reg_createtimeStr = LocalDateTime.parse(reg_createtime, formatter); val 
        vip_start_timeStr = LocalDateTime.parse(vip_start_time, formatter) val 
        vip_end_timeStr = LocalDateTime.parse(vip_end_time, formatter)
        val vip_last_modify_timeStr = LocalDateTime.parse(vip_last_modify_time, formatter) 
        DwsMember_Result(uid, ad_id, fullname, icounurl, lastlogin, mailaddr, 
        memberlevel,password, paymoney,phone, qq, register, regupdatetime, 
        unitname,userip, zipcode, appkey, 
        appregurl,bdp_uuid,Timestamp.valueOf(reg_createtimeStr),  isranreg,  
        regsource,regsourcename, adname, siteid.toInt,
        sitename, siteurl, site_delete, site_createtime, site_creator, vip_id.toInt, 
        vip_level,Timestamp.valueOf(vip_start_timeStr), 
        Timestamp.valueOf(vip_end_timeStr), 
        Timestamp.valueOf(vip_last_modify_timeStr), vip_max_free, vip_min_free,
                      vip_next_level, vip_operator, dt, dn)
     }
   resultData.write.format("iceberg").mode("overwrite").
    save("hadoop_prod.db.dws_member")
   }
}

4)编写 DwsIcebergController,进行运行测试

package com.atguigu.iceberg.warehouse.controller
import com.atguigu.iceberg.warehouse.service.DwsIcebergService 
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object DwsIcebergController {
   def main(args: Array[String]): Unit = { 
     val sparkConf = new SparkConf()
    .set("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
    .set("spark.sql.catalog.hadoop_prod.type", "hadoop")
    .set("spark.sql.catalog.hadoop_prod.warehouse", 
      "hdfs://mycluster/hive/warehouse")
    .set("spark.sql.catalog.catalog-name.type", "hadoop")
    .set("spark.sql.catalog.catalog-name.default-namespace", "default")
    .set("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .set("spark.sql.session.timeZone", "GMT+8")
    .setMaster("local[*]").setAppName("table_operations")
    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() 
    DwsIcebergService.getDwsMemberData(sparkSession, "20190722")
   }
}

5)发生报错,和上面在 spark sql 黑窗口测试的错误一致,当有批量数据插入分区时提示分区已关闭无法插入

6)重新建表,分区列去掉 dn,只用 dt,bug:不能指定多个分区,只能指定一个分区列

spark-sql (default)> drop table hadoop_prod.db.dws_member; create table hadoop_prod.db.dws_member(
  uid int, ad_id int,
  fullname string, iconurl string, lastlogin string, mailaddr string, memberlevel 
  string, password string, paymoney string, phone string,
  register string, regupdatetime string, unitname string, userip string, zipcode 
  string, appkey string, appregurl string, bdp_uuid string,
  reg_createtime timestamp, isranreg string, regsource string, regsourcename 
  string, adname string,
  siteid int, sitename string, siteurl string, site_delete string,
  site_createtime string, site_creator string, vip_id int,
  vip_level string, vip_start_time timestamp, vip_end_time timestamp, 
  vip_last_modify_time timestamp, vip_max_free string, vip_min_free string, 
  vip_next_level string, vip_operator string,
dt string, dn string)
using iceberg
partitioned by(dt);

7)建完表后,重新测试,插入数据成功

4.6.4.3 ads 层

1)编写所需 case class

case class QueryResult(
  uid: Int, ad_id: Int,
  memberlevel: String, register: String,
  appregurl: String, //注册来源url
  regsource: String, regsourcename: String,
  adname: String, siteid: String, sitename: String, vip_level: String, paymoney: 
  BigDecimal, dt: String,
  dn: String
)

2)编写 DwsIcebergDao,查询宽表

package com.atguigu.iceberg.warehouse.dao 
import org.apache.spark.sql.SparkSession
object DwsIcebergDao {
   /**
    *查询用户宽表数据
    *
    *@param sparkSession
    *@return
    */
  def queryDwsMemberData(sparkSession: SparkSession) = { 
         sparkSession.sql("select
          uid,ad_id,memberlevel,register,appregurl,regsource,regsourcename,adname," 
          + "siteid,sitename,vip_level,cast(paymoney as decimal(10,4)) as 
          paymoney,dt,dn from hadoop_prod.db.dws_member ")
  }
}

3)编写 AdsIcebergService,统计指标

package com.atguigu.iceberg.warehouse.service
import com.atguigu.iceberg.warehouse.bean.QueryResult 
import com.atguigu.iceberg.warehouse.dao.DwsIcebergDao 
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{SaveMode, SparkSession}
object AdsIcebergService {
   def queryDetails(sparkSession: SparkSession, dt: String) = { 
    import   sparkSession.implicits._
     val  result  = 
DwsIcebergDao.queryDwsMemberData(sparkSession).as[QueryResult].where(s"dt='${dt}'")
result.cache()
   //统计根据 url 统计人数 wordcount result.mapPartitions(partition => {
    partition.map(item => (item.appregurl + "_" + item.dn + "_" + item.dt, 1))
}).groupByKey(_._1).mapValues(item => item._2).reduceGroups(_ + _).map(item => {
    val keys = item._1.split("_") val appregurl = keys(0)
    val dn = keys(1) val dt = keys(2)
    (appregurl, item._2, dt, dn)
 }).toDF("appregurl", "num",  "dt", "dn").writeTo("hadoop_prod.db.ads_register_appregurlnum").overwritePartitions()
    //统计各 memberlevel 等级 支付金额前三的用户
   import org.apache.spark.sql.functions._ 
  result.withColumn("rownum",row_number().over(Window.partitionBy("memberlevel").
         orderBy(desc("paymoney"))))
        .where("rownum<4").orderBy("memberlevel", "rownum")
        .select("uid", "memberlevel", "register", "appregurl", "regsourcename", 
        "adname", "sitename", "vip_level", "paymoney", "rownum", "dt", "dn")
     .writeTo("hadoop_prod.db.ads_register_top3memberpay").overwritePartitions()
   }
}

4)编写 AdsIcebergController,进行本地测试

package com.atguigu.iceberg.warehouse.controller
import com.atguigu.iceberg.warehouse.service.AdsIcebergService 
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object AdsIcebergController {
   def main(args: Array[String]): Unit = { 
      val sparkConf = new SparkConf()
     .set("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
     .set("spark.sql.catalog.hadoop_prod.type", "hadoop")
     .set("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/hive/warehouse")
     .set("spark.sql.catalog.catalog-name.type", "hadoop")
     .set("spark.sql.catalog.catalog-name.default-namespace", "default")
     .set("spark.sql.sources.partitionOverwriteMode", "dynamic")
     .set("spark.sql.session.timeZone", "GMT+8")
     .setMaster("local[*]").setAppName("dwd_app")
     val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() 
     AdsIcebergService.queryDetails(sparkSession, "20190722")
    }
}

5)查询,验证结果

4.6.5 yarn 测试

1)local 模式测试完毕后,将代码打成 jar 包,提交到集群上进行测试,那么插入模式当前都是为 overwrite 模式,所以在 yarn 上测试的时候也无需删除历史数据

2)打 jar 包之前,注意将代码中 setMast(local[*]) 注释了,把集群上有的依赖也可用<scope>provided</scope>剔除了打一个瘦包

3)打成 jar 包提交到集群,运行 spark-submit 命令运行 yarn 模式。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3
--executor-cores  4 --executor-memory 2g  --queue spark --class com.atguigu.iceberg.warehouse.controller.DwdIcebergContorller
iceberg-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar
spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3
--executor-cores  4 --executor-memory 2g  --queue spark --class com.atguigu.iceberg.warehouse.controller.DwsIcebergController
iceberg-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar
spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3
--executor-cores  4 --executor-memory 2g  --queue spark --class com.atguigu.iceberg.warehouse.controller.AdsIcebergController
iceberg-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar

相关文章
|
存储 分布式计算 监控
深入浅出 HBase 实战 | 青训营笔记
Hbase是一种NoSQL数据库,这意味着它不像传统的RDBMS数据库那样支持SQL作为查询语言。Hbase是一种分布式存储的数据库,技术上来讲,它更像是分布式存储而不是分布式数据库,它缺少很多RDBMS系统的特性,比如列类型,辅助索引,触发器,和高级查询语言等待。
988 0
深入浅出 HBase 实战 | 青训营笔记
|
1月前
|
SQL 分布式计算 数据处理
FlinkSQL开发经验分享
FlinkSQL开发经验分享
|
8月前
|
SQL 资源调度 前端开发
深度剖析Dinky源码(下)
深度剖析Dinky源码(下)
227 0
|
8月前
|
资源调度 前端开发 Java
深度剖析Dinky源码(上)
深度剖析Dinky源码
254 0
|
10月前
|
SQL 分布式计算 DataX
HIVE3 深度剖析 (下篇)
HIVE3 深度剖析 (下篇)
|
12月前
|
SQL 消息中间件 监控
|
12月前
|
分布式计算 Hadoop
|
12月前
|
SQL 分布式计算 Hadoop
|
11月前
|
Java Apache 开发工具
Flink 源码阅读环境搭建
阅读优秀的源码是提升我们代码技能最重要的手段之一,工欲善其事必先利其器,所以,搭建好源码阅读环境是我们阅读的第一步。
|
12月前
|
SQL 存储 分布式计算