
本文转载自:过往记忆大数据原文链接 R 是数据科学中最流行的计算机语言之一,专门用于统计分析和一些扩展,如用于数据处理和机器学习任务的 RStudio addins 和其他 R 包。此外,它使数据科学家能够轻松地可视化他们的数据集。 通过在 Apache Spark 中使用 SparkR,可以很容易地扩展 R 代码。要交互式地运行作业,可以通过运行 R shell 轻松地在分布式集群中运行 R 的作业。 当 SparkR 不需要与 R 进程交互时,其性能实际上与 Scala、Java 和 Python 等其他语言 API 相同。但是,当 SparkR 作业与本机 R 函数或数据类型交互时,会性能显著下降。 如果在 Spark 和 R 之间使用 Apache Arrow 来进行数据交换,其性能会有很大的提升。这篇博客文章概述了 SparkR 中 Spark 和 R 的交互,并对比了没有向量化执行和有向量化执行的性能差异。 Spark 和 R 交互 SparkR 不仅支持丰富的 ML 和类似 SQL 的 API 集合,而且还支持用于直接与 R 代码进行交互的一组 API。例如,Spark DataFrame 和 R DataFrame 之间的无缝转换以及在 Spark DataFrame 上以分布式的方式执行 R 内置函数。 在大多数情况下,Spark 中的其他语言 API 之间的性能实际上是一致的——例如,当用户代码依赖于 Spark UDF 或者 SQL API 时,执行过程完全在 JVM 中进行, I/O 方面没有任何性能损失。比如下面的两种调用时间都只需要一秒: // Scala API // ~1 second sql("SELECT id FROM range(2000000000)").filter("id > 10").count() # R API # ~1 second count(filter(sql("SELECT * FROM range(2000000000)"), "id > 10")) 但是,在需要执行 R 的内置函数或将其从 R 内置类型转换到其他语言类型的情况下,其性能将有很大不同,如下所示。 // Scala API val ds = (1L to 100000L).toDS // ~1 second ds.mapPartitions(iter => iter.filter(_ < 50000)).count() # R API df <- createDataFrame(lapply(seq(100000), function (e) list(value=e))) # ~15 seconds - 15 times slower count(dapply( df, function(x) as.data.frame(x[x$value < 50000,]), schema(df))) 上面其实仅仅是对每个分区中过滤出小于 50000 的数据,然后对其进行 count 操作,但是 SparkR 却比 Scala 编写的代码慢 15 倍! // Scala API // ~0.2 seconds val df = sql("SELECT * FROM range(1000000)").collect() # R API # ~8 seconds - 40 times slower df <- collect(sql("SELECT * FROM range(1000000)")) 上面这个例子情况更糟糕,其仅仅是将数据收集到 Driver 端,但是 SparkR 比 Scala 要慢 40 倍! 这是因为上面计算需要与 R 内置函数或数据类型交互的 API ,但是其实现效率不高。在 SparkR 中类似的函数还有六个: createDataFrame()collect()dapply()dapplyCollect()gapply()gapplyCollect()简单来说,createDataFrame() 和 collect() 需要在 JVM 和 R 之间进行序列化/反序列化,并且对数据进行转换,比如 Java 中的字符串需要转换成 R 中的 character。 原始实现(Native implementation) 上图中 SparkR DataFrame 的计算是分布在 Spark 集群上所有可用的节点上。如果不需要将数据以 R 的 data.frame 进行收集(collect)或不需要执行 R 内置函数,则在 Driver 或 executor 端不需要与 R 进程进行通信。但是当它需要使用 R 的 data.frame 或使用 R 的内置函数时,需要 Driver 或 executor 使用 sockets 使得 JVM 和 R 进行通信。 这需要在 JVM 和 R 直接对交换的数据进行序列化和反序列化操作,而这个操作的编码格式非常低效,完全没有考虑到现代 CPU 的设计,比如 CPU pipelining。 向量化执行(Vectorized implementation) 在 Apache Spark 3.0 中,SparkR 中引入了一种新的向量化(vectorized)实现,它利用 Apache Arrow 直接在 JVM 和 R 之间交换数据,且(反)序列化成本非常小,具体如下: 新的实现方式并没有在 JVM 和 R 之间使用低效的格式对数据逐行进行(反)序列化,而是利用 Apache Arrow 以高效的列格式进行流水线处理和单指令多数据(SIMD)。 新的矢量化 SparkR API 默认情况下未启用,但可以通过在 Apache Spark 3.0 中将 spark.sql.execution.arrow.sparkr.enabled 设置为 true 来启用。注意,dapplyCollect() 和 gapplyCollect() 矢量化操作尚未实现。建议使用 dapply() 和 gapply() 来替代。 基准测试结果 下面的基准测试使用的数据集为 500,000 条记录。分别测试使用和未使用矢量化的执行时间: 使用矢量化优化之后,collect() 和 createDataFrame() 性能分别大致提升 17 倍和 42x 倍;而对 dapply() 和 gapply(), 分别提升了43x 和 33x 。 从上面的启发可以看到,如果我们需要在不同系统之间进行数据交互,也可以使用 Apache Arrow。 阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入! 对开源大数据和感兴趣的同学可以加小编微信(下图二维码,备注“进群”)进入技术交流微信群。 Apache Spark技术交流社区公众号,微信扫一扫关注
直播主题 TFPark: Distributed TensorFlow in Production on Apache Spark 讲师: 汪洋英特尔大数据团队的机器学习工程师,专注于分布式机器学习框架和应用。他是Analytics Zoo和BigDL的核心贡献者之一。 时间: 7月23日 19:00 观看直播方式: 扫描下方二维码入群,或届时进入直播间(回看链接)https://developer.aliyun.com/live/43484 直播介绍 TFPark是开源AI平台Analytics Zoo中一个模块,它的可以很方便让用户在Spark集群中分布式地进行TensorFlow模型的训练和推断。一方面,TFPark利用Spark将TensorFlow 定义的AI训练或推理任务无缝的嵌入到用户的大数据流水线中,而无需对现有集群做任何修改;另一方面TFPark屏蔽了复杂的分布式系统逻辑,可以将单机开发的AI应用轻松扩展到几十甚至上百节点上。本次分享将介绍TFPark的使用,内部实现以及在生产环境中的实际案例。
Apache Spark 社区作为全球最大的开源社区,也是Apache基金会旗下最流行的开源分布式内存式大数据处理引擎。他快速、易于使用的框架,允许你解决各种复杂的数据问题,无论是半结构化、结构化、流式,或机器学习、数据科学。即使拥有来自250多个组织的超过1000个贡献者,以及遍布全球570多个地方的超过30万个Spark Meetup社区成员,作为一名国内的Spark小白,我相信大家都有一个共同的痛点,就是国内Spark相关资料过少。为此我们筹办了一系列针对国内Spark开发者的活动,指路回顾 | SPARK + AI SUMMIT 2020 中文精华版线上峰会圆满结束(附PPT下载) 作为普惠Spark中华小当家系列活动的第二站,我们隆重推出第一期Spark学习训练营。由Spark 中文社区联合阿里云开发者社区联合打造,持续定期更新。第一期训练营邀请到了全 Apache Spark contributer 阵容,经过半个月对课程的精心打磨今天正式上线!限时免费抢报,速速来看→ 训练营时间 2020年7月20日-25日 开营名额 1500人,报满即止 报名截止时间 2020年7月19日18:00 报名要求 希望你有一定的开发基础,对 Spark感兴趣想进一步深入了解https://developer.aliyun.com/topic/trainingcamp/spark01 5天训练营你将收获 实战视角下对 Spark 底层架构、核心功能模块、社区生态等全方位解析,帮你建立对 Spark 应用场景与功能实现的系统认知。社区大佬们手把手实操演示,“家教”级辅导,社群答疑。 收获Spark 生产环境开发的基础技能,完成理论到实操的跨越,全程免费学 每天打卡成功,还能意外收获老师的“加餐”课程,还可以免费获得大礼包:包括 Spark summit 中文材料、电子书、实战精选案例等。 丰富多样的社区周边免费送,并有机会成为训练营志愿者,获得更多训练营直通车,坚持 5 天学习打卡还会获得Spark 定制马克杯哦~ 注:具体内容以报名页面信息为准 5天训练营课程表 DAY1第 1 讲:Apache Spark入门 简要介绍Apache Spark架构、DataFrames API等,涵盖Apache Spark框架的基本原理。 主讲嘉宾 周康,阿里巴巴技术专家, Apache Spark/Hadoop/Parquet contributor DAY2 第2讲 Spark SQL 介绍与实战主要介绍Spark SQL的基本架构,主要模块和重要特性,常见的SQL性能问题与优化手段。 主讲嘉宾 李呈祥,阿里巴巴高级技术专家,Apache Hive Committer, Apache Flink Committer DAY3第 3 讲:Spark for ETL and Data Science主要介绍如何用Spark来做ETL以及交互式数据分析的最佳实践,主讲嘉宾 章剑锋,阿里巴巴高级技术专家, Apache Tez、Livy 、Zeppelin PMC ,Apache Pig Committer DAY4 第4讲 Using Delta lake介绍Delta Lake的用户场景,如何创建、追加和更新数据到数据湖,如何使用 Delta Lake 构建一个数据分析管道等内容。主讲嘉宾 辛现银,阿里巴巴技术专家。 DAY5 第5讲 Spark tuning and Best Practices学生将在本节课中进行实操练习。主讲嘉宾 王宇,阿里巴巴达摩院高级算法专家 点击页面报名。报名截止日期:7月19日18:00 无论你是在校同学还在一线开发,我们期望永远保持好奇、保持天真、拥有学习的热情和活力。欢迎更多的开发者和Spark爱好者加入到Spark训练营中共同成长。 报名链接:https://developer.aliyun.com/topic/trainingcamp/spark01
本文转载自公众号: 大数据学习与分享原文链接 【前言:如果你经常使用Spark SQL进行数据的处理分析,那么对笛卡尔积的危害性一定不陌生,比如大量占用集群资源导致其他任务无法正常执行,甚至导致节点宕机。那么都有哪些情况会产生笛卡尔积,以及如何事前"预测"写的SQL会产生笛卡尔积从而避免呢?(以下不考虑业务需求确实需要笛卡尔积的场景)】 Spark SQL几种产生笛卡尔积的典型场景 首先来看一下在Spark SQL中产生笛卡尔积的几种典型SQL: join语句中不指定on条件 select * from test_partition1 join test_partition2; join语句中指定不等值连接 select * from test_partition1 t1 inner join test_partition2 t2 on t1.name <> t2.name; join语句on中用or指定连接条件 select * from test_partition1 t1 join test_partition2 t2 on t1.id = t2.id or t1.name = t2.name; join语句on中用||指定连接条件 select * from test_partition1 t1 join test_partition2 t2 on t1.id = t2.id || t1.name = t2.name; 除了上述举的几个典型例子,实际业务开发中产生笛卡尔积的原因多种多样。 同时需要注意,在一些SQL中即使满足了上述4种规则之一也不一定产生笛卡尔积。比如,对于join语句中指定不等值连接条件的下述SQL不会产生笛卡尔积:--在Spark SQL内部优化过程中针对join策略的选择,最终会通过SortMergeJoin进行处理。 select * from test_partition1 t1 join test_partition2 t2 on t1.id = t2.id and t1.name<>t2.name; 此外,对于直接在SQL中使用cross join的方式,也不一定产生笛卡尔积。比如下述SQL: -- Spark SQL内部优化过程中选择了SortMergeJoin方式进行处理 select * from test_partition1 t1 cross join test_partition2 t2 on t1.id = t2.id; 但是如果cross join没有指定on条件同样会产生笛卡尔积。那么如何判断一个SQL是否产生了笛卡尔积呢? Spark SQL是否产生了笛卡尔积 以join语句不指定on条件产生笛卡尔积的SQL为例: -- test_partition1和test_partition2是Hive分区表 select * from test_partition1 join test_partition2; 通过Spark UI上SQL一栏查看上述SQL执行图,如下: 可以看出,因为该join语句中没有指定on连接查询条件,导致了CartesianProduct即笛卡尔积。再来看一下该join语句的逻辑计划和物理计划: == Parsed Logical Plan == 'GlobalLimit 1000 +- 'LocalLimit 1000 +- 'Project [*] +- 'UnresolvedRelation `t` == Analyzed Logical Plan == id: string, name: string, dt: string, id: string, name: string, dt: string GlobalLimit 1000 +- LocalLimit 1000 +- Project [id#84, name#85, dt#86, id#87, name#88, dt#89] +- SubqueryAlias `t` +- Project [id#84, name#85, dt#86, id#87, name#88, dt#89] +- Join Inner :- SubqueryAlias `default`.`test_partition1` : +- HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#84, name#85], [dt#86] +- SubqueryAlias `default`.`test_partition2` +- HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#87, name#88], [dt#89] == Optimized Logical Plan == GlobalLimit 1000 +- LocalLimit 1000 +- Join Inner :- HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#84, name#85], [dt#86] +- HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#87, name#88], [dt#89] == Physical Plan == CollectLimit 1000 +- CartesianProduct :- Scan hive default.test_partition1 [id#84, name#85, dt#86], HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#84, name#85], [dt#86] +- Scan hive default.test_partition2 [id#87, name#88, dt#89], HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#87, name#88], [dt#89] 通过逻辑计划到物理计划,以及最终的物理计划选择CartesianProduct,可以分析得出该SQL最终确实产生了笛卡尔积。 Spark SQL中产生笛卡尔积的处理策略 在之前的文章中《Spark SQL如何选择join策略》已经介绍过,Spark SQL中主要有1.ExtractEquiJoinKeys(Broadcast Hash Join、Shuffle Hash Join、Sort Merge Join,这3种是我们比较熟知的Spark SQL join)和Without joining keys(CartesianProduct、BroadcastNestedLoopJoin)join策略。 那么,如何判断SQL是否产生了笛卡尔积就迎刃而解。在利用Spark SQL执行SQL任务时,通过查看SQL的执行图来分析是否产生了笛卡尔积。如果产生笛卡尔积,则将任务杀死,进行任务优化避免笛卡尔积。【不推荐。用户需要到Spark UI上查看执行图,并且需要对Spark UI界面功能等要了解,需要一定的专业性。(注意:这里之所以这样说,是因为Spark SQL是计算引擎,面向的用户角色不同,用户不一定对Spark本身了解透彻,但熟悉SQL。对于做平台的小伙伴儿,想必深有感触)】 2.分析Spark SQL的逻辑计划和物理计划,通过程序解析计划推断SQL最终是否选择了笛卡尔积执行策略。如果是,及时提示风险。 具体可以参考Spark SQL join策略选择的源码: def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { // --- BroadcastHashJoin -------------------------------------------------------------------- // broadcast hints were specified case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) if canBroadcastByHints(joinType, left, right) => val buildSide = broadcastSideByHints(joinType, left, right) Seq(joins.BroadcastHashJoinExec( leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right))) // broadcast hints were not specified, so need to infer it from size and configuration. case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) if canBroadcastBySizes(joinType, left, right) => val buildSide = broadcastSideBySizes(joinType, left, right) Seq(joins.BroadcastHashJoinExec( leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right))) // --- ShuffledHashJoin --------------------------------------------------------------------- case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right) && muchSmaller(right, left) || !RowOrdering.isOrderable(leftKeys) => Seq(joins.ShuffledHashJoinExec( leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right))) case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) if !conf.preferSortMergeJoin && canBuildLeft(joinType) && canBuildLocalHashMap(left) && muchSmaller(left, right) || !RowOrdering.isOrderable(leftKeys) => Seq(joins.ShuffledHashJoinExec( leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right))) // --- SortMergeJoin ------------------------------------------------------------ case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) if RowOrdering.isOrderable(leftKeys) => joins.SortMergeJoinExec( leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil // --- Without joining keys ------------------------------------------------------------ // Pick BroadcastNestedLoopJoin if one side could be broadcast case j @ logical.Join(left, right, joinType, condition) if canBroadcastByHints(joinType, left, right) => val buildSide = broadcastSideByHints(joinType, left, right) joins.BroadcastNestedLoopJoinExec( planLater(left), planLater(right), buildSide, joinType, condition) :: Nil case j @ logical.Join(left, right, joinType, condition) if canBroadcastBySizes(joinType, left, right) => val buildSide = broadcastSideBySizes(joinType, left, right) joins.BroadcastNestedLoopJoinExec( planLater(left), planLater(right), buildSide, joinType, condition) :: Nil // Pick CartesianProduct for InnerJoin case logical.Join(left, right, _: InnerLike, condition) => joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nil case logical.Join(left, right, joinType, condition) => val buildSide = broadcastSide( left.stats.hints.broadcast, right.stats.hints.broadcast, left, right) // This join could be very slow or OOM joins.BroadcastNestedLoopJoinExec( planLater(left), planLater(right), buildSide, joinType, condition) :: Nil // --- Cases where this strategy does not apply --------------------------------------------- case _ => Nil } 此外,在业务开发中,要不断总结归纳产生笛卡尔积的情况,形成知识文档,以便在后续业务开发中避免类似的情况出现。除了笛卡尔积效率比较低,BroadcastNestedLoopJoin效率也相对低效,尤其是当数据量大的时候还很容易造成driver端的OOM,这种情况也是需要极力避免的。 阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入! 对开源大数据和感兴趣的同学可以加小编微信(下图二维码,备注“进群”)进入技术交流微信群。 Apache Spark技术交流社区公众号,微信扫一扫关注
作者:王涛,花名扬礼,阿里巴巴计算平台事业部 EMR 开发工程师. 目前从事开源大数据存储计算方面的开发和优化工作。 随着阿里云JindoFS SDK的全面放开使用,基于JindoFS SDK的阿里云数据迁移利器Jindo DistCp现在也全面面向用户开放使用。Jindo DistCp是阿里云E-MapReduce团队开发的大规模集群内部和集群之间分布式文件拷贝的工具。其使用MapReduce实现文件分发,错误处理和恢复,把文件和目录的列表作为map/reduce任务的输入,每个任务会完成源列表中部分文件的拷贝。目前全量支持hdfs->oss,hdfs->hdfs,oss->hdfs,oss->oss的数据拷贝场景,提供多种个性化拷贝参数和多种拷贝策略。重点优化hdfs到oss的数据拷贝,通过定制化CopyCommitter,实现No-Rename拷贝,并保证数据拷贝落地的一致性。功能全量对齐S3 DistCp和HDFS DistCp,性能较HDFS DistCp有较大提升,目标提供高效、稳定、安全的数据拷贝工具。本文主要介绍如何使用Jindo DistCp来进行基本文件拷贝,以及如何在不同场景下提高数据拷贝性能。值得一提的是,此前 Jindo DistCp 仅限于E-MapReduce产品内部使用,此次全方位面向整个阿里云OSS/HDFS用户放开,并提供官方维护和支持技术,欢迎广大用户集成和使用。 大数据和数据迁移工具 在传统大数据领域,我们经常使用HDFS作为底层存储,并且在HDFS存储大规模的数据。在进行数据迁移、数据拷贝的场景中,大家选择最常用的是Hadoop自带的DistCp工具,但是其不能很好利用对象存储系统如OSS的特性,导致效率低下并且不能最终保证一致性,提供的功能选项也比较简单,不能很好的满足用户的需求。此时一个高效、功能丰富的数据迁移工具成为影响软件搬栈、业务上云的重要影响因素。 Hadoop DistCp Hadoop DistCp是Hadoop集成的分布式数据迁移工具,提供了基本文件拷贝、覆盖拷贝、指定map并行度、log输出路径等功能。在Hadoop2x上对DistCp进行了部分优化例如拷贝策略的选择,默认使用 uniformsize(每个 map 会平衡文件大小)如果指定 dynamic,则会使用 DynamicInputFormat。这些功能优化了普通hdfs间数据拷贝,但是对于对象存储系统如OSS缺少数据写入方面的优化。 S3 DistCp S3 DistCp是AWS为S3提供的distcp工具, S3DistCp是Hadoop DistCp 的扩展,它进行了优化使得其可以和S3结合使用,并新增了一些实用功能。新增功能如增量复制文件、复制文件时指定压缩方式、根据模式进行数据聚合、按照文件清单进行拷贝等。S3 DistCp依靠S3对象存储系统,目前只能在AWS EMR内部使用,并不开放给普通用户。 Jindo DistCp Jindo DistCp是一个简单易用的分布式文件拷贝工具,目前主要用在E-Mapreduce集群内,主要提供hdfs到OSS的数据迁移服务,相比于Hadoop DistCp和S3 DistCp,Jindo DistCp做了很多优化以及新增了许多个性化功能,并且深度结合OSS对象存储的特性,定制化CopyCommitter,实现No-Rename拷贝,大大缩短上云数据迁移时间消耗。现在Jindo DistCp对外开放使用,我们可以使用该功能来进行上云数据迁移,获得OSS数据迁移利器。 为什么使用 Jindo DistCp? 1、效率高,在测试场景中最高可到1.59倍的加速。2、基本功能丰富,提供多种拷贝方式和场景优化策略。3、深度结合OSS,对文件提供直接归档和低频、压缩等操作。4、实现No-Rename拷贝,保证数据一致性。5、场景全面,可完全替代Hadoop DistCp,支持多Hadoop版本(如有问题可提issue) Jindo DistCp 兼容性如何? Jindo DistCp目前支持Hadoop2.7+和最新的Hadoop3.x,以两个不同的jar形式提供服务,依赖Hadoop环境并且不会和Hadoop DistCp产生冲突。在阿里云EMR内部可直接提供Jindo DistCp的服务,用户无需进行jar包下载。用户下载jar包后,再通过参数或者Hadoop配置文件配上oss的AK即可使用。 使用 Jindo DistCp 性能提升多少? 我们做了一个Jindo DistCp和Hadoop DistCp的性能对比,在这个测试中我们以hdfs到oss为主要场景,利用Hadoop自带的测试数据集TestDFSIO分别生成1000个10M、1000个500M、1000个1G大小的文件进行从hdfs拷贝数据到oss上的测试过程。 分析测试结果,可以看出Jindo DistCp相比Hadoop DistCp具有较大的性能提升,在测试场景中最高可达到1.59倍加速效果。 使用工具包 1. 下载jar包 我们去github repo下载最新的jar包 jindo-distcp-x.x.x.jar注意:目前Jar包只支持Linux、MacOS操作系统,因为SDK底层采用了native代码。 2. 配置OSS访问AK 您可以在命令中使用程序执行时指定--key、--secret、--endPoint参数选项来指定AK。 示例命令如下: hadoop jar jindo-distcp-2.7.3.jar --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --key yourkey --secret yoursecret --endPoint oss-cn-hangzhou.aliyuncs.com 您也可以将oss的ak、secret、endpoint预先配置在 hadoop的 core-site.xml 文件里 ,避免每次使用时临时填写ak。 <configuration> <property> <name>fs.jfs.cache.oss-accessKeyId</name> <value>xxx</value> </property> <property> <name>fs.jfs.cache.oss-accessKeySecret</name> <value>xxx</value> </property> <property> <name>fs.jfs.cache.oss-endpoint</name> <value>oss-cn-xxx.aliyuncs.com</value> </property> </configuration> 另外,我们推荐配置免密功能,避免明文保存accessKey,提高安全性。 使用手册 Jindo DistCp提供多种实用功能及其对应的参数选择,下面介绍参数含义及其示例 更多详细使用细节,请参考Jindo DistCp使用指南 联系我们 Jindo DistCp还在日益完善,后续会不断根据用户需求进行优化。欢迎大家下载使用Jindo DistCp,如果遇到任何问题,请随时联系阿里云E-Mapreduce团队,或者在github上提交issue,我们将尽快为您解答。 相关阅读 重磅:阿里云 JindoFS SDK 全面开放使用,OSS 文件各项操作性能得到大幅提升 JindoFS - 分层存储 EMR Spark-SQL性能极致优化揭秘 Native Codegen Framework Jindo SQL 性能优化之 RuntimeFilter Plus JindoFS: 云上大数据的高性能数据湖存储方案 JindoFS概述:云原生的大数据计算存储分离方案 JindoFS解析 - 云上大数据高性能数据湖存储方案 后续我们也会在云栖社区和钉钉群分享更多的 Jindo 技术干货,欢迎有兴趣的同学加入 【阿里云EMR钉钉群】进行交流和技术分享。
2020年07月
2020年06月
2020年05月
是的,可以大致这样理解。是行级别的,但下面存储格式基本上还是以 Parquet/ORC 列式为主;delta 小文件要及时合并的,否则性能很差。数据库这个提法不一定好,因为并不会用于 OLTP;可以说是数据仓库,OLAP 场景为主的。关于这个区别,我的一篇文章里面讲得比较细。可以看看。
https://yq.aliyun.com/articles/699919?spm=a2c4e.11153959.0.0.4f427507ntu6fX
这个问题很高级,你们是不是已经在玩了?不过工作流的定义过程里面,必然会形成各个工作流节点之间的依赖关系,定义工作流本身就是定义各个节点和他们之间的上下游关系,也就形成了这些依赖关系。如果你问的是多个工作流之间是不是还可以形成更高层次的依赖关系,我没有深入去看,感觉目前还比较早一点,不一定已经支持了。
分享里面(4月28日钉钉群分享)提到的 Hydrogen 项目就是要系统支持这些深度学习框架的。Spark 3.0 会包含进去。你找到相关 SPIP,JIRA 和 PPT 挖一下。
如果您没有使用 EMR 的统一元数据库功能,可以关闭公网 IP。
理论上可以在不破坏集群环境的前提下安装。但是这些软件的运行可能会影响到集群的稳定可靠性,不建议进行此类操作。
目前还不能支持,用户要创建 EMR 集群需要在 EMR 控制台上来创建 ECS。
EMR 支持自动续费操作,支持 EMR 和 ECS 的自动续费。
续费操作请参考集群续费。经常会有用户反馈续费了但是还是会通知说没有续费。这是因为 EMR 现在有 2 块,一块是 EMR,一块是 ECS,大部分的用户都只是续费了 ECS 而没有续费 EMR。您可以打开续费界面查看 ECS 和 EMR 到期时间。
一般是用户的按量节点数量的上限到了。ECS 根据不同用户,按量节点上限是不一样的。需要用户去申请加大。如果确认不是上述的原因,还有一种可能是用户是没有创建的机型的权限,需要去 ECS 开通这个机型的使用权限。