大数据Spark External DataSource 1

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 大数据Spark External DataSource

1 数据源与格式

在SparkSQL模块,提供一套完成API接口,用于方便读写外部数据源的的数据(从Spark 1.4版本提供),框架本身内置外部数据源:

在Spark 2.4版本中添加支持Image Source(图像数据源)和Avro Source。

数据分析处理中,数据可以分为结构化数据、非结构化数据及半结构化数据。


1)、结构化数据(Structured)

结构化数据源可提供有效的存储和性能。例如,Parquet和ORC等柱状格式使从列的子集中提取值变得更加容易。

基于行的存储格式(如Avro)可有效地序列化和存储提供存储优势的数据。然而,这些优点通常以灵活性为代价。如因结构的固定性,格式转变可能相对困难。

2)、非结构化数据(UnStructured)

相比之下,非结构化数据源通常是自由格式文本或二进制对象,其不包含标记或元数据以定义数据的结构。

报纸文章,医疗记录,图像,应用程序日志通常被视为非结构化数据。这些类型的源通常

要求数据周围的上下文是可解析的。

3)、半结构化数据(Semi-Structured)

半结构化数据源是按记录构建的,但不一定具有跨越所有记录的明确定义的全局模式。每个数据记录都使用其结构信息进行扩充。

半结构化数据格式的好处是,它们在表达数据时提供了最大的灵活性,因为每条记录都是自我描述的。但这些格式的主要缺点是它们会产生额外的解析开销,并且不是特别为ad-hoc(特定)查询而构建的。

2 加载/保存数据

SparkSQL提供一套通用外部数据源接口,方便用户从数据源加载和保存数据,例如从MySQL表中既可以加载读取数据:load/read,又可以保存写入数据:save/write。

由于SparkSQL没有内置支持从HBase表中加载和保存数据,但是只要实现外部数据源接口,也能像上面方式一样读取加载数据。

2.1 Load 加载数据

2.1.1 获取SparkSession对象

 /**
     * 获取SparkSession对象
     *
     * @return
     */
    static void createSpark() {
        properties = getProperties();
        spark = SparkSession.builder()
                .config("spark.local.dir", "/tmp/spark-tmp")
                .config("spark.driver.maxResultSize", "20g")
                .config("spark.sql.parquet.writeLegacyFormat", "true")
                .config("hbase.zookeeper.quorum", properties.getProperty("hbase.zookeeper.quorum"))
                .config("hbase.zookeeper.property.clientPort", properties.getProperty("hbase.zookeeper.property.clientPort"))
                .config("zookeeper.znode.parent", properties.getProperty("zookeeper.znode.parent"))
                .config("spark.redis.host", properties.getProperty("spark.redis.host"))
                .config("spark.redis.port", properties.getProperty("spark.redis.port"))
                .config("spark.redis.auth", properties.getProperty("spark.redis.auth"))
                .config("spark.redis.db", properties.getProperty("spark.redis.db"))
                .master("local[*]")
                .appName("toplion.offline")
                .enableHiveSupport()
                .getOrCreate();
        spark.sparkContext().setLogLevel(Level.ERROR.toString());
        spark.udf().register("removeEmoji", (UDF1<String, String>) col -> {
                    return EmojiParser.removeAllEmojis(col);
                }
                , DataTypes.StringType);
        spark.udf().register("addStr", new UDF3<Object, String, String, String>() {
                    @Override
                    public String call(Object o, String head, String tail) throws Exception {
                        return head + o.toString() + tail;
                    }
                }
                , DataTypes.StringType);
        spark.udf().register("addUUID", new UDF0() {
                    @Override
                    public String call() throws Exception {
                        return UUID.randomUUID().toString();
                    }
                }
                , DataTypes.StringType);
        SparkConf sparkConf = spark.sparkContext().getConf();
        redisConfig = RedisConfig.fromSparkConf(sparkConf);
        readWriteConfig =ReadWriteConfig.fromSparkConf(sparkConf);
        jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
        redisContext = new RedisContext(spark.sparkContext());
    }

在SparkSQL中读取数据使用SparkSession读取,并且封装到数据结构Dataset/DataFrame中。

DataFrameReader专门用于加载load读取外部数据源的数据,基本格式如下:

SparkSQL模块本身自带支持读取外部数据源的数据:

总结起来三种类型数据,也是实际开发中常用的:

  • 第一类:文件格式数据
  1. 文本文件text、csv文件和json文件
  • 第二类:列式存储数据
  1. Parquet格式、ORC格式
  • 第三类:数据库表
  1. 关系型数据库RDBMS:MySQL、DB2、Oracle和MSSQL
  2. Hive仓库表

官方文档:http://spark.apache.org/docs/2.4.5/sql-data-sources-load-save-functions.html

此外加载文件数据时,可以直接使用SQL语句,指定文件存储格式和路径:

2.2 Save 保存数据

SparkSQL模块中可以从某个外部数据源读取数据,就能向某个外部数据源保存数据,提供相

应接口,通过DataFrameWrite类将数据进行保存。

与DataFrameReader类似,提供一套规则,将数据Dataset保存,基本格式如下:

SparkSQL模块内部支持保存数据源如下:

所以使用SpakrSQL分析数据时,从数据读取,到数据分析及数据保存,链式操作,更多就是

ETL操作。当将结果数据DataFrame/Dataset保存至Hive表中时,可以设置分区partition和分桶

bucket,形式如下:

2.2.1 写出至redis

需要的依赖:

spark-redis-2.4.1-jar-with-dependencies.jar

spark使用parallelize方法创建RDD

