spark1.4加载mysql数据 创建Dataframe及join操作连接方法问题

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: 首先我们使用新的API方法连接mysql加载数据 创建DF import org.apache.spark.sql.DataFrame import org.apache.spark.{SparkContext, SparkConf} import org.

首先我们使用新的API方法连接mysql加载数据 创建DF

import org.apache.spark.sql.DataFrame
import org.apache.spark.{SparkContext, SparkConf} 
import org.apache.spark.sql.{SaveMode, DataFrame} 
import scala.collection.mutable.ArrayBuffer 
import org.apache.spark.sql.hive.HiveContext 
import java.sql.DriverManager 
import java.sql.Connection 
val sqlContext = new HiveContext(sc)
val mySQLUrl = "jdbc:mysql://10.180.211.100:3306/appcocdb?user=appcoc&password=Asia123"

val CI_MDA_SYS_TABLE = sqlContext.jdbc(mySQLUrl,"CI_MDA_SYS_TABLE").cache()

val CI_MDA_SYS_TABLE_COLUMN = sqlContext.jdbc(mySQLUrl,"CI_MDA_SYS_TABLE_COLUMN").cache()

val CI_LABEL_EXT_INFO = sqlContext.jdbc(mySQLUrl,"CI_LABEL_EXT_INFO").cache()

val CI_LABEL_INFO = sqlContext.jdbc(mySQLUrl,"CI_LABEL_INFO").cache()

val CI_APPROVE_STATUS = sqlContext.jdbc(mySQLUrl,"CI_APPROVE_STATUS").cache()

val DIM_COC_LABEL_COUNT_RULES = sqlContext.jdbc(mySQLUrl,"DIM_COC_LABEL_COUNT_RULES").cache()

 

 

根据多表ID进行关联

val labels = CI_MDA_SYS_TABLE.join(CI_MDA_SYS_TABLE_COLUMN,CI_MDA_SYS_TABLE("TABLE_ID") === CI_MDA_SYS_TABLE_COLUMN("TABLE_ID"),"inner").cache()
labels.join(CI_LABEL_EXT_INFO,CI_MDA_SYS_TABLE_COLUMN("COLUMN_ID") === CI_LABEL_EXT_INFO("COLUMN_ID"),"inner").cache()
labels.join(CI_LABEL_INFO,CI_LABEL_EXT_INFO("LABEL_ID") === CI_LABEL_INFO("LABEL_ID"),"inner").cache()
labels.join(CI_APPROVE_STATUS,CI_LABEL_INFO("LABEL_ID") === CI_APPROVE_STATUS("RESOURCE_ID"),"inner").cache()
labels.filter(CI_APPROVE_STATUS("CURR_APPROVE_STATUS_ID") === 107 and (CI_LABEL_INFO("DATA_STATUS_ID") === 1 || CI_LABEL_INFO("DATA_STATUS_ID") === 2) and (CI_LABEL_EXT_INFO("COUNT_RULES_CODE") isNotNull) and CI_MDA_SYS_TABLE("UPDATE_CYCLE") === 1).cache()

于是噼里啪啦的报错了,在第三个join时找不到ID了,这个问题很诡异。。。:

无奈了。。于是使用官网API spark1.4的指定方法尝试

val labels = CI_MDA_SYS_TABLE.join(CI_MDA_SYS_TABLE_COLUMN,"TABLE_ID")
labels.join(CI_LABEL_EXT_INFO,"COLUMN_ID")
labels.join(CI_LABEL_INFO,"LABEL_ID")
labels.join(CI_APPROVE_STATUS).WHERE($"LABEL_ID"===$"RESOURCE_ID")

于是又噼里啪啦的,还是找不到ID。。。。

 

最后无奈。。就用原来的方法 创建软连接,加载数据,发现可以。。这我就不明白了。。。

