运维开发网

scala – Spark UI DAG阶段已断开连接

运维开发网 https://www.qedev.com 2020-06-13 10:36 出处:网络 作者:运维开发网整理
我在spark- shell中运行了以下工作: val d = sc.parallelize(0 until 1000000).map(i => (i%100000, i)).persist d.join(d.reduceByKey(_ + _)).collect Spark UI显示了三个阶段.阶段4和5对应于d的计算,阶段6对应于收集动作的计算.由于d是持久的,我只期望两个阶段.然而,阶段5不
我在spark- shell中运行了以下工作:

val d = sc.parallelize(0 until 1000000).map(i => (i%100000, i)).persist
d.join(d.reduceByKey(_ + _)).collect

Spark UI显示了三个阶段.阶段4和5对应于d的计算,阶段6对应于收集动作的计算.由于d是持久的,我只期望两个阶段.然而,阶段5不存在与任何其他阶段的连接.

scala – Spark UI DAG阶段已断开连接

因此尝试在不使用持久化的情况下运行相同的计算,并且DAG看起来完全相同,除非没有指示RDD已被持久化的绿点.

scala – Spark UI DAG阶段已断开连接

我希望第11阶段的输出连接到第12阶段的输入,但事实并非如此.

看一下阶段描述,这些阶段似乎表明d是持久的,因为第5阶段有输入,但我仍然对第5阶段为什么存在感到困惑.

scala – Spark UI DAG阶段已断开连接

scala – Spark UI DAG阶段已断开连接

>输入RDD被缓存,缓存的部分不会被重新计算.

这可以通过简单的测试验证:

import org.apache.spark.SparkContext

def f(sc: SparkContext) = {
  val counter = sc.longAccumulator("counter")
  val rdd = sc.parallelize(0 until 100).map(i => {
    counter.add(1L)
    (i%10, i)
  }).persist
  rdd.join(rdd.reduceByKey(_ + _)).foreach(_ => ())
  counter.value
}

assert(f(spark.sparkContext) == 100)

>缓存不会从DAG中删除阶段.

如果数据被缓存对应的阶段can be marked as skipped但仍然是DAG的一部分.可以使用检查点截断谱系,但它不是同一个事物,它不会从可视化中删除阶段.

>输入阶段包含的不仅仅是缓存计算.

Spark阶段将可以链接的操作组合在一起而不执行shuffle.

虽然输入阶段的一部分是缓存的,但它并未涵盖准备shuffle文件所需的所有操作.这就是为什么你看不到跳过的任务.

>其余(分离)只是图形可视化的限制.

>如果您首先重新分区数据:

import org.apache.spark.HashPartitioner

val d = sc.parallelize(0 until 1000000)
  .map(i => (i%100000, i))
  .partitionBy(new HashPartitioner(20))

d.join(d.reduceByKey(_ + _)).collect

你会得到你最有可能寻找的DAG:

scala – Spark UI DAG阶段已断开连接

0

精彩评论

暂无评论...
验证码 换一张
取 消