Spark 入门(2):应用

一、 Spark 中的计算模型

1.1 Spark 中的几个主要基本概念

在 Spark 中,有几个基本概念是需要先了解的,了解这些基本概念,对于后续在学习和使用 Spark 过程中,能更容易理解一些。

Application:基于 Spark 的用户程序,即由用户编写的调用 Spark API 的应用程序,它由集群上的一个驱动(Driver)程序和多个执行器(Executor)程序组成。其中应用程序的入口为用户所定义的 main 方法。

SparkContext:是 Spark 所有功能的主要入口点,它是用户逻辑与 Spark 集群主要的交互接口。通过SparkContext,可以连接到集群管理器(Cluster Manager),能够直接与集群 Master 节点进行交互,并能够向 Master 节点申请计算资源,也能够将应用程序用到的 JAR 包或 Python 文件发送到多个执行器(Executor)节点上。

Cluster Manager:即集群管理器,它存在于 Master 进程中,主要用来对应用程序申请的资源进行管理。

Worker Node:任何能够在集群中能够运行 Spark 应用程序的节点。

Task:SparkContext发送到Executor节点上执行的一个工作单元。

Driver:也即驱动器节点,它是一个运行Applicationmain()函数并创建SparkContext的进程。Driver节点也负责提交Job,并将Job转化为Task,在各个Executor进程间协调 Task 的调度。Driver节点可以不运行于集群节点机器上。

Executor:也即执行器节点,它是在一个在工作节点(Worker Node)上为Application启动的进程,它能够运行 Task 并将数据保存在内存或磁盘存储中,也能够将结果数据返回给Driver

根据以上术语的描述,通过下图可以大致看到 Spark 程序在运行时的内部协调过程:


(图片来源:Cluster Mode Overview)


除了以上几个基本概念外,Spark 中还有几个比较重要的概念。

1.2 RDD

1.2.1 基本概念

即弹性分布式数据集(Resilient Distributed Datasets),是一种容错的、可以被并行操作的元素集合,它是 Spark 中最重要的一个概念,是 Spark 对所有数据处理的一种基本抽象。Spark 中的计算过程可以简单抽象为对 RDD 的创建、转换和返回操作结果的过程:

对于 Spark 的 RDD 计算抽象过程描述如下:

makeRDD:可以通过访问外部物理存储(如 HDFS),通过调用 SparkContext.textFile()方法来读取文件并创建一个 RDD,也可以对输入数据集合通过调用 SparkContext.parallelize()方法来创建一个 RDD。RDD 被创建后不可被改变,只可以对 RDD 执行 Transformation 及 Action 操作。

Transformation(转换):对已有的 RDD 中的数据执行计算进行转换,并产生新的 RDD,在这个过程中有时会产生中间 RDD。Spark 对于Transformation采用惰性计算机制,即在 Transformation 过程并不会立即计算结果,而是在 Action 才会执行计算过程。如mapfiltergroupByKey、cache等方法,只执行Transformation操作,而不计算结果。

Action(执行):对已有的 RDD 中的数据执行计算产生结果,将结果返回 Driver 程序或写入到外部物理存储(如 HDFS)。如reducecollectcountsaveAsTextFile等方法,会对 RDD 中的数据执行计算。

1.2.2 RDD 依赖关系

Spark 中 RDD 的每一次Transformation都会生成一个新的 RDD,这样 RDD 之间就会形成类似于流水线一样的前后依赖关系,在 Spark 中,依赖关系被定义为两种类型,分别是窄依赖和宽依赖:

窄依赖(NarrowDependency):每个父 RDD 的一个分区最多被子 RDD 的一个分区所使用,即 RDD 之间是一对一的关系。窄依赖的情况下,如果下一个 RDD 执行时,某个分区执行失败(数据丢失),只需要重新执行父 RDD 的对应分区即可进行数恢复。例如mapfilterunion等算子都会产生窄依赖。

宽依赖(WideDependency,或 ShuffleDependency):是指一个父 RDD 的分区会被子 RDD 的多个分区所使用,即 RDD 之间是一对多的关系。当遇到宽依赖操作时,数据会产生Shuffle,所以也称之为ShuffleDependency。宽依赖情况下,如果下一个 RDD 执行时,某个分区执行失败(数据丢失),则需要将父 RDD 的所有分区全部重新执行才能进行数据恢复。例如groupByKeyreduceByKeysortByKey等操作都会产生宽依赖。

