大数据Spark External DataSource 2

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 大数据Spark External DataSource

7 rdbms 数据

回顾在SparkCore中读取MySQL表的数据通过JdbcRDD来读取的,在SparkSQL模块中提供对应接口,提供三种方式读取数据:

  • 方式一:单分区模式

  • 方式二:多分区模式,可以设置列的名称,作为分区字段及列的值范围和分区数目

  • 方式三:高度自由分区模式,通过设置条件语句设置分区数据及各个分区数据范围

当加载读取RDBMS表的数据量不大时,可以直接使用单分区模式加载;当数据量很多时,考

虑使用多分区及自由分区方式加载。

从RDBMS表中读取数据,需要设置连接数据库相关信息,基本属性选项如下:

范例演示:以MySQL数据库为例,加载订单表so数据,首先添加数据库驱动依赖包:

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.19</version>
</dependency>

完整演示代码如下:

import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
 * 使用SparkSession从RDBMS 表中读取数据,此处以MySQL数据库为例
 */
object SparkSQLMySQL {
  def main(args: Array[String]): Unit = {
    // 在SparkSQL中,程序的同一入口为SparkSession实例对象,构建采用是建造者模式
    val spark: SparkSession = SparkSession.builder()
      .master("local[4]")
      .appName("SparkSQLMySQL")
      .config("spark.sql.shuffle.partitions", "4")
      .getOrCreate()
    // 导入隐式转换
    import spark.implicits._
    // 连接数据库三要素信息
    val url: String = "jdbc:mysql://node1.oldlu.cn:3306/?serverTimezone=UTC&characterEncoding=ut
    f8 & useUnicode = true"
      val table: String = "db_shop.so"
      // 存储用户和密码等属性
      val props: Properties = new Properties ()
      props.put ("driver", "com.mysql.cj.jdbc.Driver")
      props.put ("user", "root")
      props.put ("password", "123456")
      // TODO: 从MySQL数据库表:销售订单表 so
      // def jdbc(url: String, table: String, properties: Properties): DataFrame
      val sosDF: DataFrame = spark.read.jdbc (url, table, props)
      println (s"Count = ${sosDF.count()}"
    )
    sosDF.printSchema()
    sosDF.show(10, truncate = false)
    // 关闭资源
    spark.stop()
  }
}

可以使用option方法设置连接数据库信息,而不使用Properties传递,代码如下:

// TODO: 使用option设置参数
val dataframe: DataFrame = spark.read
  .format("jdbc")
  .option("driver", "com.mysql.cj.jdbc.Driver")
  .option("url", "jdbc:mysql://node1.oldlu.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true"
    .option("user", "root")
    .option("password", "123456")
    .option("dbtable", "db_shop.so")
    .load()
    dataframe.show(5, truncate = false)

7.1 获取table数据,自动添加数据库前缀

    /**
     * 初始化表配置
     */
    //    private void initTables() {
    //        String env = spark.conf().get("spark.datai.env", "test");
    //        this.tables = new ConfigLoader().getTables(env);
    //    }
    protected static Properties getProperties() {
        InputStream in = AbstractOfflineAnalysis.class.getClassLoader().getResourceAsStream("offline_dev.properties");//获取文件路径
        Properties myProperties = new Properties();
        try {
            myProperties.load(in);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return myProperties;
    }
    /**
     * 获取table数据,自动添加数据库前缀
     *
     * @param name
     *
     * @return
     */
    public Dataset<Row> table(String name) {
        if ("mysql".equals(properties.getProperty("db_from"))) {
            String url = properties.getProperty("url_from");
            String db = name.split("\\.")[0];
            name = name.split("\\.")[1];
            if (db.equalsIgnoreCase("iot")) {
                url = url.replace("dbName", "IoT_test");
            } else if (db.equalsIgnoreCase("common")) {
                url = url.replace("dbName", "common_test");
            }
            return spark.read().jdbc(url, "`" + name + "`", properties);
        }
        return spark.table(name);
    }

7.2 查询全部案例

    @Override
    public Dataset<Row> getDsAll() {
        Dataset<Row> account = table("iot.account");
        Dataset<Row> rechargeCard = table("iot.recharge_card")
                .select(
                        col("bind_account_id"),
                        col("number")
                );
        account = account.join(rechargeCard, account.col("id").equalTo(rechargeCard.col("bind_account_id")),"left")
                .withColumn("user_id", when(account.col("user_id").isNull(), rechargeCard.col("number")).otherwise(account.col("user_id")));
        Dataset<Row> dsAll = account
                .where(account.col("account_status").equalTo(0L))
                .select(
                        account.col("id"),
                        account.col("project_id"),
                        account.col("user_id"),
                        account.col("balance"),
                        account.col("given"),
                        account.col("created_date")
                );
        return dsAll;
    }

7.3 分析某一字段下的统计

    @Override
    public Dataset<Row> analysis(String start, String end, String dateTag, int days, Dataset<Row> dsAll) {
        dsAll = dsAll.where(col("created_date").between(start, end));
        Column[] colBusinessId = {
                col("project_id")
        };
        Dataset<Row> result = dsAll
                .groupBy(colBusinessId)
                .agg(
                        round(sum(col("balance")), 2).as("sum_balance"),//总余额
                        round(sum(col("given")), 2).as("sum_given"),//总余额
                        countDistinct(col("user_id")).as("user_count"),//用户数
                        countDistinct(when(col("balance").equalTo(0L), col("user_id"))).as("zero_balance_count"),//0余额用户数
                        countDistinct(when(col("given").equalTo(0L), col("user_id"))).as("zero_given_count")//0积分用户数
                )
                .withColumn("date", lit(dateTag))
                .withColumn("avg_user_balance", round(col("sum_balance").divide(col("user_count")), 2))//人均余额
                .withColumn("avg_user_given", round(col("sum_given").divide(col("user_count")), 2));//人均积分
        return result;
    }
}

8 hive 数据

Spark SQL模块从发展来说,从Apache Hive框架而来,发展历程:Hive(MapReduce)-> Shark

(Hive on Spark) -> Spark SQL(SchemaRDD -> DataFrame -> Dataset),所以SparkSQL天然无

缝集成Hive,可以加载Hive表数据进行分析。

官方文档:http://spark.apache.org/docs/2.4.5/sql-data-sources-hive-tables.html

8.1 spark-shell 集成 Hive

  • 第一步、当编译Spark源码时,需要指定集成Hive,命令如下:

官方文档: http://spark.apache.org/docs/2.4.5/building-spark.html#building-with-hive-and-jdbc-support

第二步、SparkSQL集成Hive本质就是:读取Hive框架元数据MetaStore,此处启动Hive MetaStore

  • 服务即可。
  1. Hive 元数据MetaStore读取方式:JDBC连接四要素和HiveMetaStore服务
  2. 启动Hive MetaStore 服务,脚本【metastore-start.sh】内容如下:
#!/bin/sh
HIVE_HOME=/export/server/hive
## 启动服务的时间
DATE_STR=`/bin/date '+%Y%m%d%H%M%S'`
# 日志文件名称(包含存储路径)
HIVE_SERVER2_LOG=${HIVE_HOME}/hivemetastore-${DATE_STR}.log
## 启动服务
/usr/bin/nohup ${HIVE_HOME}/bin/hive --service metastore > ${HIVE_SERVER2_LOG} 2>&1 &
  • 第三步、连接HiveMetaStore服务配置文件hive-site.xml,放于【$SPARK_HOME/conf】目录
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://node1.oldlu.cn:9083</value>
</property>
</configuration>

将hive-site.xml配置发送到集群中所有Spark按照配置目录,此时任意机器启动应用都可以访问

Hive表数据。

  • 第四步、案例演示,读取Hive中db_hive.emp表数据,分析数据
  1. 其一、读取表的数据,使用DSL分析
  2. 其二、直接编写SQL语句

  3. 复杂SQL分析语句执行:
spark.sql("select e.ename, e.sal, d.dname from db_hive.emp e join db_hive.dept d on e.deptno = d.dept
no").show()

8.2 IDEA 集成 Hive

在IDEA中开发应用,集成Hive,读取表的数据进行分析,构建SparkSession时需要设置

HiveMetaStore服务器地址及集成Hive选项,首先添加MAVEN依赖包:

<!-- Spark SQL 与 Hive 集成 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>

范例演示代码如下:

import org.apache.spark.sql.SparkSession
/**
 * SparkSQL集成Hive,读取Hive表的数据进行分析
 */
object SparkSQLHive {
  def main(args: Array[String]): Unit = {
    // TODO: 构建SparkSession实例对象
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[4]")
      .config("spark.sql.shuffle.partitions", "4")
      // 指定Hive MetaStore服务地址
      .config("hive.metastore.uris", "thrift://node1.oldlu.cn:9083")
      // TODO: 表示集成Hive,读取Hive表的数据
      .enableHiveSupport()
      .getOrCreate()
    // 导入隐式转换
    import spark.implicits._
    // 导入函数库
    import org.apache.spark.sql.functions._
    // TODO: 读取Hive表的数据
    spark.sql(
      """
        |SELECT deptno, ROUND(AVG(sal), 2) AS avg_sal FROM db_hive.emp GROUP BY deptno
""".stripMargin)
      .show(10, truncate = false)
    println("===========================================================")
    import org.apache.spark.sql.functions._
    spark.read
      .table("db_hive.emp")
      .groupBy($"deptno")
      .agg(round(avg($"sal"), 2).alias("avg_sal"))
      .show(10, truncate = false)
    // 应用结束,关闭资源
    spark.stop()
  }
}

运行程序结果如下:

8.3 Hive写入到Spark

package com.oldlu.datai;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.event.Level;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
/**
 * @author :oldlu
 * @date :Created in 2020/12/2 10:07
 * @description:
 * @modified By:
 */
public class HiveExport {
    private void saveDataset(String[] tables, Properties mysqlProperties) {
        SparkSession spark = SparkSession.builder()
                .config("spark.local.dir", "/tmp/spark-tmp")
                .master("local")
                .appName("toplion.iot")
                .enableHiveSupport()
                .getOrCreate();
        spark.sparkContext().setLogLevel(Level.ERROR.toString());
        spark.sql("use iot");
        for (String table : tables) {
            Dataset<Row> tableDS = spark.table(table);
            tableDS.write().mode(SaveMode.Overwrite).jdbc(mysqlProperties.getProperty("url"), table, mysqlProperties);
            tableDS.show(999);
        }
    }
    public void start() {
        InputStream in = this.getClass().getClassLoader().getResourceAsStream("jdbc.properties");//获取文件路径
        Properties mysqlProperties = new Properties();
        try {
            mysqlProperties.load(in);
            String[] tables = {"account_s", "consume_s", "device_s", "repair_s", "users_s","feedback_s", "recharge_s"};
            //String[] tables = {"consume_s","account_s"};
            saveDataset(tables, mysqlProperties);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
28天前
|
机器学习/深度学习 分布式计算 大数据
Spark 适合解决多种类型的大数据处理问题
【9月更文挑战第1天】Spark 适合解决多种类型的大数据处理问题
35 3
|
1月前
|
分布式计算 大数据 Apache
跨越界限:当.NET遇上Apache Spark,大数据世界的新篇章如何谱写?
【8月更文挑战第28天】随着信息时代的发展,大数据已成为推动企业决策、科研与技术创新的关键力量。Apache Spark凭借其卓越的分布式计算能力和多功能数据处理特性,在大数据领域占据重要地位。然而,对于.NET开发者而言,如何在Spark生态中发挥自身优势成为一个新课题。为此,微软与Apache Spark社区共同推出了.NET for Apache Spark,使开发者能用C#、F#等语言编写Spark应用,不仅保留了Spark的强大功能,还融合了.NET的强类型系统、丰富库支持及良好跨平台能力,极大地降低了学习门槛并拓展了.NET的应用范围。
49 3
|
1月前
|
分布式计算 大数据 数据处理
Apache Spark的应用与优势:解锁大数据处理的无限潜能
【8月更文挑战第23天】Apache Spark以其卓越的性能、易用性、通用性、弹性与可扩展性以及丰富的生态系统,在大数据处理领域展现出了强大的竞争力和广泛的应用前景。随着大数据技术的不断发展和普及,Spark必将成为企业实现数字化转型和业务创新的重要工具。未来,我们有理由相信,Spark将继续引领大数据处理技术的发展潮流,为企业创造更大的价值。
|
1月前
|
存储 分布式计算 供应链
Spark在供应链核算中应用问题之调整Spark读取ODPS离线表分区大小如何解决
Spark在供应链核算中应用问题之调整Spark读取ODPS离线表分区大小如何解决
|
28天前
|
Java Spring API
Spring框架与GraphQL的史诗级碰撞:颠覆传统,重塑API开发的未来传奇!
【8月更文挑战第31天】《Spring框架与GraphQL:构建现代API》介绍了如何结合Spring框架与GraphQL构建高效、灵活的API。首先通过引入`spring-boot-starter-data-graphql`等依赖支持GraphQL,然后定义查询和类型,利用`@GraphQLQuery`等注解实现具体功能。Spring的依赖注入和事务管理进一步增强了GraphQL服务的能力。示例展示了从查询到突变的具体实现,证明了Spring与GraphQL结合的强大潜力,适合现代API设计与开发。
52 0
|
1月前
|
大数据 RDMA
神龙大数据加速引擎MRACC问题之MRACC-Spark利用eRDMA近网络优化插件来提升性能如何解决
神龙大数据加速引擎MRACC问题之MRACC-Spark利用eRDMA近网络优化插件来提升性能如何解决
33 0
|
23天前
|
存储 大数据 数据挖掘
【数据新纪元】Apache Doris:重塑实时分析性能,解锁大数据处理新速度,引爆数据价值潜能!
【9月更文挑战第5天】Apache Doris以其卓越的性能、灵活的架构和高效的数据处理能力,正在重塑实时分析的性能极限,解锁大数据处理的新速度,引爆数据价值的无限潜能。在未来的发展中,我们有理由相信Apache Doris将继续引领数据处理的潮流,为企业提供更快速、更准确、更智能的数据洞察和决策支持。让我们携手并进,共同探索数据新纪元的无限可能!
72 11
|
28天前
|
存储 分布式计算 大数据
MaxCompute 数据分区与生命周期管理
【8月更文第31天】随着大数据分析需求的增长,如何高效地管理和组织数据变得至关重要。阿里云的 MaxCompute(原名 ODPS)是一个专为海量数据设计的计算服务,它提供了丰富的功能来帮助用户管理和优化数据。本文将重点讨论 MaxCompute 中的数据分区策略和生命周期管理方法,并通过具体的代码示例来展示如何实施这些策略。
71 1
|
1月前
数据平台问题之在数据影响决策的过程中,如何实现“决策/行动”阶段
数据平台问题之在数据影响决策的过程中,如何实现“决策/行动”阶段
|
1月前
|
存储 监控 安全
大数据架构设计原则:构建高效、可扩展与安全的数据生态系统
【8月更文挑战第23天】大数据架构设计是一个复杂而系统的工程,需要综合考虑业务需求、技术选型、安全合规等多个方面。遵循上述设计原则,可以帮助企业构建出既高效又安全的大数据生态系统,为业务创新和决策支持提供强有力的支撑。随着技术的不断发展和业务需求的不断变化,持续优化和调整大数据架构也将成为一项持续的工作。