小文件处理方案汇总

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
对象存储 OSS,20GB 3个月
对象存储 OSS,内容安全 1000次 1年
简介: 小文件处理方案汇总

大数据领域中的小文件问题,也是一个非常棘手的问题,仅次于数据倾斜问题,对于时间和性能能都是毁灭性打击。本文参考网上对于小文件问题的定义和常见系统的解决方案,给大家还原一个大数据系统中小文件问题的系统性解决方案。本文针对目前大数据领域主要的主要框架,讲解了小文件产生的原因和一些解决办法      

hive中小文件的处理方法

1. 使用 hive 自带的 concatenate 命令,自动合并小文件

大数据领域中的小文件问题,也是一个非常棘手的问题,仅次于数据倾斜问题,对于时间和性能能都是毁灭性打击。本文参考网上对于小文件问题的定义和常见系统的解决方案,给大家还原一个大数据系统中小文件问题的系统性解决方案。
本文针对目前大数据领域主要的主要框架,讲解了小文件产生的原因和一些解决办法      
hive中小文件的处理方法
 1. 使用 hive 自带的 concatenate 命令,自动合并小文件

注意:

1、concatenate 命令只支持 RCFILE 和 ORC 文件类型。

2、使用concatenate命令合并小文件时不能指定合并后的文件数量,但可以多次执行该命令。

3、当多次使用concatenate后文件数量不在变化,这个跟参数 mapreduce.input.fileinputformat.split.minsize=256mb 的设置有关,可设定每个文件的最小size。

2. 调整参数减少Map数量

#执行Map前进行小文件合并
#CombineHiveInputFormat底层是 Hadoop的 CombineFileInputFormat 方法
#此方法是在mapper中将多个文件合成一个split作为输入
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -- 默认
#每个Map最大输入大小(这个值决定了合并后文件的数量)
set mapred.max.split.size=256000000;   -- 256M
#一个节点上split的至少的大小(这个值决定了多个DataNode上的文件是否需要合并)
set mapred.min.split.size.per.node=100000000;  -- 100M
#一个交换机下split的至少的大小(这个值决定了多个交换机上的文件是否需要合并)
set mapred.min.split.size.per.rack=100000000;  -- 100M

设置map输出和reduce输出进行合并的相关参数:

#设置map端输出进行合并,默认为true
set hive.merge.mapfiles = true;
#设置reduce端输出进行合并,默认为false
set hive.merge.mapredfiles = true;
#设置合并文件的大小
set hive.merge.size.per.task = 256*1000*1000;   -- 256M
#当输出文件的平均大小小于该值时,启动一个独立的MapReduce任务进行文件merge
set hive.merge.smallfiles.avgsize=16000000;   -- 16M

启用压缩

# hive的查询结果输出是否进行压缩
set hive.exec.compress.output=true;
# MapReduce Job的结果输出是否使用压缩
set mapreduce.output.fileoutputformat.compress=true;

3. 减少Reduce的数量

#reduce 的个数决定了输出的文件的个数,所以可以调整reduce的个数控制hive表的文件数量,
#hive中的分区函数 distribute by 正好是控制MR中partition分区的,
#然后通过设置reduce的数量,结合分区函数让数据均衡的进入每个reduce即可。
#设置reduce的数量有两种方式,第一种是直接设置reduce个数
set mapreduce.job.reduces=10;
#第二种是设置每个reduce的大小,Hive会根据数据总大小猜测确定一个reduce个数
set hive.exec.reducers.bytes.per.reducer=5120000000; -- 默认是1G,设置为5G
#执行以下语句,将数据均衡的分配到reduce中
set mapreduce.job.reduces=10;
insert overwrite table A partition(dt)
select * from B
distribute by rand();
解释:如设置reduce数量为10,则使用 rand(), 随机生成一个数 x % 10 ,
这样数据就会随机进入 reduce 中,防止出现有的文件过大或过小

4、distribute by

insert overwrite table test [partition(hour=...)] select * from test distribute by floor (rand()*5);

5. 使用hadoop的archive将小文件归档

Hadoop Archive简称HAR,是一个高效地将小文件放入HDFS块中的文件存档工具,它能够将多个小文件打包成一个HAR文件,这样在减少namenode内存使用的同时,仍然允许对文件进行透明的访问

