Spark SQL实战(08)-整合Hive

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: Apache Spark 是一个快速、可扩展的分布式计算引擎,而 Hive 则是一个数据仓库工具,它提供了数据存储和查询功能。在 Spark 中使用 Hive 可以提高数据处理和查询的效率。

1 整合原理及使用


Apache Spark 是一个快速、可扩展的分布式计算引擎,而 Hive 则是一个数据仓库工具,它提供了数据存储和查询功能。在 Spark 中使用 Hive 可以提高数据处理和查询的效率。


场景

历史原因积累下来的,很多数据原先是采用Hive来进行处理的,现想改用Spark操作数据,须要求Spark能够无缝对接已有的Hive的数据,实现平滑过渡。


MetaStore

Hive底层的元数据信息是存储在MySQL中,$HIVE_HOME/conf/hive-site.xml


Spark若能直接访问MySQL中已有的元数据信息 $SPARK_HOME/conf/hive-site.xml


前置条件

在使用 Spark 整合 Hive 之前,需要安装配置以下软件:


Hadoop:用于数据存储和分布式计算。

Hive:用于数据存储和查询。

Spark:用于分布式计算。

整合 Hive

在 Spark 中使用 Hive,需要将 Hive 的依赖库添加到 Spark 的类路径中。在 Java 代码中,可以使用 SparkConf 对象来设置 Spark 应用程序的配置。下面是一个示例代码:


import org.apache.spark.SparkConf;

import org.apache.spark.sql.SparkSession;

public class SparkHiveIntegration {

   public static void main(String[] args) {

       SparkConf conf = new SparkConf()

               .setAppName("SparkHiveIntegration")

               .setMaster("local[*]")

               .set("spark.sql.warehouse.dir", "/user/hive/warehouse");

       SparkSession spark = SparkSession.builder()

               .config(conf)

               .enableHiveSupport()

               .getOrCreate();

       spark.sql("SELECT * FROM mytable").show();

       spark.stop();

   }

}


在上面的代码中,首先创建了一个 SparkConf 对象,设置了应用程序的名称、运行模式以及 Hive 的元数据存储路径。然后,创建了一个 SparkSession 对象,启用了 Hive 支持。最后,使用 Spark SQL 查询语句查询了一个名为 mytable 的 Hive 表,并将结果打印出来。最后,停止了 SparkSession 对象。


需要注意的是,Spark SQL 语法与 Hive SQL 语法略有不同,可以参考 Spark SQL 官方文档。


2 ThiriftServer使用


javaedge@JavaEdgedeMac-mini sbin % pwd

/Users/javaedge/Downloads/soft/spark-2.4.3-bin-2.6.0-cdh5.15.1/sbin

javaedge@JavaEdgedeMac-mini sbin % ./start-thriftserver.sh --master local --jars /Users/javaedge/.m2/repository/mysql/mysql-connector-java/8.0.15/mysql-connector-java-8.0.15.jar

starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2, logging to /Users/javaedge/Downloads/soft/spark-2.4.3-bin-2.6.0-cdh5.15.1/logs/spark-javaedge-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-JavaEdgedeMac-mini.local.out



14.png


beeline

内置了一个客户端工具:


javaedge@JavaEdgedeMac-mini bin % ./beeline -u jdbc:hive2://localhost:10000

Connecting to jdbc:hive2://localhost:10000

log4j:WARN No appenders could be found for logger (org.apache.hive.jdbc.Utils).

log4j:WARN Please initialize the log4j system properly.

log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

Connected to: Spark SQL (version 2.4.3)

Driver: Hive JDBC (version 1.2.1.spark2)

Transaction isolation: TRANSACTION_REPEATABLE_READ

Beeline version 1.2.1.spark2 by Apache Hive

0: jdbc:hive2://localhost:10000>


当你执行一条命令后:

13.png



就能在 Web UI 看到该命令记录:

12.png



3 通过代码访问数据