val CI_MDA_SYS_TABLE_DDL = s"""
             CREATE TEMPORARY TABLE CI_MDA_SYS_TABLE
             USING org.apache.spark.sql.jdbc
             OPTIONS (
               url    '${mySQLUrl}',
               dbtable     'CI_MDA_SYS_TABLE'
             )""".stripMargin

     sqlContext.sql(CI_MDA_SYS_TABLE_DDL)
     val CI_MDA_SYS_TABLE = sql("SELECT * FROM CI_MDA_SYS_TABLE").cache()
    //val CI_MDA_SYS_TABLE  = sqlContext.jdbc(mySQLUrl,"CI_MDA_SYS_TABLE").cache()

    val CI_MDA_SYS_TABLE_COLUMN_DDL = s"""
            CREATE TEMPORARY TABLE CI_MDA_SYS_TABLE_COLUMN
            USING org.apache.spark.sql.jdbc
            OPTIONS (
              url    '${mySQLUrl}',
              dbtable     'CI_MDA_SYS_TABLE_COLUMN'
            )""".stripMargin

    sqlContext.sql(CI_MDA_SYS_TABLE_COLUMN_DDL)
    val CI_MDA_SYS_TABLE_COLUMN = sql("SELECT * FROM CI_MDA_SYS_TABLE_COLUMN").cache()
    //val CI_MDA_SYS_TABLE_COLUMN  = sqlContext.jdbc(mySQLUrl,"CI_MDA_SYS_TABLE_COLUMN").cache()

.........

最终问题是解决了。。可是 为什么直接加载不行呢。。还有待考究。

 

附带一个问题的解决 如果啊报这种错误

15/11/19 10:57:12 INFO BlockManagerInfo: Removed broadcast_3_piece0 on cbg6aocdp9:49897 in memory (size: 8.4 KB, free: 1060.3 MB)
15/11/19 10:57:12 INFO BlockManagerInfo: Removed broadcast_3_piece0 on cbg6aocdp5:45978 in memory (size: 8.4 KB, free: 1060.3 MB)
15/11/19 10:57:12 INFO BlockManagerInfo: Removed broadcast_2_piece0 on 10.176.238.11:38968 in memory (size: 8.2 KB, free: 4.7 GB)
15/11/19 10:57:12 INFO BlockManagerInfo: Removed broadcast_2_piece0 on cbg6aocdp4:55199 in memory (size: 8.2 KB, free: 1060.3 MB)
15/11/19 10:57:12 INFO ContextCleaner: Cleaned shuffle 0
15/11/19 10:57:12 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 10.176.238.11:38968 in memory (size: 6.5 KB, free: 4.7 GB)
15/11/19 10:57:12 INFO BlockManagerInfo: Removed broadcast_1_piece0 on cbg6aocdp8:55706 in memory (size: 6.5 KB, free: 1060.3 MB)
TARGET_TABLE_CODE:========================IT03
Exception in thread "main" java.lang.RuntimeException: Error in configuring object
        at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)
        at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)
        at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
        at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:190)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:121)
        at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:125)
        at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1269)
        at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1203)
        at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1262)
        at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:176)
        at org.apache.spark.sql.DataFrame.show(DataFrame.scala:331)
        at main.asiainfo.coc.impl.IndexMakerObj$$anonfun$makeIndexsAndLabels$1.apply(IndexMakerObj.scala:218)
        at main.asiainfo.coc.impl.IndexMakerObj$$anonfun$makeIndexsAndLabels$1.apply(IndexMakerObj.scala:137)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at main.asiainfo.coc.impl.IndexMakerObj$.makeIndexsAndLabels(IndexMakerObj.scala:137)
        at main.asiainfo.coc.CocDss$.main(CocDss.scala:23)
        at main.asiainfo.coc.CocDss.main(CocDss.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
        ... 71 more
Caused by: java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzoCodec not found.
        at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:135)
        at org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:175)
        at org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45)
        ... 76 more
Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not found
        at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2018)
        at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:128)
        ... 78 more

