Iceberg实战踩坑指南(三)

简介: Iceberg实战踩坑指南

3)编写 DwdIcebergService

package com.atguigu.iceberg.warehouse.service
import java.sql.Timestamp import java.time.LocalDate
import java.time.format.DateTimeFormatter
import com.atguigu.iceberg.warehouse.bean.{BaseWebsite, MemberRegType, VipLevel} import org.apache.spark.sql.SparkSession
object DwdIcebergService {
  def readOdsData(sparkSession: SparkSession) = { 
    import  org.apache.spark.sql.functions._ 
    import sparkSession.implicits._ 
    sparkSession.read.json("/ods/baseadlog.log")
    .withColumn("adid", col("adid").cast("Int"))
    .writeTo("hadoop_prod.db.dwd_base_ad").overwritePartitions()
    sparkSession.read.json("/ods/baswewebsite.log").map(item => { val createtime = item.getAs[String]("createtime")
    val str = LocalDate.parse(createtime, DateTimeFormatter.ofPattern("yyyy-MM-dd")).atStartOfDay().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) 
    BaseWebsite(item.getAs[String]("siteid").toInt, item.getAs[String]("sitename"),
    item.getAs[String]("siteurl"), item.getAs[String]("delete").toInt, 
    Timestamp.valueOf(str), item.getAs[String]("creator"), item.getAs[String]("dn"))
  }).writeTo("hadoop_prod.db.dwd_base_website").overwritePartitions()
sparkSession.read.json("/ods/member.log").drop("dn")
.withColumn("uid", col("uid").cast("int"))
.withColumn("ad_id", col("ad_id").cast("int"))
.writeTo("hadoop_prod.db.dwd_member").overwritePartitions()
sparkSession.read.json("/ods/memberRegtype.log").drop("domain").drop("dn")
.withColumn("regsourcename", col("regsource"))
.map(item => {
val createtime = item.getAs[String]("createtime")
val str = LocalDate.parse(createtime, DateTimeFormatter.ofPattern("yyyy-MM-dd")).atStartOfDay().
format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) MemberRegType(item.getAs[String]("uid").toInt, item.getAs[String]("appkey"),
item.getAs[String]("appregurl"), item.getAs[String]("bdp_uuid"), Timestamp.valueOf(str), item.getAs[String]("isranreg"), item.getAs[String]("regsource"), item.getAs[String]("regsourcename"), item.getAs[String]("websiteid").toInt, item.getAs[String]("dt"))
}).writeTo("hadoop_prod.db.dwd_member_regtype").overwritePartitions()
sparkSession.read.json("/ods/pcenterMemViplevel.log").drop("discountval")
.map(item => {
  val startTime = item.getAs[String]("start_time") val endTime = item.getAs[String]("end_time")
  val last_modify_time = item.getAs[String]("last_modify_time")
  val startTimeStr = LocalDate.parse(startTime, DateTimeFormatter.ofPattern("yyyy-MM- 
 dd")).atStartOfDay().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
  val endTimeStr = LocalDate.parse(endTime, DateTimeFormatter.ofPattern("yyyy-MM-dd")).atStartOfDay().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
  val last_modify_timeStr = LocalDate.parse(last_modify_time, 
  DateTimeFormatter.ofPattern("yyyy-MM-dd")).atStartOfDay().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) 
  VipLevel(item.getAs[String]("vip_id").toInt, item.getAs[String]("vip_level"),
  Timestamp.valueOf(startTimeStr), Timestamp.valueOf(endTimeStr), 
  Timestamp.valueOf(last_modify_timeStr),
  item.getAs[String]("max_free"), item.getAs[String]("min_free"), 
  item.getAs[String]("next_level"), item.getAs[String]("operator"), 
  item.getAs[String]("dn"))
}).writeTo("hadoop_prod.db.dwd_vip_level").overwritePartitions()
  sparkSession.read.json("/ods/pcentermempaymoney.log")
   .withColumn("uid", col("uid").cast("int"))
   .withColumn("siteid", col("siteid").cast("int"))
   .withColumn("vip_id", col("vip_id").cast("int"))
   .writeTo("hadoop_prod.db.dwd_pcentermempaymoney").overwritePartitions()
 }
}