#用来控制归档是否可用
set hive.archive.enabled=true;
#通知Hive在创建归档时是否可以设置父目录
set hive.archive.har.parentdir.settable=true;
#控制需要归档文件的大小
set har.partfile.size=1099511627776;
#使用以下命令进行归档
ALTER TABLE A ARCHIVE PARTITION(dt='2020-12-24', hr='12');
#对已归档的分区恢复为原文件
ALTER TABLE A UNARCHIVE PARTITION(dt='2020-12-24', hr='12');

hive小文件调优实践_PONY LEE的博客-CSDN博客_hive设置文件大小

spark处理小文件

1、通过repartition或coalesce算子控制最后的DataSet的分区数(Spark Core)

注意repartition和coalesce的区别,具体可以参考文章《重要|Spark分区并行度决定机制》

2. spark自适应调整

社区在Spark2.3版本之后的AdaptiveExecute特性之中就能很好的解决Partition个数过多导致小文件过多的问题. 通过动态的评估Shuffle输入的个数(通过设置spark.sql.adaptive.shuffle.targetPostShuffleInputSize实现), 可以聚合多个Task任务, 减少Reduce的个数 。

使用方式:

# spark2.3
set spark.sql.adaptive.enabled=true
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=128MB
# spark3.0
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true

3.HINT方式

社区在Spark2.4版本之后引入HINT模式SPARK-24940, 可以由用户来指定最后分区的个数, 只要在SQL语句之中加入注释文件。

3.HINT方式
社区在Spark2.4版本之后引入HINT模式SPARK-24940, 可以由用户来指定最后分区的个数, 只要在SQL语句之中加入注释文件。

支持简单无Shuffle模式的Reparation

缺点:

需要人工干预, 设计Partition的个数, 而对于变化的场景来说, 难有一个固定的Partition个数无法处理Shuffle输出比过低的场景。

4.独立的小文件合并

处理Shuffle输出比过低的场景, 因此我们需要一种兜底方案: 直接读取HDFS上的数据, 进行合并操作。当插入任务完成之后, 新启动一个Job读取所有的数据, 然后根据设置的文件大小, 进行合并并会写到HDFS之中. 由于是直读直写的方式, 因此对于数据大小的评估是非常精确的, 因此可以很好的避免Shuffle输出比的问题.
优点:
彻底解决小文件问题
缺点:
引入新的一次Job过程, 性能会受影响, 特别对中型任务会有一定的影响(10秒左右)
使用方式:

set spark.sql.merge.enabled=true;
set spark.sql.merge.size.per.task=134217728; --128 * 1024 * 1024 bytes

性能优化: ORC和Parquet格式支持按行读取和按Stripe读取, Stripe读取可以认为是GroupRead, 由于不需要解析文件里面具体的数值, 因此可以按照Stripe粒度读取文件, 再写入文件之中, 以Stripe粒度合并文件。

set spark.sql.merge.mode=fast; -- 默认是pretty, 是逐行读写文件的, 性能较慢

fast模式目前只针对ORC格式做了优化性能较快, 其他格式会额外引入一次任务, 尤其在动态分区场景下, 性能会下降不少

5.spark自定义异步合并工具类