JavaRDD和RDD的互相转换

  protected static RedisContext redisContext;
    protected void toRedisSET(RDD<String> vs, String setName) {
        redisContext.toRedisSET(vs, setName, 60 * 60 * 48, redisConfig, readWriteConfig);
    }
    protected void toRedisHASH(RDD<Tuple2<String, String>> kvs, String setName) {
        redisContext.toRedisHASH(kvs, setName, 60 * 60 * 48, redisConfig, readWriteConfig);
    }
    @Override
    protected void writeRedis(LocalDate startDate, LocalDate endDate, String group, String dateTag) {
        if ("all".equals(group)) {
            Dataset<Row> deviceConnection = table("iot.device_connection");
            Dataset<Row> device = table("iot.device");
            Dataset<Row> ds = deviceConnection.join(device, deviceConnection.col("device_id").equalTo(device.col("id")))
                    .where(col("created_time").lt(endDate.toString()))
                    .select(
                            //deviceConnection.col("device_id"),
                            device.col("project_id").as("projectId"),
                            device.col("identity").as("deviceIdentity"),
                            when(deviceConnection.col("type").equalTo(0),"ONLINE").otherwise("OFFLINE").as("currentType"),
                            deviceConnection.col("created_time").cast(DataTypes.LongType).as("lastUpdateDate")
                    ).withColumn("lastUpdateDate",col("lastUpdateDate").multiply(1000))
                    .orderBy(col("lastUpdateDate").desc());
            Dataset<Row> dsData = ds.groupBy(col("deviceIdentity"))
                    .agg(
                            first(col("lastUpdateDate")).as("lastUpdateDate"),
                            first(col("projectId")).as("projectId"),
                            first(col("currentType")).as("currentType")
                    )
                    //.withColumn("uuid", functions.callUDF("addStr", col("project_id"), lit(""), lit("")))
                    .withColumn("_class", lit("com.toplion.iot.device.domain.DeviceConnection"))
                    .withColumn("uuid", functions.callUDF("uuid"))
                    .withColumn("id", col("uuid"));
            List<String> list = dsData.toJSON().collectAsList();
            list.forEach(s -> {
                JSONObject r = JSONObject.parseObject(s);
                String uuidStr = r.getString("uuid");
                String projectIdStr = r.getString("projectId");
                String deviceIdentityStr = r.getString("deviceIdentity");
                String lastUpdateDateStr = r.getString("lastUpdateDate");
                String currentTypeStr = r.getString("currentType");
                JavaRDD<String> uuid = jsc.parallelize(Arrays.asList(new String[]{uuidStr}));
                toRedisSET(uuid.rdd(), "DeviceConnection:projectId:" + projectIdStr);
                toRedisSET(uuid.rdd(), "DeviceConnection:deviceIdentity:" + deviceIdentityStr);
                toRedisSET(uuid.rdd(), "DeviceConnection:lastUpdateDate:" + lastUpdateDateStr);
                toRedisSET(uuid.rdd(), "DeviceConnection:currentType:" + currentTypeStr);
                JavaRDD<String> idx = jsc.parallelize(Arrays.asList("DeviceConnection:lastUpdateDate:" + lastUpdateDateStr,
                        "DeviceConnection:projectId:" + projectIdStr,
                        "DeviceConnection:currentType:" + currentTypeStr,
                        "DeviceConnection:deviceIdentity:" + deviceIdentityStr)
                );
                toRedisSET(idx.rdd(), "DeviceConnection:" + uuidStr + ":idx");
            });
            dsData.write()
                    .format("org.apache.spark.sql.redis")
                    .option("table", "DeviceConnection")
                    .option("key.column", "uuid")
                    .option("ttl", 60 * 60 * 48)
                    .mode(SaveMode.Append)
                    .save();
            //todo 上面是根据设备连接记录,下面是设备表
            device = device.where(col("status").equalTo("0"));
            ds = device
                    .where(col("last_modified_date").lt(endDate.toString()))
                    .select(
                            device.col("id").as("device_id"),
                            device.col("project_id"),
                            device.col("identity"),
                            device.col("last_modified_date").as("created_time"),
                            device.col("network_status"),
                            device.col("working_status")
                    )
                    .orderBy(col("created_time").desc());
            dsData = ds.groupBy(col("project_id").as("projectId"))
                    .agg(
                            countDistinct(col("device_id")).as("count"),
                            countDistinct(when(col("network_status").equalTo("1"), col("device_id"))).as("online"),
                            countDistinct(when(col("working_status").equalTo("1"), col("device_id"))).as("working")
                    )
                    .withColumn("offline", col("count").minus(col("online")))
                    .withColumn("date", lit(dateTag))
                    //.withColumn("uuid", functions.callUDF("addStr", col("project_id"), lit(""), lit("")))
                    .withColumn("_class", lit("com.toplion.iot.device.domain.DeviceS"))
                    .withColumn("uuid", functions.callUDF("uuid"))
                    .withColumn("id", col("uuid"))
                    .withColumn("type", lit("0"));
            list = dsData.toJSON().collectAsList();
            list.forEach(s -> {
                JSONObject r = JSONObject.parseObject(s);
                String uuidStr=r.getString("uuid");
                String typeStr=r.getString("type");
                String dateStr=r.getString("date");
                String projectIdStr=r.getString("projectId");
                JavaRDD<String> uuid = jsc.parallelize(Arrays.asList(new String[]{uuidStr}));
                toRedisSET(uuid.rdd(), "DeviceS:type:" +typeStr);
                toRedisSET(uuid.rdd(), "DeviceS:date:" + dateStr);
                toRedisSET(uuid.rdd(), "DeviceS:projectId:" + projectIdStr);
                JavaRDD<String> idx = jsc.parallelize(Arrays.asList("DeviceS:date:" + dateStr,
                        "DeviceS:projectId:" + projectIdStr,
                        "DeviceS:type:" + typeStr));
                toRedisSET(idx.rdd(), "DeviceS:" + uuidStr + ":idx");
            });
            dsData.write()
                    .format("org.apache.spark.sql.redis")
                    .option("table", "DeviceS")
                    .option("key.column", "uuid")
                    .option("ttl", 60 * 60 * 48)
                    .mode(SaveMode.Append)
                    .save();
        }
    }

2.3 案例演示

加载json格式数据,提取name和age字段值,保存至Parquet列式存储文件。

// 加载json数据
val peopleDF = spark.read.format("json").load("/datas/resources/people.json")
val resultDF = peopleDF.select("name", "age")
// 保存数据至parquet
resultDF.write.format("parquet").save("/datas/people-parquet")

在spark-shell上执行上述语句,截图结果如下:

查看HDFS文件系统目录,数据已保存值parquet文件,并且使用snappy压缩。

2.4 保存模式(SaveMode)

将Dataset/DataFrame数据保存到外部存储系统中,考虑是否存在,存在的情况下的下如何

进行保存,DataFrameWriter中有一个mode方法指定模式:

通过源码发现SaveMode时枚举类,使用Java语言编写,如下四种保存模式:


第一种:Append 追加模式,当数据存在时,继续追加;

第二种:Overwrite 覆写模式,当数据存在时,覆写以前数据,存储当前最新数据;

第三种:ErrorIfExists 存在及报错;

第四种:Ignore 忽略,数据存在时不做任何操作;