4).编写 DwdIcebergController

package com.atguigu.iceberg.warehouse.controller
import com.atguigu.iceberg.warehouse.service.DwdIcebergService import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object DwdIcebergController {
  def main(args: Array[String]): Unit = { 
     val sparkConf = new SparkConf()
    .set("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
    .set("spark.sql.catalog.hadoop_prod.type", "hadoop")
    .set("spark.sql.catalog.hadoop_prod.warehouse",
"hdfs://mycluster/hive/warehouse")
    .set("spark.sql.catalog.catalog-name.type", "hadoop")
    .set("spark.sql.catalog.catalog-name.default-namespace", "default")
    .set("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .set("spark.sql.session.timeZone", "GMT+8")
    .setMaster("local[*]").setAppName("table_operations")
    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() 
     DwdIcebergService.readOdsData(sparkSession)
  }
}

4.6.4.2 dws 层(表指定多个分区列会有 bug)

1)创建 case class

case class DwsMember(
  uid: Int, ad_id: Int,
  fullname: String, iconurl: String, lastlogin: String, mailaddr: String, 
  memberlevel: String, password: String, paymoney: String, phone: String,
  qq: String, register: String,
  regupdatetime: String, unitname: String, userip: String, zipcode: String, appkey: 
  String, appregurl: String, bdp_uuid: String, reg_createtime: String, isranreg: 
  String, regsource: String, regsourcename: String, adname:  String, siteid: 
  String, sitename: String, siteurl: String, site_delete: String,
  site_createtime: String, site_creator: String, vip_id: String, vip_level: String, 
  vip_start_time: String, vip_end_time: String,
  vip_last_modify_time: String, vip_max_free: String, vip_min_free: String, 
  vip_next_level: String, vip_operator: String,
  dt: String, dn: String
)
case class DwsMember_Result(
  uid: Int, ad_id: Int,
  fullname: String, iconurl: String, lastlogin: String, mailaddr: String, 
  memberlevel: String,
  password: String,
  paymoney: String, phone: String, qq: String, register: String,
  regupdatetime: String, unitname: String, userip: String, zipcode: String, appkey: 
  String, appregurl: String, bdp_uuid: String,
  reg_createtime: Timestamp, isranreg: String, regsource: String, regsourcename: 
  String, adname: String,
  siteid: Int, sitename: String, siteurl: String, site_delete: String,
  site_createtime: String, site_creator: String, vip_id: Int,
  vip_level: String, vip_start_time: Timestamp, vip_end_time: Timestamp, 
  vip_last_modify_time: Timestamp, vip_max_free: String, vip_min_free: String, 
  vip_next_level: String, vip_operator: String,
  dt: String, dn: String
)

2)创建 DwdIcebergDao 操作六张基础表