object CombineSmallFiles {
  /**
    * Step 1:读取指定目录下需要合并文件的目录和目录下的文件名,修改文件名后缀为.old:相当于记录为需要合并
    */
  def searchFiles(spark: SparkSession, hdfs: FileSystem, srcStr: String, fileType: String): Unit = {
    val srcPath = new Path(srcStr)
    val fileStatus = hdfs.listStatus(srcPath)
    var smallFileCount: Int = 0
    fileStatus.foreach(file => {
      // 对大小小于128M的文件进行标记
      if (!file.isDirectory && !file.getPath.getName.startsWith(".")
        && (hdfs.getContentSummary(file.getPath).getLength < 134217728)) {
        if (!file.getPath.getName.endsWith(".old")) {
          hdfs.rename(file.getPath, new Path(file.getPath + ".old"))
        }
        smallFileCount += 1
      } else if (file.isDirectory && !file.getPath.getName.startsWith(".")) {
        searchFiles(spark, hdfs, file.getPath.toUri.getPath, fileType)
      }
    })
    if (smallFileCount > 1) {
      combineSmallFile(spark, hdfs, srcPath, fileType)
    }
  }
  /**
    * 合并
    */
  def combineSmallFile(spark: SparkSession, hdfs: FileSystem, srcPath: Path, fileType: String): Unit = {
    val srcStr = srcPath.toUri.getPath
    val combineStr = srcStr + "/.combine"
    //如果因为程序中断导致combine遗留合并后的文件,则移动后清除
    moveCombineFileAndRemove(hdfs, srcStr, combineStr)
    // Step 2:获取目录下.old的文件,读取写入临时目录并生成文件.combine
    spark
      .read
      .format(fileType)
      .load(srcStr + "/*.old")
      .repartition(1)
      .write
      .format(fileType)
      .save(combineStr)
    // Step3:删除.old -> mv .combine下文件到源目录 -> 删除.combine
    hdfs.listStatus(srcPath).foreach(file => {
      // 对.old结尾的文件清除
      if (!file.isDirectory && file.getPath.getName.endsWith(".old")) {
        hdfs.delete(file.getPath, true)
      }
    })
    moveCombineFileAndRemove(hdfs, srcStr, combineStr)
  }
  /**
    * 移动合并文件并清除合并用的临时目录
    *
    * @param hdfs
    * @param srcStr
    * @param combineStr
    */
  def moveCombineFileAndRemove(hdfs: FileSystem, srcStr: String, combineStr: String): Unit = {
    val combinePath = new Path(combineStr)
    if (!hdfs.exists(combinePath)) return
    hdfs.listStatus(combinePath).foreach(combineFile => {
      if (combineFile.getPath.getName.startsWith("part-")) {
        hdfs.rename(combineFile.getPath, new Path(srcStr + "/" + combineFile.getPath.getName + ".combine"))
        hdfs.deleteOnExit(combineFile.getPath)
      }
    })
    hdfs.delete(new Path(combineStr), true)
  }
}

调用

/**
  * 运维程序:小文件合并
  */
object BT_OPS_CombineSmallFiles {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("BT_OPS_CombineSmallFiles")
      .config("spark.sql.adaptive.enabled", "true")
      .config("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", "134217728")
      .enableHiveSupport()
      .getOrCreate()
    val hdfs: FileSystem = org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration)
    val srcPath = "你的HDFS路径"
    searchFiles(spark, hdfs,srcPath, "orc")
    spark.stop
  }
}

6. 增加batch大小(spark streaming)

这种方法很容易理解,batch越大,从外部接收的event就越多,
内存积累的数据也就越多,那么输出的文件数也就回变少,
比如将上面例子中的batch时间从10s增加为100s,|
那么一个小时的文件数量就会减少到1152个。
但是此时延迟会比较大,不适合实时性要求高的场景。

7. 自己调用foreach去append

sparkstreaming提供的foreach这个outout类api,可以让我们自定义输出计算结果的方法。
那么我们其实也可以利用这个特性,那就是每个batch在要写文件时,并不是去生成一个新的文件流,而是把之前的文件打开。考虑这种方法的可行性,
首先,HDFS上的文件不支持修改,但是很多都支持追加,那么每个batch的每个partition就对应一个输出文件,每次都去追加这个partition对应的输出文件,
这样也可以实现减少文件数量的目的。这种方法要注意的就是不能无限制的追加,当判断一个文件已经达到某一个阈值时,就要产生一个新的文件进行追加了。

flink处理小文件

1.自定义 PartitionCommitPolicy

一个典型的 Parquet 文件合并的案例代码如下。我们可以通过

'sink.partition-commit.policy.kind' = 'metastore,success-file,custom', 
'sink.partition-commit.policy.class' = 'me.lmagics.flinkexp.hiveintegration.util.ParquetFileMergingCommitPolicy

参数进行配置:

package me.lmagics.flinkexp.hiveintegration.util;
import org.apache.flink.hive.shaded.parquet.example.data.Group;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetFileReader;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetFileWriter.Mode;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetReader;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetWriter;
import org.apache.flink.hive.shaded.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.flink.hive.shaded.parquet.hadoop.example.GroupReadSupport;
import org.apache.flink.hive.shaded.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.flink.hive.shaded.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.flink.hive.shaded.parquet.hadoop.util.HadoopInputFile;
import org.apache.flink.hive.shaded.parquet.schema.MessageType;
import org.apache.flink.table.filesystem.PartitionCommitPolicy;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class ParquetFileMergingCommitPolicy implements PartitionCommitPolicy {
  private static final Logger LOGGER = LoggerFactory.getLogger(ParquetFileMergingCommitPolicy.class);
  @Override
  public void commit(Context context) throws Exception {
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(conf);
    String partitionPath = context.partitionPath().getPath();
    List<Path> files = listAllFiles(fs, new Path(partitionPath), "part-");
    LOGGER.info("{} files in path {}", files.size(), partitionPath);
    MessageType schema = getParquetSchema(files, conf);
    if (schema == null) {
      return;
    }
    LOGGER.info("Fetched parquet schema: {}", schema.toString());
    Path result = merge(partitionPath, schema, files, fs);
    LOGGER.info("Files merged into {}", result.toString());
  }
  private List<Path> listAllFiles(FileSystem fs, Path dir, String prefix) throws IOException {
    List<Path> result = new ArrayList<>();
    RemoteIterator<LocatedFileStatus> dirIterator = fs.listFiles(dir, false);
    while (dirIterator.hasNext()) {
      LocatedFileStatus fileStatus = dirIterator.next();
      Path filePath = fileStatus.getPath();
      if (fileStatus.isFile() && filePath.getName().startsWith(prefix)) {
        result.add(filePath);
      }
    }
    return result;
  }
  private MessageType getParquetSchema(List<Path> files, Configuration conf) throws IOException {
    if (files.size() == 0) {
      return null;
    }
    HadoopInputFile inputFile = HadoopInputFile.fromPath(files.get(0), conf);
    ParquetFileReader reader = ParquetFileReader.open(inputFile);
    ParquetMetadata metadata = reader.getFooter();
    MessageType schema = metadata.getFileMetaData().getSchema();
    reader.close();
    return schema;
  }
  private Path merge(String partitionPath, MessageType schema, List<Path> files, FileSystem fs) throws IOException {
    Path mergeDest = new Path(partitionPath + "/result-" + System.currentTimeMillis() + ".parquet");
    ParquetWriter<Group> writer = ExampleParquetWriter.builder(mergeDest)
      .withType(schema)
      .withConf(fs.getConf())
      .withWriteMode(Mode.CREATE)
      .withCompressionCodec(CompressionCodecName.SNAPPY)
      .build();
    for (Path file : files) {
      ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file)
        .withConf(fs.getConf())
        .build();
      Group data;
      while((data = reader.read()) != null) {
        writer.write(data);
      }
      reader.close();
    }
    writer.close();
    for (Path file : files) {
      fs.delete(file, false);
    }
    return mergeDest;
  }
}

2. flink1.12之后新增的 table/SQL 参数配置

auto-compaction=true
compaction.file-size=1024M

3. flink datastream api

从 1.15 版本开始 FileSink 开始支持已经提交 pending 文件的合并,从而允许应用设置一个较小的时间周期并且避免生成大量的小文件。尤其是当用户使用 bulk 格式 的时候:这种格式要求用户必须在 checkpoint 的时候切换文件。

文件合并功能可以通过以下代码打开:

FileSink<Integer> fileSink=
        FileSink.forRowFormat(new Path(path),new SimpleStringEncoder<Integer>())
            .enableCompact(
                FileCompactStrategy.Builder.newBuilder()
                    .setNumCompactThreads(1024)
                    .enableCompactionOnCheckpoint(5)
                    .build(),
                new RecordWiseFileCompactor<>(
                    new DecoderBasedReader.Factory<>(SimpleStringDecoder::new)))
            .build();

这一功能开启后,在文件转为 pending 状态与文件最终提交之间会进行文件合并。这些 pending 状态的文件将首先被提交为一个以 . 开头的 临时文件。这些文件随后将会按照用户指定的策略和合并方式进行合并并生成合并后的 pending 状态的文件。然后这些文件将被发送给 Committer 并提交为正式文件,在这之后,原始的临时文件也会被删除掉。

当开启文件合并功能时,用户需要指定 FileCompactStrategy 与 FileCompactor 。

FileCompactStrategy 指定何时以及哪些文件将被合并。目前有两个并行的条件:目标文件大小与间隔的 Checkpoint 数量。当目前缓存的文件的总大小达到指定的阈值,或自上次合并后经过的 Checkpoint 次数已经达到指定次数时, FileSink 将创建一个异步任务来合并当前缓存的文件。