总是手敲命令行肯定太慢了,我们更多是代码访问:


package com.javaedge.bigdata.chapter06

import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}

object JDBCClientApp {

 def main(args: Array[String]): Unit = {

   Class.forName("org.apache.hive.jdbc.HiveDriver")

   val conn: Connection = DriverManager.getConnection("jdbc:hive2://localhost:10000")

   val pstmt: PreparedStatement = conn.prepareStatement("show tables")

   val rs: ResultSet = pstmt.executeQuery()

   while (rs.next()) {

     println(rs.getObject(1) + " : " + rs.getObject(2))

   }

 }

}



最后打成 jar 包,扔到服务器定时运行即可执行作业啦。


ThiriftServer V.S Spark Application 例行作业

Thrift Server 独立的服务器应用程序,它允许多个客户端通过网络协议访问其上运行的 Thrift 服务。Thrift 服务通常是由一组 Thrift 定义文件定义的,这些文件描述了可以从客户端发送到服务器的请求和响应消息的数据结构和协议。Thrift Server 可以使用各种编程语言进行开发,包括 Java、C++、Python 等,并支持多种传输和序列化格式,例如 TSocket、TFramedTransport、TBinaryProtocol 等。使用 Thrift Server,您可以轻松地创建高性能、可伸缩和跨平台的分布式应用程序。


Spark Application,基于 Apache Spark 的应用程序,它使用 Spark 编写的 API 和库来处理大规模数据集。Spark Application 可以部署在本地计算机或云环境中,并且支持各种数据源和格式,如 Hadoop 分布式文件系统(HDFS)、Apache Cassandra、Apache Kafka 等。Spark Application 可以并行处理数据集,以加快数据处理速度,并提供了广泛的机器学习算法和图形处理功能。使用 Spark Application,您可以轻松地处理海量数据,提取有价值的信息和洞察,并帮助您做出更明智的业务决策。


因此,Thrift Server 和 Spark Application 适用不同的场景和应用程序:


需要创建一个分布式服务并为多个客户端提供接口,使用 Thrift Server

需要处理大规模数据集并使用分布式计算和机器学习算法来分析数据,使用 Spark Application

4 Spark 代码访问 Hive 数据

5 Spark SQL 函数实战

parallelize