package com.atguigu.iceberg.warehouse.dao 
import org.apache.spark.sql.SparkSession
object DwDIcebergDao {
    def getDwdMember(sparkSession: SparkSession) = {
       sparkSession.sql("selectuid,ad_id 
      ,birthday,email,fullname,iconurl,lastlogin,mailaddr,memberlevel," + 
      "password,phone,qq,register,regupdatetime,unitname,userip,zipcode,dt
      from hadoop_prod.db.dwd_member")
    }
}
def getDwdPcentermempaymoney(sparkSession: SparkSession) = {
      sparkSession.sql("select  uid,paymoney,siteid,vip_id,dt,dn
      hadoop_prod.db.dwd_pcentermempaymoney")
}
def getDwdVipLevel(sparkSession: SparkSession) = {
      sparkSession.sql("select vip_id,vip_level,start_time as 
      vip_start_time,end_time as vip_end_time," +
      "last_modify_time as vip_last_modify_time,max_free as vip_max_free,min_free 
      as vip_min_free,next_level as vip_next_level," +
      "operator as vip_operator,dn from hadoop_prod.db.dwd_vip_level")
}
def getDwdBaseWebsite(sparkSession: SparkSession) = {
   sparkSession.sql("select siteid,sitename,siteurl,delete as 
   site_delete,createtime as site_createtime,creator as site_creator" +
   ",dn from hadoop_prod.db.dwd_base_website")
}
def getDwdMemberRegtyp(sparkSession: SparkSession) = {
   sparkSession.sql("select uid,appkey,appregurl,bdp_uuid,createtime  as 
   reg_createtime,isranreg,regsource,regsourcename,websiteid," +
   "dt from hadoop_prod.db.dwd_member_regtype")
}
def getDwdBaseAd(sparkSession: SparkSession) = {
   sparkSession.sql("select adid as ad_id,adname,dn from 
   hadoop_prod.db.dwd_base_ad;")
}

3)编写 DwsIcebergService,处理业务

package com.atguigu.iceberg.warehouse.service
import java.sql.Timestamp import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import com.atguigu.iceberg.warehouse.bean.{DwsMember, DwsMember_Result} import com.atguigu.iceberg.warehouse.dao.DwDIcebergDao
import org.apache.spark.sql.SparkSession
object DwsIcebergService {
   def getDwsMemberData(sparkSession: SparkSession, dt: String) = { 
     import sparkSession.implicits._
     val  dwdPcentermempaymoney = 
     DwDIcebergDao.getDwdPcentermempaymoney(sparkSession).where($"dt" === dt)
     val dwdVipLevel = DwDIcebergDao.getDwdVipLevel(sparkSession)
     val dwdMember = DwDIcebergDao.getDwdMember(sparkSession).where($"dt" === dt) 
     val dwdBaseWebsite = DwDIcebergDao.getDwdBaseWebsite(sparkSession)
     val dwdMemberRegtype = 
     DwDIcebergDao.getDwdMemberRegtyp(sparkSession).where($"dt" ===dt)
     val dwdBaseAd = DwDIcebergDao.getDwdBaseAd(sparkSession)
     val result = dwdMember.join(dwdMemberRegtype.drop("dt"), Seq("uid"), "left")
     .join(dwdPcentermempaymoney.drop("dt"), Seq("uid"), "left")
     .join(dwdBaseAd, Seq("ad_id", "dn"), "left")
     .join(dwdBaseWebsite, Seq("siteid", "dn"), "left")
     .join(dwdVipLevel, Seq("vip_id", "dn"), "left_outer")
      .select("uid", "ad_id", "fullname", "iconurl", "lastlogin", "mailaddr", 
     "memberlevel","password", "paymoney", "phone", "qq", 
     "register","regupdatetime", "unitname", "userip", "zipcode", "appkey",  
     "appregurl", "bdp_uuid", "reg_createtime", "isranreg", "regsource", 
     "regsourcename", "adname", "siteid", "sitename", "siteurl", "site_delete", 
     "site_createtime", "site_creator", "vip_id", "vip_level",
     "vip_start_time",  "vip_end_time", "vip_last_modify_time", "vip_max_free", 
     "vip_min_free", "vip_next_level", "vip_operator", "dt", "dn").as[DwsMember]
     val resultData = result.groupByKey(item => item.uid + "_" + item.dn)
     .mapGroups { 
        case (key, iters) => val keys = key.split("_")
        val uid = Integer.parseInt(keys(0)) val dn = keys(1)
        val dwsMembers = iters.toList
        val paymoney  =  
dwsMembers.filter(_.paymoney!=null).map(item=>BigDecimal.apply(item.paymoney)).reduceOption(_   +
_).getOrElse(BigDecimal.apply(0.00)).toString   
        val ad_id = dwsMembers.map(_.ad_id).head    
        val fullname = dwsMembers.map(_.fullname).head    
        val icounurl = dwsMembers.map(_.iconurl).head   
        val lastlogin = dwsMembers.map(_.lastlogin).head    
        val mailaddr = dwsMembers.map(_.mailaddr).head    
        val memberlevel = dwsMembers.map(_.memberlevel).head    
        val password = dwsMembers.map(_.password).head    
        val phone = dwsMembers.map(_.phone).head    
        val qq = dwsMembers.map(_.qq).head    
        val register = dwsMembers.map(_.register).head    
        val regupdatetime = dwsMembers.map(_.regupdatetime).head    
        val unitname = dwsMembers.map(_.unitname).head    
        val userip = dwsMembers.map(_.userip).head    
        val zipcode = dwsMembers.map(_.zipcode).head    
        val appkey = dwsMembers.map(_.appkey).head    
        val appregurl = dwsMembers.map(_.appregurl).head    
        val bdp_uuid = dwsMembers.map(_.bdp_uuid).head    
        val reg_createtime  = if  (dwsMembers.map(_.reg_createtime).head  !=  null)
        dwsMembers.map(_.reg_createtime).head else "1970-01-01 00:00:00" val isranreg = 
        dwsMembers.map(_.isranreg).head
        val regsource = dwsMembers.map(_.regsource).head
        val regsourcename = dwsMembers.map(_.regsourcename).head val adname = 
        dwsMembers.map(_.adname).head
        val siteid  = if  (dwsMembers.map(_.siteid).head  !=  null) 
        dwsMembers.map(_.siteid).head else "0"
        val sitename = dwsMembers.map(_.sitename).head val siteurl = 
        dwsMembers.map(_.siteurl).head
        val site_delete = dwsMembers.map(_.site_delete).head
        val site_createtime = dwsMembers.map(_.site_createtime).head val site_creator = 
        dwsMembers.map(_.site_creator).head
        val vip_id  = if  (dwsMembers.map(_.vip_id).head  !=  null) 
        dwsMembers.map(_.vip_id).head else "0"
        val vip_level = dwsMembers.map(_.vip_level).max
        val vip_start_time  = if  (dwsMembers.map(_.vip_start_time).min !=  null) 
        dwsMembers.map(_.vip_start_time).min else "1970-01-01 00:00:00"
        val vip_end_time  = if  (dwsMembers.map(_.vip_end_time).max !=  null) 
        dwsMembers.map(_.vip_end_time).max else "1970-01-01 00:00:00"
        val vip_last_modify_time = if (dwsMembers.map(_.vip_last_modify_time).max != null)
        dwsMembers.map(_.vip_last_modify_time).max else "1970-01-01 00:00:00"
        val vip_max_free =  dwsMembers.map(_.vip_max_free).head val vip_min_free =  
        dwsMembers.map(_.vip_min_free).head val vip_next_level = 
        dwsMembers.map(_.vip_next_level).head val vip_operator = 
        dwsMembers.map(_.vip_operator).head
        val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
        val reg_createtimeStr = LocalDateTime.parse(reg_createtime, formatter); val 
        vip_start_timeStr = LocalDateTime.parse(vip_start_time, formatter) val 
        vip_end_timeStr = LocalDateTime.parse(vip_end_time, formatter)
        val vip_last_modify_timeStr = LocalDateTime.parse(vip_last_modify_time, formatter) 
        DwsMember_Result(uid, ad_id, fullname, icounurl, lastlogin, mailaddr, 
        memberlevel,password, paymoney,phone, qq, register, regupdatetime, 
        unitname,userip, zipcode, appkey, 
        appregurl,bdp_uuid,Timestamp.valueOf(reg_createtimeStr),  isranreg,  
        regsource,regsourcename, adname, siteid.toInt,
        sitename, siteurl, site_delete, site_createtime, site_creator, vip_id.toInt, 
        vip_level,Timestamp.valueOf(vip_start_timeStr), 
        Timestamp.valueOf(vip_end_timeStr), 
        Timestamp.valueOf(vip_last_modify_timeStr), vip_max_free, vip_min_free,
                      vip_next_level, vip_operator, dt, dn)
     }
   resultData.write.format("iceberg").mode("overwrite").
    save("hadoop_prod.db.dws_member")
   }
}

4)编写 DwsIcebergController,进行运行测试

package com.atguigu.iceberg.warehouse.controller
import com.atguigu.iceberg.warehouse.service.DwsIcebergService 
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object DwsIcebergController {
   def main(args: Array[String]): Unit = { 
     val sparkConf = new SparkConf()
    .set("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
    .set("spark.sql.catalog.hadoop_prod.type", "hadoop")
    .set("spark.sql.catalog.hadoop_prod.warehouse", 
      "hdfs://mycluster/hive/warehouse")
    .set("spark.sql.catalog.catalog-name.type", "hadoop")
    .set("spark.sql.catalog.catalog-name.default-namespace", "default")
    .set("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .set("spark.sql.session.timeZone", "GMT+8")
    .setMaster("local[*]").setAppName("table_operations")
    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() 
    DwsIcebergService.getDwsMemberData(sparkSession, "20190722")
   }
}

5)发生报错,和上面在 spark sql 黑窗口测试的错误一致,当有批量数据插入分区时提示分区已关闭无法插入

6)重新建表,分区列去掉 dn,只用 dt,bug:不能指定多个分区,只能指定一个分区列

spark-sql (default)> drop table hadoop_prod.db.dws_member; create table hadoop_prod.db.dws_member(
  uid int, ad_id int,
  fullname string, iconurl string, lastlogin string, mailaddr string, memberlevel 
  string, password string, paymoney string, phone string,
  register string, regupdatetime string, unitname string, userip string, zipcode 
  string, appkey string, appregurl string, bdp_uuid string,
  reg_createtime timestamp, isranreg string, regsource string, regsourcename 
  string, adname string,
  siteid int, sitename string, siteurl string, site_delete string,
  site_createtime string, site_creator string, vip_id int,
  vip_level string, vip_start_time timestamp, vip_end_time timestamp, 
  vip_last_modify_time timestamp, vip_max_free string, vip_min_free string, 
  vip_next_level string, vip_operator string,
dt string, dn string)
using iceberg
partitioned by(dt);

7)建完表后,重新测试,插入数据成功