实际项目依据具体业务情况选择保存模式,通常选择Append和Overwrite模式。

3 parquet 数据

SparkSQL模块中默认读取数据文件格式就是parquet列式存储数据,通过参数【spark.sql.sources.default】设置,默认值为【parquet】。

范例演示代码:直接load加载parquet数据和指定parquet格式加载数据。

import org.apache.spark.sql.{DataFrame, SparkSession}
/**
 * SparkSQL读取Parquet列式存储数据
 */
object SparkSQLParquet {
  def main(args: Array[String]): Unit = {
    // 构建SparkSession实例对象,通过建造者模式创建
    val spark: SparkSession = SparkSession
      .builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[3]")
      .getOrCreate()
    import spark.implicits._
    // TODO: 从LocalFS上读取parquet格式数据
    val usersDF: DataFrame = spark.read.parquet("datas/resources/users.parquet")
    usersDF.printSchema()
    usersDF.show(10, truncate = false)
    println("==================================================")
    // SparkSQL默认读取文件格式为parquet
    val df = spark.read.load("datas/resources/users.parquet")
    df.printSchema()
    df.show(10, truncate = false)
    // 应用结束,关闭资源
    spark.stop()
  }
}

运行程序结果:

4 text 数据

SparkSession加载文本文件数据,提供两种方法,返回值分别为DataFrame和Dataset,前面【入门案例:词频统计WordCount】中已经使用,下面看一下方法声明:

可以看出textFile方法底层还是调用text方法,先加载数据封装到DataFrame中,再使用as[String]方法将DataFrame转换为Dataset,实际项目中推荐使用textFile方法,从Spark 2.0开始提供。无论是text方法还是textFile方法读取文本数据时,一行一行的加载数据,每行数据使用UTF-8编码的字符串,列名称为【value】。

范例演示:分别使用text和textFile方法加载数据。

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
 * SparkSQL加载文本文件数据,方法text和textFile
 */
object SparkSQLText {
  def main(args: Array[String]): Unit = {
    // 构建SparkSession实例对象,通过建造者模式创建
    val spark: SparkSession = SparkSession
      .builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[3]")
      .getOrCreate() // 底层实现:单例模式,创建SparkContext对象
    import spark.implicits._
    // TODO: text方法加载数据,封装至DataFrame中
    val dataframe: DataFrame = spark.read.text("datas/resources/people.txt")
    dataframe.printSchema()
    dataframe.show(10, truncate = false)
    println("=================================================")
    val dataset: Dataset[String] = spark.read.textFile("datas/resources/people.txt")
    dataset.printSchema()
    dataset.show(10, truncate = false)
    spark.stop() // 应用结束,关闭资源
  }
}

5 json 数据

实际项目中,有时处理数据以JSON格式存储的,尤其后续结构化流式模块:StructuredStreaming,从Kafka Topic消费数据很多时间是JSON个数据,封装到DataFrame中,需要解析提取字段的值。以读取github操作日志JSON数据为例,数据结构如下:

  • 1)、操作日志数据使用GZ压缩:2015-03-01-11.json.gz,先使用json方法读取。
// 构建SparkSession实例对象,通过建造者模式创建
val spark: SparkSession = SparkSession
  .builder()
  .appName(this.getClass.getSimpleName.stripSuffix("$"))
  .master("local[3]")
  // 底层实现:单例模式,创建SparkContext对象
  .getOrCreate()
import spark.implicits._
// TODO: 从LocalFS上读取json格式数据(压缩)
val jsonDF: DataFrame = spark.read.json("datas/json/2015-03-01-11.json.gz")
jsonDF.printSchema()
jsonDF.show(10, truncate = true)
  • 2)、使用textFile加载数据,对每条JSON格式字符串数据,使用SparkSQL函数库functions中自
    带get_json_obejct函数提取字段:id、type、public和created_at的值。
  1. 函数:get_json_obejct使用说明
  2. 核心代码
val githubDS: Dataset[String] = spark.read.textFile("datas/json/2015-03-01-11.json.gz")
githubDS.printSchema() // value 字段名称,类型就是String
githubDS.show(1)
// TODO:使用SparkSQL自带函数,针对JSON格式数据解析的函数
import org.apache.spark.sql.functions._
// 获取如下四个字段的值:id、type、public和created_at
val gitDF: DataFrame = githubDS.select(
  get_json_object($"value", "$.id").as("id"),
  get_json_object($"value", "$.type").as("type"),
  get_json_object($"value", "$.public").as("public"),
  get_json_object($"value", "$.created_at").as("created_at")
)
gitDF.printSchema()
gitDF.show(10, truncate = false)