SparkContext 一个方法,将一个本地数据集转为RDD。parallelize` 方法接受一个集合作为输入参数,并根据指定的并行度创建一个新的 RDD。


语法:


// data表示要转换为 RDD 的本地集合

// numSlices表示 RDD 的分区数,通常等于集群中可用的 CPU 核心数量。

val rdd =

sc.parallelize(data, numSlices)


将一个包含整数值的本地数组转换为RDD:


import org.apache.spark.{SparkConf, SparkContext}


// 创建 SparkConf 对象

val conf = new SparkConf().setAppName("ParallelizeExample").setMaster("local[*]")


// 创建 SparkContext 对象

val sc = new SparkContext(conf)


// 定义本地序列

val data = Seq(1, 2, 3, 4, 5)


// 使用 parallelize 方法创建 RDD

val rdd = sc.parallelize(data)


// 执行转换操作

val result = rdd.map(_ * 2)


// 显示输出结果

result.foreach(println)


创建了一个包含整数值的本地序列 data,然后使用 parallelize 方法将其转换为一个 RDD。接下来,我们对 RDD 进行转换操作,并打印输出结果。


使用 parallelize 方法时,请确保正确配置 Spark 应用程序,并设置正确 CPU 核心数量和内存大小。否则,可能会导致应用程序性能下降或崩溃。


5.1 内置函数

都在这:


11.png


统计 PV、UV 实例

package com.javaedge.bigdata.chapter06

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{DataFrame, SparkSession}

/**

* 内置函数

*/

object BuiltFunctionApp {

 def main(args: Array[String]): Unit = {

   val spark: SparkSession = SparkSession.builder()

     .master("local").appName("HiveSourceApp")

     .getOrCreate()


   // day  userid

   val userAccessLog = Array(

     "2016-10-01,1122",

     "2016-10-01,1122",

     "2016-10-01,1123",

     "2016-10-01,1124",

     "2016-10-01,1124",

     "2016-10-02,1122",

     "2016-10-02,1121",

     "2016-10-02,1123",

     "2016-10-02,1123"

   )

   import spark.implicits._

   // Array ==> RDD

   val userAccessRDD: RDD[String] = spark.sparkContext.parallelize(userAccessLog)

   val userAccessDF: DataFrame = userAccessRDD.map(x => {

     val splits: Array[String] = x.split(",")

     Log(splits(0), splits(1).toInt)

   }).toDF

   userAccessDF.show()

   import org.apache.spark.sql.functions._

   // select day, count(user_id) from xxx group by day;

   userAccessDF.groupBy("day").agg(count("userId").as("pv")).show()

   userAccessDF.groupBy("day").agg(countDistinct("userId").as("uv")).show()

   spark.stop()

 }

 private case class Log(day: String, userId: Int)

}



5.2 自定义函数

package com.javaedge.bigdata.chapter06

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{DataFrame, SparkSession}

/**

* 统计每个人爱好的个数

* pk:3

* jepson: 2

*

*

* 1)定义函数

* 2)注册函数

* 3)使用函数

*/

object UDFFunctionApp {

 def main(args: Array[String]): Unit = {

   val spark: SparkSession = SparkSession.builder()

     .master("local").appName("HiveSourceApp")

     .getOrCreate()

   import spark.implicits._

   val infoRDD: RDD[String] = spark.sparkContext.textFile(

     "/Users/javaedge/Downloads/sparksql-train/data/hobbies.txt")

   val infoDF: DataFrame = infoRDD.map(_.split("###")).map(x => {

     Hobbies(x(0), x(1))

   }).toDF

   infoDF.show(false)

   // TODO... 定义函数 和 注册函数

   spark.udf.register("hobby_num", (s: String) => s.split(",").size)

   infoDF.createOrReplaceTempView("hobbies")

   //TODO... 函数的使用

   spark.sql("select name, hobbies, hobby_num(hobbies) as hobby_count from hobbies").show(false)

   // select name, hobby_num(hobbies) from xxx

   spark.stop()

 }

 private case class Hobbies(name: String, hobbies: String)

}


output:

+------+----------------------+

|name  |hobbies               |

+------+----------------------+

|pk    |jogging,Coding,cooking|

|jepson|travel,dance          |

+------+----------------------+

+------+----------------------+-----------+

|name  |hobbies               |hobby_count|

+------+----------------------+-----------+

|pk    |jogging,Coding,cooking|3          |

|jepson|travel,dance          |2          |

+------+----------------------+-----------+



6 总结


通过上述示例代码,可以看到如何在 Java 中使用 Spark 整合 Hive。通过使用 Hive 的数据存储和查询功能,可以在 Spark 中高效地处理和分析数据。当然,还有许多其他功能和配置可以使用,例如设置 Spark 应用程序的资源分配、数据分区、数据格式转换等等。


相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
2月前
|
SQL 数据库 开发者
MSSQL性能调优实战技巧:索引优化、SQL语句微调与并发控制策略
在Microsoft SQL Server(MSSQL)的管理与优化中,性能调优是一项复杂但至关重要的任务
|
12天前
|
SQL 存储 数据处理
"SQL触发器实战大揭秘:一键解锁数据自动化校验与更新魔法,让数据库管理从此告别繁琐,精准高效不再是梦!"
【8月更文挑战第31天】在数据库管理中,确保数据准确性和一致性至关重要。SQL触发器能自动执行数据校验与更新,显著提升工作效率。本文通过一个员工信息表的例子,详细介绍了如何利用触发器自动设定和校验薪资,确保其符合业务规则。提供的示例代码展示了在插入新记录时如何自动检查并调整薪资,以满足最低标准。这不仅减轻了数据库管理员的负担,还提高了数据处理的准确性和效率。触发器虽强大,但也需谨慎使用,以避免复杂性和性能问题。
23 1
|
2月前
|
SQL 安全 数据库
Python Web开发者必学:SQL注入、XSS、CSRF攻击与防御实战演练!
【7月更文挑战第26天】在 Python Web 开发中, 安全性至关重要。本文聚焦 SQL 注入、XSS 和 CSRF 这三大安全威胁,提供实战防御策略。SQL 注入可通过参数化查询和 ORM 框架来防范;XSS 则需 HTML 转义用户输入与实施 CSP;CSRF 防御依赖 CSRF 令牌和双重提交 Cookie。掌握这些技巧,能有效加固 Web 应用的安全防线。安全是持续的过程,需贯穿开发始终。
65 1
Python Web开发者必学:SQL注入、XSS、CSRF攻击与防御实战演练!
|
29天前
|
SQL 存储 分布式计算
|
29天前
|
SQL 存储 分布式计算
插入Hive表数据SQL
【8月更文挑战第10天】
|
8天前
|
SQL 安全 数据库
基于SQL Server事务日志的数据库恢复技术及实战代码详解
基于事务日志的数据库恢复技术是SQL Server中一个非常强大的功能,它能够帮助数据库管理员在数据丢失或损坏的情况下,有效地恢复数据。通过定期备份数据库和事务日志,并在需要时按照正确的步骤恢复,可以最大限度地减少数据丢失的风险。需要注意的是,恢复数据是一个需要谨慎操作的过程,建议在执行恢复操作之前,详细了解相关的操作步骤和注意事项,以确保数据的安全和完整。
19 0
|
1月前
|
SQL 物联网 数据处理
"颠覆传统,Hive SQL与Flink激情碰撞!解锁流批一体数据处理新纪元,让数据决策力瞬间爆表,你准备好了吗?"
【8月更文挑战第9天】数据时代,实时性和准确性至关重要。传统上,批处理与流处理各司其职,但Apache Flink打破了这一界限,尤其Flink与Hive SQL的结合,开创了流批一体的数据处理新时代。这不仅简化了数据处理流程,还极大提升了效率和灵活性。例如,通过Flink SQL,可以轻松实现流数据与批数据的融合分析,无需在两者间切换。这种融合不仅降低了技术门槛,还为企业提供了更强大的数据支持,无论是在金融、电商还是物联网领域,都将发挥巨大作用。
39 6
|
11天前
|
测试技术 Java
全面保障Struts 2应用质量:掌握单元测试与集成测试的关键策略
【8月更文挑战第31天】Struts 2 的测试策略结合了单元测试与集成测试。单元测试聚焦于单个组件(如 Action 类)的功能验证,常用 Mockito 模拟依赖项;集成测试则关注组件间的交互,利用 Cactus 等框架确保框架拦截器和 Action 映射等按预期工作。通过确保高测试覆盖率并定期更新测试用例,可以提升应用的整体稳定性和质量。
22 0
|
11天前
|
数据库 Java 监控
Struts 2 日志管理化身神秘魔法师,洞察应用运行乾坤,演绎奇幻篇章!
【8月更文挑战第31天】在软件开发中,了解应用运行状况至关重要。日志管理作为 Struts 2 应用的关键组件,记录着每个动作和决策,如同监控摄像头,帮助我们迅速定位问题、分析性能和使用情况,为优化提供依据。Struts 2 支持多种日志框架(如 Log4j、Logback),便于配置日志级别、格式和输出位置。通过在 Action 类中添加日志记录,我们能在开发过程中获取详细信息,及时发现并解决问题。合理配置日志不仅有助于调试,还能分析用户行为,提升应用性能和稳定性。
29 0