4.6.4.3 ads 层

1)编写所需 case class

case class QueryResult(
  uid: Int, ad_id: Int,
  memberlevel: String, register: String,
  appregurl: String, //注册来源url
  regsource: String, regsourcename: String,
  adname: String, siteid: String, sitename: String, vip_level: String, paymoney: 
  BigDecimal, dt: String,
  dn: String
)

2)编写 DwsIcebergDao,查询宽表

package com.atguigu.iceberg.warehouse.dao 
import org.apache.spark.sql.SparkSession
object DwsIcebergDao {
   /**
    *查询用户宽表数据
    *
    *@param sparkSession
    *@return
    */
  def queryDwsMemberData(sparkSession: SparkSession) = { 
         sparkSession.sql("select
          uid,ad_id,memberlevel,register,appregurl,regsource,regsourcename,adname," 
          + "siteid,sitename,vip_level,cast(paymoney as decimal(10,4)) as 
          paymoney,dt,dn from hadoop_prod.db.dws_member ")
  }
}

3)编写 AdsIcebergService,统计指标

package com.atguigu.iceberg.warehouse.service
import com.atguigu.iceberg.warehouse.bean.QueryResult 
import com.atguigu.iceberg.warehouse.dao.DwsIcebergDao 
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{SaveMode, SparkSession}
object AdsIcebergService {
   def queryDetails(sparkSession: SparkSession, dt: String) = { 
    import   sparkSession.implicits._
     val  result  = 
DwsIcebergDao.queryDwsMemberData(sparkSession).as[QueryResult].where(s"dt='${dt}'")
result.cache()
   //统计根据 url 统计人数 wordcount result.mapPartitions(partition => {
    partition.map(item => (item.appregurl + "_" + item.dn + "_" + item.dt, 1))
}).groupByKey(_._1).mapValues(item => item._2).reduceGroups(_ + _).map(item => {
    val keys = item._1.split("_") val appregurl = keys(0)
    val dn = keys(1) val dt = keys(2)
    (appregurl, item._2, dt, dn)
 }).toDF("appregurl", "num",  "dt", "dn").writeTo("hadoop_prod.db.ads_register_appregurlnum").overwritePartitions()
    //统计各 memberlevel 等级 支付金额前三的用户
   import org.apache.spark.sql.functions._ 
  result.withColumn("rownum",row_number().over(Window.partitionBy("memberlevel").
         orderBy(desc("paymoney"))))
        .where("rownum<4").orderBy("memberlevel", "rownum")
        .select("uid", "memberlevel", "register", "appregurl", "regsourcename", 
        "adname", "sitename", "vip_level", "paymoney", "rownum", "dt", "dn")
     .writeTo("hadoop_prod.db.ads_register_top3memberpay").overwritePartitions()
   }
}