FileCompactor 指定如何将给定的路径列表对应的文件进行合并将结果写入到文件中。根据如何写文件,它可以分为两类:

  • OutputStreamBasedFileCompactor : 用户将合并后的结果写入一个输出流中。通常在用户不希望或者无法从输入文件中读取记录时使用。这种类型的 CompactingFileWriter 的一个例子是 ConcatFileCompactor ,它直接将给定的文件进行合并并将结果写到输出流中。
  • RecordWiseFileCompactor :这种类型的 CompactingFileWriter 会逐条读出输入文件的记录用户,然后和FileWriter一样写入输出文件中。CompactingFileWriter 的一个例子是 RecordWiseFileCompactor ,它从给定的文件中读出记录并写出到 CompactingFileWriter 中。用户需要指定如何从原始文件中读出记录。

注意事项1 一旦启用了文件合并功能,此后若需要再关闭,必须在构建FileSink时显式调用disableCompact方法。

注意事项2 如果启用了文件合并功能,文件可见的时间会被延长。

iceberg处理小文件

 使用spark+iceberg合并小文件

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.Actions;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;
import java.util.HashMap;
import java.util.Map;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
/**
 * @author: zhushang
 * @create: 2021-04-02 14:30
 */
public class SparkCompaction {
    public static void main(String[] args) {
        TableIdentifier identifier = TableIdentifier.of(Namespace.of("db"), "table");
        Map<String, String> config = new HashMap<>();
        config.put("type", "iceberg");
        config.put("catalog-type", "hive");
        config.put("property-version", "1");
        config.put("warehouse", "warehouse");
        config.put("uri", "thrift://local:9083");
        config.put("io-impl", "org.apache.iceberg.aliyun.oss.OSSFileIO");
        config.put("oss.endpoint", "https://xxx.aliyuncs.com");
        config.put("oss.access.key.id", "key");
        config.put("oss.access.key.secret", "secret");
        sparkSession();
        HiveCatalog hiveCatalog = new HiveCatalog(new Configuration());
        hiveCatalog.initialize("iceberg_hive_catalog", config);
        Table table = hiveCatalog.loadTable(identifier);
        Actions.forTable(table).rewriteDataFiles().targetSizeInBytes(128 * 1024 * 1024).execute();
        Snapshot snapshot = table.currentSnapshot();
        if (snapshot != null) {
            table.expireSnapshots().expireOlderThan(snapshot.timestampMillis()).commit();
        }
    }
    private static void sparkSession() {
        SparkSession.builder()
                .master("local[*]")
                .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
                .config("spark.hadoop." + METASTOREURIS.varname, "localhost:9083")
                .config("spark.sql.warehouse.dir", "warehouse")
                .config("spark.executor.heartbeatInterval", "100000")
                .config("spark.network.timeoutInterval", "100000")
                .enableHiveSupport()
                .getOrCreate();
    }
}

使用flink+iceberg合并小文件

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.actions.Actions;
import java.util.HashMap;
import java.util.Map;
public class FlinkCompaction {
    public static void main(String[] args) throws Exception {
        ParameterTool tool = ParameterTool.fromArgs(args);
        Map<String, String> properties = new HashMap<>();
        properties.put("type", "iceberg");
        properties.put("catalog-type", "hive");
        properties.put("property-version", "1");
        properties.put("warehouse", tool.get("warehouse"));
        properties.put("uri", tool.get("uri"));
        if (tool.has("oss.endpoint")) {
            properties.put("io-impl", "org.apache.iceberg.aliyun.oss.OSSFileIO");
            properties.put("oss.endpoint", tool.get("oss.endpoint"));
            properties.put("oss.access.key.id", tool.get("oss.access.key.id"));
            properties.put("oss.access.key.secret", tool.get("oss.access.key.secret"));
        }
        CatalogLoader loader =
                CatalogLoader.hive(tool.get("catalog"), new Configuration(), properties);
        Catalog catalog = loader.loadCatalog();
        TableIdentifier identifier =
                TableIdentifier.of(Namespace.of(tool.get("db")), tool.get("table"));
        Table table = catalog.loadTable(identifier);
        /**
        * 合并小文件,核心代码
        */
        Actions.forTable(table)
                .rewriteDataFiles()
                .maxParallelism(5)
                .targetSizeInBytes(128 * 1024 * 1024)
                .execute();
        Snapshot snapshot = table.currentSnapshot();
        if (snapshot != null) {
            table.expireSnapshots().expireOlderThan(snapshot.timestampMillis()).commit();
        }
    }
}

总结:无论spark还是flink写入iceberg,合并小文件的关键为:

Actions.forTable(table).rewriteDataFiles().targetSizeInBytes(128 * 1024 * 1024).

    .targetSizeInBytes(128 * 1024 * 1024)参数决定了最终结果文件的大小。

    hudi合并小文件

    我们使用COPY_ON_WRITE表来演示Hudi如何自动处理文件大小特性。

    关键配置项如下:

    • hoodie.parquet.max.file.size数据文件最大大小,Hudi将试着维护文件大小到该指定值;
    • hoodie.parquet.small.file.limit小于该大小的文件均被视为小文件;
    • hoodie.copyonwrite.insert.split.size单文件中插入记录条数,此值应与单个文件中的记录数匹配(可以根据最大文件大小和每个记录大小来确定)









    相关实践学习
    基于MaxCompute的热门话题分析
    本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
    SaaS 模式云数据仓库必修课
    本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
    相关文章
    |
    存储 监控 前端开发
    前端文件流、切片下载和上传:优化文件传输效率与用户体验 【最全】
    文件传输是一个常见的需求。对于大文件的下载和上传,直接使用传统的方式可能会遇到性能和用户体验方面的问题。幸运的是,前端技术提供了一些高效的解决方案:文件流操作和切片下载与上传。本文将深入探讨这些技术,帮助你理解它们的原理和实现方法,以优化文件传输效率和提升用户体验。
    前端文件流、切片下载和上传:优化文件传输效率与用户体验 【最全】
    |
    1月前
    |
    运维
    【运维基础知识】用dos批处理批量替换文件中的某个字符串(本地单元测试通过,部分功能有待优化,欢迎指正)
    该脚本用于将C盘test目录下所有以t开头的txt文件中的字符串“123”批量替换为“abc”。通过创建批处理文件并运行,可实现自动化文本替换,适合初学者学习批处理脚本的基础操作与逻辑控制。
    128 56
    |
    2月前
    |
    存储 Linux Python
    文件处理的一些最佳实践
    文件处理的一些最佳实践
    53 0
    |
    6月前
    |
    C语言
    C 语言文件处理全攻略:创建、写入、追加操作解析
    在 C 语言中,您可以通过声明类型为 FILE 的指针,并使用 fopen() 函数来创建、打开、读取和写入文件:
    227 0
    |
    存储 前端开发 C#
    如何实现文件断点续传功能
    相信大家都使用过迅雷、电驴、百度云网盘等一类的工具,所有这些支持上传或下载的工具都有一个功能,那就是断点续传,也就是在你网络不佳传输断开时,传输会暂停,在网络恢复后,可以继续传输,从而避免数据的重复上传,以减少网络流量,提高效率。那么,你有仔细想过这其中的实现原理嘛?
    |
    运维
    运维小笔记:清理指定后缀名文件的 powerhsell 小脚本
    在运维值班每天都需要从系统导出这种数据,压缩好放在工作电脑上,解压用脚本做汇总。但是长期都没删除各个日期下的压缩包。几年下来,有上千个目录,也不知道哪些目录中有没有删除的压缩包。一个一个手删太累了,不妨做个 powershell 小脚本一键搞定吧。
    104 0
    |
    编译器 数据库 C语言
    文件处理操作
    文件处理操作
    91 0
    文件处理操作
    |
    安全 Linux Go
    Brename - 一个便捷跨平台批重命名文件/目录的命令行小工具
    brename是用Go编程语言实现的,支持跨平台Linxu和Windows用户使用。
    135 0
    |
    固态存储 测试技术 Linux
    文件IO操作开发笔记(二):使用Cpp的ofstream对磁盘文件存储进行性能测试以及测试工具
    在做到个别项目对日志要求较高,要求并行写入的数据较多,尽管写入数据的线程放在子线程,仍然会造成界面程序的假死(实际上Qt还是在跑,只是磁盘消耗超过瓶颈,造成假死(注意:控制台还能看到打印输出,linux则能看到打印输出)。 本篇升级了测试工具,并且测试了ofstream在USB3.0和M.2SSD上的写入性能。
    文件IO操作开发笔记(二):使用Cpp的ofstream对磁盘文件存储进行性能测试以及测试工具
    |
    存储 关系型数据库 MySQL
    【C-文件操作】一文教你如何将代码的数据持久化(一)
    【C-文件操作】一文教你如何将代码的数据持久化
    113 0
    【C-文件操作】一文教你如何将代码的数据持久化(一)