RDD 依赖关系如下图所示:

1.3 Partition

1.3.1 基本概念

partition(分区)是 Spark 中的重要概念,它是RDD的最小单元,RDD是由分布在各个节点上的partition 组成的。partition的数量决定了task的数量,每个task对应着一个partition

例如,使用 Spark 来读取本地文本文件内容,读取完后,这些内容将会被分成多个partition,这些partition就组成了一个RDD,同时这些partition可以分散到不同的机器上执行。RDD 的 partition 描述如下图所示:


(图片来源:Spark簡易操作


partition的数量可以在创建 RDD 时指定,如果未指定 RDD 的 partition 大小,则在创建 RDD 时,Spark 将使用默认值,默认值为spark.default.parallelism配置的参数。

1.3.2 Partition 数量影响及调整

Partition 数量的影响:

如果 partition 数量太少,则直接影响是计算资源不能被充分利用。例如分配 8 个核,但 partition 数量为 4,则将有一半的核没有利用到。

如果 partition 数量太多,计算资源能够充分利用,但会导致 task 数量过多,而 task 数量过多会影响执行效率,主要是 task 在序列化和网络传输过程带来较大的时间开销。

根据Spark RDD Programming Guide上的建议,集群节点的每个核分配 2-4 个partitions比较合理。

Partition 调整:

Spark 中主要有两种调整 partition 的方法:coalesce、repartition

参考 pyspark 中的函数定义:

def coalesce(self, numPartitions, shuffle=False):
"""
Return a new RDD that is reduced into \`numPartitions\` partitions.
"""

def repartition(self, numPartitions):
"""
*Return a new RDD that has exactly numPartitions partitions.*

*Can increase or decrease the level of parallelism in this RDD.*
*Internally, this uses a shuffle to redistribute data.*
*If you are decreasing the number of partitions in this RDD, consider*
*using \`coalesce\`, which can avoid performing a shuffle.*
*"""
return self.coalesce(numPartitions, shuffle=True)

从函数接口可以看到,reparation是直接调用coalesce(numPartitions, shuffle=True),不同的是,reparation函数可以增加或减少 partition 数量,调用repartition函数时,还会产生shuffle操作。而coalesce函数可以控制是否shuffle,但当shuffleFalse时,只能减小partition数,而无法增大。

1.4 Job

前面提到,RDD 支持两种类型的算子操作:TransformationAction。Spark 采用惰性机制,Transformation算子的代码不会被立即执行,只有当遇到第一个Action算子时,会生成一个Job,并执行前面的一系列Transformation操作。一个Job包含NTransformation和 1 个Action

而每个Job会分解成一系列可并行处理的Task,然后将Task分发到不同的Executor上运行,这也是 Spark 分布式执行的简要流程。

1.5 Stage

Spark 在对Job中的所有操作划分Stage时,一般会按照倒序进行,依据 RDD 之间的依赖关系(宽依赖或窄依赖)进行划分。即从Action开始,当遇到窄依赖类型的操作时,则划分到同一个执行阶段;遇到宽依赖操作,则划分一个新的执行阶段,且新的阶段为之前阶段的Parent,之前的阶段称作Child Stage,然后依次类推递归执行。Child Stage需要等待所有的Parent Stage执行完之后才可以执行,这时Stage之间根据依赖关系构成了一个大粒度的 DAG。

如下图所示,为一个复杂的 DAG Stage 划分示意图:


(图片来源:Job物理执行图


上图为一个 Job,该 Job 生成的 DAG 划分成了 3 个 Stage。上图的 Stage 划分过程是这样的:从最后的Action开始,从后往前推,当遇到操作为NarrowDependency时,则将该操作划分为同一个Stage,当遇到操作为ShuffleDependency时,则将该操作划分为新的一个Stage

1.6 Task

Task为一个Stage中的一个执行单元,也是 Spark 中的最小执行单元,一般来说,一个 RDD 有多少个Partition,就会有多少个Task,因为每一个Task 只是处理一个Partition上的数据。在一个Stage内,所有的 RDD 操作以串行的 Pipeline 方式,由一组并发的Task完成计算,这些Task的执行逻辑完全相同,只是作用于不同的Partition。每个Stage里面Task的数目由该Stage最后一个 RDD 的Partition 个数决定。

Spark 中Task分为两种类型,ShuffleMapTask 和 ResultTask,位于最后一个 Stage 的 Task 为 ResultTask,其他阶段的属于 ShuffleMapTask。ShuffleMapTask 和 ResultTask 分别类似于 Hadoop 中的 Map 和 Reduce。

二、Spark 调度原理

2.1 Spark 集群整体运行架构


(图片来源:SparkInternals-Overview


Spark 集群分为 Master 节点和 Worker 节点,相当于 Hadoop 的 Master 和 Slave 节点。Master 节点上常驻 Master 守护进程,负责管理全部的 Worker 节点。Worker 节点上常驻 Worker 守护进程,负责与 Master 节点通信并管理 Executors。

Driver 为用户编写的 Spark 应用程序所运行的进程。Driver 程序可以运行在 Master 节点上,也可运行在 Worker 节点上,还可运行在非 Spark 集群的节点上。

2.2 Spark 调度器

Spark 中主要有两种调度器:DAGScheduler 和 TaskScheduler,DAGScheduler 主要是把一个 Job 根据 RDD 间的依赖关系,划分为多个 Stage,对于划分后的每个 Stage 都抽象为一个由多个 Task 组成的任务集(TaskSet),并交给 TaskScheduler 来进行进一步的任务调度。TaskScheduler 负责对每个具体的 Task 进行调度。

2.2.1 DAGScheduler

当创建一个 RDD 时,每个 RDD 中包含一个或多个分区,当执行 Action 操作时,相应的产生一个 Job,而一个 Job 会根据 RDD 间的依赖关系分解为多个 Stage,每个 Stage 由多个 Task 组成(即 TaskSet),每个 Task 处理 RDD 中的一个 Partition。一个 Stage 里面所有分区的任务集合被包装为一个 TaskSet 交给 TaskScheduler 来进行任务调度。这个过程由是由 DAGScheduler 来完成的。DAGScheduler 对 RDD 的调度过程如下图所示:

(图片来源:Core Services behind Spark Job Execution

2.2.2 TaskScheduler

DAGScheduler 将一个 TaskSet 交给 TaskScheduler 后,TaskScheduler 会为每个 TaskSet 进行任务调度,Spark 中的任务调度分为两种:FIFO(先进先出)调度和 FAIR(公平调度)调度。

FIFO 调度:即谁先提交谁先执行,后面的任务需要等待前面的任务执行。这是 Spark 的默认的调度模式。

FAIR 调度:支持将作业分组到池中,并为每个池设置不同的调度权重,任务可以按照权重来决定执行顺序。

在 Spark 中使用哪种调度器可通过配置spark.scheduler.mode参数来设置,可选的参数有 FAIR 和 FIFO,默认是 FIFO。

FIFO 调度算法为 FIFOSchedulingAlgorithm,该算法的 comparator 方法的 Scala 源代码如下:

override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val priority1 = s1.priority  // priority实际为Job ID
    val priority2 = s2.priority
    var res = math.signum(priority1 - priority2)
    if (res == 0) {
      val stageId1 = s1.stageId
      val stageId2 = s2.stageId
      res = math.signum(stageId1 - stageId2)
    }
    res < 0
  }

根据以上代码,FIFO 调度算法实现的是:对于两个调度任务 s1 和 s2,首先比较两个任务的优先级(Job ID)大小,如果 priority1 比 priority2 小,那么返回 true,表示 s1 的优先级比 s2 的高。由于 Job ID 是顺序生成的,先生成的 Job ID 比较小,所以先提交的 Job 肯定比后提交的 Job 优先级高,也即先提交的 Job 会被先执行。

如果 s1 和 s2 的 priority 相同,表示为同一个 Job 的不同 Stage,则比较 Stage ID,Stage ID 小则优先级高。

FAIR 调度算法为 FairSchedulingAlgorithm,该算法的 comparator 方法的 Scala 源代码如下:

展开阅读全文

本文系作者在时代Java发表,未经许可,不得转载。

如有侵权,请联系nowjava@qq.com删除。

编辑于

关注时代Java

关注时代Java