4)编写 AdsIcebergController,进行本地测试

package com.atguigu.iceberg.warehouse.controller
import com.atguigu.iceberg.warehouse.service.AdsIcebergService 
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object AdsIcebergController {
   def main(args: Array[String]): Unit = { 
      val sparkConf = new SparkConf()
     .set("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
     .set("spark.sql.catalog.hadoop_prod.type", "hadoop")
     .set("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/hive/warehouse")
     .set("spark.sql.catalog.catalog-name.type", "hadoop")
     .set("spark.sql.catalog.catalog-name.default-namespace", "default")
     .set("spark.sql.sources.partitionOverwriteMode", "dynamic")
     .set("spark.sql.session.timeZone", "GMT+8")
     .setMaster("local[*]").setAppName("dwd_app")
     val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() 
     AdsIcebergService.queryDetails(sparkSession, "20190722")
    }
}

5)查询,验证结果

4.6.5 yarn 测试

1)local 模式测试完毕后,将代码打成 jar 包,提交到集群上进行测试,那么插入模式当前都是为 overwrite 模式,所以在 yarn 上测试的时候也无需删除历史数据

2)打 jar 包之前,注意将代码中 setMast(local[*]) 注释了,把集群上有的依赖也可用<scope>provided</scope>剔除了打一个瘦包

