我刚刚将spark 2.1.0升级为spark 2.2.1。有没有人看到极端缓慢的行为dataframe.filter(…).collect()?特别是以前的collect操作filter。dataframe.collect似乎运行正常。但是,dataframe.filter(…).collect()需要永远。它只包含2条记录。它在单元测试中。当我回到Spark 2.1.0时,它恢复正常速度
我看过线程转储,找不到明显的原因。我已经努力确保我使用的所有库也使用Spark 2.2.1。
它似乎陷入了这种堆栈跟踪
scala.collection.mutable.FlatHashTable$class.addEntry(FlatHashTable.scala:151)
scala.collection.mutable.HashSet.addEntry(HashSet.scala:40)
scala.collection.mutable.FlatHashTable$class.addElem(FlatHashTable.scala:142)
scala.collection.mutable.HashSet.addElem(HashSet.scala:40)
scala.collection.mutable.HashSet.$plus$eq(HashSet.scala:59)
scala.collection.mutable.HashSet.$plus$eq(HashSet.scala:40)
scala.collection.generic.Growable
$$ anonfun $$
plus$plus$eq$1.apply(Growable.scala:59)
scala.collection.generic.Growable
$$ anonfun $$
plus$plus$eq$1.apply(Growable.scala:59)
scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
scala.collection.mutable.AbstractSet.$plus$plus$eq(Set.scala:46)
scala.collection.mutable.HashSet.clone(HashSet.scala:83)
scala.collection.mutable.HashSet.clone(HashSet.scala:40)
org.apache.spark.sql.catalyst.expressions.ExpressionSet.$plus(ExpressionSet.scala:65)
org.apache.spark.sql.catalyst.expressions.ExpressionSet.$plus(ExpressionSet.scala:50)
scala.collection.SetLike
$$ anonfun $$
plus$plus$1.apply(SetLike.scala:141)
scala.collection.SetLike
$$ anonfun $$
plus$plus$1.apply(SetLike.scala:141)
scala.collection.TraversableOnce
$$ anonfun$foldLeft$1.apply(TraversableOnce.scala:157) scala.collection.TraversableOnce $$
anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:316)
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:972)
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:972)
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:972)
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:972)
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
scala.collection.TraversableOnce$class.$div$colon(TraversableOnce.scala:151)
scala.collection.AbstractTraversable.$div$colon(Traversable.scala:104)
scala.collection.SetLike$class.$plus$plus(SetLike.scala:141)
org.apache.spark.sql.catalyst.expressions.ExpressionSet.$plus$plus(ExpressionSet.scala:50)
org.apache.spark.sql.catalyst.plans.logical.UnaryNode
$$ anonfun$getAliasedConstraints$1.apply(LogicalPlan.scala:323) org.apache.spark.sql.catalyst.plans.logical.UnaryNode $$
anonfun$getAliasedConstraints$1.apply(LogicalPlan.scala:320)
scala.collection.immutable.List.foreach(List.scala:392)
org.apache.spark.sql.catalyst.plans.logical.UnaryNode.getAliasedConstraints(LogicalPlan.scala:320)
org.apache.spark.sql.catalyst.plans.logical.Project.validConstraints(basicLogicalOperators.scala:65)
org.apache.spark.sql.catalyst.plans.QueryPlan.constraints$lzycompute(QueryPlan.scala:188) => holding Monitor(org.apache.spark.sql.catalyst.plans.logical.Aggregate@1129881457})
org.apache.spark.sql.catalyst.plans.QueryPlan.constraints(QueryPlan.scala:188)
org.apache.spark.sql.catalyst.plans.logical.Aggregate.validConstraints(basicLogicalOperators.scala:555)
org.apache.spark.sql.catalyst.plans.QueryPlan.constraints$lzycompute(QueryPlan.scala:188) => holding Monitor(org.apache.spark.sql.catalyst.plans.logical.Aggregate@1129881457})
org.apache.spark.sql.catalyst.plans.QueryPlan.constraints(QueryPlan.scala:188)
org.apache.spark.sql.catalyst.plans.QueryPlan.getConstraints(QueryPlan.scala:196)
org.apache.spark.sql.catalyst.optimizer.PruneFilters
$$ anonfun$apply$16 $$
anonfun$25.apply(Optimizer.scala:717)
org.apache.spark.sql.catalyst.optimizer.PruneFilters
$$ anonfun$apply$16 $$
anonfun$25.apply(Optimizer.scala:716)
scala.collection.TraversableLike
$$ anonfun$partition$1.apply(TraversableLike.scala:314) scala.collection.TraversableLike $$
anonfun$partition$1.apply(TraversableLike.scala:314)
scala.collection.immutable.List.foreach(List.scala:392)
scala.collection.TraversableLike$class.partition(TraversableLike.scala:314)
scala.collection.AbstractTraversable.partition(Traversable.scala:104)
org.apache.spark.sql.catalyst.optimizer.PruneFilters
$$ anonfun$apply$16.applyOrElse(Optimizer.scala:716) org.apache.spark.sql.catalyst.optimizer.PruneFilters $$
anonfun$apply$16.applyOrElse(Optimizer.scala:705)
org.apache.spark.sql.catalyst.trees.TreeNode
$$ anonfun$2.apply(TreeNode.scala:267) org.apache.spark.sql.catalyst.trees.TreeNode $$
anonfun$2.apply(TreeNode.scala:267)
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
org.apache.spark.sql.catalyst.optimizer.PruneFilters.apply(Optimizer.scala:705)
org.apache.spark.sql.catalyst.optimizer.PruneFilters.apply(Optimizer.scala:704)
org.apache.spark.sql.catalyst.rules.RuleExecutor
$$ anonfun$execute$1 $$
anonfun$apply$1.apply(RuleExecutor.scala:85)
org.apache.spark.sql.catalyst.rules.RuleExecutor
$$ anonfun$execute$1 $$
anonfun$apply$1.apply(RuleExecutor.scala:82)
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
scala.collection.immutable.List.foldLeft(List.scala:84)
org.apache.spark.sql.catalyst.rules.RuleExecutor
$$ anonfun$execute$1.apply(RuleExecutor.scala:82) org.apache.spark.sql.catalyst.rules.RuleExecutor $$
anonfun$execute$1.apply(RuleExecutor.scala:74)
scala.collection.immutable.List.foreach(List.scala:392)
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78) => holding Monitor(org.apache.spark.sql.execution.QueryExecution@1193326176})
org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84) => holding Monitor(org.apache.spark.sql.execution.QueryExecution@1193326176})
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89) => holding Monitor(org.apache.spark.sql.execution.QueryExecution@1193326176})
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
org.apache.spark.sql.Dataset.withAction(Dataset.scala:2837)
org.apache.spark.sql.Dataset.collect(Dataset.scala:2387)
固定它。所以问题在于这个属性spark.sql.constraintPropagation.enabled。默认值true在Spark 2.2.1中。堆栈跟踪表明它卡在一些查询计划生成中。
简答:将上述属性设置为false。 spark.conf.set(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key, false)
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。