一看最后就知道 是hadoop数据压缩格式为lzo spark要想读取 必须引入hadoop lzo的jar包

 

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
3月前
|
关系型数据库 MySQL 索引
MySQL的全文索引查询方法
【8月更文挑战第26天】MySQL的全文索引查询方法
50 0
|
1月前
|
存储 关系型数据库 MySQL
环比、环比增长率、同比、同比增长率 ,占比,Mysql 8.0 实例(最简单的方法之一)(sample database classicmodels _No.2 )
环比、环比增长率、同比、同比增长率 ,占比,Mysql 8.0 实例(最简单的方法之一)(sample database classicmodels _No.2 )
92 1
|
1月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
49 3
|
1月前
|
存储 关系型数据库 MySQL
提高MySQL查询性能的方法有很多
提高MySQL查询性能的方法有很多
159 7
|
3月前
|
存储 关系型数据库 MySQL
mysql数据库查询时用到的分页方法有哪些
【8月更文挑战第16天】在MySQL中,实现分页的主要方法包括:1)使用`LIMIT`子句,简单直接但随页数增加性能下降;2)通过子查询优化`LIMIT`分页,提高大页码时的查询效率;3)利用存储过程封装分页逻辑,便于复用但需额外维护;4)借助MySQL变量实现,可能提供更好的性能但实现较复杂。这些方法各有优缺点,可根据实际需求选择适用方案。
369 2
|
16天前
|
关系型数据库 MySQL
Mysql 中日期比较大小的方法有哪些?
在 MySQL 中,可以通过多种方法比较日期的大小,包括使用比较运算符、NOW() 函数、DATEDIFF 函数和 DATE 函数。这些方法可以帮助你筛选出特定日期范围内的记录,确保日期格式一致以避免错误。
|
2月前
|
关系型数据库 MySQL 数据库
Python MySQL查询返回字典类型数据的方法
通过使用 `mysql-connector-python`库并选择 `MySQLCursorDict`作为游标类型,您可以轻松地将MySQL查询结果以字典类型返回。这种方式提高了代码的可读性,使得数据操作更加直观和方便。上述步骤和示例代码展示了如何实现这一功能,希望对您的项目开发有所帮助。
125 4
|
2月前
|
SQL 关系型数据库 MySQL
创建包含MySQL和SQLServer数据库所有字段类型的表的方法
创建一个既包含MySQL又包含SQL Server所有字段类型的表是一个复杂的任务,需要仔细地比较和转换数据类型。通过上述方法,可以在两个数据库系统之间建立起相互兼容的数据结构,为数据迁移和同步提供便利。这一过程不仅要考虑数据类型的直接对应,还要注意特定数据类型在不同系统中的表现差异,确保数据的一致性和完整性。
32 4
|
2月前
|
关系型数据库 MySQL Unix
MySQL配置不区分大小写的方法
结论 通过适当配置 lower_case_table_names参数以及在数据定义和查询中选择合适的校对规则,可以灵活地控制MySQL中的大小写敏感性,以适应不同的应用场景和需求。这样的设置既可以增加数据库的兼容性,又可以在必要时利用大小写敏感性进行精确的数据处理。需要注意的是,修改 lower_case_table_names参数后,最好在数据库初始化时进行,以避免现有表名的大小写问题。
283 3
|
2月前
|
存储 关系型数据库 MySQL
技术解析:MySQL中取最新一条重复数据的方法
以上提供的两种方法都可以有效地从MySQL数据库中提取每个类别最新的重复数据。选择哪种方法取决于具体的使用场景和MySQL版本。子查询加分组的方法兼容性更好,适用于所有版本的MySQL;而窗口函数方法代码更简洁,执行效率可能更高,但需要MySQL 8.0及以上版本。在实际应用中,应根据数据量大小、查询性能需求以及MySQL版本等因素综合考虑,选择最合适的实现方案。
360 6