3)打成 jar 包提交到集群,运行 spark-submit 命令运行 yarn 模式。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3
--executor-cores  4 --executor-memory 2g  --queue spark --class com.atguigu.iceberg.warehouse.controller.DwdIcebergContorller
iceberg-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar
spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3
--executor-cores  4 --executor-memory 2g  --queue spark --class com.atguigu.iceberg.warehouse.controller.DwsIcebergController
iceberg-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar
spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3
--executor-cores  4 --executor-memory 2g  --queue spark --class com.atguigu.iceberg.warehouse.controller.AdsIcebergController
iceberg-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar

相关文章
|
6月前
|
存储 消息中间件 分布式计算
flink的常见知识点总结(一)
flink的常见知识点总结(一)
|
Java Apache 开发工具
Flink 源码阅读环境搭建
阅读优秀的源码是提升我们代码技能最重要的手段之一,工欲善其事必先利其器,所以,搭建好源码阅读环境是我们阅读的第一步。
|
SQL 存储 分布式计算
|
资源调度 Kubernetes 网络协议
一文搞懂Flink架构与任务编排|青训营笔记
本文主要讲述Flink的整体架构,以及流处理任务涉及的各个算子的调度编排机制。为模仿实现一个简易流处理引擎作下铺垫。
554 0
一文搞懂Flink架构与任务编排|青训营笔记
|
存储 SQL 资源调度
Flink 内核原理与实现-入门
Flink 内核原理与实现-入门
298 0
Flink 内核原理与实现-入门
|
存储 分布式计算 算法
2022年Flink面试题整理
JobManager扮演着集群中的管理者Master的角色,它是整个集群的协调者,负责接收Flink Job,协调检查点,Failover 故障恢复等,同时管理Flink集群中从节点TaskManager。
2400 0
|
存储 SQL 分布式计算
Flink 引擎简介 | 青训营笔记
从产品技术来看,Flink 具备如下流计算技术特征:完全一次保证:故障后应正确恢复有状态运算符中的状态;低延迟:越低越好。许多应用程序需要亚秒级延迟;高吞吐量:随着数据速率的增长,通过管道推送大量数据至关重要;强大的计算模型:框架应该提供一种编程模型,该模型不限制用户并允许各种各样的应用程序在没有故障的情况下,容错机制的开销很低;流量控制:来自慢速算子的反压应该由系统和数据源自然吸收,以避免因消费者缓慢而导致崩溃或降低性能;乱序数据的支持:支持由于其他原因导致的数据乱序达到、延迟到达后,计算出正确的结果;完备的流式语义:支持窗口等现代流式处理语义抽象;
217 0
Flink 引擎简介 | 青训营笔记
下一篇
无影云桌面