sparksql工程小记

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,高可用系列 2核4GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介:   最近做一个oracle项目迁移工作,跟着spark架构师学着做,进行一些方法的总结。  1、首先,创建SparkSession对象(老版本为sparkContext)  val session = SparkSession.builder().appName("app1").getOrCreate()  2、数据的更新时间配置表,选用mysql,就是说每次结果数据计算写入mysql后,还会将此次数据的更新时间写入数据配置表。

  最近做一个oracle项目迁移工作,跟着spark架构师学着做,进行一些方法的总结。

  1、首先,创建SparkSession对象(老版本为sparkContext)

  val session = SparkSession.builder().appName("app1").getOrCreate()

  2、数据的更新时间配置表,选用mysql,就是说每次结果数据计算写入mysql后,还会将此次数据的更新时间写入数据配置表。 那么在代码里,需要创建配置表的case class,配置与构造数据库schema信息,url,用户名密码等,随后根据配置表中的不同app进行数据的过滤。

  val appId = "1"

  case class DBInformation(url:Stirng,schema:String,user:String,passwd:String)

  val mysqlDB = DBInformation("jdbc:mysql://....",schema,user,passowrd)

  val tableName = mysqlDB.schema + "." + name

  val props = new Properties()

  props.setProperty("user",mysqlDB.user)

  props.setProperty("password",mysqlDB.passwd)

  props.setProperty(JDBCOptions.JDBC_DRIVER_CLASS,"com.mysql.jdbc.Driver")

  val record = session.read.jdbc(mysqlDB.url,tableName,props).filter(row => row.getAs[Int]("app_id") == appId).take(1)

  //第一次写入,木有数据

  if(0 == record.size){

    DBInfoMation(null,null,null)

  }else{

    DBInfoMation(record(0).getTimestmap(1),recode(0).getTimestamp(2),recode(0)..getTimestamp(3))  

  3、注册UDF,由于原来是用oracle的语法,现如今转为sparksql,需要注册一些UDF,来兼容原有oracle的函数

  def registerUDF(session:SparkSession) : Unit = {

    session.udf.register("UDF",(value : String,modifieds:Array[String) => {

      val filter = modifieds.filter(_!=null)

      if(!filter.isEmpty){

        filter.max

      }else{

        null

      }

     })

   {

  4、很多计算是需要过往的历史数据的,在第一次初始化的时候,先对历史数据进行缓存。这里有个知识点,会将一直计算的同步数据进行checkPoint落地磁盘,如果发现历史时间在同步时间之后,则加载历史数据,否则就加载同步数据。

  val (updateTime,initData) = if(historyTime.after(syncTime)){

    (historyTime,initFromHistory(tableName))

  } else {

    (syncTime,initFromCheckPoint(syncTime))

  }

  //记录schema

  schema = initData.schema

  //baseData为缓存在内存的数据,并根据数据量进行repartition

  baseData = initData.repartition(numPartitions,_partitionColumns.map(new Column()):_*).rdd.persisit(storageLevel)

  //触发action动作

  baseData.foreach(_=>Unit)

  5、有一种情况,下游三个表要关联生成一张大表,这三张表的数据来源于消息中间件中的三个topic,但是数据可能不是同时到来,那么就需要将历史加载的大表拆根据ID拆分为三个小表,然后逐个append到三个小表上,随后再根据ID关联起来,再组成最终表。

  val table1 = new createUpdatingTable(session,"tableName1",topicConf,numPartitons,...)

  val table2 = new createUpdatingTable (session,"tableName2",topicConf1,numPartitions,...)

  val table3 = new createUpdatingTable(session,"tableName3","topicConf2,numPartitions,...)

  val mergeBaseTable = (session,"mergeTableName",Array(table1,table2,table3),finallyColumn,finallyPartitions...)

  mergeBaseTable.updateAndGetData(Some(genDataFilter(currentTime)))

  //三表拆分与合并

  val tmpPartitionKey = "pd_code"

  if(baseData != null) {

    val oldData = getOldData(baseData,keyDF.rdd,tmpPartitionKey)

    oldDf = session.createDataFrame(oldData,schema)

    .repartition(numPartitions,new Column(tmpPartitionKey))

    .persist(storageLevel)

  }

  val table1 = updateShardTable(oldDf,inDfs(0)...).sparksession.createDataFrame(data,schema)

  val table2 = ....

  val table3 = ....

  

  6、三表key进行合并,通过sql进行三来源表合并

  val keySet = keys.collect()

  val broadcastKeys = session.sparkContext.broadCast(keySet)

  baseData.mapPartitions({iter =>

    val set = broadcastKey.value.toSet

    iter.filter(row=>set.contains(row.getAs[Any](keyCol)))

  },true)

  val sql ="select a.column,b.column,c.column.... from table1 a left join table2 b on a.pd_code = b.pd_code......

  val finallyTable = session.sql(sql)

 

  7、从历史数据中筛选出此次需要更新的数据(通过ID进行过滤),随后将新数据进行append

  val new Data = baseData.zipPartitions(updateData,true){case(liter,riter)=>

    val rset = new mutable.HashSet[Any]

    for(row <- riter){

      rset.add(row.getAs[Any](keyCol))

    }

    liter.filter(row=>!rset.contains(row.getAs[Any](keyCol))))

    }.zipPartitions(updateData,true){case (liter,riter)=>

      liter++riter

    }.persisit(storageLevel)

  

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
目录
相关文章
|
安全 Java 项目管理
云效常见问题之maven私有仓库迁移如何解决
云效(CloudEfficiency)是阿里云提供的一套软件研发效能平台,旨在通过工程效能、项目管理、质量保障等工具与服务,帮助企业提高软件研发的效率和质量。本合集是云效使用中可能遇到的一些常见问题及其答案的汇总。
371 0
|
SQL 分布式计算 监控
Hive性能优化之计算Job执行优化 2
Hive性能优化之计算Job执行优化
340 1
|
存储 资源调度 分布式计算
CDP中配置Apache Hadoop Yarn的安全性
CDP中配置Hadoop Yarn的安全性。
849 0
CDP中配置Apache Hadoop Yarn的安全性
|
10月前
|
存储 人工智能 运维
龙蜥副理事长张东:加速推进 AI+OS 深度融合,打造最 AI 的服务器操作系统
操作系统如何满足 AI 应用场景需求?未来发展趋势如何?
|
数据挖掘 API 开发者
​Email API有哪些,最好的3个API接口有哪些
Email API如SendGrid、Mailgun和AOKSend是企业自动化邮件通信的关键工具。它们提供邮件发送、接收和管理功能,提升效率,优化客户体验。SendGrid以其高可靠性、强大分析和易于集成备受青睐;Mailgun以灵活性和高发送率著称;而AOKSend则以其高效、详细分析和易用性脱颖而出。通过使用这些API,企业能实现定制化邮件服务,跟踪性能,提升邮件营销效果。
|
消息中间件 分布式计算 Kafka
Flink 1.16.2 版本在流式读取 Iceberg upsert primary key 表方面存在一些限制
,Flink 1.16.2 版本在流式读取 Iceberg upsert primary key 表方面存在一些限制
226 2
|
SQL 数据库 流计算
在Flink CDC中处理分区表
在Flink CDC中处理分区表
414 1
|
开发框架 缓存 JavaScript
阿里巴巴加入 Eclipse 基金会,开源一站式 Java 应用诊断平台 -- Eclipse Jifa
### 前言 Java 作为一门主流的编程语言, 在业界拥有着丰富的工具帮助开发者排查与定位研发过程中遇到的各类疑难问题。早在多年前, 阿里巴巴就研发了一款 Java 应用的在线问题诊断平台 -- ZProfiler, 致力于帮助研发同学快速定位生产环境中频繁遇到的 Java 问题, 例如 OOM 异常。 在内部, ZProfiler 平台被研发同学广泛使用, 协助他们解决了大量的生产问题
3868 0
阿里巴巴加入 Eclipse 基金会,开源一站式 Java 应用诊断平台 -- Eclipse Jifa
|
存储 供应链 安全
SaaS+PaaS服务模式,如何担起大型企业转型重任
SaaS+PaaS服务模式,如何担起大型企业转型重任
|
运维 监控 Dubbo
ZooKeeper 避坑实践:如何调优 jute.maxbuffer
本篇文章就深入 ZooKeeper 源码,一起探究一下ZooKeeper 的 jute.maxbuffer 参数的最佳实践。
ZooKeeper 避坑实践:如何调优 jute.maxbuffer

热门文章

最新文章