大数据Spark External DataSource 2

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 大数据Spark External DataSource

7 rdbms 数据

回顾在SparkCore中读取MySQL表的数据通过JdbcRDD来读取的,在SparkSQL模块中提供对应接口,提供三种方式读取数据:

  • 方式一:单分区模式

  • 方式二:多分区模式,可以设置列的名称,作为分区字段及列的值范围和分区数目

  • 方式三:高度自由分区模式,通过设置条件语句设置分区数据及各个分区数据范围

当加载读取RDBMS表的数据量不大时,可以直接使用单分区模式加载;当数据量很多时,考

虑使用多分区及自由分区方式加载。

从RDBMS表中读取数据,需要设置连接数据库相关信息,基本属性选项如下:

范例演示:以MySQL数据库为例,加载订单表so数据,首先添加数据库驱动依赖包:

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.19</version>
</dependency>

完整演示代码如下:

import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
 * 使用SparkSession从RDBMS 表中读取数据,此处以MySQL数据库为例
 */
object SparkSQLMySQL {
  def main(args: Array[String]): Unit = {
    // 在SparkSQL中,程序的同一入口为SparkSession实例对象,构建采用是建造者模式
    val spark: SparkSession = SparkSession.builder()
      .master("local[4]")
      .appName("SparkSQLMySQL")
      .config("spark.sql.shuffle.partitions", "4")
      .getOrCreate()
    // 导入隐式转换
    import spark.implicits._
    // 连接数据库三要素信息
    val url: String = "jdbc:mysql://node1.oldlu.cn:3306/?serverTimezone=UTC&characterEncoding=ut
    f8 & useUnicode = true"
      val table: String = "db_shop.so"
      // 存储用户和密码等属性
      val props: Properties = new Properties ()
      props.put ("driver", "com.mysql.cj.jdbc.Driver")
      props.put ("user", "root")
      props.put ("password", "123456")
      // TODO: 从MySQL数据库表:销售订单表 so
      // def jdbc(url: String, table: String, properties: Properties): DataFrame
      val sosDF: DataFrame = spark.read.jdbc (url, table, props)
      println (s"Count = ${sosDF.count()}"
    )
    sosDF.printSchema()
    sosDF.show(10, truncate = false)
    // 关闭资源
    spark.stop()
  }
}

可以使用option方法设置连接数据库信息,而不使用Properties传递,代码如下:

// TODO: 使用option设置参数
val dataframe: DataFrame = spark.read
  .format("jdbc")
  .option("driver", "com.mysql.cj.jdbc.Driver")
  .option("url", "jdbc:mysql://node1.oldlu.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true"
    .option("user", "root")
    .option("password", "123456")
    .option("dbtable", "db_shop.so")
    .load()
    dataframe.show(5, truncate = false)

7.1 获取table数据,自动添加数据库前缀

    /**
     * 初始化表配置
     */
    //    private void initTables() {
    //        String env = spark.conf().get("spark.datai.env", "test");
    //        this.tables = new ConfigLoader().getTables(env);
    //    }
    protected static Properties getProperties() {
        InputStream in = AbstractOfflineAnalysis.class.getClassLoader().getResourceAsStream("offline_dev.properties");//获取文件路径
        Properties myProperties = new Properties();
        try {
            myProperties.load(in);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return myProperties;
    }
    /**
     * 获取table数据,自动添加数据库前缀
     *
     * @param name
     *
     * @return
     */
    public Dataset<Row> table(String name) {
        if ("mysql".equals(properties.getProperty("db_from"))) {
            String url = properties.getProperty("url_from");
            String db = name.split("\\.")[0];
            name = name.split("\\.")[1];
            if (db.equalsIgnoreCase("iot")) {
                url = url.replace("dbName", "IoT_test");
            } else if (db.equalsIgnoreCase("common")) {
                url = url.replace("dbName", "common_test");
            }
            return spark.read().jdbc(url, "`" + name + "`", properties);
        }
        return spark.table(name);
    }

7.2 查询全部案例

    @Override
    public Dataset<Row> getDsAll() {
        Dataset<Row> account = table("iot.account");
        Dataset<Row> rechargeCard = table("iot.recharge_card")
                .select(
                        col("bind_account_id"),
                        col("number")
                );
        account = account.join(rechargeCard, account.col("id").equalTo(rechargeCard.col("bind_account_id")),"left")
                .withColumn("user_id", when(account.col("user_id").isNull(), rechargeCard.col("number")).otherwise(account.col("user_id")));
        Dataset<Row> dsAll = account
                .where(account.col("account_status").equalTo(0L))
                .select(
                        account.col("id"),
                        account.col("project_id"),
                        account.col("user_id"),
                        account.col("balance"),
                        account.col("given"),
                        account.col("created_date")
                );
        return dsAll;
    }

7.3 分析某一字段下的统计

    @Override
    public Dataset<Row> analysis(String start, String end, String dateTag, int days, Dataset<Row> dsAll) {
        dsAll = dsAll.where(col("created_date").between(start, end));
        Column[] colBusinessId = {
                col("project_id")
        };
        Dataset<Row> result = dsAll
                .groupBy(colBusinessId)
                .agg(
                        round(sum(col("balance")), 2).as("sum_balance"),//总余额
                        round(sum(col("given")), 2).as("sum_given"),//总余额
                        countDistinct(col("user_id")).as("user_count"),//用户数
                        countDistinct(when(col("balance").equalTo(0L), col("user_id"))).as("zero_balance_count"),//0余额用户数
                        countDistinct(when(col("given").equalTo(0L), col("user_id"))).as("zero_given_count")//0积分用户数
                )
                .withColumn("date", lit(dateTag))
                .withColumn("avg_user_balance", round(col("sum_balance").divide(col("user_count")), 2))//人均余额
                .withColumn("avg_user_given", round(col("sum_given").divide(col("user_count")), 2));//人均积分
        return result;
    }
}

8 hive 数据

Spark SQL模块从发展来说,从Apache Hive框架而来,发展历程:Hive(MapReduce)-> Shark

(Hive on Spark) -> Spark SQL(SchemaRDD -> DataFrame -> Dataset),所以SparkSQL天然无

缝集成Hive,可以加载Hive表数据进行分析。

官方文档:http://spark.apache.org/docs/2.4.5/sql-data-sources-hive-tables.html

8.1 spark-shell 集成 Hive

  • 第一步、当编译Spark源码时,需要指定集成Hive,命令如下:

官方文档: http://spark.apache.org/docs/2.4.5/building-spark.html#building-with-hive-and-jdbc-support

第二步、SparkSQL集成Hive本质就是:读取Hive框架元数据MetaStore,此处启动Hive MetaStore

  • 服务即可。
  1. Hive 元数据MetaStore读取方式:JDBC连接四要素和HiveMetaStore服务
  2. 启动Hive MetaStore 服务,脚本【metastore-start.sh】内容如下:
#!/bin/sh
HIVE_HOME=/export/server/hive
## 启动服务的时间
DATE_STR=`/bin/date '+%Y%m%d%H%M%S'`
# 日志文件名称(包含存储路径)
HIVE_SERVER2_LOG=${HIVE_HOME}/hivemetastore-${DATE_STR}.log
## 启动服务
/usr/bin/nohup ${HIVE_HOME}/bin/hive --service metastore > ${HIVE_SERVER2_LOG} 2>&1 &
  • 第三步、连接HiveMetaStore服务配置文件hive-site.xml,放于【$SPARK_HOME/conf】目录
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://node1.oldlu.cn:9083</value>
</property>
</configuration>

将hive-site.xml配置发送到集群中所有Spark按照配置目录,此时任意机器启动应用都可以访问

Hive表数据。

  • 第四步、案例演示,读取Hive中db_hive.emp表数据,分析数据
  1. 其一、读取表的数据,使用DSL分析
  2. 其二、直接编写SQL语句

  3. 复杂SQL分析语句执行:
spark.sql("select e.ename, e.sal, d.dname from db_hive.emp e join db_hive.dept d on e.deptno = d.dept
no").show()

8.2 IDEA 集成 Hive

在IDEA中开发应用,集成Hive,读取表的数据进行分析,构建SparkSession时需要设置

HiveMetaStore服务器地址及集成Hive选项,首先添加MAVEN依赖包:

<!-- Spark SQL 与 Hive 集成 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>

范例演示代码如下:

import org.apache.spark.sql.SparkSession
/**
 * SparkSQL集成Hive,读取Hive表的数据进行分析
 */
object SparkSQLHive {
  def main(args: Array[String]): Unit = {
    // TODO: 构建SparkSession实例对象
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[4]")
      .config("spark.sql.shuffle.partitions", "4")
      // 指定Hive MetaStore服务地址
      .config("hive.metastore.uris", "thrift://node1.oldlu.cn:9083")
      // TODO: 表示集成Hive,读取Hive表的数据
      .enableHiveSupport()
      .getOrCreate()
    // 导入隐式转换
    import spark.implicits._
    // 导入函数库
    import org.apache.spark.sql.functions._
    // TODO: 读取Hive表的数据
    spark.sql(
      """
        |SELECT deptno, ROUND(AVG(sal), 2) AS avg_sal FROM db_hive.emp GROUP BY deptno
""".stripMargin)
      .show(10, truncate = false)
    println("===========================================================")
    import org.apache.spark.sql.functions._
    spark.read
      .table("db_hive.emp")
      .groupBy($"deptno")
      .agg(round(avg($"sal"), 2).alias("avg_sal"))
      .show(10, truncate = false)
    // 应用结束,关闭资源
    spark.stop()
  }
}

运行程序结果如下:

8.3 Hive写入到Spark

package com.oldlu.datai;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.event.Level;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
/**
 * @author :oldlu
 * @date :Created in 2020/12/2 10:07
 * @description:
 * @modified By:
 */
public class HiveExport {
    private void saveDataset(String[] tables, Properties mysqlProperties) {
        SparkSession spark = SparkSession.builder()
                .config("spark.local.dir", "/tmp/spark-tmp")
                .master("local")
                .appName("toplion.iot")
                .enableHiveSupport()
                .getOrCreate();
        spark.sparkContext().setLogLevel(Level.ERROR.toString());
        spark.sql("use iot");
        for (String table : tables) {
            Dataset<Row> tableDS = spark.table(table);
            tableDS.write().mode(SaveMode.Overwrite).jdbc(mysqlProperties.getProperty("url"), table, mysqlProperties);
            tableDS.show(999);
        }
    }
    public void start() {
        InputStream in = this.getClass().getClassLoader().getResourceAsStream("jdbc.properties");//获取文件路径
        Properties mysqlProperties = new Properties();
        try {
            mysqlProperties.load(in);
            String[] tables = {"account_s", "consume_s", "device_s", "repair_s", "users_s","feedback_s", "recharge_s"};
            //String[] tables = {"consume_s","account_s"};
            saveDataset(tables, mysqlProperties);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
5天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
5天前
|
SQL 分布式计算 大数据
MaxCompute产品使用合集之MaxCompute 支持 SHOW EXTERNAL TABLE 这样的语句吗
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
5天前
|
分布式计算 DataWorks 大数据
MaxCompute操作报错合集之大数据计算的MaxCompute Spark引擎无法读取到表,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
MaxCompute操作报错合集之大数据计算的MaxCompute Spark引擎无法读取到表,是什么原因
|
5天前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之spark3.1.1通过resource目录下的conf文件配置,报错如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
5天前
|
分布式计算 大数据 数据处理
[AIGC大数据基础] Spark 入门
[AIGC大数据基础] Spark 入门
143 0
|
5天前
|
分布式计算 大数据 Java
Spark 大数据实战:基于 RDD 的大数据处理分析
Spark 大数据实战:基于 RDD 的大数据处理分析
142 0
|
5天前
|
分布式计算 大数据 BI
MaxCompute产品使用合集之MaxCompute项目的数据是否可以被接入到阿里云的Quick BI中
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
5天前
|
SQL 分布式计算 大数据
MaxCompute产品使用合集之怎样可以将大数据计算MaxCompute表的数据可以导出为本地文件
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
5天前
|
分布式计算 DataWorks 数据库
DataWorks操作报错合集之DataWorks使用数据集成整库全增量同步oceanbase数据到odps的时候,遇到报错,该怎么处理
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
25 0

热门文章

最新文章