大数据Spark External DataSource 2

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 大数据Spark External DataSource

7 rdbms 数据

回顾在SparkCore中读取MySQL表的数据通过JdbcRDD来读取的,在SparkSQL模块中提供对应接口,提供三种方式读取数据:

  • 方式一:单分区模式

  • 方式二:多分区模式,可以设置列的名称,作为分区字段及列的值范围和分区数目

  • 方式三:高度自由分区模式,通过设置条件语句设置分区数据及各个分区数据范围

当加载读取RDBMS表的数据量不大时,可以直接使用单分区模式加载;当数据量很多时,考

虑使用多分区及自由分区方式加载。

从RDBMS表中读取数据,需要设置连接数据库相关信息,基本属性选项如下:

范例演示:以MySQL数据库为例,加载订单表so数据,首先添加数据库驱动依赖包:

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.19</version>
</dependency>

完整演示代码如下:

import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
 * 使用SparkSession从RDBMS 表中读取数据,此处以MySQL数据库为例
 */
object SparkSQLMySQL {
  def main(args: Array[String]): Unit = {
    // 在SparkSQL中,程序的同一入口为SparkSession实例对象,构建采用是建造者模式
    val spark: SparkSession = SparkSession.builder()
      .master("local[4]")
      .appName("SparkSQLMySQL")
      .config("spark.sql.shuffle.partitions", "4")
      .getOrCreate()
    // 导入隐式转换
    import spark.implicits._
    // 连接数据库三要素信息
    val url: String = "jdbc:mysql://node1.oldlu.cn:3306/?serverTimezone=UTC&characterEncoding=ut
    f8 & useUnicode = true"
      val table: String = "db_shop.so"
      // 存储用户和密码等属性
      val props: Properties = new Properties ()
      props.put ("driver", "com.mysql.cj.jdbc.Driver")
      props.put ("user", "root")
      props.put ("password", "123456")
      // TODO: 从MySQL数据库表:销售订单表 so
      // def jdbc(url: String, table: String, properties: Properties): DataFrame
      val sosDF: DataFrame = spark.read.jdbc (url, table, props)
      println (s"Count = ${sosDF.count()}"
    )
    sosDF.printSchema()
    sosDF.show(10, truncate = false)
    // 关闭资源
    spark.stop()
  }
}

可以使用option方法设置连接数据库信息,而不使用Properties传递,代码如下:

// TODO: 使用option设置参数
val dataframe: DataFrame = spark.read
  .format("jdbc")
  .option("driver", "com.mysql.cj.jdbc.Driver")
  .option("url", "jdbc:mysql://node1.oldlu.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true"
    .option("user", "root")
    .option("password", "123456")
    .option("dbtable", "db_shop.so")
    .load()
    dataframe.show(5, truncate = false)

7.1 获取table数据,自动添加数据库前缀

    /**
     * 初始化表配置
     */
    //    private void initTables() {
    //        String env = spark.conf().get("spark.datai.env", "test");
    //        this.tables = new ConfigLoader().getTables(env);
    //    }
    protected static Properties getProperties() {
        InputStream in = AbstractOfflineAnalysis.class.getClassLoader().getResourceAsStream("offline_dev.properties");//获取文件路径
        Properties myProperties = new Properties();
        try {
            myProperties.load(in);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return myProperties;
    }
    /**
     * 获取table数据,自动添加数据库前缀
     *
     * @param name
     *
     * @return
     */
    public Dataset<Row> table(String name) {
        if ("mysql".equals(properties.getProperty("db_from"))) {
            String url = properties.getProperty("url_from");
            String db = name.split("\\.")[0];
            name = name.split("\\.")[1];
            if (db.equalsIgnoreCase("iot")) {
                url = url.replace("dbName", "IoT_test");
            } else if (db.equalsIgnoreCase("common")) {
                url = url.replace("dbName", "common_test");
            }
            return spark.read().jdbc(url, "`" + name + "`", properties);
        }
        return spark.table(name);
    }

7.2 查询全部案例

    @Override
    public Dataset<Row> getDsAll() {
        Dataset<Row> account = table("iot.account");
        Dataset<Row> rechargeCard = table("iot.recharge_card")
                .select(
                        col("bind_account_id"),
                        col("number")
                );
        account = account.join(rechargeCard, account.col("id").equalTo(rechargeCard.col("bind_account_id")),"left")
                .withColumn("user_id", when(account.col("user_id").isNull(), rechargeCard.col("number")).otherwise(account.col("user_id")));
        Dataset<Row> dsAll = account
                .where(account.col("account_status").equalTo(0L))
                .select(
                        account.col("id"),
                        account.col("project_id"),
                        account.col("user_id"),
                        account.col("balance"),
                        account.col("given"),
                        account.col("created_date")
                );
        return dsAll;
    }

7.3 分析某一字段下的统计

    @Override
    public Dataset<Row> analysis(String start, String end, String dateTag, int days, Dataset<Row> dsAll) {
        dsAll = dsAll.where(col("created_date").between(start, end));
        Column[] colBusinessId = {
                col("project_id")
        };
        Dataset<Row> result = dsAll
                .groupBy(colBusinessId)
                .agg(
                        round(sum(col("balance")), 2).as("sum_balance"),//总余额
                        round(sum(col("given")), 2).as("sum_given"),//总余额
                        countDistinct(col("user_id")).as("user_count"),//用户数
                        countDistinct(when(col("balance").equalTo(0L), col("user_id"))).as("zero_balance_count"),//0余额用户数
                        countDistinct(when(col("given").equalTo(0L), col("user_id"))).as("zero_given_count")//0积分用户数
                )
                .withColumn("date", lit(dateTag))
                .withColumn("avg_user_balance", round(col("sum_balance").divide(col("user_count")), 2))//人均余额
                .withColumn("avg_user_given", round(col("sum_given").divide(col("user_count")), 2));//人均积分
        return result;
    }
}

8 hive 数据

Spark SQL模块从发展来说,从Apache Hive框架而来,发展历程:Hive(MapReduce)-> Shark

(Hive on Spark) -> Spark SQL(SchemaRDD -> DataFrame -> Dataset),所以SparkSQL天然无

缝集成Hive,可以加载Hive表数据进行分析。

官方文档:http://spark.apache.org/docs/2.4.5/sql-data-sources-hive-tables.html

8.1 spark-shell 集成 Hive

  • 第一步、当编译Spark源码时,需要指定集成Hive,命令如下:

官方文档: http://spark.apache.org/docs/2.4.5/building-spark.html#building-with-hive-and-jdbc-support

第二步、SparkSQL集成Hive本质就是:读取Hive框架元数据MetaStore,此处启动Hive MetaStore

  • 服务即可。
  1. Hive 元数据MetaStore读取方式:JDBC连接四要素和HiveMetaStore服务
  2. 启动Hive MetaStore 服务,脚本【metastore-start.sh】内容如下:
#!/bin/sh
HIVE_HOME=/export/server/hive
## 启动服务的时间
DATE_STR=`/bin/date '+%Y%m%d%H%M%S'`
# 日志文件名称(包含存储路径)
HIVE_SERVER2_LOG=${HIVE_HOME}/hivemetastore-${DATE_STR}.log
## 启动服务
/usr/bin/nohup ${HIVE_HOME}/bin/hive --service metastore > ${HIVE_SERVER2_LOG} 2>&1 &
  • 第三步、连接HiveMetaStore服务配置文件hive-site.xml,放于【$SPARK_HOME/conf】目录
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://node1.oldlu.cn:9083</value>
</property>
</configuration>

将hive-site.xml配置发送到集群中所有Spark按照配置目录,此时任意机器启动应用都可以访问

Hive表数据。

  • 第四步、案例演示,读取Hive中db_hive.emp表数据,分析数据
  1. 其一、读取表的数据,使用DSL分析
  2. 其二、直接编写SQL语句

  3. 复杂SQL分析语句执行:
spark.sql("select e.ename, e.sal, d.dname from db_hive.emp e join db_hive.dept d on e.deptno = d.dept
no").show()

8.2 IDEA 集成 Hive

在IDEA中开发应用,集成Hive,读取表的数据进行分析,构建SparkSession时需要设置

HiveMetaStore服务器地址及集成Hive选项,首先添加MAVEN依赖包:

<!-- Spark SQL 与 Hive 集成 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>

范例演示代码如下:

import org.apache.spark.sql.SparkSession
/**
 * SparkSQL集成Hive,读取Hive表的数据进行分析
 */
object SparkSQLHive {
  def main(args: Array[String]): Unit = {
    // TODO: 构建SparkSession实例对象
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[4]")
      .config("spark.sql.shuffle.partitions", "4")
      // 指定Hive MetaStore服务地址
      .config("hive.metastore.uris", "thrift://node1.oldlu.cn:9083")
      // TODO: 表示集成Hive,读取Hive表的数据
      .enableHiveSupport()
      .getOrCreate()
    // 导入隐式转换
    import spark.implicits._
    // 导入函数库
    import org.apache.spark.sql.functions._
    // TODO: 读取Hive表的数据
    spark.sql(
      """
        |SELECT deptno, ROUND(AVG(sal), 2) AS avg_sal FROM db_hive.emp GROUP BY deptno
""".stripMargin)
      .show(10, truncate = false)
    println("===========================================================")
    import org.apache.spark.sql.functions._
    spark.read
      .table("db_hive.emp")
      .groupBy($"deptno")
      .agg(round(avg($"sal"), 2).alias("avg_sal"))
      .show(10, truncate = false)
    // 应用结束,关闭资源
    spark.stop()
  }
}

运行程序结果如下:

8.3 Hive写入到Spark

package com.oldlu.datai;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.event.Level;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
/**
 * @author :oldlu
 * @date :Created in 2020/12/2 10:07
 * @description:
 * @modified By:
 */
public class HiveExport {
    private void saveDataset(String[] tables, Properties mysqlProperties) {
        SparkSession spark = SparkSession.builder()
                .config("spark.local.dir", "/tmp/spark-tmp")
                .master("local")
                .appName("toplion.iot")
                .enableHiveSupport()
                .getOrCreate();
        spark.sparkContext().setLogLevel(Level.ERROR.toString());
        spark.sql("use iot");
        for (String table : tables) {
            Dataset<Row> tableDS = spark.table(table);
            tableDS.write().mode(SaveMode.Overwrite).jdbc(mysqlProperties.getProperty("url"), table, mysqlProperties);
            tableDS.show(999);
        }
    }
    public void start() {
        InputStream in = this.getClass().getClassLoader().getResourceAsStream("jdbc.properties");//获取文件路径
        Properties mysqlProperties = new Properties();
        try {
            mysqlProperties.load(in);
            String[] tables = {"account_s", "consume_s", "device_s", "repair_s", "users_s","feedback_s", "recharge_s"};
            //String[] tables = {"consume_s","account_s"};
            saveDataset(tables, mysqlProperties);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
3月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
231 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
3月前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
154 6
|
3月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
183 2
|
3月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
178 1
|
3月前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
3月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
97 1
|
27天前
|
SQL 数据可视化 大数据
从数据小白到大数据达人:一步步成为数据分析专家
从数据小白到大数据达人:一步步成为数据分析专家
210 92
|
3月前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
729 7
|
3月前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
97 2
|
1月前
|
分布式计算 Shell MaxCompute
odps测试表及大量数据构建测试
odps测试表及大量数据构建测试