运行结果:

范例演示完整代码:

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
 * SparkSQL读取JSON格式文本数据
 */
object SparkSQLJson {
  def main(args: Array[String]): Unit = {
    // 构建SparkSession实例对象,通过建造者模式创建
    val spark: SparkSession = SparkSession
      .builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[3]")
      // 底层实现:单例模式,创建SparkContext对象
      .getOrCreate()
    import spark.implicits._
    // TODO: 从LocalFS上读取json格式数据(压缩)
    val jsonDF: DataFrame = spark.read.json("datas/json/2015-03-01-11.json.gz")
    jsonDF.printSchema()
    jsonDF.show(10, truncate = true)
    println("===================================================")
    val githubDS: Dataset[String] = spark.read.textFile("datas/json/2015-03-01-11.json.gz")
    githubDS.printSchema() // value 字段名称,类型就是String
    githubDS.show(1)
    // TODO:使用SparkSQL自带函数,针对JSON格式数据解析的函数
    import org.apache.spark.sql.functions._
    // 获取如下四个字段的值:id、type、public和created_at
    val gitDF: DataFrame = githubDS.select(
      get_json_object($"value", "$.id").as("id"),
      get_json_object($"value", "$.type").as("type"),
      get_json_object($"value", "$.public").as("public"),
      get_json_object($"value", "$.created_at").as("created_at")
    )
    gitDF.printSchema()
    gitDF.show(10, truncate = false)
    // 应用结束,关闭资源
    spark.stop()
  }
}

6 csv 数据

在机器学习中,常常使用的数据存储在csv/tsv文件格式中,所以SparkSQL中也支持直接读取格式数据,从2.0版本开始内置数据源。关于CSV/TSV格式数据说明:

SparkSQL中读取CSV格式数据,可以设置一些选项,重点选项:

  • 1)、分隔符:sep
  1. 默认值为逗号,必须单个字符
  • 2)、数据文件首行是否是列名称:header
  1. 默认值为false,如果数据文件首行是列名称,设置为true
  • 3)、是否自动推断每个列的数据类型:inferSchema
  1. 默认值为false,可以设置为true
    官方提供案例:

    当读取CSV/TSV格式数据文件首行是否是列名称,读取数据方式(参数设置)不一样的 。
  • 第一点:首行是列的名称,如下方式读取数据文件
// TODO: 读取TSV格式数据
val ratingsDF: DataFrame = spark.read
  // 设置每行数据各个字段之间的分隔符, 默认值为 逗号
  .option("sep", "\t")
  // 设置数据文件首行为列名称,默认值为 false
  .option("header", "true")
  // 自动推荐数据类型,默认值为false
  .option("inferSchema", "true")
  // 指定文件的路径
  .csv("datas/ml-100k/u.dat")
ratingsDF.printSchema()
ratingsDF.show(10, truncate = false)
  • 第二点:首行不是列的名称,如下方式读取数据(设置Schema信息)
// 定义Schema信息
val schema = StructType(
  StructField("user_id", IntegerType, nullable = true) ::
    StructField("movie_id", IntegerType, nullable = true) ::
    StructField("rating", DoubleType, nullable = true) ::
    StructField("timestamp", StringType, nullable = true) :: Nil
)
// TODO: 读取TSV格式数据
val mlRatingsDF: DataFrame = spark.read
  // 设置每行数据各个字段之间的分隔符, 默认值为 逗号
  .option("sep", "\t")
  // 指定Schema信息
  .schema(schema)
  // 指定文件的路径
  .csv("datas/ml-100k/u.data")
mlRatingsDF.printSchema()
mlRatingsDF.show(5, truncate = false)

将DataFrame数据保存至CSV格式文件,演示代码如下:

/**
 * 将电影评分数据保存为CSV格式数据
 */
mlRatingsDF
  // 降低分区数,此处设置为1,将所有数据保存到一个文件中
  .coalesce(1)
  .write
  // 设置保存模式,依据实际业务场景选择,此处为覆写
  .mode(SaveMode.Overwrite)
  .option("sep", ",")
  // TODO: 建议设置首行为列名
  .option("header", "true")
  .csv("datas/ml-csv-" + System.nanoTime())

范例演示完整代码SparkSQLCsv.scala如下:

import org.apache.spark.SparkContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
/**
 * SparkSQL 读取CSV/TSV格式数据:
 * i). 指定Schema信息
 * ii). 是否有header设置
 */
object SparkSQLCsv {
  def main(args: Array[String]): Unit = {
    // 构建SparkSession实例对象
    val spark: SparkSession = SparkSession.builder()
      .appName(SparkSQLCsv.getClass.getSimpleName)
      .master("local[2]")
      .getOrCreate()
    import spark.implicits._
    // 获取SparkContext实例对象
    val sc: SparkContext = spark.sparkContext
    /**
     * 实际企业数据分析中
     * csv\tsv格式数据,每个文件的第一行(head, 首行),字段的名称(列名)
     */
    // TODO: 读取TSV格式数据
    val ratingsDF: DataFrame = spark.read
      // 设置每行数据各个字段之间的分隔符, 默认值为 逗号
      .option("sep", "\t")
      // 设置数据文件首行为列名称,默认值为 false
      .option("header", "true")
      // 自动推荐数据类型,默认值为false
      .option("inferSchema", "true")
      // 指定文件的路径
      .csv("datas/ml-100k/u.dat")
    ratingsDF.printSchema()
    ratingsDF.show(10, truncate = false)
    println("=======================================================")
    // 定义Schema信息
    val schema = StructType(
      StructField("user_id", IntegerType, nullable = true) ::
        StructField("movie_id", IntegerType, nullable = true) ::
        StructField("rating", DoubleType, nullable = true) ::
        StructField("timestamp", StringType, nullable = true) :: Nil
    )
    // TODO: 读取TSV格式数据
    val mlRatingsDF: DataFrame = spark.read
      // 设置每行数据各个字段之间的分隔符, 默认值为 逗号
      .option("sep", "\t")
      // 指定Schema信息
      .schema(schema)
      // 指定文件的路径
      .csv("datas/ml-100k/u.data")
    mlRatingsDF.printSchema()
    mlRatingsDF.show(5, truncate = false)
    println("=======================================================")
    /**
     * 将电影评分数据保存为CSV格式数据
     */
    mlRatingsDF
      // 降低分区数,此处设置为1,将所有数据保存到一个文件中
      .coalesce(1)
      .write
      // 设置保存模式,依据实际业务场景选择,此处为覆写
      .mode(SaveMode.Overwrite)
      .option("sep", ",")
      // TODO: 建议设置首行为列名
      .option("header", "true")
      .csv("datas/ml-csv-" + System.nanoTime())
    // 关闭资源
    spark.stop()
  }
}
相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
6天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
6天前
|
SQL 分布式计算 大数据
MaxCompute产品使用合集之MaxCompute 支持 SHOW EXTERNAL TABLE 这样的语句吗
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
6天前
|
分布式计算 DataWorks 大数据
MaxCompute操作报错合集之大数据计算的MaxCompute Spark引擎无法读取到表,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
MaxCompute操作报错合集之大数据计算的MaxCompute Spark引擎无法读取到表,是什么原因
|
6天前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之spark3.1.1通过resource目录下的conf文件配置,报错如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
6天前
|
分布式计算 大数据 数据处理
[AIGC大数据基础] Spark 入门
[AIGC大数据基础] Spark 入门
143 0
|
6天前
|
分布式计算 大数据 Java
Spark 大数据实战:基于 RDD 的大数据处理分析
Spark 大数据实战:基于 RDD 的大数据处理分析
142 0
|
6天前
|
分布式计算 大数据 BI
MaxCompute产品使用合集之MaxCompute项目的数据是否可以被接入到阿里云的Quick BI中
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
6天前
|
SQL 分布式计算 大数据
MaxCompute产品使用合集之怎样可以将大数据计算MaxCompute表的数据可以导出为本地文件
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
6天前
|
分布式计算 DataWorks 数据库
DataWorks操作报错合集之DataWorks使用数据集成整库全增量同步oceanbase数据到odps的时候,遇到报错,该怎么处理
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
25 0

热门文章

最新文章